Spring Boot 整合RocketMq实现消息过滤功能

网友投稿 567 2022-07-26


目录简介根据TAG过滤消息生产者消费者测试结果根据SQL表达式过滤消息生产者消费者启动程序报错The broker does not support consumer to filter message by SQL92测试结果总结

简介

消息过滤是指消费者一端在消费消息时,对消息进行选择性过滤,只消费符合过滤条件的消息。 RocketMQ的消息过滤机制大致分为两种:标签过滤和类过滤。其中标签过滤又分为Tag过滤和SQL92过滤。

根据TAG过滤消息

消息发送端只能设置一个tag,消息接收端可以设置多个tag。

生产者

public void sendTagMessage()

{

String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};

for(int i=oDGeGTpilQ0;i<10;i++)

{

String tag = tags[i % oDGeGTpilQtags.length];

logger.info("sendTagMessage tag is :{}",tag);

String msg = "hello, 这是第" + (i + 1) + "条消息";

org.springframework.messaging.Message msg1 = MessageBuilder.withPayload(msg).build();

rocketMQTemplate.convertAndSend("test-tag-rocketmq" + ":" + tag, msg1);

}

}

说明:示例中循环发送了10条消息,每条消息设置了一个tag发送过滤消息的格式为:topic:tag的形式,注意发送端只能设定一个tag。

消费者

@Component

@RocketMQMessageListener(consumerGroup="test-tagrocketmq-group",topic="test-tag-rocketmq",selectorExpression="TagA || TagC || TagD",selectorType=SelectorType.TAG, messageModel = MessageModel.CLUSTERING)

public class TagConsumer implements RocketMQListener

{

private Logger logger =LoggerFactory.getLogger(getClhttp://ass());

@Override

public void onMessage(Object o)

{

String msg=jsON.toJSONString(o);

logger.info("send TagA || TagC || TagD succss content is:{}", msg);

}

}

说明:

selectorType:指定消息通过的tag的方式,默认为SelectorType.TAGmessageModel:指定消息的消费模式,默认为MessageModel.CLUSTERING模式每条消息只能由一个消费者消费,而MessageModel.BROADCASTING模式为广播模式,所有订阅者都能消费。selectorExpression :指定那些Tag消息能够被消费,多个采用||分割。

测试结果

从结果我可以看出第2条为TAGC、第7条为TAGC、第8条为TAGD,第3条为TAGD,第5条为TAGA,第0条为TAGA,而消费端监听的TAG为TAGA、TAGC、TAGD所以对于不符合条件的消息进行了过滤。

根据SQL表达式过滤消息

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。

RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

数字比较,如>,>=,<,<=,BETWEEN,=;字符比较,如:=,<>,IN;IS NULL or IS NOT NULL;逻辑运算符:AND, OR, NOT;常量类型:数值,如:123, 3.1415;字符, 如:‘abc’, 必须使用单引号;NULL,特殊常量Boolean, TRUE or FALSE;

生产者

public void sendSQLMessage()

{

String msg = "hello, 这是第1条消息";

org.springframework.messaging.Message message = MessageBuilder.withPayload(msg).build() ;

Map headers = new HashMap<>() ;

headers.put("i", 5) ;

rocketMQTemplate.convertAndSend("test-sql-rocketmq", message, headers);

}

说明:传递了参数为5进行条件判断。

消费者

@Component

@RocketMQMessageListener(consumerGroup="test-sqlrocketmq-group",topic="test-sql-rocketmq",selectorExpression = "i=5",selectorType=SelectorType.SQL92, messageModel = MessageModel.CLUSTERING)

public class SQLConsumer implements RocketMQListener

{

private Logger logger =LoggerFactory.getLogger(getClass());

@Override

public void onMessage(MessageExt message)

{

String msg=new String(message.getBody());

String paramStr=JSON.toJSONString(message.getProperties());

//消息内容

logger.info("send succss content is:{}", msg);

//消息参数

logger.info("send mssage parma is:{}", paramStr);

}

}

说明:

selectorType:指定消息通过的tag的方式,默认为SelectorType.CLUSTERINGmessageModel:指定消息的消费模式,默认为MessageModel.CLUSTERING模式每条消息只能由一个消费者消费,而MessageModel.BROADCASTING模式为广播模式,所有订阅者都能消费。selectorExpression : 采用rocketMQ支持的表达式。例如i=5

启动程序报错The broker does not support consumer to filter message by SQL92

原因:默认情况下broke没有开启对SQL语法的支持,需要修改配置

1.打开rocketmq服务下的broke.conf文件,添加如下配置即可。

2.重启broke服务即可.

测试结果

说明:只有满足SQL条件能进行消费。

总结

本文讲解了RocketMQ实现消息过滤,针对不同的业务场景选择合适的方案即可,如果疑问,请随时反馈,


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java poi导出excel时如何设置手动换行
下一篇:mybatis配置mapper
相关文章

 发表评论

暂时没有评论,来抢沙发吧~