RocketMQ分佈式事務原理介紹

RocketMQ實現分佈式事務原理

1.知識準備
在系統架構從單體到分佈式、SOA、微服務的發展過程中,因爲流量的增多出現了大量消息堆積問題的需求,在這種背景下,阿里開發出rocketmq來解決該問題,並且經過雙十一的考驗,在低延遲和高可靠性方面優於kafka,很好的應用在海量消息系統中。

異步解耦:一個大的交易系統中會有一些核心業務系統和非核心業務系統,每筆交易會與上下游交易系統進行交互,比如在下單的過程中會有物流/積分/優惠券等,整個業務系統龐大複雜,rocketmq通過異步通信和應用解耦,目的就是爲了不讓非核心的業務系統影響核心的業務系統運行而失去用戶。

削峯填谷:比如某應用的處理能力是每秒 10 個請求。在某一秒,突然到來了 30 個請求,而接下來兩秒,都沒有請求到達。在這種情況下,如果直接拒絕 20 個請求,應用在接下來的兩秒就會空閒。所以,需要把請求突刺均攤到一段時間內,讓系統負載保持在請求處理水位之內,同時儘可能地處理更多請求,從而起到「削峯填谷」的效果。Rocketmq可提供這種削峯填谷的服務來解決這種問題。

分佈式事務一致性:比如在分佈式系統的交易過程中,下單的用戶有紅包、優惠券等優惠,多個數據源事務,這種場景引入rocketmq既可以實現系統之間的解耦還可以保證最終的數據一致性。

在這裏插入圖片描述

Name Server 可集羣部署,節點之間無任何信息同步。提供服務發現和路由 ; Broker(消息中轉角色,負責存儲消息,轉發消息) 部署相對複雜,Broker 分爲Master 與Slave,一 個Master 可以對應多個Slave,但是一個Slave 只能對應一個Master,Master 與Slave 的對應關係通過 指定相同的BrokerName,不同的BrokerId來定 義,BrokerId爲0 表示Master,非0 表示Slave。 Producer,生產者,擁有相同 Producer Group 的 Producer 組成一個集羣, 與Name Server 集羣 中的其中一個節點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,並向提供Topic 服務的Master 建立長連接,且定時向Master 發送心跳。Producer 完全無狀態,可集羣部署; Consumer,消費者,接收消息進行消費的實例,擁有相同 Consumer Group 的 Consumer 組成 一個集羣,與Name Server 集羣中的其中一個節點(隨機選擇)建立長連接,定期從Name Server 取 Topic 路由信息,並向提供Topic 服務的Master、Slave 建立長連接,且定時向Master、Slave 發送心 跳。Consumer既可以從Master 訂閱消息,也可以從Slave 訂閱消息,訂閱規則由Broker 配置決定。

長鏈接:C、S建立連接,不斷開,報文發送、接收通訊連接一直存在; 短鏈接:C、S每進行一次報文收發交易時才建立鏈接,交易完後立即斷開。

在這裏插入圖片描述

首先來看本地通訊的情況,應用程序A和應用程序B運行於同一系統1,它們之間可以藉助消息隊列技術進行彼此的通訊:應用程序A向隊列1發送一條信息,而當應用程序B需要時就可以得到該信息。 其次是遠程通訊的情況,如果信息傳輸的目標改爲在系統2上的應用程序C,這種變化不會對應用程序A產生影響,應用程序A向隊列2發送一條信息,系統1的MQ發現Q2所指向的目的隊列實際上位於系統2,它將信息放到本地的一個特殊隊列——傳輸隊列(Transmission Queue)。建立一條從系統1到系統2的消息通道,消息通道代理將從傳輸隊列中讀取消息,並傳遞這條信息到系統2,然後等待確認。只有MQ接到系統2成功收到信息的確認之後,它才從傳輸隊列中真正將該信息刪除。如果通訊線路不通,或系統2不在運行,信息會留在傳輸隊列中,直到被成功地傳送到目的地。這是MQ最基本而最重要的技術–確保信息傳輸,並且是一次且僅一次(once-and-only-once)的傳遞。

在這裏插入圖片描述

2.實現分佈式事務原理
基本概念: 分佈式系統CAP理論

在這裏插入圖片描述

CAP:分爲A/C/P 可用性、一致性、分區容錯,這三個特徵是不可以兼得的,只能得到其中兩個 部分,
如果不要求P(不允許分區),則C(強一致性)和A(可用性)是可以保證的。但放棄P的同時也就意味着放棄了系統的擴展性,也就是分佈式節點受限,沒辦法部署子節點,這是違背分佈式系統設計的初衷的。
在分佈式網絡中P是必須的,在節點A、B、C上保留了三份,如果對節點A上的數據進行了修改,讓客戶端通過網絡對該數據進行讀取,此時數據從一個節點複製到另外的節點時至少需要時間(網絡暢通?),所以,當P發生時,這時候兩個選擇:
1.如果三份數據完全一致後返回。也就是說,這時從任何一個網絡節點讀取的數據都是一樣的,這就是所謂的強一致性讀,等數據在網絡中的複製,同時A、B、C三個節點中任何一個宕機,都會導致數據不可用。也就是說,要保證強一致性,網絡中的副本越多,數據的可用性就越差(C/P)。
2.如果允許讀操作立即返回,容忍B節點的讀取與A節點的讀取不一致的情況發生。這樣一來,可用性顯然得到了提高,網絡中的副本也可以多一些,唯一得不到保證的是數據一致性。(A/P)

強一致性:分佈式系統中,更新操作執行成功後所有的用戶都應該讀到最新的值,這樣的系統被認爲是具有強一致性的。 等同於所有節點訪問同一份最新的數據副本;
可用性:每一個操作總是能夠在一定的時間內返回結果,這裏需要注意的是"一定時間內"和"返回結果"。一定時間指的是,在可以容忍的範圍內返回結果,結果可以是成功或者失敗。 對數據更新具備高可用性(A);
分區容錯性:理解爲在存在網絡分區的情況下,仍然可以接受請求,系統如果不能在時限內達成數據一致性,就意味着發生了分區的情況,必須就當前操作在C和A之間做出選擇。

方式: 兩階段提交(2PC) 三階段提交(3PC) MQ事務消息(最終一致性)

兩階段提交:

階段一

a) 協調者向所有參與者發送事務內容,詢問是否可以提交事務,等待答覆。

b) 各參與者執行事務操作,將 undo 和 redo 信息記入事務日誌中(但不提交事務)。

c) 如參與者執行成功,給協調者反饋 yes,否則反饋 no。

階段二

如果協調者收到了參與者的失敗消息或者超時,直接給每個參與者發送回滾(rollback)消息;否則,發送提交(commit)消息。兩種情況處理如下:

情況1:當所有參與者均反饋 yes,提交事務

a) 協調者向所有參與者發出正式提交事務的請求(即 commit 請求)。

b) 參與者執行 commit 請求,並釋放整個事務期間佔用的資源。

c) 各參與者向協調者反饋 ack(應答)完成的消息。

d) 協調者收到所有參與者反饋的 ack 消息後,即完成事務提交。

情況2:當有一個參與者反饋 no,回滾事務

a) 協調者向所有參與者發出回滾請求(即 rollback 請求)。

b) 參與者使用階段 1 中的 undo 信息執行回滾操作,並釋放整個事務期間佔用的資源。

c) 各參與者向協調者反饋 ack 完成的消息。

d) 協調者收到所有參與者反饋的 ack 消息後,即完成事務。

三階段提交:

三階段提交是在二階段提交上的改進版本,最關鍵要解決的就是協調者和參與者同時掛掉的問題。

詢問
響應
提交

階段一

a) 協調者向所有參與者發出包含事務內容的 canCommit 請求,詢問是否可以提交事務,並等待所有參與者答覆。

b) 參與者收到 canCommit 請求後,如果認爲可以執行事務操作,則反饋 yes 並進入預備狀態,否則反饋 no。

階段二

協調者根據參與者響應情況,有以下兩種可能。

情況1:所有參與者均反饋 yes,協調者預執行事務

a) 協調者向所有參與者發出 preCommit 請求,進入準備階段。

b) 參與者收到 preCommit 請求後,執行事務操作,將 undo 和 redo 信息記入事務日誌中(但不提交事務)。

c) 各參與者向協調者反饋 ack 響應或 no 響應,並等待最終指令。

情況2:只要有一個參與者反饋 no,或者等待超時後協調者尚無法收到所有提供者的反饋,即中斷事務

a) 協調者向所有參與者發出 abort 請求。

b) 無論收到協調者發出的 abort 請求,或者在等待協調者請求過程中出現超時,參與者均會中斷事務。

階段三

該階段進行真正的事務提交,也可以分爲以下兩種情況。

情況 1:所有參與者均反饋 ack 響應,執行真正的事務提交

a) 如果協調者處於工作狀態,則向所有參與者發出 do Commit 請求。

b) 參與者收到 do Commit 請求後,會正式執行事務提交,並釋放整個事務期間佔用的資源。

c) 各參與者向協調者反饋 ack 完成的消息。

d) 協調者收到所有參與者反饋的 ack 消息後,即完成事務提交。

情況2:只要有一個參與者反饋 no,或者等待超時後協調組尚無法收到所有提供者的反饋,即回滾事務。

a) 如果協調者處於工作狀態,向所有參與者發出 rollback 請求。

b) 參與者使用階段 1 中的 undo 信息執行回滾操作,並釋放整個事務期間佔用的資源。

c) 各參與者向協調組反饋 ack 完成的消息。

d) 協調組收到所有參與者反饋的 ack 消息後,即完成事務回滾。

優點:比二階段提交,三階段提交降低阻塞範圍,在等待超時後協調者或參與者會中斷事務。避免了協調者單點問題。階段 3 中協調者出現問題時,參與者會繼續提交事務。
缺點:數據不一致問題依然存在,當在參與者收到 preCommit 請求後等待 do commite 指令時,此時如果協調者請求中斷事務,而協調者無法與參與者正常通信,會導致參與者繼續提交事務,造成數據不一致。

MQ事務消息(最終一致性)
優點:消息數據獨立存儲,降低業務系統與消息系統之間的耦合;吞吐量優於本地消息表方案。
缺點:一次消息發送需要兩次網絡請求(half消息 + commit/rollback),需要實現消息回查接口。

RocketMQ是一種最終一致性的分佈式事務,就是說它保證的是消息最終一致性,而不是像2PC、3PC那樣強一致分佈式事務,至於爲什麼說它是最終一致性事務下面詳細說明。

在這裏插入圖片描述

場景分析:

1.網絡正常,程序邏輯正確;

實現最終一致性的目的。

2.Brock服務器異常(步驟一、步驟四);

服務器正常後,進入下一步。

3.執行本地事務失敗(網絡原因,沒有返回結果);

回查執行情況。

4.A發送成功,B消費失敗。

一般是代碼有問題,因爲消費端RocketMQ有重試機制,如果不是代碼問題一般重試幾次就能成功;

人工兜底處理後,就可以讓事務達到最終的一致性。

3.問題
1.什麼叫半消息,爲什麼要先發送Half Message(半消息)?

半消息是指暫不能被Consumer消費的消息。Producer 已經把消息成功發送到了 Broker 端,但此消息被標記爲暫不能投遞狀態,處於該種狀態下的消息稱爲半消息。需要 Producer 對消息的二次確認後,Consumer才能去消費它。

半消息的作用:

1)可以先確認 Brock服務器是否正常 ,如果半消息都發送失敗了 那說明Brock掛了。

2)可以通過半消息來回查事務,如果半消息發送成功後一直沒有被二次確認,那麼就會回查事務狀態。

2.半消息二次確認的時機是什麼時候?

本地事務返回成功或者失敗,並且producer拿到了本地事務執行的結果。

3.什麼情況會回查?

由於網絡閃段,生產者應用重啓等原因。導致 Producer 端一直沒有對 Half Message(半消息) 進行 二次確認。這是Brock服務器會定時掃描長期處於半消息的消息,會主動詢問 Producer端 該消息的最終狀態(Commit或者Rollback),該消息即爲 消息回查。

1)執行本地事務的時候,由於突然網絡等原因一直沒有返回執行事務的結果(commit或者rollback)導致最終返回UNKNOW,那麼就會回查。
2) 本地事務執行成功後,返回Commit進行消息二次確認的時候的服務掛了,在重啓服務那麼這個時候在brock端 它還是個Half Message(半消息),這也會回查。

4.如回查,先查看當前事務的執行情況,還是是否需要重新執行本地事務?

先查看當前事務的執行情況,再看是否需要重新執行本地事務。 如果出現第二種情況而引起的回查,如果不先查看當前事務的執行情況,而是直接執行事務,那麼就相當於成功執行了兩個本地事務。

5.爲什麼說rocketMQ是最終一致性事務?

除了場景分析中的數據問題,數據可以達到一致性,還有人工兜底處理後,就可以讓事務達到最終的一致性。

6.實際事務場景會出現幾種情況,事務不一致的會有幾種情況中,是否會發生?

三種:A成功B成功;A成功B失敗;A失敗B失敗;A失敗B成功;

兩種:A成功B失敗;A失敗B成功;

A成功B失敗會發生。

7.A服務成功,B失敗發生後的處理?

1)代碼bug修復;

  1. 人工介入。

遺留問題:
1.回查是怎麼實現的;

應用發送事務消息的核心類是TransactionMQProducer,這個類的sendMessageTransaction方法,該方法做了兩件事:

檢查transactionListener是否存在;
調用父類執行事務消息發送;
TransactionListener在事務消息流程中起到至關重要的作用,executeLocalTransaction方法對應的就是執行本地事務操作,checkLocalTransaction對應的就是回查本地事務操作來實現對消息的回查。

在這裏插入圖片描述

2.半消息的完整解決方案;

Client端 中半消息是在DefaultMQProducer類的sendMessageInTransaction方法中進行發送,該方法做了四件事:

給消息打上事務消息相關的標記,用於MQ服務端區分普通消息和事務消息
發送半消息(half message)
發送成功則由transactionListener執行本地事務
執行endTransaction方法,如果半消息發送失敗或本地事務執行失敗告訴服務端是刪除半消息,半消息發送成功且本地事務執行成功則告訴服務端生效半消息。
Server在TransactionalMessageBridge類處理half message:

在這裏插入圖片描述

putHalfMessage(MessageExtBrokerInner messageInner)
parseHalfMessageInner(MessageExtBrokerInner msgInner)
這兩個方法主要做了:
將消息的topic,queueId放進消息體自身的map裏進行緩存
將消息的topic 設置爲「RMQ_SYS_TRANS_OP_HALF_TOPIC」,queueId設置爲0
將消息寫入磁盤持久化
可以看到所有的事務半消息都會被放進同一個topic的同一個queue裏面,通過對topic的區分,從而避免了半消息被consumer給消費到。

定時任務回查流程:

TransactionalMessageService類的線程中check方法都是對半消息進行過濾(如超過72小時的事務消息,就被算作過期),只保留符合條件的半消息對其進行回查。

在這裏插入圖片描述

服務器在收到Client發過來的Commit消息後會 讀出半消息——>恢復topic等原消息體的信息——>和普通消息一樣再次寫入磁盤——>刪除之前的半消息如果是Rollback消息則直接刪除之前的半消息,到此,整條RocketMQ 事務消息的調用鏈就結束了。

3.什麼是兩階段;

兩階段是執行事務過程分爲兩個階段:投票(詢問)、提交執行。

第一階段:

協調者向所有參與者發送prepare請求與事務內容,詢問是否可以準備事務提交,並等待參與者的響應。
參與者執行事務中包含的操作,並記錄undo日誌(用於回滾)和redo日誌(用於重放),但不真正提交。
參與者向協調者返回事務操作的執行結果,執行成功返回yes,否則返回no。
第二階段:

分爲成功與失敗兩種情況。

若所有參與者都返回yes,說明事務可以提交:

協調者向所有參與者發送commit請求。
參與者收到commit請求後,將事務真正地提交上去,並釋放佔用的事務資源,並向協調者返回ack。
協調者收到所有參與者的ack消息,事務成功完成。
若有參與者返回no或者超時未返回,說明事務中斷,需要回滾:

協調者向所有參與者發送rollback請求。
參與者收到rollback請求後,根據undo日誌回滾到事務執行前的狀態,釋放佔用的事務資源,並向協調者返回ack。
協調者收到所有參與者的ack消息,事務回滾完成。

4.什麼是三階段;

三階段是執行事務過程分爲三個階段:詢問、響應、提交。

詢問:協調者向參與者發送commit請求,參與者如果可以提交就返回Yes響應,否則返回No響應。

響應: Coordinator根據Cohort的反應情況來決定是否可以繼續事務的PreCommit操作。 根據響應情況,有以下兩種可能:

假如Coordinator從所有的Cohort獲得的反饋都是Yes響應,那麼就會進行事務的預執行;
假如有任何一個Cohort向Coordinator發送了No響應,或者等待超時之後,Coordinator都沒有接到Cohort的響應,那麼就中斷事務
提交:該階段進行真正的事務提交,也可以分爲以下兩種情況:

執行提交;
中斷事務。
Coordinator沒有接收到Cohort發送的ACK響應(可能是接受者發送的不是ACK響應,也可能響應超時),那麼就會執行中斷事務。

5.cap爲什麼只能滿足兩點。

C:一致性,就是說所有的服務器上面的數據都是一樣的;

A:可用性,用戶訪問服務器上面的數據,響應時間在可以接受的範圍內;

P:分區容忍性,其實就是高可用性,一個節點崩了,並不影響我們其它的節點。

滿足C和A,P不能滿足:

滿足C需要所有的服務器的數據要一樣,也就是說要實現數據的同步,那麼同步要時間,並且機器越多,同步的時間肯定越慢,問題是同時也滿足了A,也就是說,我要同步時間短才行。這樣的話,機器就不能太多了,也就是P是滿足不了的;

滿足C和P,A不能滿足:

滿足P需要很多服務器,假設有1000臺服務器,同時滿足了C,也就是說要保證每臺機器的數據都一樣,那麼同步的時間可就很大,在這種情況下,肯定是不能保證用戶隨時訪問每臺服務器獲取到的數據都是最新的,想要獲取最新的,就需要等待,等全部同步完了,就可以獲取到了,

但是A要求短時間就可以拿到想要的數據,所以A是滿足不了;

滿足A和P,C不能滿足:

滿足A要求短時間就可以拿到想要的數據,在高可用的分佈式環境中短時間拿到數據,C是滿足不了。