RocketMQ发送消息的三种方式(六)

多种消息发送模式对比

SYNC:同步
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
应用场景:重要通知邮件、报名短信通知、营销短信系统等

ASYNC:异步
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。
消息队列 RocketMQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。
应用场景:对RT时间铭感,可以支持更高的并发,回调成功出发相对应的业务,比如注册成功后通知积分系统发放优惠券

ONEWAY:无需等待响应
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。
应用场景:主要是日志收集,适用于某些耗时非常短,但对可靠性要求并不高的场景,例如LogServer只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答

三种模式对比

发送方式发送 TPS发送结果反馈可靠性
同步发送不丢失消息
异步发送不丢失消息
单项发送最快可能丢失消息

三种方式代码示例

/**
 * 1、同步发送消息
 */
private  void sync() throws Exception {
    //创建消息
    Message message = new Message("topic_family", ("  同步发送  ").getBytes());
    //同步发送消息
    SendResult sendResult = producer.send(message);
    log.info("Product-同步发送-Product信息={}", sendResult);
}
/**
 * 2、异步发送消息
 */
private  void async() throws Exception {
    //创建消息
    Message message = new Message("topic_family", ("  异步发送  ").getBytes());
    //异步发送消息
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("Product-异步发送-输出信息={}", sendResult);
        }
        @Override
        public void onException(Throwable e) {
            e.printStackTrace();
            //补偿机制,根据业务情况进行使用,看是否进行重试
        }
    });
}
/**
 * 3、单项发送消息
 */
private  void oneWay() throws Exception {
    //创建消息
    Message message = new Message("topic_family", (" 单项发送 ").getBytes());
    //同步发送消息
    producer.sendOneway(message);
}

SendStatus状态

当Product发送消息的时候,会返回SendResult对象,该对象又包含了一个SendStatus对象。

package org.apache.rocketmq.client.producer;
public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

SEND_OK
代表发送成功!但并不保证它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。

SLAVE_NOT_AVAILABLE
如果Broker的角色是SYNC_MASTER(同步复制)(默认为异步),但没有配置Slave(从节点) Broker,将获得此状态。

FLUSH_DISK_TIMEOUT
如果Broker设置为 SYNC_FLUSH(同步刷盘)(默认为ASYNC_FLUSH),并且Broker的syncFlushTimeout(默认为5秒)内完成刷新磁盘,将获得此状态。

FLUSH_SLAVE_TIMEOUT
如果Broker的角色是SYNC_MASTER(同步复制)(默认为ASYNC_MASTER),并且从Broker的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,将获得此状态。

添加新评论

评论列表