[TOC]
## 1. 消息出来
topic:队列的集合称为topic
## 1.1 Simple
消息直接发送,无法保证
## 1.2 Order
### 1.2.1 使用场景
> 1. 一个生产者可以发送消息给多给topic
> 2. 一个topic默认有4个队列
> 3. producer以roundrobin(轮询)的方式给多个队列发送消息
> 4. 同一个队列消息遵守FIFO
> * 顺序消费:
> 例如在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。
> RocketMQ可以保证顺序消费,他的实现是生产者将这个三个消息放在topic的一个队列里面,单机支持上万个持久化队列,消费端去消费的时候也是只能有一个Consumer去取得这个队列里面的数据,然后顺序消费。
> * rocketmq的顺序消息需要满足2点:
> 1.Producer端保证发送消息有序,且发送到同一个队列。
> 2.consumer端只能让一个consumer保证消费同一个队列。
### 1.2.2 使用场景 如何实现
### 1.2.3 使用场景producer顺序发送消息到同一队列
> 1. 默认的情况下,producer会向topic(队列的集合,默认四个队列)中的队列轮询式的发生消息,这就不满足顺序消费一系列消息发送到一个队列的要求,所以要修改向队列发送消息的方法。
> 2. 重写MessageQueueSelector,从字面理解就是消息队列选择器,非常的贴切!原理就是在队列数量不变的情况下,通过一系列事务的编号(订单id)和队列叔取模
~~~
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 3); # 这里的3用于取模运算,相同编号的数据会路由到同一个队列当中去
~~~
这里设置编号为1
![](https://box.kancloud.cn/e55d1a164eae168f29585825ddc278a9_1737x549.png)
这里设置编号为3,验证了topic默认四个队列,且可以指定消息用于取模的id
![](https://box.kancloud.cn/f01f5866332953af0a98498f32d1f5de_1139x484.png)
以上可以保证同一系列事务被发送到了一个队列当中。
### 1.2.4 使用场景 某一个Consumer顺序消费同一个队列
通过设置Listener实现
1. MessageListenerOrderly(有序的)
实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列
自动实现顺序消费
~~~
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
;
return ConsumeOrderlyStatus.SUCCESS;
}
});
~~~
2. MessageListenerConcurrently(无序的)
需要把线程池改为单线程模式。
> 1. ConsumeMessageOrderlyService类的start()方法,如果是集群消费,则启动定时任务,定时向broker发送批量锁住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合。
> consumer收到后,设置是否锁住标志位。
> 这里注意2个变量:
> consumer端的RebalanceImpl里的ConcurrentHashMap processQueueTable,是否锁住设置在ProcessQueue里。
> broker端的RebalanceLockManager里的ConcurrentHashMap> mqLockTable,这里维护着全局队列锁。
> 2. ConsumeMessageOrderlyService.ConsumeRequest的run方法是消费消息,这里还有个MessageQueueLock messageQueueLock,维护当前consumer端的本地队列锁。保证当前只有一个线程能够进行消费。
> 3. 拉到消息存入ProcessQueue,然后判断,本地是否获得锁,全局队列是否被锁住,然后从ProcessQueue里取出消息,用MessageListenerOrderly进行消费。
> 拉到消息后调用ProcessQueue.putMessage(final List msgs) 存入,具体是存入TreeMap msgTreeMap。
> 然后是调用ProcessQueue.takeMessags(final int batchSize)消费,具体是把msgTreeMap里消费过的消息,转移到TreeMap msgTreeMapTemp。
> 4. 本地消费的事务控制,ConsumeOrderlyStatus.SUCCESS(提交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(挂起一会再消费),在此之前还有一个变量ConsumeOrderlyContext context的setAutoCommit()是否自动提交。
> 当SUSPEND_CURRENT_QUEUE_A_MOMENT时,autoCommit设置为true或者false没有区别,本质跟消费相反,把消息从msgTreeMapTemp转移回msgTreeMap,等待下次消费。
> 当SUCCESS时,autoCommit设置为true时比设置为false多做了2个动作,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
> ProcessQueue.commit() :本质是删除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消费时从msgTreeMap转移过来的。
> this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本质是把拉消息的偏移量更新到本地,然后定时更新到broker。
> 那么少了这2个动作会怎么样呢,随着消息的消费进行,msgTreeMapTemp里的消息堆积越来越多,消费消息的偏移量一直没有更新到broker导致consumer每次重新启动后都要从头开始重复消费。
> 就算更新了offset到broker,那么msgTreeMapTemp里的消息堆积呢?不知道这算不算bug。
> 所以,还是把autoCommit设置为true比较好。
## 2. 生产中的使用
### 2.1 使用注意事项
> 1. 消费者处理MQ消息时必须幂等性(即无论接收到多少相同的消息,执行后的结果一致),如果不具有幂等性,则转换成幂等性处理方法;
> 2. 业务方自己保证每条发送到RocketMQ消息都有唯一的ID,这样消费者根据消息的唯一ID去重,并确保消息处理成功。
### 2.2 java 交互RocketMQ
#### 2.2.1 producer
~~~
package com.aixin.lovetocar.rocketmq.util;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
/**
* Created by dailin on 2018/4/25.
*/
public class RocketMQProducer {
private static DefaultMQProducer defaultMQProducer;
/**
* @param groupName 指定producer组
* @param nameServer namerserver地址
*/
public RocketMQProducer(String groupName, String nameServer) throws MQClientException {
defaultMQProducer = new DefaultMQProducer(groupName);
defaultMQProducer.setNamesrvAddr(nameServer);
defaultMQProducer.start(); //producer开始
}
/**
* 同步发送消息
*
* @param topic
* @param tags
* @param key
* @param data
* @throws Exception
*/
public void sentSynData(String topic, String tags, String key, String data) throws Exception {
Message msg = new Message(topic, tags, key, data.getBytes()); //封装消息
SendResult sendResult = defaultMQProducer.send(msg); //发送消息
System.out.printf("%s%n", sendResult);
}
/**
* 同步发送消息
*
* @param topic
* @param tags
* @param data
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
* @throws MQBrokerException
*/
public void sentSynData(String topic, String tags, String data) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
Message msg = new Message(topic, tags, data.getBytes()); //封装消息
SendResult sendResult = defaultMQProducer.send(msg); //发送消息
System.out.printf("%s%n", sendResult);
}
/**
* 发送顺序消息
*
* @param topic
* @param tags
* @param data
* @param order
* @throws InterruptedException
* @throws RemotingException
* @throws MQClientException
* @throws MQBrokerException
*/
public void sentOrderDate(String topic, String tags, String key, String data, Integer order) throws InterruptedException, RemotingException,
MQClientException, MQBrokerException {
Message msg = new Message(topic, tags, key, data.getBytes());
SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, order);
System.out.println(sendResult);
}
/**
* 发送顺序消息
*
* @param topic
* @param tags
* @param data
* @param order
* @throws InterruptedException
* @throws RemotingException
* @throws MQClientException
* @throws MQBrokerException
*/
public void sentOrderDate(String topic, String tags, String data, Integer order) throws InterruptedException, RemotingException,
MQClientException, MQBrokerException {
Message msg = new Message(topic, tags, data.getBytes());
//队列选择
SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, order);
System.out.println(sendResult);
}
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
}
/**
* 关闭producer与RocketMQ的连接
*/
public void shudownProducer() {
defaultMQProducer.shutdown();
}
}
~~~
- Docker
- 什么是docker
- Docker安装、组件启动
- docker网络
- docker命令
- docker swarm
- dockerfile
- mesos
- 运维
- Linux
- Linux基础
- Linux常用命令_1
- Linux常用命令_2
- ip命令
- 什么是Linux
- SELinux
- Linux GCC编译警告:Clock skew detected. 错误解决办法
- 文件描述符
- find
- 资源统计
- LVM
- Linux相关配置
- 服务自启动
- 服务器安全
- 字符集
- shell脚本
- shell命令
- 实用脚本
- shell 数组
- 循环与判断
- 系统级别进程开启和停止
- 函数
- java调用shell脚本
- 发送邮件
- Linux网络配置
- Ubuntu
- Ubuntu发送邮件
- 更换apt-get源
- centos
- 防火墙
- 虚拟机下配置网络
- yum重新安装
- 安装mysql5.7
- 配置本地yum源
- 安装telnet
- 忘记root密码
- rsync+ crontab
- Zabbix
- Zabbix监控
- Zabbix安装
- 自动报警
- 自动发现主机
- 监控MySQL
- 安装PHP常见错误
- 基于nginx安装zabbix
- 监控Tomcat
- 监控redis
- web监控
- 监控进程和端口号
- zabbix自定义监控
- 触发器函数
- zabbix监控mysql主从同步状态
- Jenkins
- 安装Jenkins
- jenkins+svn+maven
- jenkins执行shell脚本
- 参数化构建
- maven区分环境打包
- jenkins使用注意事项
- nginx
- nginx认证功能
- ubuntu下编译安装Nginx
- 编译安装
- Nginx搭建本地yum源
- 文件共享
- Haproxy
- 初识Haproxy
- haproxy安装
- haproxy配置
- virtualbox
- virtualbox 复制新的虚拟机
- ubuntu下vitrualbox安装redhat
- centos配置双网卡
- 配置存储
- Windows
- Windows安装curl
- VMware vSphere
- 磁盘管理
- 增加磁盘
- gitlab
- 安装
- tomcat
- Squid
- bigdata
- FastDFS
- FastFDS基础
- FastFDS安装及简单实用
- api介绍
- 数据存储
- FastDFS防盗链
- python脚本
- ELK
- logstash
- 安装使用
- kibana
- 安准配置
- elasticsearch
- elasticsearch基础_1
- elasticsearch基础_2
- 安装
- 操作
- java api
- 中文分词器
- term vector
- 并发控制
- 对text字段排序
- 倒排和正排索引
- 自定义分词器
- 自定义dynamic策略
- 进阶练习
- 共享锁和排它锁
- nested object
- 父子关系模型
- 高亮
- 搜索提示
- Redis
- redis部署
- redis基础
- redis运维
- redis-cluster的使用
- redis哨兵
- redis脚本备份还原
- rabbitMQ
- rabbitMQ安装使用
- rpc
- RocketMQ
- 架构概念
- 安装
- 实例
- 好文引用
- 知乎
- ACK
- postgresql
- 存储过程
- 编程语言
- 计算机网络
- 基础_01
- tcp/ip
- http转https
- Let's Encrypt免费ssl证书(基于haproxy负载)
- what's the http?
- 网关
- 网络IO
- http
- 无状态网络协议
- Python
- python基础
- 基础数据类型
- String
- List
- 遍历
- Python基础_01
- python基础_02
- python基础03
- python基础_04
- python基础_05
- 函数
- 网络编程
- 系统编程
- 类
- Python正则表达式
- pymysql
- java调用python脚本
- python操作fastdfs
- 模块导入和sys.path
- 编码
- 安装pip
- python进阶
- python之setup.py构建工具
- 模块动态导入
- 内置函数
- 内置变量
- path
- python模块
- 内置模块_01
- 内置模块_02
- log模块
- collections
- Twisted
- Twisted基础
- 异步编程初探与reactor模式
- yield-inlineCallbacks
- 系统编程
- 爬虫
- urllib
- xpath
- scrapy
- 爬虫基础
- 爬虫种类
- 入门基础
- Rules
- 反反爬虫策略
- 模拟登陆
- problem
- 分布式爬虫
- 快代理整站爬取
- 与es整合
- 爬取APP数据
- 爬虫部署
- collection for ban of web
- crawlstyle
- API
- 多次请求
- 向调度器发送请求
- 源码学习
- LinkExtractor源码分析
- 构建工具-setup.py
- selenium
- 基础01
- 与scrapy整合
- Django
- Django开发入门
- Django与MySQL
- java
- 设计模式
- 单例模式
- 工厂模式
- java基础
- java位移
- java反射
- base64
- java内部类
- java高级
- 多线程
- springmvc-restful
- pfx数字证书
- 生成二维码
- 项目中使用log4j
- 自定义注解
- java发送post请求
- Date时间操作
- spring
- 基础
- spring事务控制
- springMVC
- 注解
- 参数绑定
- springmvc+spring+mybatis+dubbo
- MVC模型
- SpringBoot
- java配置入门
- SpringBoot基础入门
- SpringBoot web
- 整合
- SpringBoot注解
- shiro权限控制
- CommandLineRunner
- mybatis
- 静态资源
- SSM整合
- Aware
- Spring API使用
- Aware接口
- mybatis
- 入门
- mybatis属性自动映射、扫描
- 问题
- @Param 注解在Mybatis中的使用 以及传递参数的三种方式
- mybatis-SQL
- 逆向生成dao、model层代码
- 反向工程中Example的使用
- 自增id回显
- SqlSessionDaoSupport
- invalid bound statement(not found)
- 脉络
- beetl
- beetl是什么
- 与SpringBoot整合
- shiro
- 什么是shiro
- springboot+shrio+mybatis
- 拦截url
- 枚举
- 图片操作
- restful
- java项目中日志处理
- JSON
- 文件工具类
- KeyTool生成证书
- 兼容性问题
- 开发规范
- 工具类开发规范
- 压缩图片
- 异常处理
- web
- JavaScript
- 基础语法
- 创建对象
- BOM
- window对象
- DOM
- 闭包
- form提交-文件上传
- td中内容过长
- 问题1
- js高级
- js文件操作
- 函数_01
- session
- jQuery
- 函数01
- data()
- siblings
- index()与eq()
- select2
- 动态样式
- bootstrap
- 表单验证
- 表格
- MUI
- HTML
- iframe
- label标签
- 规范编程
- layer
- sss
- 微信小程序
- 基础知识
- 实践
- 自定义组件
- 修改自定义组件的样式
- 基础概念
- appid
- 跳转
- 小程序发送ajax
- 微信小程序上下拉刷新
- if
- 工具
- idea
- Git
- maven
- svn
- Netty
- 基础概念
- Handler
- SimpleChannelInboundHandler 与 ChannelInboundHandler
- 网络编程
- 网络I/O
- database
- oracle
- 游标
- PLSQL Developer
- mysql
- MySQL基准测试
- mysql备份
- mysql主从不同步
- mysql安装
- mysql函数大全
- SQL语句
- 修改配置
- 关键字
- 主从搭建
- centos下用rpm包安装mysql
- 常用sql
- information_scheme数据库
- 值得学的博客
- mysql学习
- 运维
- mysql权限
- 配置信息
- 好文mark
- jsp
- jsp EL表达式
- C
- test