# 消息队列
在日常业务开发需求中,经常会用到消息队列 rocketmq,nsq,kafka等工具来完成诸如排队,秒杀,异步处理等业务;
为了能让我们方便快捷的接入各种消息队列,Orange框架对日常使用的队列生产/队列消费进行了封装,基于接口进行实现,无论使用何种消息队列,都只是一个配置的问题;
领导说用rocketmq我们就用rocketmq,领导说换成redis,我们就给他换成redis,完全不慌,极大简化接入成本;😁😁😁
## 组件说明
- 组件基于接口实现,不同的消息队列通过配置文件指定
- 目前支持了 rocketmq、kafka、redis驱动;
## 配置说明
如下是完整的配置选项:
~~~
[queue]
driver="redis" // rocketmq 或 redis
retry=2 // 重试次数
version="2.0.0.0" // kafka专属配置,默认2.0.0.0
randClient=true // kafka专属配置,默认true,开启随机生成clientID,可以实现启动多实例同时一起消费相同topic,加速消费能力的特性
endpoints=["192.168.1.1:6379"] // brocker 或 redis 服务地址,当用redis驱动是只用填一个主库地址即可
passwd="123456" // redis驱动专属配置,redis密码
redisdb=4 // redis驱动专属配置,redis库编号
timeout=3600 // redis专属配置,队列超时时间(s) ,当队列一直没有被消费到达超时时间则队列会被销毁
~~~
## 使用指南
### 注册生产者
- 通过如下代码注册生产者并向队列发送一条消息;
- 参数说明,注册方法 `NewProducer` 需要传入队列 group 名称;
- 消息发送方法 `SendMsg` 需要指定消息队列 topic 名称和消息体内容;
~~~
client, _:= queue.NewProducer("test")
ret, err := client.SendMsg("test-topic", "Hello World!")
~~~
### 注册消费者
消费者采用的是监听并执行回调方法的模式实现,注册好消费者后,程序会持续地监听队列并消费数据;
和生产者一样,消费者方法 `NewConsumer` 需要传入队列 group 名称;
消费者消息监听方法 `ListenReceiveMsgDo` 传入消息 topic 和一个消息处理的回调方法,接受到消息后的处理逻辑都放在该回调方法中即可完成消费流程;
~~~
client, _ := queue.NewConsumer("test")
client.ListenReceiveMsgDo("test", func(mqMsg queue.MqMsg) {
fmt.Println("recive===>", mqMsg, mqMsg.BodyString())
})
~~~
### 完整示例
- 先准备好一个redis服务作为示例
- 在 GOPATH 下创建一个demo目录,目录中创建两个文件 main.go, config.toml;
~~~
main.go
------------------
package main
import (
"gitee.com/zhucheer/orange/app"
"gitee.com/zhucheer/orange/queue"
"fmt"
)
func main(){
router := &Route{}
app.AppStart(router)
}
type Route struct {
}
func (s *Route) ServeMux() {
app.NewRouter("").GET("/", func(ctx *app.Context) error {
client, _ := queue.NewProducer("test")
ret, _ := client.SendMsg("test", "hello mq producer!")
return ctx.ToJson(ret)
})
}
func (s *Route) Register() {
client, _ := queue.NewConsumer("test")
client.ListenReceiveMsgDo("test", func(mqMsg queue.MqMsg) {
fmt.Println("recive msg===>", mqMsg.BodyString())
})
}
~~~
在配置文件中添加好对应的redis服务信息
~~~
config.toml
------------------
[queue]
driver="redis"
groupName= "dev-api"
retry=2
endpoints=["192.168.1.1:6379"]
passwd="123456"
redisdb=4
timeout=3600
~~~
最后拉取依赖并执行(开启 go moduls)
```
go mod init
go mod tidy
go run main.go --config=config.toml
```
程序启动后访问 http://127.0.0.1:8088/ 即可看到生产和消费的过程;