消息架構的設計難題以及應對之道

概述

在微服務開發中我們經常會引入消息中間件實現業務解耦,執行異步操作, 現在讓我們來看看使用消息中間件的好處和弊端。

首先需要肯定是使用消息組件有很多好處,其中最核心的三個是:解耦、異步、削峯。

  • 解耦:客戶端只要講請求發送給特定的通道即可,不需要感知接收請求實例的情況。
  • 異步:將消息寫入消息隊列,非必要的業務邏輯以異步的方式運行,加快響應速度。
  • 削峯:消息中間件在消息被消費之前一直緩存消息,消息處理端可以按照自己處理的併發量從消息隊列中慢慢處理消息,不會一瞬間壓垮業務。

當然消息中間件並不是銀彈,引入消息機制後也會有如下一些弊端:

  • 潛在的性能瓶頸:消息代理可能會存在性能瓶頸。幸運的是目前主流的消息中間件都支持高度的橫向擴展。
  • 潛在的單點故障:消息代理的高可用性至關重要,否則系統整體的可靠性將受到影響,幸運的是大多數消息中間件都是高可用的。
  • 額外的操作複雜性:消息系統是一個必須獨立安裝、配置和運維的系統組件,增加了運維的複雜度。

這些弊端我們藉助消息中間件本身提供的擴展、高可用能力可以解決,但是要真正用好消息中間件我們還需要關注可能會遇到的一些設計難題。

處理併發和順序消息

在生產環境中爲了提高消息處理的能力以及應用程序的吞吐量,一般會將消費者部署多個實例節點。那麼帶來的挑戰就是如何確保每個消息只被處理一次,並且是按照他們的發送順序來處理的。

例如:假設有3個相同的接收方實例從同一個點對點通道讀取消息,發送方按順序發佈了 Order CreatedOrder UpdatedOrder Cancelled 這3個事件消息。簡單的消息實現可能就會同事講每個消息給不同的接收方。若由於網絡問題導致延遲,消息可能沒有按照他們發出時的順序被處理,這將導致奇怪的行爲,服務實例可能在另一個服務器處理 Order Created 消息之前處理 Order Cancelled消息。

Kafka 使用的解決方案是使用分片(分區)通道。整體解決方案分爲三個部分:

  1. 一個主題通道由多個分片組成,每個分片的行爲類似一個通道。
  2. 發送方在消息頭部指定分片鍵如orderId,Kafka使用分片鍵將消息分配給特定的分片。
  3. 將接收方的多個實例組合在一起,並將他們視爲相同的邏輯接收方(消費者組)。kafka將每個分片分配給單個接收器,它在接收方啓動和關閉時重新分配分片。

在這裏插入圖片描述

如上圖所示,每個Order事件消息都將orderId作爲其分片鍵。特定訂單的每個事件都發布到同一個分片。而且該分片中的消息始終由同一個接收方實例讀取,因此這樣就能夠保證按順序處理這些消息。

處理重複消息

引入消息架構必須要解決的另一個挑戰是處理重複消息。在理想情況下,消息代理應該只傳遞一次消息,但保證消息有且僅有一次的消息傳遞的成本通常很高。相反,很多消息組件承諾至少保證成功傳遞一次消息。

在正常情況下,消息組件只會傳遞一次消息。但是當客戶端、網絡或消息組件故障可能導致消息被多次傳遞。假設客戶端在處理消息後發送確認消息前,他的數據庫崩潰了,這時消息組件將再次發送未確認的消息,在數據庫重新啓動時向該客戶端發送。

處理重複消息有以下兩種不同的方法:

  • 編寫冪等消息處理程序
  • 跟蹤消息並丟棄重複項

編寫冪等消息處理器

如果應用程序處理消息的邏輯是滿足冪等的,那麼重複消息就是無害的。程序的冪等性是指,即使這個應用被相同輸入參數多次重複調用時,也不會產生額外的效果。例如:取消一個已經取消的訂單,就是一個冪等性操作。同樣,創建一個已經存在的訂單操作也必是這樣。滿足冪等的消息處理程序可以被放心的執行多次,只要消息組件在傳遞消息時保持相同的消息順序。

但是不幸的是,應用程序通常不是冪等的。或者你現在正在使用的消息組件在重新傳遞消息時不會保留排序。重複或無序消息可能會導致錯誤。在這種情況下,你需要編寫跟蹤消息並丟棄重複消息的消息處理程序。

跟蹤消息並丟棄重複消息

考慮一個授權消費者信用卡的消息處理程序。它必須爲每個訂單僅執行一次信用卡授權操作。這段應用程序每次調用時都會產生不同的效果。如果重複消息導致消息處理程序多次執行該邏輯,則應用程序的行爲將不正確。執行此類應用程序邏輯的消息處理程序必須通過檢測和丟棄重複消息而讓它成爲冪等的。

一個簡單的解決方案是消息接收方使用 message id 跟蹤他已處理的消息並丟棄任何重複項。例如,在數據庫表中存儲它消費的每條消息的 message id。

在這裏插入圖片描述

當接收方處理消息時,它將消息的 message id 作爲創建和變更業務實體的事務的一部分記錄在數據表裏。如上圖所示,接收方將包含message id 的行插入 PROCESSED_MESSAGE表。如果消息是重複的,則INSERT將失敗,接收方可以選擇丟棄該消息。

另一個解決方案是消息處理程序在應用程序表,而不是專門表中記錄 message id。當時用具有受限事務模型的NoSQL數據庫時,此方法特別有用,因爲 NoSQL數據庫通常不支持將針對兩個表的更新作爲數據庫事務。

處理事務性消息

服務通常需要在更新數據庫的事務中發佈消息,數據庫更新和消息發送都必須在事務中進行,否則服務可能會更新數據庫然後在發送消息之前崩潰。

如果服務不以原子方式執行者兩個操作,則類似的故障可能使系統處於不一致狀態。

接下來我們看一下常用的保證事務消息的兩種解決方案,最後再看看現代消息組件RocketMQ的事務性消息解決方案。

使用數據庫表作爲消息隊列

如果你的應用程序正在使用關係型數據庫,要保證數據的更新和消息發送之間的事務可以直接使用事務性發件箱模式,Transactional Outbox

在這裏插入圖片描述

此模式使用數據庫表作爲臨時消息隊列。如上圖所示,發送消息的服務有個OUTBOX數據表,在進行INSERT、UPDATE、DELETE 業務操作時也會給OUTBOX數據表INSERT一條消息記錄,這樣可以保證原子性,因爲這是基於本地的ACID事務。

OUTBOX表充當臨時消息隊列,然後我們在引入一個消息中繼(MessageRelay)的服務,由他從OUTBOX表中讀取數據併發布消息到消息組件。

消息中繼的實現可以很簡單,只需要通過定時任務定期從OUTBOX表中拉取最新未發佈的數據,獲取到數據後將數據發送給消息組件,最後將完成發送的消息從OUTBOX表中刪除即可。

使用事務日誌發佈事件

另外一種保證事務性消息的方式是基於數據庫的事務日誌,也就是所謂的數據變更捕獲,Change Data Capture,簡稱CDC。

一般數據庫在數據發生變更的時候都會記錄事務日誌(Transaction Log),比如MySQL的binlog。事務日誌可以簡單的理解成數據庫本地的一個文件隊列,它主要記錄按時間順序發生的數據庫表變更記錄。

這裏我們利用alibaba開源的組件canal結合MySQL來說明下這種模式的工作原理。

更多操作說明可以參考官方文檔:https://github.com/alibaba/canal

canal工作原理

  • canal 模擬 MySQL slave 的交互協議,把自己僞裝成一個MySQL的 slave節點 ,向 MySQL master 發送dump 協議;
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal );
  • canal 解析 binary log 對象(原始爲 byte 流),然後可以將解析後的數據直接發送給消息組件。

RocketMQ事務消息解決方案

Apache RocketMQ在4.3.0版中已經支持分佈式事務消息,RocketMQ採用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。

在這裏插入圖片描述

RocketMQ實現事務消息主要分爲兩個階段:正常事務的發送及提交、事務信息的補償流程。

整體流程爲:

  • 正常事務發送與提交階段
    1、生產者發送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)
    2、服務端響應消息寫入結果,半消息發送成功
    3、開始執行本地事務
    4、根據本地事務的執行狀態執行Commit或者Rollback操作
  • 事務信息的補償流程
    1、如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求
    2、生產者收到確認回查請求後,檢查本地事務的執行狀態
    3、根據檢查後的結果執行Commit或者Rollback操作
    補償階段主要是用於解決生產者在發送Commit或者Rollback操作時發生超時或失敗的情況。

在生產者使用RocketMQ發送事務消息的時候我們也會借鑑第一種方案即自建一張事務日誌表,然後在執行本地事務的時候同時生成一條事務日誌記錄,讓本地事務與日誌事務在同一個方法中,同時添加 @Transactional 註解,保證兩個操作事務是一個原子操作。

這樣如果事務日誌表中有這個本地事務的信息,那就代表本地事務執行成功,需要Commit,相反如果沒有對應的事務日誌,則表示沒執行成功,需要Rollback。

感興趣的同學可以翻看這篇文章:RocketMQ進階-事務消息