6-RabbitMQ工作模式-Routing路由模式

网友投稿 245 2022-11-05


6-RabbitMQ工作模式-Routing路由模式

6-RabbitMQ工作模式-Routing路由模式

Routing路由模式

1. 模式说明

路由模式特点:

队列与交换机的绑定,不能是任意绑定了,而是要指定一个​​RoutingKey​​(路由key)消息的发送方在 向 Exchange发送消息时,也必须指定消息的​​RoutingKey​​。Exchange不再把消息交给每一个绑定的队列,而是根据消息的​​Routing Key​​进行判断,只有队列的​​Routingkey​​与消息的 ​​Routing key​​完全一致,才会接收到消息

1556029284397

图解:

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列C1:消费者,其所在队列指定了需要routing key 为 error 的消息C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

2. 案例

在编码上与 ​​Publish/Subscribe发布与订阅模式​​ 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。

在写案例之前,我们首先定义一下需求:

生产者:发送两条消息,一条消息的用于插入数据,另一条消息用于更新数据消费者1:接收插入数据的消息,进行数据插入消费者2:接收更新数据的消息,进行数据更新

1)生产者

执行发送消息:

2)消费者1:专门接收 insert 的消息

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_Routing1 { //队列名称 static final String DIRECT_QUEUE_INSERT = "direct_queue_insert"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收队列的数据 body: " + new String(body)); } }; channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer); //不需要关闭资源,因为消费者需要持续监听队列信息 }}

3)消费者2:专门接收 update 的消息

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_Routing2 { //队列名称 static final String DIRECT_QUEUE_UPDATE = "direct_queue_update"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收队列的数据 body: " + new String(body)); } }; channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer); //不需要关闭资源,因为消费者需要持续监听队列信息 }}

3. 测试

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。

消费者1 收到了 insert 的消息

消费者2 收到了 update 的消息

4. 小结

Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:【Android】SharedPreferences梳理
下一篇:基于OpenCV的人脸口罩识别检测详细教程
相关文章

 发表评论

暂时没有评论,来抢沙发吧~