企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
Publish模式 === Publish/Subscribe,订阅模式 > 消息被路由投递个给多个队列,一个消息被多个消费者获取 (可以被多个消费者重复消费) ![](https://box.kancloud.cn/a4bc3086a03c17c8083eb13ae56d0b12_551x186.png) ~~~ // 订阅模式: 创建RabbitMQ实例 func NewRabbitMQPubSub(exchangeName string) *RabbitMQ { mq := NewRabbitMQ("", exchangeName, "") return mq } // 订阅模式: 发送消息 func (r *RabbitMQ) PublishPub(message string) error { // 1.尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 交换机名称 "fanout",// 交换机类型 [发布订阅模式下 交换机类型 fanout ] true, // 是否持久化 false, // 是否自动删除 false, // 如果是true表示这个exchange不可以被client用来推送exchange和exchange之间绑定 false, // nil, //其他参数 ) if err != nil { return err } // 2. 发送消息 err = r.channel.Publish( r.Exchange, "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }, ) return err } // 订阅模式: 消费消息 func (r *RabbitMQ) ConsumptionSub() { // 1.尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 交换机名称 "fanout",// 交换机类型 [发布订阅模式下 交换机类型 fanout ] true, // 是否持久化 false, // 是否自动删除 false, // 如果是true表示这个exchange不可以被client用来推送exchange和exchange之间绑定 false, // nil, //其他参数 ) if err != nil { log.Print(err.Error()) return } // 2.试探性创建队列,注意队列名称不要写 queue, err := r.channel.QueueDeclare( "", //随机生产队列名称 false, false, true, // 排他 false, nil, ) if err != nil { log.Print(err.Error()) return } // 3.绑定队列到exchange中 err = r.channel.QueueBind( queue.Name, // 队列名称 "", // 订阅写key必须为空 r.Exchange,// 交换机 false, nil, ) if err != nil { log.Print(err.Error()) return } // 4. 消费消息 msgch, err := r.channel.Consume( queue.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for { select { case data := <-msgch: fmt.Printf("%s\n",data.Body) } } }() <-forever } ~~~ ### 生产者 ~~~ // 生产者 func main() { sub := RabbitMQ.NewRabbitMQPubSub("newProduct") for i:=0;i<1000;i++ { sub.PublishPub("订阅模式生产第: " + strconv.Itoa(i)) } } ~~~ ### 消费者 ~~~ func main() { sub := RabbitMQ.NewRabbitMQPubSub("newProduct") sub.ConsumptionSub() } ~~~