`
阿尔萨斯
  • 浏览: 4189229 次
社区版块
存档分类
最新评论

RabbitMQ (五)主题(Topic)

 
阅读更多

转载请标明出处:http://blog.csdn.net/lmj623565791/article/details/37706355

上一篇博客中,我们进步改良了我们的日志系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发,如果你还不了解:RabbitMQ (四) 路由选择 (Routing)

虽然使用direct类型改良了我们的系统,但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。
在我们的日志系统中,我们有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅。这个概念类似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)
这样可能给我们更多的灵活性:我们可能只想订阅来自’cron’的致命错误日志,而不是来自’kern’的。
为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。

1、 主题转发(Topic Exchange)
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。
2、 图解:
我们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:<speed>.<color>.<species>。
我们创建3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。
可以简单的认为:
Q1对所有的橙色动物感兴趣。
Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。
一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,但是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,所以会被丢弃。
如果我们违法我们的约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。
注:主题类型的转发器非常强大,可以实现其他类型的转发器。
当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。
当绑定键中不包含任何#与*时,类似direct类型转发器。
3、 完整的例子

发送端EmitLogTopic.java:

package com.zhy.rabbit._05_topic_exchange;

import java.util.UUID;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "topic");

		String[] routing_keys = new String[] { "kernal.info", "cron.warning",
				"auth.info", "kernel.critical" };
		for (String routing_key : routing_keys)
		{
			String msg = UUID.randomUUID().toString();
			channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
					.getBytes());
			System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
		}

		channel.close();
		connection.close();
	}
}

我们发送了4条消息,分别设置了不同的选择键。

接收端1,ReceiveLogsTopicForKernel.java

package com.zhy.rabbit._05_topic_exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopicForKernel
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明转发器
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		// 随机生成一个队列
		String queueName = channel.queueDeclare().getQueue();
		
		//接收所有与kernel相关的消息
		channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");

		System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);

		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			String routingKey = delivery.getEnvelope().getRoutingKey();

			System.out.println(" [x] Received routingKey = " + routingKey
					+ ",msg = " + message + ".");
		}
	}
}

直接收和Kernel相关的日志消息。

接收端2,ReceiveLogsTopicForCritical.java

package com.zhy.rabbit._05_topic_exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopicForCritical
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明转发器
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		// 随机生成一个队列
		String queueName = channel.queueDeclare().getQueue();

		// 接收所有与kernel相关的消息
		channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");

		System.out
				.println(" [*] Waiting for critical messages. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);

		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			String routingKey = delivery.getEnvelope().getRoutingKey();

			System.out.println(" [x] Received routingKey = " + routingKey
					+ ",msg = " + message + ".");
		}
	}
}

只接收致命错误的日志消息。

运行结果:

[x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.
[x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.
[x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.
[x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

[*] Waiting for messages about kernel. To exit press CTRL+C
[x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

[*] Waiting for critical messages. To exit press CTRL+C
[x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

可以看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。


分享到:
评论

相关推荐

    rabbitmq_priority_topic_python:rabbitmq 带有优先级和主题功能,可以发送带优先级的物品,带主题的物品接收

    rabbitmq 带有优先级和主题功能,可以发送带优先级的物品,带主题的物品接收 确保插件 rabbitmq_priority_queue 已安装并启用。 安装库:python-pika 看这里的情况:你想用queue来保存不同优先级的items,比如0-10...

    spring boot使用RabbitMQ实现topic 主题

    本篇文章主要介绍了spring boot使用RabbitMQ实现topic 主题,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    rabbitmq_priority_topic_c:具有优先级和主题功能的rabbitmq,您可以发送具有优先级的项目并接收具有主题的项目

    具有优先级和主题功能的rabbitmq,您可以发送具有优先级的项目并接收具有主题的项目 确保已安装插件rabbitmq_priority_queue并将其启用。 该演示基于rabbimq-c库。 请参阅此处的情况:您想使用队列来保存具有不同...

    rabbitMQ.doc

    rabbitMq简单介绍还有下载,安装及配置,包括一些队列案例(Simble简单队列,.work queues 工作队列 公平分发 轮询分发,订阅模式 publish/subscribe,routing路由模式,Topic 主题模式,rabbitMq的消息确认机制)

    RabbitmqDemo:RabbitMQ相关示例

    RabbitMQ示例5:主题【topic切换】 RabbitMQ示例6:远程过程调用RPC Pom.xml &lt;? xml version = " 1.0 " encoding = " UTF-8 " ?&gt; &lt; project xmlns = " http://maven.apache.org/POM/4.0.0 " xmlns : ...

    java队列源码-rabbitmq-repository:RabbitMQ消息队列学习的源码记录

    rabbitmq主题示例 rabbitmq-java-rpc rabbitmq PRC通信示例 rabbitmq-spring-helloworld spring boot 使用rabbitmq的第一个demo rabbitmq-spring-work-queue spring boot使用rabbitmq的队列示例 rabbitmq-spring-...

    RabbitMQ 详细Demo

    包含以下实例 1、 简单消息队列 2、 工作队列 3、 广播式交换机(Fanout Exchange) 4、 直接式交换机(Direct Exchange) 5、 主题式交换机(Topic Exchange) 6、 主题式交换机(header Exchange)

    springboot整合rabbitmq使用​

    gitee仓库地址 https://gitee.com/ckl996/mq-demo 1.rabbitmq简单模式、工作模式、发布订阅模式 发布订阅模式: fanout广播模式 direct路由模式 topic主题订阅模式 资料内含docker部署mq文件及使用说明讲义

    kafka-connect-rabbitmq

    连接器用于读取RabbitMQ队列或主题。 配置 名称 类型 重要性 默认值 验证器 文献资料 kafka.topic 细绳 高的 Kafka主题,用于将消息写入。 兔子队列 列表 高的 兔子队列 rabbitmq.host 细绳 高的 本地主机 要...

    rabbitmq-study2

    java-publish-receiverabbitmq发布订阅示例rabbitmq-java-routingrabbitmq路由示例rabbitmq-spring-topic-exchangerabbitmq主题示例rabbitmq-java-rpcrabbitmq PRC通信示例rabbitmq-spring-helloworldspring boot ...

    my-spring-boot-rabbitmq

    producer: 分发消息生产者(发布订阅模式)direct-producer: 路由消息生产者(简单模式、工作队列模式、路由模式)topic-producer: 主题消息生产者(主题模式)advance-feature: RabbitMQ高级特性,包括死信队列、...

    mqttfx-1.7.1-windows最新版

    最新版1.7.1,安装可用,拆箱即用!...设备将当前所处的状态作为MQTT主题发送给IoT Hub,每个MQTT主题topic具有不同等级的名称,如“建筑/楼层/温度。” MQTT代理服务器将接收到的主题topic发送给给所有订阅的客户端。

    java8看不到源码-sns:假亚马逊SNS

    java8 看不到源码 假社交网络 用于测试的伪造 Amazon Simple Notification Service ...create-topic --name test1' 罐 从以下位置下载最新版本并运行: DB_PATH=/tmp/db.json java -jar sns-0.2.0.ja

    入门篇!大白话带你认识 Kafka

    我们现在经常提到 Kafka 的时候就已经默认它是一个非常优秀的消息队列了,我们也会经常拿它给 RocketMQ、RabbitMQ 对比。我觉得 Kafka 相比其他消息队列主要的优势如下: 极致的性能 :基于 Scala 和 Java 语言开发...

    1 入门篇!大白话带你认识 Kafka

    虽然之前用过 ActiveMQ 和 RabbitMQ,但是在 Kafka 这门技术面前我也算是一个初学者。文章中若有说法有点完善或者不准确的地方敬请指出。 今天我们来聊聊 Kafka ,主要是带你重新认识一下 Kafka,聊一下 Kafka 中...

Global site tag (gtag.js) - Google Analytics