kafka原理

Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。

**1.前言**
消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。下面將從Kafka文件存儲機制和物理結構角度,分析Kafka是如何實現高效文件存儲,及實際應用效果。
1.1  Kafka的特性:
- 高吞吐量、低延遲:kafka每秒能夠處理幾十萬條消息,它的延遲最低只有幾毫秒,每一個topic能夠分多個partition, consumer group 對partition進行consume操做。
- 可擴展性:kafka集羣支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁盤,而且支持數據備份防止數據丟失
- 容錯性:容許集羣中節點失敗(若副本數量爲n,則容許n-1個節點失敗)
- 高併發:支持數千個客戶端同時讀寫

1.2   Kafka的使用場景:
- 日誌收集:一個公司能夠用Kafka能夠收集各類服務的log,經過kafka以統一接口服務的方式開放給各類consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic來作實時的監控分析,或者裝載到hadoop、數據倉庫中作離線分析和挖掘。
- 運營指標:Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。
- 流式處理:好比spark streaming和storm

1.3  Kakfa的設計思想
- Kakfa Broker Leader的選舉:
  Kakfa Broker集羣受Zookeeper管理。全部的Kafka Broker節點一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper註冊Watch)。
  這個Controller會監聽其餘的Kafka Broker的全部信息,若是這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時全部的kafka broker又會一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。
  例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上全部的partition在zookeeper上的狀態,並選取ISR列表中的一個replica做爲partition leader(若是ISR列表中的replica全掛,選一個倖存的replica做爲leader; 若是該partition的全部的replica都宕機了,則將新的leader設置爲-1,等待恢復,等待ISR中的任一個Replica「活」過來,而且選它做爲Leader;或選擇第一個「活」過來的Replica(不必定是ISR中的)做爲Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其餘的kafka broker。

- Consumergroup:
  各個consumer(consumer 線程)能夠組成一個組(Consumer group ),partition中的每一個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費,若是一個message能夠被多個consumer(consumer 線程)消費的話,那麼這些consumer必須在不一樣的組。
  Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啓動一個新的consumer group。因此若是想同時對一個topic作消費的話,啓動多個consumer group就能夠了,可是要注意的是,這裏的多個consumer的消費都必須是順序讀取partition裏面的message,新啓動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。
  當啓動一個consumer group去消費一個topic的時候,不管topic裏面有多個少個partition,不管咱們consumer group裏面配置了多少個consumer thread,這個consumer group下面的全部consumer thread必定會消費所有的partition;即使這個consumer group下只有一個consumer thread,那麼這個consumer thread也會去消費全部的partition。所以,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。
  同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不可以一個consumer group的多個consumer同時消費一個partition。

- Consumer Rebalance的觸發條件:
  (1): Consumer增長或刪除會觸發 Consumer Group的Rebalance(2)Broker的增長或者減小都會觸發 Consumer Rebalance
  
- Consumer:
  Consumer處理partition裏面的message的時候是o(1)順序讀取的。因此必須維護着上一次讀到哪裏的offsite信息。high level API,offset存於Zookeeper中,low level API的offset由本身維護。通常來講都是使用high level api的。
  Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也能夠配置成讀完消息處理再commit,這種狀況下consumer端的響應就會比較慢的,須要等處理完才行。
  若是producer的流量增大,當前的topic的parition數量=consumer數量,這時候的應對方式就是很想擴展:增長topic下的partition,同時增長這個consumer group下的consumer。
![image.png](/img/bVcSl2y)

- Delivery Mode:
  Kafka producer 發送message不用維護message的offsite信息,由於這個時候,offsite就至關於一個自增id,producer就儘管發送message就行了。
  可是Consumer端是須要維護這個partition當前消費到哪一個message的offsite信息的,這個offsite信息,high level api是維護在Zookeeper上,low level api是本身的程序維護。
  當使用high level api的時候,先拿message處理,再定時自動commit offsite+1(也能夠改爲手動), 而且kakfa處理message是沒有鎖操做的。所以若是處理message失敗,此時尚未commit offsite+1,當consumer thread重啓後會重複消費這個message。可是做爲高吞吐量高併發的實時處理系統,at least once的狀況下,至少一次會被處理到,是能夠容忍的。若是沒法容忍,就得使用low level api來本身程序維護這個offsite信息,那麼想何時commit offsite+1就本身搞定了。
  
  - Topic & Partition:
    Topic至關於傳統消息系統MQ中的一個隊列queue,producer端發送的message必須指定是發送到哪一個topic,可是不須要指定topic下的哪一個partition,由於kafka會把收到的message進行load balance,均勻的分佈在這個topic下的不一樣的partition上( hash(message) % [broker數量]  )。
    在物理結構上,每一個partition對應一個物理的目錄(文件夾),文件夾命名是[topicname]_[partition]_[序號],一個topic能夠有無數多的partition,根據業務需求和數據量來設置。
    在kafka配置文件中可隨時更高num.partitions參數來配置更改topic的partition數量,在建立Topic時經過參數指定parittion數量。Topic建立以後經過Kafka提供的工具也能夠修改partiton數量。
     通常來講,(1)一個Topic的Partition數量大於等於Broker的數量,能夠提升吞吐率。(2)同一個Partition的Replica儘可能分散到不一樣的機器,高可用。
     當add a new partition的時候,partition裏面的message不會從新進行分配,原來的partition裏面的message數據不會變,新加的這個partition剛開始是空的,隨後進入這個topic的message就會從新參與全部partition的load balance。
     
     - Partition Replica:
       每一個partition能夠在其餘的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集羣。存replica副本的方式是按照kafka broker的順序存。例若有5個kafka broker節點,某個topic有3個partition,每一個partition存2個副本,那麼partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replica副本數目不能大於kafka broker節點的數目,不然報錯。這裏的replica數其實就是partition的副本總數,其中包括一個leader,其餘的就是copy副本)。這樣若是某個broker宕機,其實整個kafka內數據依然是完整的。可是,replica副本數越高,系統雖然越穩定,可是回來帶資源和性能上的降低;replica副本少的話,也會形成系統丟數據的風險。
       (1)怎樣傳送消息:producer先把message發送到partition leader,再由leader發送給其餘partition follower。
       (2)在向Producer發送ACK前須要保證有多少個Replica已經收到該消息:根據ack配的個數而定。
       (3)怎樣處理某個Replica不工做的狀況:若是這個部工做的partition replica不在ack列表中,就是producer在發送消息到partition leader上,partition leader向partition follower發送message沒有響應而已,這個不會影響整個系統,也不會有什麼問題。若是這個不工做的partition replica在ack列表中的話,producer發送的message的時候會等待這個不工做的partition replca寫message成功,可是會等到time out,而後返回失敗由於某個ack列表中的partition replica沒有響應,此時kafka會自動的把這個部工做的partition replica從ack列表中移除,之後的producer發送message的時候就不會有這個ack列表下的這個部工做的partition replica了。
       (4)怎樣處理Failed Replica恢復回來的狀況:若是這個partition replica以前不在ack列表中,那麼啓動後從新受Zookeeper管理便可,以後producer發送message的時候,partition leader會繼續發送message到這個partition follower上。若是這個partition replica以前在ack列表中,此時重啓後,須要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的,出現某個部工做的partition replica的時候自動從ack列表中移除的)

     - Partition leader與follower:
       partition也有leader和follower之分。leader是主partition,producer寫kafka的時候先寫partition leader,再由partition leader push給其餘的partition follower。partition leader與follower的信息受Zookeeper控制,一旦partition leader所在的broker節點宕機,zookeeper會衝其餘的broker的partition follower上選擇follower變爲parition leader。
      
     - Topic分配partition和partition replica的算法:
       (1)將Broker(size=n)和待分配的Partition排序。(2)將第i個Partition分配到第(i%n)個Broker上。(3)將第i個Partition的第j個Replica分配到第((i + j) % n)個Broker上
                       
      - Partition ack:
        當ack=1,表示producer寫partition leader成功後,broker就返回成功,不管其餘的partition follower是否寫成功。當ack=2,表示producer寫partition leader和其餘一個follower成功的時候,broker就返回成功,不管其餘的partition follower是否寫成功。當ack=-1[parition的數量]的時候,表示只有producer所有寫成功的時候,纔算成功,kafka broker才返回成功信息。這裏須要注意的是,若是ack=1的時候,一旦有個broker宕機致使partition的follower和leader切換,會致使丟數據。                         
         ![image.png](/img/bVcSl4b)
                            
      - message狀態:                                            
        在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪一個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着若是consumer處理很差的話,broker上的一個消息可能會被消費屢次。                                                   
                                                                    
      - message持久化:                                                                       
        Kafka中會把消息持久化到本地文件系統中,而且保持o(1)極高的效率。咱們衆所周知IO讀取是很是耗資源的性能也是最慢的,這就是爲了數據庫的瓶頸常常在IO上,須要換SSD硬盤的緣由。可是Kafka做爲吞吐量極高的MQ,卻能夠很是高效的message持久化到文件。這是由於Kafka是順序寫入o(1)的時間複雜度,速度很是快。也是高吞吐量的緣由。因爲message的寫入持久化是順序寫入的,所以message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。通常的機器,單機每秒100k條數據。                                                                              
                                                                                               
          https://www.cnblogs.com/cxxjohnson/p/8921661.html