RocketMQ消息重试机制(五)

由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。
如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。
所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。

RocketMQ为使用者封装了消息重试的处理流程,无需开发人员手动处理。
RocketMQ支持了生产端和消费端两类重试机制。

生产端重试

如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

相关API
DefaultMQProducer可以设置消息发送失败的最大重试次数
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed)
public SendResult send(Message msg, long timeout)

生产端的重试实现

//同步发送消息,如果5秒内没有发送成功,则重试5次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(5);
producer.send(msg,5000L);

消费端重试

消费者消费消息后,需要给Broker返回消费状态。以MessageListenerConcurrently监听器为例,Consumer消费完成后需要返回ConsumeConcurrentlyStatus并发消费状态,查看源码可以看到ConsumeConcurrentlyStatus是一个枚举,并且有两种状态

public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     * 成功消费
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     * 消费失败,一段时间后重试
     */
    RECONSUME_LATER;
}

Consumer端的重试包括两种情况
异常重试:由于Consumer端逻辑出现了异常,导致返回了RECONSUME_LATER状态,
那么Broker就会在一段时间后尝试重试。
超时重试:这里的超时异常并非真正意义上的超时,它指的是指获取消息后,因为某种原因没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS 或 RECONSUME_LATER。那么 RocketMQ会认为该消息没有发送,会一直发送。因为它会认为该消息根本就没有发送给消费者,所以肯定没消费。

因此,如果Consumer端正常消费成功,一定要返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态

两种重试演示

异常重试
RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

但是在大部分情况下,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数

consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
      MessageExt message = list.get(0);
        try {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(message.getBody()));
            //制造异常
            int a = 1 / 0;
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            if (message.getReconsumeTimes()>=2){
              //TODO 根据有任务发送短信通知,进入人工介入处理
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
});

消息重试指定的次数后,就返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS不再重试了。
注:只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息是不会重试的。

超时重试
超时重试可以直接在返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS之前sleep两分钟即可

消费者和生产者重试的区别主要有两点
1、默认重试次数:生产者默认是2次,而消费者默认是16次。
2、重试时间间隔:生产者是立刻重试,消费者是有一定时间间隔,按照1S,5S,10S,30S,1M,2M····2H进行重试。
注意:生产者在异步情况重试失效,而对于消费者在广播情况下重试失效。

添加新评论

评论列表