RocketMQ顺序、延迟、等消息发送样例(三)

如果要保证顺序消费,那么他的核心店就是 生产者有序存储、消费者有序消费

顺序消息概念

什么是无序消息
无需消息也就是普通的消息,Producer只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并没有保证,也无所谓先后顺序

什么是全局顺序
对于指定的一个Topic,所有消息按照先进先出(FIFO)的顺序进行发布和消费。
比如Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费。

什么是局部顺序
在实际开发场景中,并不需要消息完全按照先进先出的的顺序,而是某些消息保证先进先出就可以了
比如一个订单涉及订单生成订单支付订单完成。我不用管其他的订单,只用保证当前订单的消息先后顺序

顺序消息实现原理

生产的Message最终会存放在Queue(队列)中,如果一个Topic(主题)关联了16个Queue,如果我们不指定消息往哪个队列里放,那么默认是平均分配消息到16个queue,就无法保证一个订单消息的先后顺序了
好比有100条消息,那么这100条消息会平均分配在这16个Queue上,那么每个Queue大概放5~6个左右。
这里有一点很重的是:同一个queue,存储在里面的message 是按照先进先出的原则

这个时候思路就来了,好比有orderId=1的3条消息,分别是 订单生产、订单付款、订单完成。
只要保证它们放到同一个Queue那就保证消费者先进先出了。

请输入图片描述
请输入图片描述

这就保证局部顺序了,即同一订单按照先后顺序放到同一Queue,那么这面就保证了发送消息的先后顺序
保证了消息发送的先后顺序还不行,在消费者集群的情况下,可能消费者1先去Queue拿消息,它拿到了 订单生成,然后消费者2也去拿消息拿到了订单支付,拿到的顺序是没问题了,但关键点事拿到不代表它先消费完了,可能存在消费者一拿到消息订单生成,但由于网络等原因,消费者2比消费者1更先一步消费消息,这种情况就很尴尬了

订单付款还是可能会比订单生成更早消费的情况。
消费消息的时候注册的监听器需要用MessageListenerOrderly替换之前的MessageListenerConcurrently,因为它里面实现了分段锁。消费每个消息前,需要获得这个消息所在的Queue的锁,这样同个时间,同个Queue的消息不被并发消费

顺序代码示例

生产端保证发送消息有序,且发送到同一个Topic的同个queue里面,RocketMQ即可以保证FIFO
例子:订单的顺序流程是:创建、付款、完成,订单号相同的消息会被先后发送到同一个队列中,根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一个queue
消费者Producer

producer.send(message, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
        Long id = (Long)o;
        long index = id % list.size();
        return list.get((int)index);
    }
},0);

消费者Consumer
消费消息的时候注册的监听器需要用MessageListenerOrderly替换之前的MessageListenerConcurrently,因为它里面实现了分段锁。消费每个消息前,需要获得这个消息所在的Queue的锁,这样同个时间,同个Queue的消息不被并发消费

延时发送消息

例如在电商系统里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
使用延迟消息时候只需要在发送消息的时候对消息进行设置延迟级别即可
message.setDelayTimeLevel(1) 1表示配置里面的第一个级别,2表示第二个级别
现在只支持固定的几个时间,详看delayTimeLevel

延迟消息的使用场景
通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息,例如生日提醒
比如订单的检测,用户下单之后创建一条消息,30分钟后消费者检测用户订单是否已经付款没付款则关闭订单,如支付完成忽略即可

指定投递到某队列

指定消息投递到哪一个队列,默认topic下的queue数量是 4
支持同步、异步投递到指定的queue,选择的queue数量必须小于配置的数量,否则会报错

payProducer.getProducer().send(message, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
        Integer queueNum = Integer.parseInt(o.toString());
        return list.get(queueNum);
    }
},0);

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //处理error
}

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割处理

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
       @Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

添加新评论

评论列表