企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 回顾 在之前的教程中,我们改进了我们的日志系统 我们没有使用只有虚拟广播的扇出交换机,而是使用直接交换机,并有选择性地接收日志的可能性。 ## 需求 在我们的日志系统中,我们可能不仅要根据严重性来订阅日志,还要根据发出日志的来源进行订阅。你可能从syslog unix工具知道这个概念 ,它根据severity (info/warn/crit...) and facility (auth/cron/kern...). 这会给我们很大的灵活性 - 我们可能想要听取来自'cron'的严重错误,而且还要听取来自'kern'的所有日志。 ### Topic交换 发送到主题交换的消息不能有任意的 routing_key - 它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定连接到消息的一些功能。一些有效的路由键例子:"stock.usd.nyse","nyse.vmw ","quick.orange.rabbit"。在路由选择键中可以有任意数量的字,最多255个字节。 当在绑定中不使用特殊字符“ * ”(星号)和“ # ”(散列)时,主题交换将像直接交换一样。 绑定键也必须是相同的形式。主题交换背后的逻辑类似于direct。使用特定的路由密钥发送的消息将被传送到与匹配的绑定密钥绑定的所有队列。但是绑定键有两个重要的特殊情况: 1. *(星号)可以代替一个单词。 2. #(散列)可以代替零个或多个单词。 ![](http://www.rabbitmq.com/img/tutorials/python-five.png =400x140) **在这个例子中,我们将发送所有描述动物的信息。消息将使用由三个字(两个点)组成的路由键发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类**<speed>。<color>。<species> 我们创建了三个绑定: 1. Q1 绑定了绑定键"*.orange.*" 2. Q2 绑定了 "*.*.rabbit" and "lazy.#". **概括** Q1对所有的橙色动物感兴趣。 Q2希望听到有关兔子的一切,以及关于懒惰动物的一切。 **将路由键设置** 1. “ quick.orange.rabbit ”的消息将传递到两个队列。 2. “ lazy.orange.elephant ”也将去他们两个。 3. “ quick.orange.fox ”只会到第一个队列。 4. “ lazy.brown.fox ”只会到第二个队列。即使匹配两个绑定,“ lazy.pink.rabbit ”也只会被传递到第二个队列一次。 5. “ quick.brown.fox ”不匹配任何绑定,因此将被丢弃。 6. “ lazy.orange.male.rabbit ”即使有四个单词,也会匹配最后一个绑定,并被传递到第二个队列。 ### 代码 **EmitLogTopic** ``` private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String message = "topic"; String severity = "kern.critical"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } ``` **ReceiveLogsTopic** ``` public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueBind(queueName, EXCHANGE_NAME, "kern.*"); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } } ```