> ### 十三例 NSQ操作
* 软件下载(服务) :[https://nsq.io/deployment/installing.html](https://nsq.io/deployment/installing.html)
* * *
* nsqlookupd.exe
* nsqd.exe --lookupd-tcp-address=127.0.0.1:4160
* nsqadmin.exe --lookupd-http-address localhost:4161
* * *
* 一个节点下创建多个消息队列, 给多个模块消费
![img](https://box.kancloud.cn/6f832a53734d2fff7b802f9bdfa2e9de_1906x816.png)
> ### 生产者
~~~
package main
import (
"bufio"
"fmt"
"os"
"strings"
"github.com/nsqio/go-nsq"
)
func main() {
nsqAddress := "127.0.0.1:4150"
config := nsq.NewConfig()
producer, err := nsq.NewProducer(nsqAddress, config)
if err != nil{
fmt.Println(err)
}
//读取控制台输入
reader := bufio.NewReader(os.Stdin)
for {
data, err := reader.ReadString('\n')
if err != nil {
fmt.Printf("read string failed, err:%v\n", err)
continue
}
data = strings.TrimSpace(data)
if data == "stop" {
break
}
err = producer.Publish("queue", []byte(data))
if err != nil {
fmt.Printf("publish message failed, err:%v\n", err)
continue
}
fmt.Printf("publish data:%s succ\n", data)
}
}
~~~
> ### 消费者
~~~
package main
import (
"fmt"
"os"
"syscall"
"time"
"os/signal"
"github.com/nsqio/go-nsq"
)
// 消费者
type Consumer struct {
}
//处理消息
func (*Consumer) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
//初始化消费者
func initConsumer(topic string, channel string, address string) error {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = 15 * time.Second //设置服务发现的轮询时间
c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
if err != nil {
return err
}
consumer := &Consumer{}
c.AddHandler(consumer) // 添加消费者接口
//建立NSQLookupd连接
if err := c.ConnectToNSQLookupd(address); err != nil {
return err
}
return nil
}
// 主函数
func main() {
err := initConsumer("queue", "channel1", "127.0.0.1:4161")
if err != nil {
fmt.Printf("init consumer failed, err:%v\n", err)
return
}
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
<-c
}
~~~
- 第一例 留言板
- 第二例 gRPC使用例子
- 第三例 基于go-micro做服务注册和服务发现
- 第四例 聊天室
- 第五例 工具库 第五例 并发安全字典
- dao
- common
- common.go
- config
- config.go
- gorm
- grom.go
- sqlx
- sqlx.go
- kafka
- kafka.go
- log
- log.go
- log2.go
- redis
- redis.go
- zookeeper
- zookeeper.go
- init
- main.go
- 第六例 原生sql操作
- 第七例 sqlx操作
- 第八例 Redis数据库(gomodule/redigo)
- 第九例 Redis消息队列
- 第十例 Redis集群连接
- 第十一例 Zookeeper操作
- 第十二例 Kafka操作
- 第十三例 NSQ操作
- 第十四例 二分查找
- 第十五例 交换排序 - 冒泡排序
- 第十六例 插入排序 - 直接插入排序
- 第十七例 插入排序 - 希尔排序
- 第十八例 交换排序 - 快速排序
- 第十九例 算法求解应用
- 第二十例 pprof性能分析
- 第二一例 CPU信息采集
- 第二二例 Heap信息采集
- 第二三例 Http信息采集
- 第二四例 单元测试(功能测试)
- 第二五例 基准测试(压力测试/性能测试)
- 第二六例 gdb调试
- 第二七例 json序列化和反序列化
- 第二八例 protobuf序列化和反序列化
- 第二九例 包管理工具 go vendor
- 第三十例 包管理工具 go mod
- 第三一例 zip压缩
- 第三二例 交叉编译
- 第三三例 线上环境部署
- 第三四例 业务:实现固定周期维护
- 第三五例 聊天室(精简版)
- 第三六例 并发安全字典
- 第三七例 导出Excel表格
- 第三八例 导出CSV表格
- 第三九例 聊天室(高并发)
- 第四十例 JWT (Json Web Token)
- 第四一例 雪花算法生成 Id
- 第四二例 对称加密 AES
- 第四三例 非对称加密 RSA
- 第四四例 签名算法 SHA1
- 第四五例 数据库操作 gorm
- 第四六例 数据库操作 gorm 集合
- 数据库连接和创建表
- 查询 - 分页
- 查询所有数据
- 查询单条数据
- 插入一条或多条数据
- 更新一条或多条数据
- 更新一条或多条数据(有零值)
- 第四七例 RSA(MD5WithRSA 算法)签名和验签方式
- 第四八例 线上部署脚本
- 第四九例 Elasticsearch
- 第五十例 对象池
- 第五十一例 相关阅读