# 循环分发
启动一个发送端往队列发消息,此时启动多个接收端。发送的消息会对接收端一个一个挨着发送消息。
![](https://box.kancloud.cn/0dfbb61885d91b32807858d2ea66e669_1584x709.png)
这就是默认情况下,多个接收端轮流消费消息。队列发送给消费端后,就立即删除。那么问题来了,当某个消费者在处理消息的时候,异常终止了怎么办?此时,我们更希望这样:若是那个消费者挂掉了,消息自动转给另一个消费者处理。
幸好,rabbitmq就有效确认机制。消费者收到消息后,正常处理完成,此时才通知队列可以自由删除。那么问题又来了,消费者挂掉了连确认消息都发不出,该怎么办?rabbitmq维持了消费者的连接信息。消费者挂掉,与server的连接通道会关闭或tcp连接丢失。这时server知道了这个情况,就自动重发消息。
这里还有个问题,就是server挂掉了怎么办?**注意: durable=True。这个就是,当server挂了队列还存在。delivery_mode=2:server挂了消息还存在。若是保证消息不丢,这两个参数都要设置。**
发送端
```
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# durable:server挂了队列仍然存在
channel.queue_declare(queue='task_queue', durable=True)
# 使用默认的交换机发送消息。exchange为空就使用默认的。delivery_mode=2:使消息持久化。和队列名称绑定routing_key
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
))
print(" [x] Sent %r" % message)
connection.close()
```
接收端
```
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
# 手动对消息进行确认
ch.basic_ack(delivery_tag=method.delivery_tag)
# basic_consume:这个函数有no_ack参数。该参数默认为false。表示:需要对message进行确认。怎么理解:no设置成false,表示要确认
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
```
# 公平派遣
此刻,我们已经知道如何保证消息不丢,那么问题又来了。有的消费干得快,有的干得慢。这样分发消息,有的累死有的没事干。这个问题如何解决?
![](https://box.kancloud.cn/c967559388971bc1277b7021edfcf1c9_396x111.png)
rabbitmq已经考虑到了。那就是:那个干完了,通知给server,server就发送给那个。
在上面的接收端的
```
channel.basic_consume(callback, queue='task_queue')
```
代码前加:
```
channel.basic_qos(prefetch_count=1)
```
# 发布订阅模式
我们要将同一个消息发给多个客户端。
发送端:
```
import pika
import sys
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
# 原则上,消息,只能有交换机传到队列。就像我们家里面的交换机道理一样。
# 有多个设备连接到交换机,那么,这个交换机把消息发给那个设备呢,就是根据
# 交换机的类型来定。类型有:direct\topic\headers\fanout
# fanout:这个就是,所有的设备都能收到消息,就是广播。
# 此处定义一个名称为'logs'的'fanout'类型的exchange
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 将消息发送到名为log的exchange中
# 因为是fanout类型的exchange,所以无需指定routing_key
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
```
接收端:
```
import pika
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
# 这里需要和发送端保持一致(习惯和要求)
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 类似的,比如log,我们其实最想看的,当连接上的时刻到消费者退出,这段时间的日志
# 有些消息,过期了的对我们并没有什么用
# 并且,一个终端,我们要收到队列的所有消息,比如:这个队列收到两个消息,一个终端收到一个。
# 我们现在要做的是:两个终端都要收到两个
# 那么,我们就只需做个临时队列。消费端断开后就自动删除
result = channel.queue_declare(exclusive=True)
# 取得队列名称
queue_name = result.method.queue
# 将队列和交换机绑定一起
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
# no_ack=True:此刻没必要回应了
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
```
# 根据类型订阅消息
一些消息,仍然发送给所有接收端。其中,某个接收端,只对其中某些消息感兴趣,它只想接收这一部分消息。如下图:C1,只对error感兴趣,C2对其他三种甚至对所有都感兴趣,我们该怎么搞呢?
![](https://box.kancloud.cn/65d418b54973525e03432d314234141d_423x171.png)
发送端:
```
import pika
import sys
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
# 创建一个交换机:direct_logs 类型是:direct
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
# 向exchage按照设置的 routing_key=severity 发送message
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
```
接收端:
```
import pika
import sys
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
# 跟发送端一致
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
# 还是声明临时队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
# 使用routing_key绑定交换机和队列。广播类型,无需使用这个
# direct类型:会对消息进行精确匹配
# 对个队列使用相同路由key是可以的
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
```
# 进行RPC调用
RPC:是远程过程调用。简单点说:比如,我们在本地的代码中调用一个函数,那么这个函数不一定有返回值, 但一定有返回。若是在分布式环境中,前面的例子,发送消息出去后,发送端是不清楚客户端处理完后的结果的。由于rabbitmq的响应机制,顶多能获取到客户端的处理状态,但并不能获取处理结果。那么,我们想像本地调用那样,需要客户端处理后返回结果该怎么办呢。就是如下图:
![](https://box.kancloud.cn/2e78d61bca855f6c743a3b7fe0e26eaf_576x200.png)
client发送请求,同时告诉server处理完后要发送消息给:回调队列的ID:correlation_id=abc,并调用replay_to回调队列对应的回调函数。
## 客户端
客户端:发消息也收消息
```
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
# 创建连接
credentials = pika.PlainCredentials("yang", "123456")
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
self.channel = self.connection.channel()
# 创建回调队列
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
# 这里:这个是消息发送方,当要执行回调的时候,它又是接收方
# 使用callback_queue 实现消息接收。即是回调。注意:这里的回调
# 不需要对消息进行确认。反复确认,没玩没了就成了死循环
#这里设置回调
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
# 定义回调的响应函数。
# 判断:若是当前的回调ID和响应的回调ID相同,即表示,是本次请求的回调
# 原因:若是发起上百个请求,发送端总得知道回来的对应的是哪一个发送的
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
# 设置响应和回调通道的ID
self.response = None
self.corr_id = str(uuid.uuid4())
# properties中指定replay_to:表示回调要调用那个函数
# 指定correlation_id:表示回调返回的请求ID是那个
# body:是要交给接收端的参数
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=self.callback_queue,
correlation_id=self.corr_id,),
body=str(n))
# 监听回调
while self.response is None:
self.connection.process_data_events()
# 返回的结果是整数,这里进行强制转换
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
```
服务端:
```
import pika
credentials = pika.PlainCredentials("yang", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.176', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
#收到的消息
n = int(body)
print(" [.] fib(%s)" % n)
#要处理的任务
response = fib(n)
#发布消息。通知到客户端
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= props.correlation_id),
body=str(response))
#手动响应
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
```