网站优化

网站优化

Products

当前位置:首页 > 网站优化 >

RocketMQ如何高效应对高并发下的消息持久化?

GG网络技术分享 2026-04-17 00:18 0


上篇文章我们分析完RocketMQ发送消息的原理, 得到后来啊客户端会通过RPC组件向Broker进行通信

Broker收到请求后需要将消息进行持久化,一旦涉及到持久化,服务器的性能会急速下降,甚至出现卡顿、掉帧的现象!消费者在读取磁盘时也会被迫慢下来简直是“血泪史”。

RocketMQ(三):面对高并发请求,如何高效持久化消息?

先说说那些神奇的文件:CommitLog、 ConsumerQueue、IndexFile

Broker提供三套文件来满足高并发下的持久化需求:,这事儿我得说道说道。

  • CommitLog——顺序写入的大肥肉文件。
  • ConsumerQueue——二级索引的“小碎片”。
  • IndexFile——哈希索引,像是给消息装上了GPS。

深得我心。 为了避免写入时产生巨大的IO开销, RocketMQ干脆把所有Topic和队列的消息都塞进同一个CommitLog里顺序写、顺序读,省得每次都去找“哪儿”。

CommitLog 的命名哲学

踩个点。 CommitLog 文件以起始偏移量命名, 每个文件固定 1GB:

00000000000000000000
00000000001073741824
00000000002147483648
……

这些文件在源码里叫 MappedFile用的是 mmap所以写入几乎没有拷贝开销。 实不相瞒... 可别小看它们,一个 MappedFile 就相当于一块“大块头”。

ConsumerQueue:二级索引的灵魂伴侣

ConsumerQueue 按 Topic 分一级, 再按队列 ID 分二级, 拭目以待。 每个文件大小固定 6MB,记录长度只有 20B:

  • 8B —— CommitLog 偏移量。
  • 4B —— 消息体大小。
  • 8B —— Tag 哈希,用于过滤。

这套结构可以类比 MySQL 的聚簇索引 总的来说... + 二级索引——先定位二级,再回表找主键。

IndexFile:哈希索引的终极武器

IndexFile 由三部分组成:文件头+ 哈希槽+ 索引项。它把 的 hash 值映射到槽位, 然后链式指向真正的索引项,实现 O 查询,没眼看。。

常见消息存储组件对比表
组件名称存储方式单条记录大小适用场景
CommitLogMMap 顺序写100B高吞吐写入场景
ConsumerQueueMMap 顺序追加20B 固定长PULL 拉取定位
IndexFileMMap 哈希+链表20B 固定长精确查询/Tag过滤

刷盘与同步:异步 VS 同步,那你选哪个?🤔

刷盘 = 把内核缓冲区的数据强制落地磁盘。

  • 同步刷盘🔒:生产者必须等磁盘真正落地才返回,平安但慢。
  • 异步刷盘⚡:BROKER 自己每隔几百毫秒批量落地,吞吐高但有丢数据风险。
  • SLA 模式:If you love risk, choose async; if you love data integrity, choose sync.

同步刷盘内部实现小窥视👀

public void run {
    while ) {
        // 交换写队列和读队列
        swapRequests;
        // 遍历读队列施行 flush
        for  {
            boolean flushOK = this.mappedFile.flush;
            req.wakeupFuture;
        }
    }
}

异步刷盘小剧场🎭

public void run {
    while ) {
        // 等待配置好的间隔时间或被唤醒
        waitForRunning;
        // 按最少页数阈值刷盘
        mappedFileQueue.flushLeastPages;
    }
}

主从复制:又是一出好戏!📡

探探路。 BROKER 主节点在本地完成持久化后会把同样的数据通过网络推送给从节点。复制同样支持同步/异步两种模式。默认异步,这意味着如果主节点挂了从节点可能只收到了部分日志,需要在启动时进行“日志补齐”。这就是所谓的 “到头来一致性”——数据到头来会统一,但期间可能出现短暂的不一致。

复制流程速记⚙️

  1. #Producer 发 RPC 给 Master Broker。
  2. #Master 写 CommitLog 并提交 Flush 请求。
  3. #Master 将已写入的字节通过 HAService 推送给 Slave。
  4. #Slave 收到后同样走一次 Flush 流程保证磁盘落地。
  5. #若 Master 挂掉,则 Slave 会晋升为新的 Master 并继续服务。

再聊聊 ReputMessageService & Dispatcher 系列 🚀

The story doesn't end at CommitLog! 当一条消息成功写入 CommitLog 后 它还要经过 "重投" 步骤, 实不相瞒... 把信息拆解成多个子任务交给不同的 Dispatcher:

  • CommitLogDispatcherBuildConsumeQueue: 把消息位置写进 ConsumerQueue;
  • CommitLogDispatcherBuildIndex: 把 key‑hash 写进 IndexFile;

This design decouples heavy I/O of message storage from lightwei 别怕... ght indexing work, allowing each part to be flushed independently.

代码片段乱弹 🎸

// 重投核心循环
while  {
    SelectMappedBufferResult result = commitLog.getData;
    if  {
        try {
            DispatchRequest dr = DefaultMessageStore.decodeDispatchRequest;
            doDispatch; // 调度到各个 Dispatcher
            offset += dr.getMsgSize;
        } finally { result.release; }
    } else { doNext = false; }
}
}

清理机制:垃圾不怕, 我有扫帚🧹

BROKER 并不是把旧日志永久保存,它会定期跑两个清理线程:

清理服务一览表
Name  Description  
`CleanCommitLogService` 删除已消费完且过期的 CommitLog 文件
`CleanConsumeQueueService` 删除无用 ConsumerQueue & IndexFile
`ReputMessageService` 负责将 CommitLog 中的新数据重新投递到索引层
`HAService` 处理主从复制相关清理与恢复工作
🌟 随机产品功能对比 🌟
功能点 RocketMQ Kafka ActiveMQ 备注
顺序写 ✔︎ ✔︎ ✖︎ All in one
MMap 支持 ✔︎ ✖︎ ✖︎ 唯一优势
事务消息 ✔︎ ✖︎ ✔︎

  • A] 顺序写 + MMap = 磁盘 IO 最低消耗 🐢➡🐇;

白嫖。 B] 异步刷盘 + 批量提交 = CPU 利用率飙升 🚀;

  • C] 两层索引 = 查询毫秒级完成;
  • D] 主从异步复制 = 高可用 + 可容忍短暂数据不一致;
  • E] 清理线程 + 定期回收 = 磁盘空间永不枯竭。


  • 提交需求或反馈

    Demand feedback