ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
# 消息队列 在日常业务开发需求中,经常会用到消息队列 rocketmq,nsq,kafka等工具来完成诸如排队,秒杀,异步处理等业务; 为了能让我们方便快捷的接入各种消息队列,Orange框架对日常使用的队列生产/队列消费进行了封装,基于接口进行实现,无论使用何种消息队列,都只是一个配置的问题; 领导说用rocketmq我们就用rocketmq,领导说换成redis,我们就给他换成redis,完全不慌,极大简化接入成本;😁😁😁 ## 组件说明 - 组件基于接口实现,不同的消息队列通过配置文件指定 - 目前支持了 rocketmq、kafka、redis驱动; ## 配置说明 如下是完整的配置选项: ~~~ [queue] driver="redis" // rocketmq 或 redis retry=2 // 重试次数 version="2.0.0.0" // kafka专属配置,默认2.0.0.0 randClient=true // kafka专属配置,默认true,开启随机生成clientID,可以实现启动多实例同时一起消费相同topic,加速消费能力的特性 endpoints=["192.168.1.1:6379"] // brocker 或 redis 服务地址,当用redis驱动是只用填一个主库地址即可 passwd="123456" // redis驱动专属配置,redis密码 redisdb=4 // redis驱动专属配置,redis库编号 timeout=3600 // redis专属配置,队列超时时间(s) ,当队列一直没有被消费到达超时时间则队列会被销毁 ~~~ ## 使用指南 ### 注册生产者 - 通过如下代码注册生产者并向队列发送一条消息; - 参数说明,注册方法 `NewProducer` 需要传入队列 group 名称; - 消息发送方法 `SendMsg` 需要指定消息队列 topic 名称和消息体内容; ~~~ client, _:= queue.NewProducer("test") ret, err := client.SendMsg("test-topic", "Hello World!") ~~~ ### 注册消费者 消费者采用的是监听并执行回调方法的模式实现,注册好消费者后,程序会持续地监听队列并消费数据; 和生产者一样,消费者方法 `NewConsumer` 需要传入队列 group 名称; 消费者消息监听方法 `ListenReceiveMsgDo` 传入消息 topic 和一个消息处理的回调方法,接受到消息后的处理逻辑都放在该回调方法中即可完成消费流程; ~~~ client, _ := queue.NewConsumer("test") client.ListenReceiveMsgDo("test", func(mqMsg queue.MqMsg) { fmt.Println("recive===>", mqMsg, mqMsg.BodyString()) }) ~~~ ### 完整示例 - 先准备好一个redis服务作为示例 - 在 GOPATH 下创建一个demo目录,目录中创建两个文件 main.go, config.toml; ~~~ main.go ------------------ package main import ( "gitee.com/zhucheer/orange/app" "gitee.com/zhucheer/orange/queue" "fmt" ) func main(){ router := &Route{} app.AppStart(router) } type Route struct { } func (s *Route) ServeMux() { app.NewRouter("").GET("/", func(ctx *app.Context) error { client, _ := queue.NewProducer("test") ret, _ := client.SendMsg("test", "hello mq producer!") return ctx.ToJson(ret) }) } func (s *Route) Register() { client, _ := queue.NewConsumer("test") client.ListenReceiveMsgDo("test", func(mqMsg queue.MqMsg) { fmt.Println("recive msg===>", mqMsg.BodyString()) }) } ~~~ 在配置文件中添加好对应的redis服务信息 ~~~ config.toml ------------------ [queue] driver="redis" groupName= "dev-api" retry=2 endpoints=["192.168.1.1:6379"] passwd="123456" redisdb=4 timeout=3600 ~~~ 最后拉取依赖并执行(开启 go moduls) ``` go mod init go mod tidy go run main.go --config=config.toml ``` 程序启动后访问 http://127.0.0.1:8088/ 即可看到生产和消费的过程;