# 批处理中心
Spring Batch是一个基于Spring的企业级批处理框架。
企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再进行归档的这类操作
批处理的整个流程可以明显的分为3个阶段:
* 1、读数据
* 2、业务处理
* 3、归档结果数据
## 基础理论
![](https://box.kancloud.cn/5ba9778c0635bf655287b8ffb088b91a_831x374.png)
![](https://box.kancloud.cn/7e5290bbebece0178e6ce77174ff31cb_956x730.png)
[参考链接](https://gitee.com/kailing/partitionjob)
本工程不依赖mq,改为本地作业
![](https://box.kancloud.cn/95e3bbfacf3a2805490a75b8a52f5040_935x465.png)
![](https://box.kancloud.cn/f0eb92da5f0bd9ed73f85d44fb9ce4df_629x436.png)
# 设定读取处理写入规则
```
@Bean("slaveStep")
public Step slaveStep(DeliverPostProcessorItem processorItem,
JdbcCursorItemReader reader) {
CompositeItemProcessor itemProcessor = new CompositeItemProcessor();
List<ItemProcessor> processorList = new ArrayList<>();
processorList.add(processorItem);
itemProcessor.setDelegates(processorList);
return stepBuilderFactory.get("slaveStep")
.<DeliverPost, DeliverPost>chunk(1000)//事务提交批次
.reader(reader)
.processor(itemProcessor)
.writer(dbItemWriter)
.build();
}
```
## 数据分片
```
/**
* @create 2019年4月2日
* Content :根据数据ID分片
*/
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
ColumnRangePartitioner(DataSource dataSource){
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new LinkedHashMap<String, ExecutionContext>();
int current_thread = 1 ;
int total_thread = gridSize ;
while (current_thread <= total_thread) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + current_thread, value);
value.putInt("current_thread", current_thread);
value.putInt("total_thread", total_thread);
current_thread++;
}
return result;
}
}
```
## 本地基于游标方式读取分片信息
```
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<DeliverPost> JdbcCursorItemReader(
@Value("#{stepExecutionContext['current_thread']}") Long current_thread,
@Value("#{stepExecutionContext['total_thread']}") Long total_thread) {
System.err.println("接收到分片参数["+total_thread+"->"+current_thread+"]");
JdbcCursorItemReader<DeliverPost> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource); // 设置数据源
reader.setFetchSize(100); // 设置一次最大读取条数
reader.setRowMapper(new DeliverPostRowMapper()); // 把数据库中的每条数据映射到Person对中
reader.setSql("select order_id , post_id from oc_deliver_post_t where post_id is not null and post_id <> '0' and mod(substring(order_id , -4) ,? )= ( ? -1 )");
reader.setPreparedStatementSetter(new PreparedStatementSetter() {
public void setValues(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setLong(1, total_thread);
preparedStatement.setLong(2, current_thread);
}
});
return reader;
}
```
### 分片数据处理过程
```
/**
* @create 2019年4月2日
* Content :数据处理Item
*/
@Service
public class DeliverPostProcessorItem implements ItemProcessor<DeliverPost, DeliverPost> {
Logger logger = LoggerFactory.getLogger(DeliverPostProcessorItem.class);
@Autowired
private CommonDao commonDao ;
@Autowired
private ThirdServiceProp thirdServiceProp;
@Override
public DeliverPost process(DeliverPost deliverPost) throws Exception {
logger.info("订单号:【{}】经过处理器 ", deliverPost.getOrderId());
{
// ems是否签收
String resp = this.getEms(deliverPost.getPostId());
try {
Map respMap = JSONObject.parseObject(resp, Map.class);
if ("0000".equals(respMap.get("code"))) {
Map rep = (Map) respMap.get("rep");
Map msg = (Map) rep.get("msg");
List<Map> traces = (List<Map>) msg.get("traces");
for (Iterator<Map> it = traces.iterator(); it.hasNext();) {
Map temp = it.next();
if ("10".equals(temp.get("code"))) {
// 已签收
deliverPost.setIsArrived(1);
}
}
}
} catch (Exception e) {
System.out.println(e);
}
}
{
// 中通是否签收
String resp = this.getZT(deliverPost.getPostId());
try {
Map respMap = JSONObject.parseObject(resp, Map.class);
if ("0000".equals(respMap.get("code"))) {
Map rep = (Map) respMap.get("rep");
Map msg = (Map) rep.get("msg");
List<Map> data = (List<Map>) msg.get("data");
for (Iterator<Map> it = data.iterator(); it.hasNext();) {
Map temp = it.next();
List<Map> traces = (List<Map>) temp.get("traces");
for (Iterator<Map> it1 = traces.iterator(); it1.hasNext();) {
Map tempT = it1.next();
if ("收件".equals(tempT.get("scanType"))) {
// 已签收
deliverPost.setIsArrived(1);
}
}
}
}
} catch (Exception e) {
}
}
return deliverPost;
}
public String getEms(String postId) {
String transid= PointUtil.getRandom() ;
// JSONObject resultJosn = JSONObject.fromObject(result);
StringBuffer strbuf = new StringBuffer();
String jsonOut = "";
try {
com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject();
obj.put("method", "ems.inland.trace.query");
obj.put("action", "3th_ems");
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
obj.put("prea", sdf.format(date));// 163315236523523
com.alibaba.fastjson.JSONObject req = new com.alibaba.fastjson.JSONObject();
obj.put("req", req);
com.alibaba.fastjson.JSONObject msg = new com.alibaba.fastjson.JSONObject();
req.put("msg", msg);
msg.put("mailNo", postId);
msg.put("authorization", "408a6c32e61d3ad5cb5c4e0cb3d2b089");
msg.put("timestamp", System.currentTimeMillis());
// 请求数据
jsonOut = obj.toString();
logger.info("EMS请求处理开始: transid=【{}】 ,req=【{}】", transid ,jsonOut);
String callurl = commonDao.getHttpUrl("104");
int timeOut = 3000;
URL url = new URL(thirdServiceProp.getUrl() + callurl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
byte[] contentbyte = jsonOut.getBytes("UTF-8");
conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
conn.setRequestProperty("Content-Length", contentbyte.length + "");
conn.setRequestProperty("Accept-Encoding", "");
conn.setRequestProperty("Accept", "application/json");
conn.setConnectTimeout(3000);
conn.setReadTimeout(3000);
conn.setUseCaches(false);
conn.setDoInput(true);
conn.setDoOutput(true);
conn.connect();
OutputStream out = conn.getOutputStream();
out.write(contentbyte); // 发送请求报文
out.flush();
out.close();
InputStream in = conn.getInputStream();
BufferedReader dr = new BufferedReader(new InputStreamReader(in, "utf-8"));
String text_rsp = null;
while ((text_rsp = dr.readLine()) != null) {
strbuf.append(text_rsp);
}
in.close();
logger.info("EMS请求处理结束: transid=【{}】 ,res=【{}】 ",transid, strbuf);
} catch (Exception e) {
strbuf.setLength(0);
strbuf.append("{\"code\":\"8888\",\"detail\":\"失败\"}");
logger.error(postId + "EMS转发接口报错!!!");
}
return strbuf.toString();
}
public String getZT(String postId) {
String transid= PointUtil.getRandom() ;
// JSONObject resultJosn = JSONObject.fromObject(result);
StringBuffer strbuf = new StringBuffer();
String jsonOut = "";
try {
com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject();
obj.put("method", "api.traceInterfaceNewTraces");
obj.put("action", "3th_zto");
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
obj.put("prea", sdf.format(date));// 163315236523523
com.alibaba.fastjson.JSONObject req = new com.alibaba.fastjson.JSONObject();
obj.put("req", req);
com.alibaba.fastjson.JSONObject msg = new com.alibaba.fastjson.JSONObject();
req.put("msg", msg);
msg.put("company_id", "20f74746141c4433a15e7ddd5aade604");
msg.put("data", Arrays.asList(postId));
msg.put("msg_type", "NEW_TRACES");
// 请求数据
jsonOut = obj.toString();
logger.info("中通请求处理开始: transid=【{}】 ,req=【{}】 ",transid , jsonOut);
String callurl = commonDao.getHttpUrl("103");
//固定token
callurl =callurl.replace("tokenid", "798d3ed2ebaec83ae608c10207f783d6") ;
int timeOut = 3000;
URL url = new URL(thirdServiceProp.getUrl() + callurl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
byte[] contentbyte = jsonOut.getBytes("UTF-8");
conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
conn.setRequestProperty("Content-Length", contentbyte.length + "");
conn.setRequestProperty("Accept-Encoding", "");
conn.setRequestProperty("Accept", "application/json");
conn.setConnectTimeout(3000);
conn.setReadTimeout(3000);
conn.setUseCaches(false);
conn.setDoInput(true);
conn.setDoOutput(true);
conn.connect();
OutputStream out = conn.getOutputStream();
out.write(contentbyte); // 发送请求报文
out.flush();
out.close();
InputStream in = conn.getInputStream();
BufferedReader dr = new BufferedReader(new InputStreamReader(in, "utf-8"));
String text_rsp = null;
while ((text_rsp = dr.readLine()) != null) {
strbuf.append(text_rsp);
}
in.close();
logger.info("中通请求处理结束: transid=【{}】 ,res=【{}】 ",transid, strbuf);
} catch (Exception e) {
strbuf.setLength(0);
strbuf.append("{\"code\":\"8888\",\"detail\":\"失败\"}");
logger.error(postId + "中通转发接口报错!!!");
}
return strbuf.toString();
}
}
```
分片数据输出item
```
/**
* @create 2019年4月2日
* Content :数据输出item
*/
@Component
@StepScope
public class DBWriterItem<T> implements ItemWriter<T> {
@Autowired
private DeliverPostDao deliverPostDao ;
@Override
public void write(List<? extends T> list) throws Exception {
deliverPostDao.batchInsert((List<? extends DeliverPost>) list);
}
}
```
- 前言
- 1.项目说明
- 2.项目更新日志
- 3.文档更新日志
- 01.快速开始
- 01.maven构建项目
- 02.环境安装
- 03.STS项目导入
- 03.IDEA项目导入
- 04.数据初始化
- 05.项目启动
- 06.付费文档说明
- 02.总体流程
- 1.oauth接口
- 2.架构设计图
- 3.微服务介绍
- 4.功能介绍
- 5.梳理流程
- 03.模块详解
- 01.老版本1.0.1分支模块讲解
- 01.db-core模块
- 02.api-commons模块
- 03.log-core模块
- 04.security-core模块
- 05.swagger-core模块
- 06.eureka-server模块
- 07.auth-server模块
- 08.auth-sso模块解析
- 09.user-center模块
- 10.api-gateway模块
- 11.file-center模块
- 12.log-center模块
- 13.batch-center模块
- 14.back-center模块
- 02.spring-boot-starter-web那点事
- 03.自定义db-spring-boot-starter
- 04.自定义log-spring-boot-starter
- 05.自定义redis-spring-boot-starter
- 06.自定义common-spring-boot-starter
- 07.自定义swagger-spring-boot-starter
- 08.自定义uaa-server-spring-boot-starter
- 09.自定义uaa-client-spring-boot-starter
- 10.自定义ribbon-spring-boot-starter
- 11.springboot启动原理
- 12.eureka-server模块
- 13.auth-server模块
- 14.user-center模块
- 15.api-gateway模块
- 16.file-center模块
- 17.log-center模块
- 18.back-center模块
- 19.auth-sso模块
- 20.admin-server模块
- 21.zipkin-center模块
- 22.job-center模块
- 23.batch-center
- 04.全新网关
- 01.基于spring cloud gateway的new-api-gateway
- 02.spring cloud gateway整合Spring Security Oauth
- 03.基于spring cloud gateway的redis动态路由
- 04.spring cloud gateway聚合swagger文档
- 05.技术详解
- 01.互联网系统设计原则
- 02.系统幂等性设计与实践
- 03.Oauth最简向导开发指南
- 04.oauth jdbc持久化策略
- 05.JWT token方式启用
- 06.token有效期的处理
- 07.@PreAuthorize注解分析
- 08.获取当前用户信息
- 09.认证授权白名单配置
- 10.OCP权限设计
- 11.服务安全流程
- 12.认证授权详解
- 13.验证码技术
- 14.短信验证码登录
- 15.动态数据源配置
- 16.分页插件使用
- 17.缓存击穿
- 18.分布式主键生成策略
- 19.分布式定时任务
- 20.分布式锁
- 21.网关多维度限流
- 22.跨域处理
- 23.容错限流
- 24.应用访问次数控制
- 25.统一业务异常处理
- 26.日志埋点
- 27.GPRC内部通信
- 28.服务间调用
- 29.ribbon负载均衡
- 30.微服务分布式跟踪
- 31.异步与线程传递变量
- 32.死信队列延时消息
- 33.单元测试用例
- 34.Greenwich.RELEASE升级
- 35.混沌工程质量保证
- 06.开发初探
- 1.开发技巧
- 2.crud例子
- 3.新建服务
- 4.区分前后台用户
- 07.分表分库
- 08.分布式事务
- 1.Seata介绍
- 2.Seata部署
- 09.shell部署
- 01.eureka-server
- 02.user-center
- 03.auth-server
- 04.api-gateway
- 05.file-center
- 06.log-center
- 07.back-center
- 08.编写shell脚本
- 09.集群shell部署
- 10.集群shell启动
- 11.部署阿里云问题
- 10.网关安全
- 1.openresty https保障服务安全
- 2.openresty WAF应用防火墙
- 3.openresty 高可用
- 11.docker配置
- 01.docker安装
- 02.Docker 开启远程API
- 03.采用docker方式打包到服务器
- 04.docker创建mysql
- 05.docker网络原理
- 06.docker实战
- 6.01.安装docker
- 6.02.管理镜像基本命令
- 6.03.容器管理
- 6.04容器数据持久化
- 6.05网络模式
- 6.06.Dockerfile
- 6.07.harbor部署
- 6.08.使用自定义镜像
- 12.统一监控中心
- 01.spring boot admin监控
- 02.Arthas诊断利器
- 03.nginx监控(filebeat+es+grafana)
- 04.Prometheus监控
- 05.redis监控(redis+prometheus+grafana)
- 06.mysql监控(mysqld_exporter+prometheus+grafana)
- 07.elasticsearch监控(elasticsearch-exporter+prometheus+grafana)
- 08.linux监控(node_exporter+prometheus+grafana)
- 09.micoservice监控
- 10.nacos监控
- 11.druid数据源监控
- 12.prometheus.yml
- 13.grafana告警
- 14.Alertmanager告警
- 15.监控微信告警
- 16.关于接口监控告警
- 17.prometheus-HA架构
- 18.总结
- 13.统一日志中心
- 01.统一日志中心建设意义
- 02.通过ELK收集mysql慢查询日志
- 03.通过elk收集微服务模块日志
- 04.通过elk收集nginx日志
- 05.统一日志中心性能优化
- 06.kibana安装部署
- 07.日志清理方案
- 08.日志性能测试指标
- 09.总结
- 14.数据查询平台
- 01.数据查询平台架构
- 02.mysql配置bin-log
- 03.单节点canal-server
- 04.canal-ha部署
- 05.canal-kafka部署
- 06.实时增量数据同步mysql
- 07.canal监控
- 08.clickhouse运维常见脚本
- 15.APM监控
- 1.Elastic APM
- 2.Skywalking
- 01.docker部署es
- 02.部署skywalking-server
- 03.部署skywalking-agent
- 16.压力测试
- 1.ocp.jmx
- 2.test.bat
- 3.压测脚本
- 4.压力报告
- 5.报告分析
- 6.压测平台
- 7.并发测试
- 8.wrk工具
- 9.nmon
- 10.jmh测试
- 17.SQL优化
- 1.oracle篇
- 01.基线测试
- 02.调优前奏
- 03.线上瓶颈定位
- 04.执行计划解读
- 05.高级SQL语句
- 06.SQL tuning
- 07.数据恢复
- 08.深入10053事件
- 09.深入10046事件
- 2.mysql篇
- 01.innodb存储引擎
- 02.BTree索引
- 03.执行计划
- 04.查询优化案例分析
- 05.为什么会走错索引
- 06.表连接优化问题
- 07.Connection连接参数
- 08.Centos7系统参数调优
- 09.mysql监控
- 10.高级SQL语句
- 11.常用维护脚本
- 12.percona-toolkit
- 18.redis高可用方案
- 1.免密登录
- 2.安装部署
- 3.配置文件
- 4.启动脚本
- 19.消息中间件搭建
- 19-01.rabbitmq集群搭建
- 01.rabbitmq01
- 02.rabbitmq02
- 03.rabbitmq03
- 04.镜像队列
- 05.haproxy搭建
- 06.keepalived
- 19-02.rocketmq搭建
- 19-03.kafka集群
- 20.mysql高可用方案
- 1.环境
- 2.mysql部署
- 3.Xtrabackup部署
- 4.Galera部署
- 5.galera for mysql 集群
- 6.haproxy+keepalived部署
- 21.es集群部署
- 22.生产实施优化
- 1.linux优化
- 2.jvm优化
- 3.feign优化
- 4.zuul性能优化
- 23.线上问题诊断
- 01.CPU性能评估工具
- 02.内存性能评估工具
- 03.IO性能评估工具
- 04.网络问题工具
- 05.综合诊断评估工具
- 06.案例诊断01
- 07.案例诊断02
- 08.案例诊断03
- 09.案例诊断04
- 10.远程debug
- 24.fiddler抓包实战
- 01.fiddler介绍
- 02.web端抓包
- 03.app抓包
- 25.疑难解答交流
- 01.有了auth/token获取token了为啥还要配置security的登录配置
- 02.权限数据存放在redis吗,代码在哪里啊
- 03.其他微服务和认证中心的关系
- 04.改包问题
- 05.use RequestContextListener or RequestContextFilter to expose the current request
- 06./oauth/token对应代码在哪里
- 07.验证码出不来
- 08./user/login
- 09.oauth无法自定义权限表达式
- 10.sleuth引发线程数过高问题
- 11.elk中使用7x版本问题
- 12.RedisCommandTimeoutException问题
- 13./oauth/token CPU过高
- 14.feign与权限标识符问题
- 15.动态路由RedisCommandInterruptedException: Command interrupted
- 26.学习资料
- 海量学习资料等你来拿
- 27.持续集成
- 01.git安装
- 02.代码仓库gitlab
- 03.代码仓库gogs
- 04.jdk&&maven
- 05.nexus安装
- 06.sonarqube
- 07.jenkins
- 28.Rancher部署
- 1.rancher-agent部署
- 2.rancher-server部署
- 3.ocp后端部署
- 4.演示前端部署
- 5.elk部署
- 6.docker私服搭建
- 7.rancher-server私服
- 8.rancher-agent docker私服
- 29.K8S部署OCP
- 01.准备OCP的构建环境和部署环境
- 02.部署顺序
- 03.在K8S上部署eureka-server
- 04.在K8S上部署mysql
- 05.在K8S上部署redis
- 06.在K8S上部署auth-server
- 07.在K8S上部署user-center
- 08.在K8S上部署api-gateway
- 09.在K8S上部署back-center
- 30.Spring Cloud Alibaba
- 01.统一的依赖管理
- 02.nacos-server
- 03.生产可用的Nacos集群
- 04.nacos配置中心
- 05.common.yaml
- 06.user-center
- 07.auth-server
- 08.api-gateway
- 09.log-center
- 10.file-center
- 11.back-center
- 12.sentinel-dashboard
- 12.01.sentinel流控规则
- 12.02.sentinel熔断降级规则
- 12.03.sentinel热点规则
- 12.04.sentinel系统规则
- 12.05.sentinel规则持久化
- 12.06.sentinel总结
- 13.sentinel整合openfeign
- 14.sentinel整合网关
- 1.sentinel整合zuul
- 2.sentinel整合scg
- 15.Dubbo与Nacos共存
- 31.Java源码剖析
- 01.基础数据类型和String
- 02.Arrays工具类
- 03.ArrayList源码分析
- 32.面试专题汇总
- 01.JVM专题汇总
- 02.多线程专题汇总
- 03.Spring专题汇总
- 04.springboot专题汇总
- 05.springcloud面试汇总
- 文档问题跟踪处理