流處理系統

引言

清楚數據的類型有助於我們設計一個性能更高,更有針對性的數據系統,比如在線系統,離線系統(批處理)。近實時系統(流處理)等等。比如說批處理系統,這顯然就適合用戶對實時性要求不高,而對於吞吐量邀請較高,且數據流可以分爲多個數據塊,比如生成索引,顯然我們需要一整塊的數據,然後經過一系列的處理可以生成一個索引。那麼流處理系統呢,這就適合與用戶對於數據的實時性有要求,可以在事件發生後不久進行處理,這裏數據是無界的,所以顯然無法像批處理一樣分批處理,就比如某個業務其實一直在進行,且我們希望處理這些業務的請求,並生成一些數據,但是不希望像批處理一樣一次處理一段時間內的請求,這樣實時性不好,所以需要把數據當做水流一樣,源源不斷的進行處理,這樣的場景也是很多的,比如用戶的活動事件、傳感器的感應事件,數據庫的寫入操作等。
在這裏插入圖片描述

如何發送事件流

上面提到了批處理系統其實並不適合流式數據處理,所以我們需要一些工具來處理流式數據,當然數據庫也可以完成這個任務,生產者發送任務到數據庫中,消費者輪詢查找,但是當數據多了以後顯然效率上是有問題的,所以這其實是需要某些通知機制或者更爲高效的模型,如下:

  1. 發佈訂閱系統
  2. 生產者消費者直接通信
  3. 消息隊列

這裏其實就有一點豁然開朗的感覺,以前只是知道這是分佈式通信的方法,卻不知道這其實本質是流式數據處理。

我們拿消息隊列和數據庫作對比,其實本質上和數據庫非常類似,但還是存在差異:

  1. 數據庫一般數據在明確要求刪除時刪除,而消息隊列實在數據被消費時刪除。
  2. 如果消息隊列內數據過多的話要麼進行限流,要麼把數據轉儲回磁盤,效率降低。
  3. 數據庫通常支持加快查找的方法,而消息隊列確沒有。
  4. 消息隊列不支持查詢,但可以支持發佈訂閱。

這些被視爲傳統觀點,體現在JMS/AMQP這樣的標準中,但因爲其對序列不敏感的原因,並不是只有這樣的實現。一些MQ實現了日誌式的消息隊列,它們保證數據的持久性,且保證順序,這樣就導致一個處理時間長的任務可能阻塞後面任務的處理。比如kafka,RoketMQ等。因爲這些原因,當消息處理代價過高,希望並行處理,且消息排序並不重要的時候我們可以使用JMS/AMQP類型的消息隊列,但對於消息處理很快,且對於順序要求高的任務,基於日誌的消息隊列工作的更好

有一點值得一提,就是在消息隊列上客戶端出現的冪等問題,這裏的通用解決方案是偏移量,也就是客戶端在處理了每個消息以後向MQ發送自己的偏移量,下一次的消息處理從偏移量開始。但這樣仍有危險,就是在處理了消息後還沒有記錄偏移量,在重新啓動後還會處理那些已經處理過的消息,如果從MQ下手問題可能不好解決,這裏我們可以在客戶端下手,保證冪等性。
在這裏插入圖片描述

流處理

現在我們清楚了流的來源,知道了流的傳輸,那麼我們該如何處理這些數據呢?可能有如下幾種可能:

  1. 寫入數據庫,緩存等存儲系統,直接客戶端使用。
  2. 直接把數據推送到用戶,這樣的話人是流的最終消費者(比如熱點新聞)。
  3. 處理輸入流然後產生另一個數據流,向批處理系統一樣最後的數據流向前兩步一樣。

我們可以發現一個有意思的事實,就是流式數據處理和批處理的過程非常類似,但是我們仍能發現以下差異:

  1. 排序對於流式數據來說沒有什麼意義(無界的)。
  2. 容錯機制不再相同,批處理中可以選擇重新運行作業,因爲輸入不變,但是流處理不可能重新運行整個數據集,因爲可能已經運行了很長時間,這並不現實。

那麼流處理適用於哪些事件呢?如下:

  1. 複雜事件處理:我們可以其實就是在流中搜索特定類型的事件,我們只需要在流處理系統中維護一個所要匹配的特定模式即可。當發現匹配模式的事件時產生一個輸出。
  2. 流分析:這裏我們更爲關心大量事件的累計效果和統計指標。比如測量某種事件的速率,統計一段時間某個值的平均值等。
  3. 在流中搜索:與複雜事件處理類似,不過這裏是匹配單個事件而不是模式匹配。

不可靠的時鐘

以上我們提到了一個問題,就是時間段,我們知道在多機器之間時鐘是不可信任的,那麼如果確定時間段呢?首先可以使用本地時間的話,這樣的話優點是簡單,但是可能本地時鐘與客戶端存在明顯的滯後,且還有不可控的網絡,這樣的話可能準確度就沒辦法保證。如果使用客戶端時間呢?顯然存在很大的問題,就是客戶端時間不可信任,如果惡意攻擊的話會有很大的安全問題。再來考慮一個問題,我們現在需要統計每一分鐘的流量,假如現在需要統計第十分鐘的流量,十分鐘已經過去了,現在到了十一分鐘或者十二分鐘,那麼何時終止第十分鐘的計算呢,因爲還可能有第十分鐘的事件沒有到來,此時有兩種方法,一個是維護一個全局偏移量,當在十一分鐘收到十分鐘數據時簡單的丟棄,或者針對這個滯後時間發佈一個更正事件,顯然後者會增加系統的複雜度

那麼時鐘問題如何解決呢,一種有效的方法是記錄多個事件戳:

  1. 根據設備的時鐘記錄時間按發生的時間
  2. 根據設備的時鐘記錄將事件發送到服務器的時間
  3. 根據服務器的時鐘記錄服務器收到事件的時間

這樣我們可以使用第三個時間戳減去第二個時間戳,去估計出服務器和設備之間的偏移量,然後可以將該偏移量應用於事件時間戳。這樣我們可以確定真實發生的時間戳。這個時候我們想要確定一個時間段就容易了,因爲我們獲取了近似的真實時間戳,然後只需要選擇一個合理的窗口用以表示時間段即可。
在這裏插入圖片描述

容錯

我們上面提到流處理的容錯機制與批處理有所不同,主要原因就是流處理系統是無界的,也就是說我們不能簡單的重啓任務,所以我們需要其他的方法。

  1. 微批處理:其實就是把流式處理看做一個個小的批處理,這需要我們存儲一段時間內的數據,這裏通常爲1S左右,顯然較小的批處理會導致更大的協調開銷,較大的微處理會導致較高的延遲,這顯然是一個性能折中的考慮,這樣我們只需要在宕機的時候重啓這個微處理即可。
  2. 原子提交:我們可以把多個輸入當做一個事務,並在失敗時重試,這樣我們可以丟棄掉失敗任務的部分輸出(未提交)。其實要達到相同的目的還有其他的方法,就是依賴於冪等性,我們只要保證處理一個輸入不會造成多餘的問題就可以達到丟棄失敗任務的部分輸出。上面我們提到了偏移量可以解決這個問題。

總結

流式數據顯然是一種非常普遍的數據場景,這也是很多相關框架崛起的原因。從處理數據中得出的見解(insights)是有價值的。這樣的見解(insights)並非都是生來平等的。一些見解(insights)在發生後不久就具有很高的價值,並且隨着時間的流逝,這種價值會迅速減少。流處理針對這樣的場景。流處理的關鍵優勢在於相比於批處理它能夠更快地提供見解(insights)。

參考:

  • 博文《淺談流處理
  • 書籍《Designing Data-Intensive Application》