Kafka核心總控製器Controller
在Kafka集群中會有一個或者多個broker,其中有一個broker會被選舉為控製器(Kafka Controller),它負責管理整個集群中所有分區和副本的狀態。
- 當某個分區的leader副本出現故障時,由控製器負責為該分區選舉新的leader副本。
- 當檢測到某個分區的ISR集合發生變化時,由控製器負責通知所有broker更新其元數據信息。
- 當使用kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控製器負責讓新分區被其他節點感知到。
Controller選舉機制
在kafka集群啟動的時候,會自動選舉一台broker作為controller來管理整個集群,選舉的過程是集群中每個broker都會嘗試在Zookeeper上創建一個 /controller 臨時節點,zookeeper會保證有且僅有一個broker能創建成功,這個broker就會成為集群的總控器controller。 當這個controller角色的broker宕機了,此時zookeeper臨時節點會消失,集群里其他broker會一直監聽這個臨時節點,發現臨時節點消失了,就競爭再次創建臨時節點,就是我們上面說的選舉機制,zookeeper又會保證有一個broker成為新的controller。
- 監聽broker相關的變化。為Zookeeper中的/brokers/ids/節點添加BrokerChangeListener,用來處理broker增減的變化。
- 監聽topic相關的變化。為Zookeeper中的/brokers/topics節點添加TopicChangeListener,用來處理topic增減的變化;為Zookeeper中的/admin/delete_topics節點添加TopicDeletionListener,用來處理刪除topic的動作。
- 從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關的信息並進行相應的管理。對於所有topic所對應的Zookeeper中的/brokers/topics/[topic]節點添加PartitionModificationsListener,用來監聽topic中的分區分配變化。
- 更新集群的元數據信息,同步到其他普通的broker節點中。
Partition副本選舉Leader機制
controller感知到分區leader所在的broker掛了(controller監聽了很多zk節點可以感知到broker存活),controller會從每個parititon的 replicas 副本列表中取出第一個broker作為leader,當然這個broker需要也同時在ISR列表里。
消費者消費消息的offset記錄機制
每個consumer會定期將自己消費分區的offset提交給kafka內部topic:__consumer_offsets,提交過去的時候,key是consumerGroupId+topic+分區號,value就是當前offset的值,kafka會定期清理topic里的消息,最後就保留最新的那條數據,因為__consumer_offsets可能會接收高並發的請求,kafka默認給其分配50個分區(可以通過offsets.topic.num.partitions設置),這樣可以通過加機器的方式抗大並發。
消費者Rebalance機制
消費者rebalance就是說如果consumer group中某個消費者掛了,此時會自動把分配給他的分區交給其他的消費者,如果他又重啟了,那麼又會把一些分區重新交還給他。
注意:rebalance只針對subscribe這種不指定分區消費的情況,如果通過assign這種消費方式指定了分區,kafka不會進行rebanlance。
如下情況可能會觸發消費者rebalance
- consumer所在服務重啟或宕機了
- 動態給topic增加了分區
- 消費組訂閱了更多的topic
Rebalance過程
第一階段:選擇組協調器
組協調器GroupCoordinator:每個consumer group都會選擇一個broker作為自己的組協調器coordinator,負責監控這個消費組里的所有消費者的心跳,以及判斷是否宕機,然後開啟消費者rebalance。
consumer group中的每個consumer啟動時會向kafka集群中的某個節點發送 FindCoordinatorRequest 請求來查找對應的組協調器GroupCoordinator,並跟其建立網絡連接。
組協調器選擇方式:通過如下公式可以選出consumer消費的offset要提交到__consumer_offsets的哪個分區,這個分區leader對應的broker就是這個consumer group的coordinator
公式:hash(consumer group id) % __consumer_offsets主題的分區數
第二階段:加入消費組JOIN GROUP
在成功找到消費組所對應的 GroupCoordinator 之後就進入加入消費組的階段,在此階段的消費者會向 GroupCoordinator 發送 JoinGroupRequest 請求,並處理響應。然後GroupCoordinator 從一個consumer group中選擇第一個加入group的consumer作為leader(消費組協調器),把consumer group情況發送給這個leader,接著這個leader會負責制定分區方案。
第三階段( SYNC GROUP)
consumer leader通過給GroupCoordinator發送SyncGroupRequest,接著GroupCoordinator就把分區方案下發給各個consumer,他們會根據指定分區的leader broker進行網絡連接以及消息消費。
消費者Rebalance分區分配策略
主要有三種rebalance的策略:range、round-robin、sticky。
Kafka 提供了消費者客戶端參數partition.assignment.strategy 來設置消費者與訂閱主題之間的分區分配策略。默認情況為range分配策略。
假設一個主題有10個分區(0-9),現在有三個consumer消費:
range策略:按照分區序號排序,假設 n=分區數/消費者數量 = 3, m=分區數%消費者數量 = 1,那麼前 m 個消費者每個分配 n+1 個分區,後面的(消費者數量-m )個消費者每個分配 n 個分區。比如分區0~3給一個consumer,分區4~6給一個consumer,分區7~9給一個consumer。
round-robin策略:輪詢分配,比如分區0、3、6、9給一個consumer,分區1、4、7給一個consumer,分區2、5、8給一個consumer
sticky策略:rebalance的時候,需要保證如下兩個原則。
- 分區的分配要儘可能均勻 。
- 分區的分配儘可能與上次分配的保持相同。
當兩者發生衝突時,第一個目標優先於第二個目標 。這樣可以最大程度維持原來的分區分配的策略。
比如對於第一種range情況的分配,如果第三個consumer掛了,那麼重新用sticky策略分配的結果如下:
consumer1除了原有的0~3,會再分配一個7
consumer2除了原有的4~6,會再分配8和9
producer發佈消息機制剖析
1、寫入方式
producer 採用 push 模式將消息發佈到 broker,每條消息都被 append 到 patition 中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障 kafka 吞吐率)。
2、消息路由
producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪一個 partition。其路由機制為:1. 指定了 patition,則直接使用; 2. 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition 3. patition 和 key 都未指定,使用輪詢選出一個 patition。
3、寫入流程
- producer 先從 zookeeper 的 “/brokers/…/state” 節點找到該 partition 的 leader
- producer 將消息發送給該 leader
- leader 將消息寫入本地 log
- followers 從 leader pull 消息,寫入本地 log 後 向leader 發送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset) 並向 producer 發送 ACK
HW與LEO詳解
HW俗稱高水位,HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO(log-end-offset)作為HW,consumer最多只能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀態。對於leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步後更新HW,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對於來自內部broker的讀取請求,沒有HW的限制。
舉例當producer生產消息至broker後,ISR以及HW和LEO的流轉過程
由此可見,Kafka的複製機制既不是完全的同步複製,也不是單純的異步複製。事實上,同步複製要求所有能工作的follower都複製完,這條消息才會被commit,這種複製方式極大的影響了吞吐率。而異步複製方式下,follower異步的從leader複製數據,數據只要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有複製完,落後於leader時,突然leader宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。
結合HW和LEO看下 acks=1的情況
來源:kknewsKafka設計原理詳解