Publish模式
===
Publish/Subscribe,订阅模式
> 消息被路由投递个给多个队列,一个消息被多个消费者获取 (可以被多个消费者重复消费)
![](https://box.kancloud.cn/a4bc3086a03c17c8083eb13ae56d0b12_551x186.png)
~~~
// 订阅模式: 创建RabbitMQ实例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
mq := NewRabbitMQ("", exchangeName, "")
return mq
}
// 订阅模式: 发送消息
func (r *RabbitMQ) PublishPub(message string) error {
// 1.尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名称
"fanout",// 交换机类型 [发布订阅模式下 交换机类型 fanout ]
true, // 是否持久化
false, // 是否自动删除
false, // 如果是true表示这个exchange不可以被client用来推送exchange和exchange之间绑定
false, //
nil, //其他参数
)
if err != nil {
return err
}
// 2. 发送消息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
return err
}
// 订阅模式: 消费消息
func (r *RabbitMQ) ConsumptionSub() {
// 1.尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名称
"fanout",// 交换机类型 [发布订阅模式下 交换机类型 fanout ]
true, // 是否持久化
false, // 是否自动删除
false, // 如果是true表示这个exchange不可以被client用来推送exchange和exchange之间绑定
false, //
nil, //其他参数
)
if err != nil {
log.Print(err.Error())
return
}
// 2.试探性创建队列,注意队列名称不要写
queue, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true, // 排他
false,
nil,
)
if err != nil {
log.Print(err.Error())
return
}
// 3.绑定队列到exchange中
err = r.channel.QueueBind(
queue.Name, // 队列名称
"", // 订阅写key必须为空
r.Exchange,// 交换机
false,
nil,
)
if err != nil {
log.Print(err.Error())
return
}
// 4. 消费消息
msgch, err := r.channel.Consume(
queue.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for {
select {
case data := <-msgch:
fmt.Printf("%s\n",data.Body)
}
}
}()
<-forever
}
~~~
### 生产者
~~~
// 生产者
func main() {
sub := RabbitMQ.NewRabbitMQPubSub("newProduct")
for i:=0;i<1000;i++ {
sub.PublishPub("订阅模式生产第: " + strconv.Itoa(i))
}
}
~~~
### 消费者
~~~
func main() {
sub := RabbitMQ.NewRabbitMQPubSub("newProduct")
sub.ConsumptionSub()
}
~~~