企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## RabbitMQ - windows下安装 - 安装Erlan 全部勾选 - 安装RabbitMQ 全部勾选 - cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin 目录 - rabbitmq-plugins.bat enable rabbitmq\_management 开启网页版控制台 - rabbitmq-server 启动RabbitMQ服务 - 在本地浏览器中输入:localhost:15672访问RabbitMQ的后台管理页面(初始化用户名和密码都是guest) >rabbitmqctl.bat list_queues #查看队列里的任务 >rabbitmqctl add_user admin 123456 #增加访问用户,默认用户guest只能本地访问。 >rabbitmqctl set_user_tags admin administrator 设置角色: >rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 设置默认vhost(“/”)访问权限 ` [启动rabbitmq,提示ERROR: node with name "rabbit" already running on "U57..."]` - 在任务管理器中结束进程 epmd.exe erl.exe - 在再sbin命令,rabbitmq-server start - linux安装 - 安装Erlang >yum install erlang - 安装RabbitMQ >wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm >yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm systemctl startr abbitmq-server.service systemctl status rabbitmq-server.service ``` # 查看当前所有用户 rabbitmqctl list_users # 查看默认guest用户的权限 rabbitmqctl list_user_permissions guest # 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户 rabbitmqctl delete_user guest # 添加新用户 rabbitmqctl add_user admin lssadmin # 设置用户tag rabbitmqctl set_user_tags admin administrator # 赋予用户默认vhost的全部操作权限 rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" # 查看用户的权限 rabbitmqctl list_user_permissions admin ``` - 端口放行 >firewall-cmd --zone=public --add-port=5672/tcp --permanent >firewall-cmd --zone=public --add-port=15672/tcp --permanent >firewall-cmd --reload ## RabbitMQ 默认不返回结果,默认不是队列持久化,服务关闭队列消失,多消费者,默认是公平分发 >pip install pika==0.12.0 - 消费者 receive.py ~~~ import pika # 注意,guest用户只是被容许从localhost访问 user_pwd = pika.PlainCredentials('guest', 'guest') # 创建连接 s_conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=user_pwd)) # 在连接上创建一个频道 chan = s_conn.channel() # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行 chan.queue_declare(queue='hello') # 定义一个回调函数,用来接收生产者发送的消息 def callback(ch,method,properties,body): print("[消费者] recv %s" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 手动对消息进行确认,确定后列队会自动删除消息 # 只有在consumer处理并确认了上一个message后才分配新的message给他 ,否则分给另一个空闲的consumer, 避免积压任务 chan.basic_qos(prefetch_count=1) chan.basic_consume( callback, # 调用回调函数 'hello', # 指定取消息的队列名 no_ack=False, # 该参数默认为false, True自动确认 ) # 开始循环取消息 chan.start_consuming() ~~~ - 生产者 sender.py ~~~ import pika # 注意,guest用户只是被容许从localhost访问 user_pwd = pika.PlainCredentials('guest', 'guest') # 创建连接 s_conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=user_pwd)) # 在连接上创建一个频道 chan = s_conn.channel() #声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行 chan.queue_declare(queue='hello') # 交换机, 路由键,写明将消息发往哪个队列,本例是将消息发往队列hello,生产者要发送的消息 chan.basic_publish(exchange='', routing_key='hello', body='hello world') # 当生产者发送完消息后,可选择关闭连接 s_conn.close() ~~~ - mq_rpc mq_rpc_server.py ~~~ import pika import time # 链接socket connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 生成rpc queue channel.queue_declare(queue='rpc_queue') # 斐波那契数列 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) # 收到消息就调用 # ch 管道内存对象地址 # method 消息发给哪个queue # props 返回给消费的返回参数 # body数据对象 def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) # 调用斐波那契函数 传入结果 response = fib(n) ch.basic_publish(exchange='', # 生产端随机生成的queue routing_key=props.reply_to, # 获取UUID唯一 字符串数值 properties=pika.BasicProperties(correlation_id = \ props.correlation_id), # 消息返回给生产端 body=str(response)) # 确保任务完成 ch.basic_ack(delivery_tag=method.delivery_tag) # rpc_queue收到消息:调用on_request回调函数 # queue='rpc_queue'从rpc内收 channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming() ~~~ mq_rpc_client.py ~~~ import pika import uuid import time # 斐波那契数列 前两个数相加依次排列 class FibonacciRpcClient(object): def __init__(self): # 链接远程 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() # 生成随机queue result = self.channel.queue_declare(exclusive=True) # 随机取queue名字,发给消费端 self.callback_queue = result.method.queue # self.on_response 回调函数:只要收到消息就调用这个函数。 # 声明收到消息后就 收queue=self.callback_queue内的消息 self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # 收到消息就调用 # ch 管道内存对象地址 # method 消息发给哪个queue # body数据对象 def on_response(self, ch, method, props, body): # 判断本机生成的ID 与 生产端发过来的ID是否相等 if self.corr_id == props.correlation_id: # 将body值 赋值给self.response self.response = body def call(self, n): # 赋值变量,一个循环值 self.response = None # 随机一次唯一的字符串 self.corr_id = str(uuid.uuid4()) # routing_key='rpc_queue' 发一个消息到rpc_queue内 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( # 执行命令之后结果返回给self.callaback_queue这个队列中 reply_to=self.callback_queue, # 生成UUID 发送给消费端 correlation_id=self.corr_id, ), # 发的消息,必须传入字符串,不能传数字 body=str(n)) # 没有数据就循环收 while self.response is None: # 非阻塞版的start_consuming() # 没有消息不阻塞 self.connection.process_data_events() print("no msg...") time.sleep(0.5) return self.response # 实例化 fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(6) print(" [.] Got %r" % response) ~~~