企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
Routing模式 === > 一个消息可以被多个消费者获取.并且消息的目标队列可以被生产者指定 ![](https://box.kancloud.cn/4c09f3c0680f64b28290375d4915b22d_646x195.png) ~~~ // 路由模式: 创建RabbitMQ实例 func NewRabbitMQPubRoutiong(exchangeName,routingKey string) *RabbitMQ { mq := NewRabbitMQ("", exchangeName, routingKey) return mq } // 路由模式: 发送消息 func (r *RabbitMQ) PublishRouting(message string) error { // 1.尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 交换机名称 "direct",// 交换机类型 [路由模式下 交换机类型 direct ] true, // 是否持久化 false, // 是否自动删除 false, // 如果是true表示这个exchange不可以被client用来推送exchange和exchange之间绑定 false, // nil, //其他参数 ) if err != nil { return err } // 2. 发送消息 err = r.channel.Publish( r.Exchange, r.Key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }, ) return err } // 路由模式模式: 消费消息 func (r *RabbitMQ) ConsumptionRouting() { // 1.尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 交换机名称 "direct",// 交换机类型 [路由模式下 交换机类型 direct ] true, // 是否持久化 false, // 是否自动删除 false, // 如果是true表示这个exchange不可以被client用来推送exchange和exchange之间绑定 false, // nil, //其他参数 ) if err != nil { log.Print(err.Error()) return } // 2.试探性创建队列,注意队列名称不要写 queue, err := r.channel.QueueDeclare( "", //随机生产队列名称 false, false, true, // 排他 false, nil, ) if err != nil { log.Print(err.Error()) return } // 3.绑定队列到exchange中 err = r.channel.QueueBind( queue.Name, // 队列名称 r.Key, // 订阅写key必须为空 r.Exchange,// 交换机 false, nil, ) if err != nil { log.Print(err.Error()) return } // 4. 消费消息 msgch, err := r.channel.Consume( queue.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for { select { case data := <-msgch: fmt.Printf("%s\n",data.Body) } } }() <-forever } ~~~ ### 生产者: ~~~ func main() { one := RabbitMQ.NewRabbitMQPubRoutiong("exOne","ones") two := RabbitMQ.NewRabbitMQPubRoutiong("exOne","twos") for i:=0 ;i<30;i++ { one.PublishRouting("Hello one!" + strconv.Itoa(i)) two.PublishRouting("Hello two!" + strconv.Itoa(i)) time.Sleep(time.Second) } } ~~~ ### 消费者 ~~~ package main import "High-concurrent-spike-system/RabbitMQ" func main() { one := RabbitMQ.NewRabbitMQPubRoutiong("exOne","ones") one.ConsumptionRouting() } ~~~