企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 回顾 在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。 ## 本教程 添加一个功能 - 我们将只能订阅一部分消息。例如,我们只能将重要的错误消息导向日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。 #### 绑定 前面的例子,我们已经完成了绑定。 `channel.queueBind(queueName,EXCHANGE_NAME,"");`//ReceiveLogs 绑定是交换和队列之间的关系。这可以简单地理解为:**队列对来自这个交换的消息感兴趣。** 绑定可以采用额外的routingKey参数。为了避免混淆basic_publish参数,我们将其称为 绑定键。这是我们如何创建一个关键的绑定:(绑定键的含义取决于交换类型。我们之前使用的 粉丝交换,简单地忽略了它的价值。) ``` channel.queueBind(queueName,EXCHANGE_NAME,"black"); ``` ## 直接交换 我们之前教程的日志记录系统将所有消息广播给所有消费者。我们希望扩展这个功能,以便根据消息的严重性来过滤消息。例如,我们可能需要一个将日志消息写入磁盘的程序,以仅接收严重错误,而不会在警告或信息日志消息中浪费磁盘空间。 原来的fanout交换,这并没有给我们太大的灵活性 - 它只能够无意识地播放。 我们将使用直接交换。直接交换背后的路由算法很简单 - 消息进入绑定密钥与消息的路由密钥完全匹配的队列 。 #### 图解 ![](http://www.rabbitmq.com/img/tutorials/direct-exchange.png =400x140) **第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个为绿色。通过路由键橙色发布到交换机的消息 将被路由到队列Q1。带有黑色 或绿色的路由键的消息将进入Q2。所有其他消息将被丢弃。** ## 多个绑定 ![](http://www.rabbitmq.com/img/tutorials/direct-exchange-multiple.png =400x140) **绑定多个队列和绑定键是完全合法的。在这种情况下,直接交换就像扇出一样,将消息 广播到所有的匹配队列。路由密钥为黑色的消息将传送到 Q1和Q2。** ## 发射日志 我们将把这个模型用于我们的日志系统。我们将消息发送到直接交流。我们将提供日志严重性作为路由键。 ![](http://www.rabbitmq.com/img/tutorials/python-four.png =400x140) **先创建一个交换** ``` channel.exchangeDeclare(EXCHANGE_NAME, "direct"); ``` **准备发送一条消息** severity为绑定的键 ``` channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); ``` ### 订阅 接收邮件的方式与上一个教程中的一样,除了一个例外 - 我们将为每个我们感兴趣的严重级别创建一个新的绑定。 ``` String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } ``` ## 最后的代码 **EmitLogDirect.java** ``` public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String message = "Direct"; String severity = "black"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } } ``` **ReceiveLogsDirect.java** ``` public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "black"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } } ````