RocketMQ发送消息原理,你真的懂了吗?

2026-05-23 20:597阅读0评论服务器VPS
  • 内容介绍
  • 文章标签
  • 相关推荐

你有没有觉得,现在这些技术文章写得都太“干净”了?

放心去做... 什么叫做“干净”呢?就是那种结构清晰、逻辑分明、段落整齐、语言规范,看起来像是AI写的那种文章。我就不喜欢这种, 我更喜欢那种“乱糟糟”的文章,有血有肉,有情绪,有错误,有废话,有错别字,有逻辑跳跃,有情绪波动,有你意想不到的转折,还有我自己的主观臆断和偏见。对,我就是想让你看到一个“烂”文章,而不是那种千篇一律的“标准答案”。

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

一、RocketMQ发送消息的“三板斧”

你知道RocketMQ发送消息有哪几种方式吗,研究研究。?

有三种:同步、异步、单向。听起来很牛逼,对吧?但你真的懂了吗?

别急,我们先来聊聊这三种方式到底是个啥。

1. 同步发送

同步发送, 就是你发一个消息,然后就站在那等,等它回来。就像你给老板发微信,然后就一直盯着手机看有没有回复一样。这种发送方式, 适合对数据一致性要求高的场景,比如你发个订单消息,必须确保它到了Broker,不然你敢下单吗?

2. 异步发送

异步发送呢, 就是你发完消息就走人,不管它有没有收到。它会响应时间很重要。

3. 单向发送

单向发送,就是你发完就完事了不等后来啊。这种适合对性能要求高,但对可靠性要求不高的场景,比如你发个日志,发完就完事了谁管你呢?

来我们看看这三种方式的对比:

发送方式 是否等待响应 是否重试 适用场景
同步发送 数据一致性要求高
异步发送 响应时间敏感
单向发送 性能要求高

我给跪了。 是不是觉得这三种方式很清晰?但你真的懂了吗?

二、 RocketMQ发送消息的流程

谨记... 我们先来想想,如果我们要自己设计一个消息发送系统,需要哪些步骤?

  1. 参数校验
  2. 获取路由信息
  3. 选择队列
  4. 计算超时和重试次数
  5. 选择RPC工具

我们再来看看RocketMQ是怎么做的:

  1. 参数校验
  2. 获取路由信息
  3. 选择队列
  4. 计算超时和重试次数
  5. 选择RPC工具

是不是很像?但你真的懂了吗?


private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 参数校验
    // 获取路由信息
    TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo);
    if ) {
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + retryTimesWhenSendFailed : 1;
        int times = 0;
        for  {
            String lastBrokerName = null == mq ? null : mq.getBrokerName;
            MessageQueue mqSelected = selectOneMessageQueue;
            if  {
                mq = mqSelected;
                try {
                    beginTimestampPrev = System.currentTimeMillis;
                    if  {
                        // 重发时设置topic
                        msg.setTopic);
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if  {
                        callTimeout = true;
                        break;
                    }
                    sendResult = sendKernelImpl;
                    endTimestamp = System.currentTimeMillis;
                    switch  {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if  != SendStatus.SEND_OK) {
                                if  {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                }
            }
        }
    }
}

public MessageQueue selectOneMessageQueue {
    if  {
        return selectOneMessageQueue;
    } else {
        for  {
            int index = this.sendWhichQueue.getAndIncrement;
            int pos = Math.abs % this.messageQueueList.size;
            if  pos = 0;
            MessageQueue mq = this.messageQueueList.get;
            if .equals) {
                return mq;
            }
        }
        return selectOneMessageQueue;
    }
}

public void sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    String brokerName = this.factory.findBrokerAddressInPublish;
    String brokerAddr = this.factory.findBrokerAddressInPublish;
    if  {
        brokerAddr = this.factory.findBrokerAddressInPublish;
        byte prevBody = msg.getBody;
        try {
            if ) {
                msg.setMsgId, prevBody));
            }
            if ) {
                checkForbiddenContext.setBody;
                this.checkForbiddenHook.processCheckForbidden;
            }
            if ) {
                context.setMsgId);
                this.sendMessageHook.sendMessageBefore;
            }
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader;
            requestHeader.setTopic);
            requestHeader.setBodyLength.length);
            requestHeader.setProperties);
            SendResult sendResult = null;
            switch  {
                case ASYNC:
                    Message tmpMessage = msg;
                    sendResult = this.mQClientAPIImpl.sendMessage(
                        brokerAddr,
                        brokerName,
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.producerGroup,
                        this.defaultMQProducer.getVipChannelEnabled,
                        this.defaultMQProducer.getSendMsgTimeout,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getCompressMsgBodyOverHowmuch,
                        this.defaultMQProducer.getRetryResponseAndReputIfNecessary,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQ

你有没有觉得,现在这些技术文章写得都太“干净”了?

放心去做... 什么叫做“干净”呢?就是那种结构清晰、逻辑分明、段落整齐、语言规范,看起来像是AI写的那种文章。我就不喜欢这种, 我更喜欢那种“乱糟糟”的文章,有血有肉,有情绪,有错误,有废话,有错别字,有逻辑跳跃,有情绪波动,有你意想不到的转折,还有我自己的主观臆断和偏见。对,我就是想让你看到一个“烂”文章,而不是那种千篇一律的“标准答案”。

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

一、RocketMQ发送消息的“三板斧”

你知道RocketMQ发送消息有哪几种方式吗,研究研究。?

有三种:同步、异步、单向。听起来很牛逼,对吧?但你真的懂了吗?

别急,我们先来聊聊这三种方式到底是个啥。

1. 同步发送

同步发送, 就是你发一个消息,然后就站在那等,等它回来。就像你给老板发微信,然后就一直盯着手机看有没有回复一样。这种发送方式, 适合对数据一致性要求高的场景,比如你发个订单消息,必须确保它到了Broker,不然你敢下单吗?

2. 异步发送

异步发送呢, 就是你发完消息就走人,不管它有没有收到。它会响应时间很重要。

3. 单向发送

单向发送,就是你发完就完事了不等后来啊。这种适合对性能要求高,但对可靠性要求不高的场景,比如你发个日志,发完就完事了谁管你呢?

来我们看看这三种方式的对比:

发送方式 是否等待响应 是否重试 适用场景
同步发送 数据一致性要求高
异步发送 响应时间敏感
单向发送 性能要求高

我给跪了。 是不是觉得这三种方式很清晰?但你真的懂了吗?

二、 RocketMQ发送消息的流程

谨记... 我们先来想想,如果我们要自己设计一个消息发送系统,需要哪些步骤?

  1. 参数校验
  2. 获取路由信息
  3. 选择队列
  4. 计算超时和重试次数
  5. 选择RPC工具

我们再来看看RocketMQ是怎么做的:

  1. 参数校验
  2. 获取路由信息
  3. 选择队列
  4. 计算超时和重试次数
  5. 选择RPC工具

是不是很像?但你真的懂了吗?


private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 参数校验
    // 获取路由信息
    TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo);
    if ) {
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + retryTimesWhenSendFailed : 1;
        int times = 0;
        for  {
            String lastBrokerName = null == mq ? null : mq.getBrokerName;
            MessageQueue mqSelected = selectOneMessageQueue;
            if  {
                mq = mqSelected;
                try {
                    beginTimestampPrev = System.currentTimeMillis;
                    if  {
                        // 重发时设置topic
                        msg.setTopic);
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if  {
                        callTimeout = true;
                        break;
                    }
                    sendResult = sendKernelImpl;
                    endTimestamp = System.currentTimeMillis;
                    switch  {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if  != SendStatus.SEND_OK) {
                                if  {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                }
            }
        }
    }
}

public MessageQueue selectOneMessageQueue {
    if  {
        return selectOneMessageQueue;
    } else {
        for  {
            int index = this.sendWhichQueue.getAndIncrement;
            int pos = Math.abs % this.messageQueueList.size;
            if  pos = 0;
            MessageQueue mq = this.messageQueueList.get;
            if .equals) {
                return mq;
            }
        }
        return selectOneMessageQueue;
    }
}

public void sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    String brokerName = this.factory.findBrokerAddressInPublish;
    String brokerAddr = this.factory.findBrokerAddressInPublish;
    if  {
        brokerAddr = this.factory.findBrokerAddressInPublish;
        byte prevBody = msg.getBody;
        try {
            if ) {
                msg.setMsgId, prevBody));
            }
            if ) {
                checkForbiddenContext.setBody;
                this.checkForbiddenHook.processCheckForbidden;
            }
            if ) {
                context.setMsgId);
                this.sendMessageHook.sendMessageBefore;
            }
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader;
            requestHeader.setTopic);
            requestHeader.setBodyLength.length);
            requestHeader.setProperties);
            SendResult sendResult = null;
            switch  {
                case ASYNC:
                    Message tmpMessage = msg;
                    sendResult = this.mQClientAPIImpl.sendMessage(
                        brokerAddr,
                        brokerName,
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.producerGroup,
                        this.defaultMQProducer.getVipChannelEnabled,
                        this.defaultMQProducer.getSendMsgTimeout,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getCompressMsgBodyOverHowmuch,
                        this.defaultMQProducer.getRetryResponseAndReputIfNecessary,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendFailed,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
                        this.defaultMQ