🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
> ### 十三例 NSQ操作 * 软件下载(服务) :[https://nsq.io/deployment/installing.html](https://nsq.io/deployment/installing.html) * * * * nsqlookupd.exe * nsqd.exe --lookupd-tcp-address=127.0.0.1:4160 * nsqadmin.exe --lookupd-http-address localhost:4161 * * * * 一个节点下创建多个消息队列, 给多个模块消费 ![img](https://box.kancloud.cn/6f832a53734d2fff7b802f9bdfa2e9de_1906x816.png) > ### 生产者 ~~~ package main import ( "bufio" "fmt" "os" "strings" "github.com/nsqio/go-nsq" ) func main() { nsqAddress := "127.0.0.1:4150" config := nsq.NewConfig() producer, err := nsq.NewProducer(nsqAddress, config) if err != nil{ fmt.Println(err) } //读取控制台输入 reader := bufio.NewReader(os.Stdin) for { data, err := reader.ReadString('\n') if err != nil { fmt.Printf("read string failed, err:%v\n", err) continue } data = strings.TrimSpace(data) if data == "stop" { break } err = producer.Publish("queue", []byte(data)) if err != nil { fmt.Printf("publish message failed, err:%v\n", err) continue } fmt.Printf("publish data:%s succ\n", data) } } ~~~ > ### 消费者 ~~~ package main import ( "fmt" "os" "syscall" "time" "os/signal" "github.com/nsqio/go-nsq" ) // 消费者 type Consumer struct { } //处理消息 func (*Consumer) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } //初始化消费者 func initConsumer(topic string, channel string, address string) error { cfg := nsq.NewConfig() cfg.LookupdPollInterval = 15 * time.Second //设置服务发现的轮询时间 c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者 if err != nil { return err } consumer := &Consumer{} c.AddHandler(consumer) // 添加消费者接口 //建立NSQLookupd连接 if err := c.ConnectToNSQLookupd(address); err != nil { return err } return nil } // 主函数 func main() { err := initConsumer("queue", "channel1", "127.0.0.1:4161") if err != nil { fmt.Printf("init consumer failed, err:%v\n", err) return } c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT) <-c } ~~~