laomao , Life is like a play

RocketMQ中Topic、Tag使用及消息过滤(八)

默认分类 0 评

概念

1、Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
2、Tag:消息标签,二级分类,用来进一步区分某个 Topic 下的消息分类,RocketMQ 允许消费者按照 Tag 对消息进行过滤,确保消费者最终只消费到他关注的消息类型。
Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以说是二级分类
请输入图片描述

区分

你可能会有这样的疑问:到底什么时候该用 Topic,什么时候该用 Tag?
建议你从以下几个方面进行判断:
1、类型是否一致
如普通消息,事务消息,定时消息、顺序消息不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
2、业务是否相关联
没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分。而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
3、优先级是否一致
如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会会慢一些,不同优先级的消息用不同的 Topic 进行区分。
4、量级是否相当
有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而『饿死』,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

示例
以天猫交易平台为例,订单消息,支付消息属于不同业务类型的消息,分别创建 Topic_Order 和 Topic_Pay,其中订单消息根据商品品类以不同的 Tag 再进行细分,如电器类、男装类、女装类、化妆品类,最后他们都被各个不同的系统所接收。

通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。

消息过滤

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算
在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

生产消息

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();

消息消费
使用MessageSelector.bySql来使用sql筛选消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();
Redis安装、使用及配置笔记
发表评论
撰写评论
文章目录