RocketMQ发送消息原理,你真的懂了吗?
- 内容介绍
- 文章标签
- 相关推荐
你有没有觉得,现在这些技术文章写得都太“干净”了?
放心去做... 什么叫做“干净”呢?就是那种结构清晰、逻辑分明、段落整齐、语言规范,看起来像是AI写的那种文章。我就不喜欢这种, 我更喜欢那种“乱糟糟”的文章,有血有肉,有情绪,有错误,有废话,有错别字,有逻辑跳跃,有情绪波动,有你意想不到的转折,还有我自己的主观臆断和偏见。对,我就是想让你看到一个“烂”文章,而不是那种千篇一律的“标准答案”。

一、RocketMQ发送消息的“三板斧”
你知道RocketMQ发送消息有哪几种方式吗,研究研究。?
有三种:同步、异步、单向。听起来很牛逼,对吧?但你真的懂了吗?
别急,我们先来聊聊这三种方式到底是个啥。
1. 同步发送
同步发送, 就是你发一个消息,然后就站在那等,等它回来。就像你给老板发微信,然后就一直盯着手机看有没有回复一样。这种发送方式, 适合对数据一致性要求高的场景,比如你发个订单消息,必须确保它到了Broker,不然你敢下单吗?
2. 异步发送
异步发送呢, 就是你发完消息就走人,不管它有没有收到。它会响应时间很重要。
3. 单向发送
单向发送,就是你发完就完事了不等后来啊。这种适合对性能要求高,但对可靠性要求不高的场景,比如你发个日志,发完就完事了谁管你呢?
来我们看看这三种方式的对比:
| 发送方式 | 是否等待响应 | 是否重试 | 适用场景 |
|---|---|---|---|
| 同步发送 | 是 | 是 | 数据一致性要求高 |
| 异步发送 | 否 | 是 | 响应时间敏感 |
| 单向发送 | 否 | 否 | 性能要求高 |
我给跪了。 是不是觉得这三种方式很清晰?但你真的懂了吗?
二、 RocketMQ发送消息的流程
谨记... 我们先来想想,如果我们要自己设计一个消息发送系统,需要哪些步骤?
- 参数校验
- 获取路由信息
- 选择队列
- 计算超时和重试次数
- 选择RPC工具
我们再来看看RocketMQ是怎么做的:
- 参数校验
- 获取路由信息
- 选择队列
- 计算超时和重试次数
- 选择RPC工具
是不是很像?但你真的懂了吗?
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 参数校验
// 获取路由信息
TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo);
if ) {
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + retryTimesWhenSendFailed : 1;
int times = 0;
for {
String lastBrokerName = null == mq ? null : mq.getBrokerName;
MessageQueue mqSelected = selectOneMessageQueue;
if {
mq = mqSelected;
try {
beginTimestampPrev = System.currentTimeMillis;
if {
// 重发时设置topic
msg.setTopic);
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if {
callTimeout = true;
break;
}
sendResult = sendKernelImpl;
endTimestamp = System.currentTimeMillis;
switch {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if != SendStatus.SEND_OK) {
if {
continue;
}
}
return sendResult;
default:
break;
}
}
}
}
}
}
public MessageQueue selectOneMessageQueue {
if {
return selectOneMessageQueue;
} else {
for {
int index = this.sendWhichQueue.getAndIncrement;
int pos = Math.abs % this.messageQueueList.size;
if pos = 0;
MessageQueue mq = this.messageQueueList.get;
if .equals) {
return mq;
}
}
return selectOneMessageQueue;
}
}
public void sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerName = this.factory.findBrokerAddressInPublish;
String brokerAddr = this.factory.findBrokerAddressInPublish;
if {
brokerAddr = this.factory.findBrokerAddressInPublish;
byte prevBody = msg.getBody;
try {
if ) {
msg.setMsgId, prevBody));
}
if ) {
checkForbiddenContext.setBody;
this.checkForbiddenHook.processCheckForbidden;
}
if ) {
context.setMsgId);
this.sendMessageHook.sendMessageBefore;
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader;
requestHeader.setTopic);
requestHeader.setBodyLength.length);
requestHeader.setProperties);
SendResult sendResult = null;
switch {
case ASYNC:
Message tmpMessage = msg;
sendResult = this.mQClientAPIImpl.sendMessage(
brokerAddr,
brokerName,
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.producerGroup,
this.defaultMQProducer.getVipChannelEnabled,
this.defaultMQProducer.getSendMsgTimeout,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getCompressMsgBodyOverHowmuch,
this.defaultMQProducer.getRetryResponseAndReputIfNecessary,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQ
你有没有觉得,现在这些技术文章写得都太“干净”了?
放心去做... 什么叫做“干净”呢?就是那种结构清晰、逻辑分明、段落整齐、语言规范,看起来像是AI写的那种文章。我就不喜欢这种, 我更喜欢那种“乱糟糟”的文章,有血有肉,有情绪,有错误,有废话,有错别字,有逻辑跳跃,有情绪波动,有你意想不到的转折,还有我自己的主观臆断和偏见。对,我就是想让你看到一个“烂”文章,而不是那种千篇一律的“标准答案”。

一、RocketMQ发送消息的“三板斧”
你知道RocketMQ发送消息有哪几种方式吗,研究研究。?
有三种:同步、异步、单向。听起来很牛逼,对吧?但你真的懂了吗?
别急,我们先来聊聊这三种方式到底是个啥。
1. 同步发送
同步发送, 就是你发一个消息,然后就站在那等,等它回来。就像你给老板发微信,然后就一直盯着手机看有没有回复一样。这种发送方式, 适合对数据一致性要求高的场景,比如你发个订单消息,必须确保它到了Broker,不然你敢下单吗?
2. 异步发送
异步发送呢, 就是你发完消息就走人,不管它有没有收到。它会响应时间很重要。
3. 单向发送
单向发送,就是你发完就完事了不等后来啊。这种适合对性能要求高,但对可靠性要求不高的场景,比如你发个日志,发完就完事了谁管你呢?
来我们看看这三种方式的对比:
| 发送方式 | 是否等待响应 | 是否重试 | 适用场景 |
|---|---|---|---|
| 同步发送 | 是 | 是 | 数据一致性要求高 |
| 异步发送 | 否 | 是 | 响应时间敏感 |
| 单向发送 | 否 | 否 | 性能要求高 |
我给跪了。 是不是觉得这三种方式很清晰?但你真的懂了吗?
二、 RocketMQ发送消息的流程
谨记... 我们先来想想,如果我们要自己设计一个消息发送系统,需要哪些步骤?
- 参数校验
- 获取路由信息
- 选择队列
- 计算超时和重试次数
- 选择RPC工具
我们再来看看RocketMQ是怎么做的:
- 参数校验
- 获取路由信息
- 选择队列
- 计算超时和重试次数
- 选择RPC工具
是不是很像?但你真的懂了吗?
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 参数校验
// 获取路由信息
TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo);
if ) {
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + retryTimesWhenSendFailed : 1;
int times = 0;
for {
String lastBrokerName = null == mq ? null : mq.getBrokerName;
MessageQueue mqSelected = selectOneMessageQueue;
if {
mq = mqSelected;
try {
beginTimestampPrev = System.currentTimeMillis;
if {
// 重发时设置topic
msg.setTopic);
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if {
callTimeout = true;
break;
}
sendResult = sendKernelImpl;
endTimestamp = System.currentTimeMillis;
switch {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if != SendStatus.SEND_OK) {
if {
continue;
}
}
return sendResult;
default:
break;
}
}
}
}
}
}
public MessageQueue selectOneMessageQueue {
if {
return selectOneMessageQueue;
} else {
for {
int index = this.sendWhichQueue.getAndIncrement;
int pos = Math.abs % this.messageQueueList.size;
if pos = 0;
MessageQueue mq = this.messageQueueList.get;
if .equals) {
return mq;
}
}
return selectOneMessageQueue;
}
}
public void sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerName = this.factory.findBrokerAddressInPublish;
String brokerAddr = this.factory.findBrokerAddressInPublish;
if {
brokerAddr = this.factory.findBrokerAddressInPublish;
byte prevBody = msg.getBody;
try {
if ) {
msg.setMsgId, prevBody));
}
if ) {
checkForbiddenContext.setBody;
this.checkForbiddenHook.processCheckForbidden;
}
if ) {
context.setMsgId);
this.sendMessageHook.sendMessageBefore;
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader;
requestHeader.setTopic);
requestHeader.setBodyLength.length);
requestHeader.setProperties);
SendResult sendResult = null;
switch {
case ASYNC:
Message tmpMessage = msg;
sendResult = this.mQClientAPIImpl.sendMessage(
brokerAddr,
brokerName,
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.producerGroup,
this.defaultMQProducer.getVipChannelEnabled,
this.defaultMQProducer.getSendMsgTimeout,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getCompressMsgBodyOverHowmuch,
this.defaultMQProducer.getRetryResponseAndReputIfNecessary,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQProducer.getRetryTimesWhenSendFailed,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed,
this.defaultMQ

