企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## 1. 安装 ### 1.1 安装 下载deb包就行,要求有erlang环境 ### 1.2 配置 1. 第一件事要创建用户,因为缺省的guest/guest用户只能在本地登录,所以先用命令行创建一个tuna/tuna,并让他成为管理员。 ~~~ rabbitmqctl add_user tuna tuna rabbitmqctl set_user_tags tuna administrator ~~~ 2. 启用WEB管理。 `rabbitmq-plugins enable rabbitmq_management` 用新建的tuna用户,密码:tuna登录 ~~~ http://192.168.56.130:15672 ~~~ ![](https://box.kancloud.cn/a70aaa4baf8ed9e2251f2f0e77200510_1183x316.png) 3. 修改tuna访问权限 ![](https://box.kancloud.cn/a76a3e0a496a1e312508f293ca08d973_949x451.png) ## 2. 使用 ### 2.1 简单收发 1. producer ~~~ __author__ = 'dailin' import pika class MessageProducer: def getConnection(self): credentials = pika.PlainCredentials('tuna', 'tuna') connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.56.130', credentials=credentials )) return connection if __name__ == '__main__': producer = MessageProducer() # 获取连接 connection = producer.getConnection() # 打开管道 channel = connection.channel() # 在管道中声明队列 channel.queue_declare(queue='hello',durable=True) channel.basic_publish(exchange='', routing_key='hello', body='hello world', properties=pika.BasicProperties( delivery_mode=2 # 消息持久化 ) ) print(" [x] Sent 'Hello World!'") connection.close() ~~~ 2. consumer ~~~ __author__ = 'dailin' class Consumer: import pika # 使用tuna用户登录 credentials = pika.PlainCredentials('tuna', 'tuna') connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.56.130', port=5672, credentials=credentials )) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello',durable=True) # durable=True 队列持久化(队列中的消息没有持久化) def callback(ch, method, properties, body): print(ch, method, properties, body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() ~~~ ## 3. 消息持久化 主要有两步: 1. channel.queue_declare(queue='hello') 声明队列时(生产者和消费者),队列持久化,rabbitmq重启后,队列依然存在 2. 生产者:队列中的消息持久化 ~~~ channel.basic_publish(exchange='', routing_key='hello', body='hello world', properties=pika.BasicProperties( delivery_mode=2 # 消息持久化 ) ) ~~~ ## 4. 订阅发布 > 之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了, > An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type. > Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息 > fanout: 所有bind到此exchange的queue都可以接收消息(实时的,收到就收到了,类似于广播) > direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 > topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 >    表达式符号说明:#代表一个或多个字符,*代表任何字符 > 例:#.a会匹配a.a,aa.a,aaa.a等 > *.a会匹配a.a,b.a,c.a等 > 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout  > headers: 通过headers 来决定把消息发给哪些queue 1. publisher ~~~ import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', # 声明exchange,类型为fanout,名称为logs type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', # 使用logs(exchange) routing_key='', body=message) print(" [x] Sent %r" % message) connection.close() ~~~ 2. consumer ~~~ #_*_coding:utf-8_*_ import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', # 声明exchange,类型为fanout,名称为logs type='fanout') # 不指定queue名字,rabbit会随机分配一个名字,然后exchange向这个queue里发消息,消费者就可以收到了。clusive=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', # 让queue绑定exchange queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() ~~~