RocketMQ原理源碼解析

本文的內容主要包括:MQ的對比和選型、RocketMQ原理、RocketMQ源碼解析、事務消息。

如發現本文中所寫有錯誤或者不準確之處,非常歡迎指出,我會及時調研修正!

Rocketmq性能壓測客戶端:https://github.com/Force-King/rocketmq-test

 

一、MQ的對比和選型

目前業界主要的消息中間件有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。其中開源的,社區比較活躍的是ActiveMQ、RabbitMQ、Kafka、RocketMQ。所以,我們做技術選型時,需要考慮從以上4種開源的MQ中選取。以下是這4種mq的特性:

RabbitMQ、Kafka、RocketMQ性能測試對比:

在同步發送場景中,三個消息中間件的表現區分明顯:

Kafka的吞吐量高達17.3w/s,不愧是高吞吐量消息中間件的行業老大。這主要取決於它的隊列模式保證了寫磁盤的過程是線性IO。此時broker磁盤IO已達瓶頸。

RocketMQ也表現不俗,吞吐量在11.6w/s,磁盤IO %util已接近100%。RocketMQ的消息寫入內存後即返回ack,由單獨的線程專門做刷盤的操作,所有的消息均是順序寫文件。

RabbitMQ的吞吐量5.95w/s,CPU資源消耗較高。它支持AMQP協議,實現非常重量級,爲了保證消息的可靠性在吞吐量上做了取捨。RabbitMQ在消息持久化場景下,吞吐量在2.6w/s左右。

 

Redis(補充)

是一個Key-Value的NoSQL數據庫,開發維護很活躍,同時它也支持MQ功能,所以完全可以當做一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不同大小的數據。實驗表明:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而如果數據大小超過了10K,Redis則慢的無法忍受;出隊時,無論數據大小,Redis都表現出非常好的性能,而RabbitMQ的出隊性能則遠低於Redis。

 

綜合以上對比,可以得出,RocketMQ 和 Kafka 的吞吐量最高,在高吞吐量的業務要求下,首選這兩個mq。

Kafka 在topic 達到一定量以上時,吞吐量急劇下降;而RocketMQ影響較小。如下圖:

 

從以上幾個圖可以看出,隨着業務增多,topic 增多,Kafka的穩定性比RocketMQ要差不少。

所以綜合來看,作爲互聯網行業,肯定會涉及資金轉賬等要求事務一致的場景, Kafka 不支持分佈式事務,RocketMQ 支持分佈式事務,而且理論上不會丟消息,整體對比來看,還是RocketMQ更加優越。而且RocketMQ是經歷了多次雙十一考驗的,可用性是毋庸置疑的!

 

二、RocketMQ原理

1. RocketMQ的發展歷史

        在2007年的時候,淘寶實施了「五彩石」項目,「五彩石」用於將交易系統從單機變成分佈式,也是在這個過程中產生了阿里巴巴第一代消息引擎——Notify。在2010年的時候,阿里巴巴B2B部門基於ActiveMQ的5.1版本也開發了自己的一款消息引擎,稱爲Napoli,這款消息引擎在B2B裏面廣泛地被使用,不僅僅是在交易領域,在很多的後臺異步解耦等方面也得到了廣泛的應用。在2011年的時候,業界出現了現在被很多大數據領域所推崇的Kafka消息引擎,阿里在研究了Kafka的整體機制和架構設計之後,基於Kafka的設計使用Java進行了完全重寫並推出了MetaQ 1.0版本,主要是用於解決順序消息和海量堆積的問題。而在2012年,阿里對於MetaQ進行了架構重組升級,開發出了MetaQ 2.0,這時就發現MetaQ原本基於Kafka的架構在阿里巴巴如此龐大的體系下很難進行水平擴展,所以在2012年的時候就開發了RocketMQ 3.0版本。很多人會問到RocketMQ 3.0和MetaQ 3.0的區別,其實這兩者是等價的版本,只不過阿里內部使用的稱爲MetaQ 3.0,外部開源稱之爲RocketMQ 3.0。在2015年,又基於RocketMQ開發了阿里雲上的Aliware MQ和Notify 3.0。在2016年的時候,阿里巴巴將RocketMQ的內核引擎捐贈給了Apache基金會。

        以上就是RocketMQ的整體發展歷史,其實在阿里巴巴內部圍繞着RocketMQ內核打造了三款產品,分別是MetaQ、Notify和Aliware MQ。這三者分別採用了不同的模型,MetaQ主要使用了拉模型,解決了順序消息和海量堆積問題;Notify主要使用了推模型,解決了事務消息;而云產品Aliware MQ則是提供了商業化的版本。如圖:

下圖是RocketMQ在歷年雙11的消息流轉體量:

單位:十億

 

2. RocketMQ的原理和架構

RocketMQ消息隊列的整體部署架構如下圖所示:

RocketMQ消息隊列集羣中的幾個角色:

  • NameServer:命名發現服務,更新和路由發現broker;其在RocketMQ中起着中轉承接的作用,是一個無狀態的服務,多個NameServer之間不通信。
  • Broker-Master:broker 消息主機服務器。
  • Broker-Slave:broker 消息從機服務器。
  • Producer:消息生產者。
  • Consumer:消息消費者。

RocketMQ集羣的一部分通信如下:

  • Broker啓動後需要完成一次將自己註冊至NameServer的操作;隨後每隔30s時間定期向NameServer上報Topic路由信息;
  • 消息生產者Producer作爲客戶端發送消息時候,需要根據Msg的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒有則更新路由信息會從 NameServer上重新拉取;
  • 消息生產者Producer根據所獲取的路由信息選擇一個隊列(MessageQueue)進行消息發送;Broker作爲消息的接收者接收消息並落盤存儲。

從上面可以看出在消息生產者,在Broker和NameServer間都會發生通信(這裏只說了MQ的部分通信),因此如何設計一個良好的網絡通信模塊在MQ中至關重要,它將決定RocketMQ集羣整體的消息傳輸能力與最終性能。

rocketmq-remoting 模塊是 RocketMQ消息隊列中負責網絡通信的模塊,它幾乎被其他所有需要網絡通信的模塊(諸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依賴和引用。

爲了實現客戶端與服務器之間高效的數據請求與接收,RocketMQ消息隊列自定義了通信協議並在Netty的基礎之上擴展了通信模塊。

 

鑑於RocketMQ通信模塊的底層源碼是Netty實現的,在閱讀RocketMQ的源碼之前,建議讀者先對Netty的多線程模型、JAVA NIO模型均有一定的瞭解,以便快速理解RocketMQ源碼。

 

三、RocketMQ源碼解析

1. 源碼目錄結構介紹

本文使用的RocketMQ版本是4.4.0, 依賴的netty版本是4.0.42.Final.

請參考:Netty基本原理

RocketMQ的代碼結構圖如下:

RocketMQ源碼主要分爲以下幾個package:

rocketmq-broker:mq的核心,它能接收producer和consumer的請求,並調用store層服務對消息進行處理。HA服務的基本單元,支持同步雙寫,異步雙寫等模式。

rocketmq-client:mq客戶端實現,目前官方僅僅開源了java版本的mq客戶端,c++,go客戶端有社區開源貢獻。

rocketmq-common:一些模塊間通用的功能類,比如一些配置文件、常量。

rocketmq-example:官方提供的例子,對典型的功能比如order message,push consumer,pull consumer的用法進行了示範。

rocketmq-filter:消息過濾服務,相當於在broker和consumer中間加入了一個filter代理。

rocketmq-namesrv:命名服務,更新和路由發現 broker服務。

rocketmq-remoting:基於netty的底層通信實現,所有服務間的交互都基於此模塊。

rocketmq-srvutil:解析命令行的工具類ServerUtil。

rocketmq-store:存儲層實現,同時包括了索引服務,高可用HA服務實現。

rocketmq-tools:mq集羣管理工具,提供了消息查詢等功能。

 

RocketMQ 的邏輯部署結構:

RocketMQ主要的功能集中在rocketmq-broker、rocketmq-remoting、rocketmq-store 三個模塊中,所以我們接下來主要分析這三個模塊的源碼。

先主要講解rocketmq 啓動時,都做了什麼。

 

2. RocketMQ 服務啓動

啓動類:org.apache.rocketmq.broker.BrokerController

這個類是服務啓動時執行,初始化了發送消息、消費消息、清理過期請求等各種線程池和監聽事件。

瞭解了mq服務啓動的過程,接下來,我們按照一條消息從客戶端發出,最終到服務端的存儲層並如何落盤,這一條調用鏈來分析源碼,瞭解一條消息是怎麼處理的。

 

3. Client

通常爲了不影響業務層性能,都是使用異步發送,所以接下來,我們分析消息異步發送的接口代碼。

消息由客戶端MQProducer發出

sendKernelImpl方法調用了MQClientAPIImpl類的sendMessage()。如下圖:

sendMessage() 通過判斷髮送類型,最終調用了MQClientAPIImpl類的sendMessageAsync()。如下圖:

可看出,該方法調用了通信層RemotingClient接口(實現類是NettyRemotingClient)的invokeAsync方法。

 

4. Remoting通信層

Remoting模塊類結構圖:

消息從Client調入通信層的NettyRemotingClient:

NettyRemotingAbstract類的invokeAsyncImpl():

寫入通道後,就等待Netty的Selector輪詢出來,調用後續broker處理任務。

 

5. Broke

發送的消息到達broker,調用 org.apache.rocketmq.broker.processor.SendMessageProcessor 類的processRequest()方法,processRequest()調用sendMessag(),如下圖:

下圖是sendMessag()方法:

該方法調用了MessageStore接口的putMessage()方法,而MessageStore的實現類是DefaultMessageStore,該類是位於存儲層store,在後續的存儲層會講解。

 

6. Store存儲層

存儲層時rocketmq的重中之重,也是決定mq性能的主要之處,這裏我們將詳細分析。

(1) 存儲層整體結構和源碼

業務層都是通過DefaultMessageStore類做爲操作入口。

DefaultMessageStore.putMessage()方法:

可以看出,前面的都是一些驗證,後邊最終調用了CommitLog的putMessage()方法。

CommitLog.putMessage():

RocketMQ下主要有6類文件,分別是三類大文件:Index文件,consumequeue文件,commitlog文件。三類小文件:checkpoint文件,config目錄下的配置文件.和abort。

而對於三類大文件,使用的就是NIO的MappedByteBuffer類來提高讀寫性能。這個類是文件內存映射的相關類,支持隨機讀和順序寫。在RocketMQ中,被封裝成了MappedFile類。

RocketMQ對於每類大文件,在存儲時候分割成了多個固定大小的文件,每個文件名爲前面所有的文件大小加1(也就是偏移量)。從而實現對整個大文件的串聯拼接。接下來就需要看看MappedFIle這個封裝類了。

MappedFile.appendMessagesInner():

這裏可以看到,調用了AppendMessageCallback.doAppend()方法,而AppendMessageCallback是個接口,它的實現類DefaultAppendMessageCallback就在CommitLog類中,是個內部類。

DefaultAppendMessageCallback..doAppend():

方法前面是內存寫入位置和偏移量的計算,以及一些消息數據的封裝準備。後邊最終調用byteBuffer.put()方法寫入內存,並計算了耗時,將消息寫入的內存的位置信息、寫入耗時封裝爲AppendMessageResult對象返回。

到這裏,消息寫入內存就算完成了,之後就是通過判斷配置文件的主從同步類型和刷盤類型,進行刷盤和HA主從同步。

(2) 刷盤

刷盤類型:

刷盤方式有三種:

其中CommitRealTimeService是老一些版本中沒有的,它爲開啓內存字節緩存的刷盤服務。

CommitRealTimeService和FlushRealTimeService都繼承了FlushCommitLogService。

這三個類都在CommitLog類中,是內部類。

FlushRealTimeService

使用配置參數:

flushCommitLogTimed=false // 刷新策略(默認是實時刷盤)

flushIntervalCommitLog=500 // 刷盤時間間隔(默認500ms)

flushCommitLogLeastPages=4 // 每次刷盤至少需要多少個page(默認是4個)

flushCommitLogThoroughInterval = 1000 * 10 // 徹底刷盤間隔時間(默認10s)

FlushRealTimeService的run方法:

大致邏輯:

  • 如果 當前時間 >(最後一次刷盤時間 + 徹底刷盤間隔時間(10s)),則將最新一次刷盤時間更新爲當前時間
  • 如果是實時刷盤,每隔一定時間間隔,該線程休眠500毫秒

如果不是實時刷盤,則調用waitForRunning,即每隔500毫秒或該刷盤服務線程調用了wakeup()方法之後結束阻塞。最後調用 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages)進行刷盤。

繼續深入MappedFileQueue.flush()方法:

可以看到這裏調用了mappedFile.flush(flushLeastPages):

這裏最終調用了NIO 的MappedByteBuffer.force();通過使用MappedByteBuffer,提高大文件磁盤讀寫性能。

CommitRealTimeService

包含提交和異步刷盤邏輯,專門爲開啓內存字節緩衝區的刷盤服務。transientStorePoolEnable=true 時使用這種方式。

使用配置參數:

commitIntervalCommitLog = 200 // 提交到FileChannel的時間間隔,默認0.2s

commitCommitLogLeastPages=4 //每次提交到File至少需要多少個page(默認是4個)

commitCommitLogThoroughInterval = 200 //提交完成間隔時間(默認0.2s)

代碼邏輯與FlushRealTimeService基本一樣,這裏就不貼出來了。

消息追加時,先直接寫入堆內內存writeBuffer,然後定時commit到堆外內存FileChannel,再定時flush。

總結:

1、同步刷盤

每次發送消息,消息都直接存儲在FileChannel中,使用的是(MapFile的MappdByteBuffer),然後直接調用force()方法刷寫到磁盤,等到force刷盤成功後,再返回給調用方,(GroupCommitRequest.waitForFlush)就是其同步調用的實現。

2、異步刷盤

分爲兩種情況,是否開啓堆內存緩存池,具體配置參數:MessageStoreConfig.transientStorePoolEnable

transientStorePoolEnable=true

先寫入內存字節緩衝區(writeBuffer) ----> 從內存字節緩衝區(write buffer)提交(commit)到文件通道(fileChannel) ----> 文件通道(fileChannel)定時flush到磁盤

transientStorePoolEnable=false(默認)

寫入映射文件字節緩衝區(mappedByteBuffer) ----> 映射文件字節緩衝區(mappedByteBuffer)定時flush

(3) 主從同步HA

有兩種類型:

同步雙寫 SYNC_MASTER

異步複製 ASYNC_MASTER

如果是SYNC_MASTER模式,消息發送者將消息刷寫到磁盤後,需要繼續等待新數據被傳輸到從服務器,從服務器數據的複製是在另外一個線程HAConnection中去拉取,所以消息發送者在這裏需要等待數據傳輸的結果,GroupTransferService就是實現該功能。而ASYNC_MASTER模式,消息在master寫入成功,即會返回成功,無需等待slave。

所以,異步複製性能高於同步雙寫,業務沒有主從強一致要求的話,推薦使用ASYNC_MASTER模式。

以下是異步複製的源碼:

 

7. RocketMQ 事務消息

在RocketMQ中生產者有三種角色NormalProducer(普通)、OrderProducer(順序)、TransactionProducer(事務),我們這裏用TransactionProducer(事務)來解決分佈式事務問題。

說到分佈式事務,就會談到經典的」賬戶轉賬」問題:2個賬戶,分佈處於2個不同的DB,或者說2個不同的系統裏面,A要扣錢,B要加錢,如何保證原子性?

一般的思路都是通過消息中間件來實現「最終一致性」:A系統扣錢,然後發條消息給中間件,B系統接收此消息,進行加錢。

但這裏面有個問題:A是先update DB,後發送消息呢?還是先發送消息,後update DB?

假設先update DB成功,發送消息網絡失敗,重發又失敗,怎麼辦? 

假設先發送消息成功,update DB失敗。消息已經發出去了,又不能撤回,怎麼辦?

所以,這裏下個結論:只要發送消息和update DB這2個操作不是原子的,無論誰先誰後,都是有問題的。

那這個問題怎麼解決呢??

爲了能解決該問題,同時又不和業務耦合,RocketMQ提出了「事務消息」的概念。

 

rocketmq實現分佈式事務:

通過消息的異步事務,可以保證本地事務和消息發送同時執行成功或失敗,從而保證了數據的最終一致性。

  • 發送prepare消息,該消息對Consumer不可見
  • 執行本地事務
  • 若本地事務執行成功,則向MQ提交消息確認發送指令;若本地事務執行失敗,則向MQ發送取消指令
  • 若MQ長時間未收到確認發送或取消發送的指令,則向業務系統詢問(回查)本地事務狀態,並做補償處理

具體來說,就是把消息的發送分成了2個階段:Prepare階段和確認階段。

流程圖如下:

源碼位於broker.transaction下:

注意:3.2.6之後的版本事務回查接口被刪除了,阿里不對外開源了,事務回查接口需要使用者自己編寫。