# 批处理中心 Spring Batch是一个基于Spring的企业级批处理框架。 企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再进行归档的这类操作 批处理的整个流程可以明显的分为3个阶段: * 1、读数据 * 2、业务处理 * 3、归档结果数据 ## 基础理论 ![](https://box.kancloud.cn/5ba9778c0635bf655287b8ffb088b91a_831x374.png) ![](https://box.kancloud.cn/7e5290bbebece0178e6ce77174ff31cb_956x730.png) [参考链接](https://gitee.com/kailing/partitionjob) 本工程不依赖mq,改为本地作业 ![](https://box.kancloud.cn/95e3bbfacf3a2805490a75b8a52f5040_935x465.png) ![](https://box.kancloud.cn/f0eb92da5f0bd9ed73f85d44fb9ce4df_629x436.png) # 设定读取处理写入规则 ``` @Bean("slaveStep") public Step slaveStep(DeliverPostProcessorItem processorItem, JdbcCursorItemReader reader) { CompositeItemProcessor itemProcessor = new CompositeItemProcessor(); List<ItemProcessor> processorList = new ArrayList<>(); processorList.add(processorItem); itemProcessor.setDelegates(processorList); return stepBuilderFactory.get("slaveStep") .<DeliverPost, DeliverPost>chunk(1000)//事务提交批次 .reader(reader) .processor(itemProcessor) .writer(dbItemWriter) .build(); } ``` ## 数据分片 ``` /** * @create 2019年4月2日 * Content :根据数据ID分片 */ public class ColumnRangePartitioner implements Partitioner { private JdbcOperations jdbcTemplate; ColumnRangePartitioner(DataSource dataSource){ this.jdbcTemplate = new JdbcTemplate(dataSource); } @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> result = new LinkedHashMap<String, ExecutionContext>(); int current_thread = 1 ; int total_thread = gridSize ; while (current_thread <= total_thread) { ExecutionContext value = new ExecutionContext(); result.put("partition" + current_thread, value); value.putInt("current_thread", current_thread); value.putInt("total_thread", total_thread); current_thread++; } return result; } } ``` ## 本地基于游标方式读取分片信息 ``` @Bean(destroyMethod = "") @StepScope public JdbcCursorItemReader<DeliverPost> JdbcCursorItemReader( @Value("#{stepExecutionContext['current_thread']}") Long current_thread, @Value("#{stepExecutionContext['total_thread']}") Long total_thread) { System.err.println("接收到分片参数["+total_thread+"->"+current_thread+"]"); JdbcCursorItemReader<DeliverPost> reader = new JdbcCursorItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new DeliverPostRowMapper()); // 把数据库中的每条数据映射到Person对中 reader.setSql("select order_id , post_id from oc_deliver_post_t where post_id is not null and post_id <> '0' and mod(substring(order_id , -4) ,? )= ( ? -1 )"); reader.setPreparedStatementSetter(new PreparedStatementSetter() { public void setValues(PreparedStatement preparedStatement) throws SQLException { preparedStatement.setLong(1, total_thread); preparedStatement.setLong(2, current_thread); } }); return reader; } ``` ### 分片数据处理过程 ``` /** * @create 2019年4月2日 * Content :数据处理Item */ @Service public class DeliverPostProcessorItem implements ItemProcessor<DeliverPost, DeliverPost> { Logger logger = LoggerFactory.getLogger(DeliverPostProcessorItem.class); @Autowired private CommonDao commonDao ; @Autowired private ThirdServiceProp thirdServiceProp; @Override public DeliverPost process(DeliverPost deliverPost) throws Exception { logger.info("订单号:【{}】经过处理器 ", deliverPost.getOrderId()); { // ems是否签收 String resp = this.getEms(deliverPost.getPostId()); try { Map respMap = JSONObject.parseObject(resp, Map.class); if ("0000".equals(respMap.get("code"))) { Map rep = (Map) respMap.get("rep"); Map msg = (Map) rep.get("msg"); List<Map> traces = (List<Map>) msg.get("traces"); for (Iterator<Map> it = traces.iterator(); it.hasNext();) { Map temp = it.next(); if ("10".equals(temp.get("code"))) { // 已签收 deliverPost.setIsArrived(1); } } } } catch (Exception e) { System.out.println(e); } } { // 中通是否签收 String resp = this.getZT(deliverPost.getPostId()); try { Map respMap = JSONObject.parseObject(resp, Map.class); if ("0000".equals(respMap.get("code"))) { Map rep = (Map) respMap.get("rep"); Map msg = (Map) rep.get("msg"); List<Map> data = (List<Map>) msg.get("data"); for (Iterator<Map> it = data.iterator(); it.hasNext();) { Map temp = it.next(); List<Map> traces = (List<Map>) temp.get("traces"); for (Iterator<Map> it1 = traces.iterator(); it1.hasNext();) { Map tempT = it1.next(); if ("收件".equals(tempT.get("scanType"))) { // 已签收 deliverPost.setIsArrived(1); } } } } } catch (Exception e) { } } return deliverPost; } public String getEms(String postId) { String transid= PointUtil.getRandom() ; // JSONObject resultJosn = JSONObject.fromObject(result); StringBuffer strbuf = new StringBuffer(); String jsonOut = ""; try { com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject(); obj.put("method", "ems.inland.trace.query"); obj.put("action", "3th_ems"); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); obj.put("prea", sdf.format(date));// 163315236523523 com.alibaba.fastjson.JSONObject req = new com.alibaba.fastjson.JSONObject(); obj.put("req", req); com.alibaba.fastjson.JSONObject msg = new com.alibaba.fastjson.JSONObject(); req.put("msg", msg); msg.put("mailNo", postId); msg.put("authorization", "408a6c32e61d3ad5cb5c4e0cb3d2b089"); msg.put("timestamp", System.currentTimeMillis()); // 请求数据 jsonOut = obj.toString(); logger.info("EMS请求处理开始: transid=【{}】 ,req=【{}】", transid ,jsonOut); String callurl = commonDao.getHttpUrl("104"); int timeOut = 3000; URL url = new URL(thirdServiceProp.getUrl() + callurl); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("POST"); byte[] contentbyte = jsonOut.getBytes("UTF-8"); conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); conn.setRequestProperty("Content-Length", contentbyte.length + ""); conn.setRequestProperty("Accept-Encoding", ""); conn.setRequestProperty("Accept", "application/json"); conn.setConnectTimeout(3000); conn.setReadTimeout(3000); conn.setUseCaches(false); conn.setDoInput(true); conn.setDoOutput(true); conn.connect(); OutputStream out = conn.getOutputStream(); out.write(contentbyte); // 发送请求报文 out.flush(); out.close(); InputStream in = conn.getInputStream(); BufferedReader dr = new BufferedReader(new InputStreamReader(in, "utf-8")); String text_rsp = null; while ((text_rsp = dr.readLine()) != null) { strbuf.append(text_rsp); } in.close(); logger.info("EMS请求处理结束: transid=【{}】 ,res=【{}】 ",transid, strbuf); } catch (Exception e) { strbuf.setLength(0); strbuf.append("{\"code\":\"8888\",\"detail\":\"失败\"}"); logger.error(postId + "EMS转发接口报错!!!"); } return strbuf.toString(); } public String getZT(String postId) { String transid= PointUtil.getRandom() ; // JSONObject resultJosn = JSONObject.fromObject(result); StringBuffer strbuf = new StringBuffer(); String jsonOut = ""; try { com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject(); obj.put("method", "api.traceInterfaceNewTraces"); obj.put("action", "3th_zto"); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); obj.put("prea", sdf.format(date));// 163315236523523 com.alibaba.fastjson.JSONObject req = new com.alibaba.fastjson.JSONObject(); obj.put("req", req); com.alibaba.fastjson.JSONObject msg = new com.alibaba.fastjson.JSONObject(); req.put("msg", msg); msg.put("company_id", "20f74746141c4433a15e7ddd5aade604"); msg.put("data", Arrays.asList(postId)); msg.put("msg_type", "NEW_TRACES"); // 请求数据 jsonOut = obj.toString(); logger.info("中通请求处理开始: transid=【{}】 ,req=【{}】 ",transid , jsonOut); String callurl = commonDao.getHttpUrl("103"); //固定token callurl =callurl.replace("tokenid", "798d3ed2ebaec83ae608c10207f783d6") ; int timeOut = 3000; URL url = new URL(thirdServiceProp.getUrl() + callurl); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("POST"); byte[] contentbyte = jsonOut.getBytes("UTF-8"); conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); conn.setRequestProperty("Content-Length", contentbyte.length + ""); conn.setRequestProperty("Accept-Encoding", ""); conn.setRequestProperty("Accept", "application/json"); conn.setConnectTimeout(3000); conn.setReadTimeout(3000); conn.setUseCaches(false); conn.setDoInput(true); conn.setDoOutput(true); conn.connect(); OutputStream out = conn.getOutputStream(); out.write(contentbyte); // 发送请求报文 out.flush(); out.close(); InputStream in = conn.getInputStream(); BufferedReader dr = new BufferedReader(new InputStreamReader(in, "utf-8")); String text_rsp = null; while ((text_rsp = dr.readLine()) != null) { strbuf.append(text_rsp); } in.close(); logger.info("中通请求处理结束: transid=【{}】 ,res=【{}】 ",transid, strbuf); } catch (Exception e) { strbuf.setLength(0); strbuf.append("{\"code\":\"8888\",\"detail\":\"失败\"}"); logger.error(postId + "中通转发接口报错!!!"); } return strbuf.toString(); } } ``` 分片数据输出item ``` /** * @create 2019年4月2日 * Content :数据输出item */ @Component @StepScope public class DBWriterItem<T> implements ItemWriter<T> { @Autowired private DeliverPostDao deliverPostDao ; @Override public void write(List<? extends T> list) throws Exception { deliverPostDao.batchInsert((List<? extends DeliverPost>) list); } } ```