定时任务使用spring-task,分布式是基于redis实现,可保证同一任务同时只有一个节点在执行。
*****
**使用技术:**
* 分布式锁,基于redis实现
* 队列,基于redis实现
* 发布/订阅,基于redis实现
* 分布式缓存,基于redis实现
*****
实现原理:
1. 每个启动的节点服务都执行一个内置任务,每秒执行一次,基于分布式锁尝试锁定自己为master节点,集群环境中只有一个master节点。
2. master节点的每个定时任务执行时向队列推入一个令牌,同时向集群所有节点广播一条消息,各节点收到消息后从队列中抢令牌,谁抢到谁执行真正的任务
```
/**
* 尝试锁定本服务作为master
*
* @param timeoutMillis
*/
@SuppressWarnings("unchecked")
public void tryLockMaster(final long timeoutMillis) {
final ICache healthCache = cacheManager.getICache(TaskConstants.TASK_HEALTH_CACHE_NAME);
if (healthCache.getNativeCache() instanceof RedisOperations) {
RedisOperations redisOperations = (RedisOperations) healthCache.getNativeCache();
//利用redis全局锁
RedisLock lock = new RedisLock(redisOperations, TaskConstants.TASK_HEALTH_CACHE_NAME);
lock.execute(new LockCallback<Boolean>() {
@Override
public Boolean doInLock(RedisConnection connection) {
if (!existMaster() || isMaster()) {
//将全局唯一ID置入缓存
healthCache.put(TaskConstants.TASK_HEALTH_KEY_NAME, TaskConstants.UU_ID);
//设置过期时间
healthCache.expire(TaskConstants.TASK_HEALTH_KEY_NAME, timeoutMillis / 1000);
log.info("I'm master! UUID[{}]", TaskConstants.UU_ID);
}
return true;
}
});
}
}
```
```
/**
* 集群中本任务只有一个服务(master)在执行
*
* @return
*/
@SuppressWarnings("unchecked")
public void doTask() {
MasterFactory masterFactory = getMasterFactory();
//如果使用集群模式则发送消息
if (masterFactory.isCluster()) {
if (masterFactory.isMaster()) {
//令牌入队。使用队列保证集群中同时只有一个服务执行任务,即集群中只有拿到令牌的服务才会执行任务
masterFactory.getQueueManager().getQueue(CURRENT_QUEUE).offer(1);
//发送执行任务命令
masterFactory.getPubSubManager().publish(1, CURRENT_CHANNEL);
}
}
//否则直接执行任务
else {
doExecute();
}
}
```
```
/**
* 接收消息并处理
*
* @param channel
* @param messageBody
* @param pattern
*/
@SuppressWarnings("unchecked")
@Override
public void onMessage(String channel, Object messageBody, String pattern) {
if (CURRENT_CHANNEL.equals(channel)) {
//使用队列保证集群中同时只有一个服务执行任务
IQueueManager queueManager = masterFactory.getQueueManager();
IQueue queue = queueManager.getQueue(CURRENT_QUEUE);
if (queue.poll() != null) {
//同步执行任务,即同一个任务直到上一次执行完成时再执行下次。单机运行时,spring task内部已经实现,但集群任务下需要另外实现,此处基于redis全局锁实现
if (queueManager instanceof RedisQueueManager) {
//基于redis全局锁实现
RedisOperations redisOperations = ((RedisQueueManager) queueManager).getRedisOperations();
RedisLock lock = new RedisLock(redisOperations, CURRENT_TASK_LOCK_NAME);
lock.execute(new LockCallback<Boolean>() {
@Override
public Boolean doInLock(RedisConnection connection) {
//执行具体的任务
doExecute();
return true;
}
});
}
//其他方式待实现...
else {
doExecute();
}
}
}
}
```
要想启用分布式任务模式需设置以下参数
--cache.cacheDriver=rediscache --mq.pubsub.import=true --mq.queue.import=true --redis.host=xxx.xxx.xxx.xxx
- walk简介
- 核心模块
- walk-data
- IData
- EntityHelper
- walk-cache
- 缓存管理器
- 缓存对象
- 缓存注解
- walk-batis
- 单表操作
- 批量操作
- 列表/分页查询
- 所有方法列表
- sql热部署
- 二级缓存
- 数据库方言
- 其他使用技巧
- 实体类生成工具
- walk-mq
- 队列管理器
- 队列对象
- 订阅/发布管理器
- 订阅器
- 发布器
- walk-shiro
- 用户认证/授权
- url动态授权/回收
- 分布式会话
- 无状态会话支持
- walk-base
- 前端基础框架
- 公共页面
- 自定义标签
- 自定义函数
- 组件及工具
- 后端基础框架
- 基础结构
- 表单校验
- 数据导入
- 数据导出
- 上传下载
- 静态参数加载器
- 静态参数翻译器
- 实体类翻译器
- sql翻译器
- 自定义翻译器
- 静态参数校验器
- 分布式任务
- 增删改查代码生成器
- walk-restful
- 请求报文
- 返回报文
- 节点翻译器
- api代码生成
- walk-activiti
- 接口封装
- 模型管理
- 流程图展示
- 集成方法
- walk-console
- 在线会话管理
- 静态参数表缓存管理
- 缓存管理
- 队列管理
- 发布/订阅管理
- walk-boot
- 常用功能
- 持久层操作
- 分布式缓存
- 分布式会话
- 分布式任务
- 前端常用功能
- 后端常用功能
- 工作流封装
- 多数据源支持
- 关于读写分离
- 常用工具类
- 代码生成工具
- SpringCloud集成
- 阿里edas平台支持
- 其他
- 开发规约
- 环境要求
- 工程示例
- 工程结构
- web工程
- API工程
- 后台任务
- 常见问题
- 事务不生效
- 分布式任务不生效
- 事务锁
- 变更历史