From a0b5329d415df8f0b6f83493281ea90d714185e3 Mon Sep 17 00:00:00 2001 From: josix Date: Sat, 21 Dec 2024 03:13:51 +0800 Subject: [PATCH] --wip-- [skip ci] --- random-thoughts/2024-12-23.md | 931 ++++++++++++++++++++++++++++++++++ 1 file changed, 931 insertions(+) create mode 100644 random-thoughts/2024-12-23.md diff --git a/random-thoughts/2024-12-23.md b/random-thoughts/2024-12-23.md new file mode 100644 index 0000000..a38cb52 --- /dev/null +++ b/random-thoughts/2024-12-23.md @@ -0,0 +1,931 @@ +--- +title: Kafka Architecture Deep Dive - From Components to Implementation Details +date: 2024-12-21T00:00:00 +dg-publish: true +dg-permalink: random-thoughts/2024-12-21 +description: A comprehensive technical deep dive into Kafka's architecture, covering core components, implementation details, and the transition from ZooKeeper to KRaft. Includes detailed diagrams of consumer behavior, node management, and data flow patterns. +tags: + - kafka + - architecture + - distributed-systems + - message-queue + - backend +--- + +- Kafka First Touch + - Architecture Overview + + ```mermaid + graph TB + subgraph Kafka Cluster + subgraph ZooKeeper Ensemble + Z1[ZooKeeper 1] + Z2[ZooKeeper 2] + Z3[ZooKeeper 3] + end + + subgraph Brokers + B1[Broker 1] + B2[Broker 2] + B3[Broker 3] + C[Controller - one type of Broker] + end + + Z1 --- Z2 + Z2 --- Z3 + Z3 --- Z1 + + Z1 --> B1 + Z2 --> B2 + Z3 --> B3 + + Z1 --> C + C --> B1 + C --> B2 + C --> B3 + end + + P1[Producer 1] --> B1 + P2[Producer 2] --> B2 + + B1 --> Con1[Consumer 1] + B2 --> Con2[Consumer 2] + B3 --> Con3[Consumer 3] + + classDef zk fill:#e1d5e7,stroke:#9673a6 + classDef broker fill:#dae8fc,stroke:#6c8ebf + classDef client fill:#d5e8d4,stroke:#82b366 + classDef controller fill:#fff2cc,stroke:#d6b656 + + class Z1,Z2,Z3 zk + class B1,B2,B3 broker + class P1,P2,Con1,Con2,Con3 client + class C controller + ``` + + - 分散式事件串流平台 (Distributed Event Streaming Platform) + - 核心架構元件 + - ZooKeeper/KRaft:負責叢集協調與管理 + - Controller:管理 Broker 狀態和分區分配 + - Broker 叢集:處理資料儲存和傳輸 + - 資料流向 + - Producer → Broker → Consumer 的單向流動 + - 支援多 Producer 和多 Consumer 同時運作 + - 擴展性設計 + - 分區 (Partition) 實現平行處理 + - 複製 (Replication) 確保資料可靠性 + - Leader-Follower 模式管理資料一致性 + - 效能優化 + - 零拷貝技術提升傳輸效率 + - 批次處理減少網路開銷 + - 頁面快取優化讀寫性能 + - Topic + - 訊息的邏輯分類單位,類似資料庫的 Table + - 可以設定 retention period 來控制資料保留時間 + - 可以被分割成多個 Partition 以提升平行處理能力 + - Broker + - Kafka 叢集中的節點,負責儲存和管理 Topic + - 每個 Broker 可以管理多個 Partition + - 負責處理讀寫請求、複製資料和故障恢復 + - 透過 ZooKeeper 或 KRaft 來進行叢集協調 + - 選舉 Controller 節點 + - 追蹤叢集成員狀態 + - 維護 Topic 配置資訊 + - 支援水平擴展,可動態增減節點 + - 提供資料複製機制確保高可用性 + - Producer + - 負責產生和發送訊息到指定的 Topic + - 可以選擇同步或非同步發送 + - 可以實作自定義的分區策略 + - Consumer + - 從 Topic 讀取和處理訊息 + - 可以指定讀取特定 Partition + - 透過 Consumer Group 實現負載平衡 + - Practice + - Consumer Group + - 消費者群組機制,用於實現訊息的負載平衡 + - 同一群組中的消費者共同消費 Topic 的訊息 + - 每個 Partition 在同一時間只能被群組中的一個消費者處理 + - 支援動態增減消費者,自動重新分配 Partition + - Partition + - Topic 的邏輯分區單位,提供平行處理能力 + - 每個 Partition 是有序且不可變的訊息序列 + - 訊息寫入後會被分配一個 offset 值 + - 支援自定義分區策略(例如:根據 key 進行分區) + - Offset + - 每條訊息在 Partition 中的唯一標識符 + - 消費者透過追蹤 offset 來管理消費進度 + - 支援多種 offset 重置策略 + - earliest:從最早的訊息開始消費 + - latest:從最新的訊息開始消費 + - specific:指定特定 offset 開始消費 + - Duplicated or Lost Message Handling + - Message Duplication Scenarios + - Consumer crashes before committing offset but after processing + - Network issues during offset commit + - Rebalancing of consumer group members + - Message Loss Prevention + - Producer acks configuration (all, 1, 0) + - Minimum in-sync replicas setting + - Proper replication factor + - Handling Strategies + - Idempotent processing design + - Unique message IDs for deduplication + - Transaction support for exactly-once semantics + - Manual offset management when needed + - Consumer in Celery Task or Consumer in standalone service + - Consumer in Celery Task + - Pros: + - Easy to integrate with existing Celery infrastructure + - Built-in retry mechanism + - Monitoring and logging through Celery's ecosystem + - Simpler deployment if already using Celery + - Cons: + - Additional overhead from Celery's task queue + - Less control over consumer behavior + - May not be optimal for high-throughput scenarios + - Mixing message queue systems could increase complexity + - Careful handling of the message processing logic like consuming the message in wrong partition (should read all the partitions), and using batch strategy instead of long running task. + - handling the worker is down unexpectedly + - Consumer in standalone service + - Pros: + - Better control over consumer behavior + - Direct connection to Kafka without middleware + - Better performance for high-throughput scenarios + - Clearer separation of concerns + - Cons: + - Need to implement retry mechanism + - Additional service to maintain and monitor + - More complex deployment + - Need to handle scaling independently + + - Topic to Partition to to Broker are all many-to-many relationships. A topic can have multiple partitions, and a partition can be assigned to multiple brokers. + - The Partition has a leader and multiple followers. The leader is responsible for handling read and write requests, while the followers replicate the data from the leader. + - The Controller is responsible for managing the broker state and partition assignment. It is elected from the brokers in the cluster and maintains the metadata about the cluster. + - The Producer sends messages to a specific topic, and the Consumer reads and processes messages from the topic. The Consumer Group mechanism is used to balance the load among consumers in the same group. + +- ZooKeeper 作為分散式協調服務在 Kafka 中扮演核心角色: + - Controller 與 ZooKeeper 的關係 + - Controller 不是 ZooKeeper 的一員 + - Controller 是 Kafka 叢集中的一個特殊 Broker + - 由普通的 Broker 經由選舉產生 + - 僅使用 ZooKeeper 作為協調服務 + - Controller 與 ZooKeeper 的合作模式 + - ZooKeeper 負責 Controller 的選舉流程 + - 選舉機制 + - 使用 ZooKeeper 的臨時節點(Ephemeral Node) + - 在 /controller 路徑創建臨時節點 + - 首個成功創建節點的 Broker 成為 Controller + - 節點包含 Controller 的 epoch 和 broker ID + - 自動故障檢測 + - 臨時節點會在 session 過期時自動刪除 + - 其他 Broker 監聽節點變化 + - Controller 失效時自動觸發重新選舉 + - 選舉流程 + - 所有 Broker 嘗試創建 controller 節點 + - 只有一個 Broker 能成功創建 + - 其他 Broker 進入待命狀態 + - 監聽節點變化以備接管 + - 防止腦裂 + - 使用 epoch number 追蹤 Controller 版本 + - 舊 epoch 的 Controller 指令會被忽略 + - 確保同時只有一個活躍 Controller + - Controller 將狀態儲存在 ZooKeeper 中 + - Controller 在 ZooKeeper 中儲存的關鍵狀態: + - Broker 相關資訊 + - 活躍 Broker 清單 + - Broker 配置和元數據 + - Broker 運行狀態 + - Topic 和分區資訊 + - Topic 配置和分區分配 + - 分區領導者資訊 + - 複製因子設定 + - ISR (In-Sync Replicas) 資訊 + - 各分區的 ISR 清單 + - 複製延遲監控數據 + - 控制器選舉資訊 + - 當前活躍控制器標識 + - 控制器 epoch 號 + - Controller 透過 ZooKeeper 監控叢集變化 + - 職責區分 + - ZooKeeper:提供分散式協調服務 + - Controller:執行 Kafka 叢集管理決策 + - Controller 是 Kafka 中的核心元件 + - 負責管理 Broker 狀態 + - 執行分區重新分配 + - 維護叢集一致性 + - 主要職責 + - 為每個分區選舉領導者 + - 處理分區重新分配 + - 監控 Broker 健康狀態 + - 與 ZooKeeper 的互動 + - 透過 ZooKeeper 選舉單一活躍 Controller + - 使用 ZooKeeper 儲存 Controller 狀態 + - 監聽 ZooKeeper 事件進行故障轉移 + + - 叢集協調與管理 + - Broker 註冊與健康狀態監控 + - 追蹤活躍的 Broker 列表 + - 即時監測 Broker 存活狀態 + - 觸發 Broker 故障轉移機制 + - Controller 選舉 + - 選出主要 Controller 節點 + - 管理 Controller 故障轉移 + - 確保叢集中只有一個活躍 Controller + - 配置管理 + - 儲存 Topic 配置資訊 + - 管理 ACL 和配額設定 + - 追蹤分區分配狀態 + - 元數據管理 + - Topic 管理 + - 儲存 Topic 清單 + - 追蹤分區數量和複製因子 + - 管理分區領導者選舉 + - 分區狀態 + - 追蹤 ISR (In-Sync Replicas) 列表 + - 監控分區領導者和追隨者狀態 + - 管理分區重新分配 + - Consumer 群組協調 + - 追蹤 Consumer 群組成員 + - 管理分區消費分配 + - 處理群組成員變更 + - 高可用性保證 + - 資料一致性 + - ZAB (ZooKeeper Atomic Broadcast) 協議運作機制: + - Leader 選舉:確保叢集中只有一個 Leader 節點 + - 原子廣播:所有寫入操作必須經過 Leader 處理並同步到 Follower + - 狀態同步:新的 Follower 加入時會與 Leader 同步最新狀態 + - ZAB 一致性保證實例: + - 寫入操作(類似但不同於 2PC): + - ZAB 使用類似但更強大的協議機制 + - 與 2PC 的差異: + - ZAB 有明確的 Leader,而 2PC 使用協調者 + - ZAB 支援故障恢復和自動選舉 + - ZAB 提供更強的一致性保證 + - 實際流程:Client 發送寫入請求 → Leader 提議變更 → Follower 確認 → Leader 提交並回應 + - 領導者切換:舊 Leader 故障 → 選舉新 Leader → 同步未完成的提交 → 恢復正常服務 + - 資料恢復:節點重啟時,從 Leader 獲取最新狀態 → 追趕錯過的更新 → 加入可用節點列表 + - 故障恢復 + - 自動偵測節點故障 + - 心跳機制 (Heartbeat) + - 定期發送心跳訊息檢查節點存活狀態 + - 設定超時閾值自動判定節點失效 + - 支援可配置的檢測間隔和超時時間 + - 會話追蹤 (Session Tracking) + - 維護每個連接節點的會話狀態 + - 即時監控會話存活狀態 + - 自動清理過期會話資源 + - 協調故障節點恢復 + - 資料同步機制 + - 使用事務日誌記錄所有操作 + - 支援增量同步和完整同步 + - 確保節點重啟後資料一致性 + - 狀態恢復流程 + - 讀取本地快照和事務日誌 + - 與 Leader 節點同步最新狀態 + - 驗證資料完整性後重新加入叢集 + - 維護服務可用性 + - 自動容錯處理 + - 快速切換到備用節點 + - 重新分配工作負載 + - 保持服務不中斷運作 + - 一致性保證 + - 確保節點間資料同步 + - 維護操作順序一致性 + - 提供強一致性保證 + - 擴展性支援 + - 支援動態增減節點 + - 處理叢集配置變更 + - 確保擴展過程中的穩定性 +- Kafka 3.0 正將 zookeeper 移除,將 controller 與 broker 整合,並將 metadata 存儲在 broker 中,這將大幅簡化架構,提升效能,降低維護成本。 + +- The flow of how consumer consumes messages from Kafka: + - The consumer subscribes to a topic and starts polling for messages. + - The consumer sends a fetch request to the broker to fetch messages from the assigned partitions. + - The broker responds with the messages available in the partition. + - The consumer processes the messages and commits the offset to the broker. + - The consumer continues to poll for new messages. + + ```mermaid + sequenceDiagram + participant App + participant KafkaConsumer + participant ConsumerNetworkClient + participant Fetcher + participant Broker + + App->>KafkaConsumer: new KafkaConsumer(props) + App->>KafkaConsumer: subscribe(topics) + + loop Poll Loop + App->>KafkaConsumer: poll(Duration) + activate KafkaConsumer + + KafkaConsumer->>Fetcher: sendFetches() + activate Fetcher + + Fetcher->>ConsumerNetworkClient: send(Node, FetchRequest) + activate ConsumerNetworkClient + + ConsumerNetworkClient->>Broker: FetchRequest + Broker-->>ConsumerNetworkClient: FetchResponse + + ConsumerNetworkClient-->>Fetcher: RequestFuture + deactivate ConsumerNetworkClient + + Fetcher->>Fetcher: handleFetchSuccess() + Fetcher-->>KafkaConsumer: ConsumerRecords + deactivate Fetcher + + KafkaConsumer-->>App: ConsumerRecords + deactivate KafkaConsumer + + App->>App: process records + end + + App->>KafkaConsumer: close() + ``` + The data_model and behavior of Fetcher in Kafka Consumer: + + ```mermaid + classDiagram + class Fetcher { + -ConsumerNetworkClient client + -FetchCollector fetchCollector + -FetchBuffer fetchBuffer + -Logger log + -Time time + -ApiVersions apiVersions + +sendFetches() int + +collectFetch() Fetch~K,V~ + +close(Timer timer) + #closeInternal(Timer timer) + -handleFetchSuccess(Node, FetchRequestData, ClientResponse) + -handleFetchFailure(Node, FetchRequestData, RuntimeException) + -sendFetchesInternal(Map~Node,FetchRequestData~ requests) + -createFetchRequest(Node, FetchRequestData) + +clearBufferedDataForUnassignedPartitions(Collection~TopicPartition~) + } + + class Node { + -int id + -String host + -int port + -String rack + +idString() String + +hasRack() boolean + } + + class FetchSessionHandler { + -int sessionId + -int epoch + +FetchRequestData build() + +handleResponse(FetchResponse) + } + + class FetchCollector { + -Deserializers~K,V~ deserializers + -FetchMetricsManager metricsManager + +collectFetch(FetchBuffer) Fetch~K,V~ + -parseRecord(ByteBuffer) ConsumerRecord~K,V~ + } + + class ConsumerNetworkClient { + -KafkaClient client + -Time time + +send(Node, Request) RequestFuture + +poll(Timer, PollCondition) + -trySend(long now) + } + + Fetcher --> Node : sends requests to + Fetcher --> FetchSessionHandler : manages sessions + Fetcher --> FetchCollector : collects records + Fetcher --> ConsumerNetworkClient : network operations + ``` + + Sequence Diagram with detailed operations: + + ```mermaid + sequenceDiagram + participant Consumer as KafkaConsumer + participant Fetcher + participant Network as ConsumerNetworkClient + participant Session as FetchSessionHandler + participant Node + participant Broker + + Note over Consumer,Broker: Fetch Operation Start + + Consumer->>Fetcher: sendFetches() + activate Fetcher + + Note over Fetcher: Prepare fetch requests for each node + Fetcher->>Fetcher: prepareFetchRequests() + + loop For each Node with pending fetches + Fetcher->>Session: build() + Note over Session: Creates fetch session data
with epoch and session ID + Session-->>Fetcher: FetchRequestData + + Fetcher->>Network: send(Node, FetchRequest) + activate Network + + Note over Network: Attempts to send request
if node is ready + Network->>Node: checkReady() + + alt Node Ready + Node-->>Network: true + Network->>Broker: FetchRequest + Note over Broker: Process fetch request
with session handling + Broker-->>Network: FetchResponse + + alt Success Response + Network->>Fetcher: handleFetchSuccess(Node, Data, Response) + Note over Fetcher: Updates fetch positions
Processes received records + Fetcher->>Session: handleResponse(FetchResponse) + Note over Session: Updates session state
Validates epoch + else Failure Response + Network->>Fetcher: handleFetchFailure(Node, Data, Exception) + Note over Fetcher: Handles errors
Updates metrics + end + + else Node Not Ready + Node-->>Network: false + Note over Network: Request queued for retry + end + + deactivate Network + end + + Consumer->>Fetcher: collectFetch() + activate Fetcher + Note over Fetcher: Collects accumulated records + Fetcher->>Consumer: Fetch + deactivate Fetcher + + Note over Consumer,Broker: Fetch Operation Complete + ``` + + detailed about prepareFetchRequests(): + + ```mermaid + sequenceDiagram + participant Client + participant AbstractFetch + participant MetricsManager + participant SubscriptionState + participant Metadata + participant FetchSessionHandler + + Client->>AbstractFetch: prepareFetchRequests() + AbstractFetch->>MetricsManager: maybeUpdateAssignment(subscriptions) + + AbstractFetch->>AbstractFetch: fetchablePartitions() + + loop For each fetchable partition + AbstractFetch->>SubscriptionState: position(partition) + + AbstractFetch->>Metadata: fetch() for leader info + + AbstractFetch->>AbstractFetch: selectReadReplica(partition, leader, currentTimeMs) + + alt Node is available & no pending requests + AbstractFetch->>FetchSessionHandler: newBuilder() + AbstractFetch->>Metadata: topicIds() + AbstractFetch->>FetchSessionHandler: builder.add(partition, partitionData) + else Node unavailable or has pending requests + Note over AbstractFetch: Skip fetch for this partition + end + end + + AbstractFetch-->>Client: Return Map + ``` + + Key Features Explained: + + - Node Management: + - Each Node represents a Kafka broker + - Contains connection details (host, port, rack) + - Used for targeting fetch requests + + - Fetch Session Handling: + - FetchSessionHandler manages incremental fetch sessions + - Maintains session ID and epoch + - Optimizes subsequent fetch requests + + - Network Operations: + - ConsumerNetworkClient handles all network communication + - Manages connection states and retries + - Handles timeouts and failures + + - Data Collection: + - FetchCollector deserializes and processes received records + - Manages buffering and batching + - Handles record parsing and validation + + - Key Method Descriptions: + - sendFetches(): Initiates fetch requests to all needed nodes + - handleFetchSuccess(): Processes successful fetch responses + - handleFetchFailure(): Handles network or broker errors + - collectFetch(): Aggregates fetched records for consumer + + - Record Processing Flow: + + ```mermaid + sequenceDiagram + participant Consumer as KafkaConsumer + participant Fetcher + participant FC as FetchCollector + participant FB as FetchBuffer + participant Deserializer + + Note over Consumer,Deserializer: Record Processing Flow + + Consumer->>Fetcher: poll(Duration) + activate Fetcher + + Fetcher->>FB: addFetchedData(PartitionData) + activate FB + Note over FB: Stores raw records
by TopicPartition + FB-->>Fetcher: completed + deactivate FB + + Fetcher->>FC: collectFetch(FetchBuffer) + activate FC + + loop For each TopicPartition + FC->>FB: getRecords(TopicPartition) + FB-->>FC: raw records + + loop For each Record + FC->>Deserializer: deserializeKey(byte[]) + Deserializer-->>FC: key object + FC->>Deserializer: deserializeValue(byte[]) + Deserializer-->>FC: value object + + FC->>FC: createConsumerRecord(...) + Note over FC: Creates record with:
- topic, partition, offset
- timestamp, headers
- key, value + end + end + + FC-->>Fetcher: ConsumerRecords + deactivate FC + + Fetcher-->>Consumer: ConsumerRecords + deactivate Fetcher + ``` + + Detailed Explanation: + + ```java + // In Fetcher class + public Fetch collectFetch() { + return fetchCollector.collectFetch(fetchBuffer); + } + + // In FetchCollector class + public ConsumerRecords collectFetch(FetchBuffer buffer) { + Map>> records = new HashMap<>(); + + for (TopicPartition partition : buffer.partitions()) { + // Get raw records for partition + Records rawRecords = buffer.getRecords(partition); + + // Process each record + for (Record raw : rawRecords) { + // Deserialize key and value + K key = keyDeserializer.deserialize(raw.key()); + V value = valueDeserializer.deserialize(raw.value()); + + // Create ConsumerRecord with metadata + ConsumerRecord record = new ConsumerRecord<>( + partition.topic(), + partition.partition(), + raw.offset(), + raw.timestamp(), + raw.timestampType(), + raw.keySize(), + raw.valueSize(), + key, + value, + raw.headers(), + Optional.of(raw.leaderEpoch()) + ); + + records.computeIfAbsent(partition, p -> new ArrayList<>()) + .add(record); + } + } + + return new ConsumerRecords<>(records); + } + ``` + + Key Aspects: + - Handles batched records by partition + - Manages deserialization + - Maintains offset ordering + - Handles record metadata (timestamps, headers) + - Provides memory efficiency through buffering + + - Node Management: + - Node 在 Kafka 中代表一個 Broker 節點,而不是 Controller + - Node 是一個基礎抽象,代表叢集中的任何 Broker + - Controller 是一個特殊的角色,由其中一個 Broker 擔任 + - Consumer 只與一般的 Broker Node 互動,不直接與 Controller 通訊 + - Controller 相關的協調工作由 Broker 內部處理 + + ```mermaid + classDiagram + class Node { + -int id + -String host + -int port + -String rack + +idString() String + +hasRack() boolean + +isConnected() boolean + } + + class Fetcher { + -Map~Node, FetchSessionHandler~ sessions + -ConsumerNetworkClient client + +sendFetches() int + -prepareFetchRequests() Map~Node, RequestData~ + -handleNodeState(Node) + } + + class ConsumerNetworkClient { + -Map~Node, ConnectionState~ connections + +ready(Node, long) boolean + +connectionFailed(Node) boolean + +tryConnect(Node) + -handleDisconnection(Node) + } + + class FetchSessionHandler { + -int sessionId + -Node node + -int epoch + +build() FetchRequestData + +handleResponse(FetchResponse) + } + + Fetcher --> Node + Fetcher --> ConsumerNetworkClient + Fetcher --> FetchSessionHandler + ``` + + - Detailed Explanation: + + ```java + // In Fetcher class + private Map prepareFetchRequests() { + Map fetchRequestMap = new HashMap<>(); + + // Get list of nodes we need to fetch from + for (TopicPartition partition : subscriptions.fetchablePartitions()) { + Node node = metadata.currentLeader(partition); + + if (node == null) { + // Handle missing leader + continue; + } + + // Check node state + if (client.isUnavailable(node)) { + // Node is in backoff period after failure + continue; + } + + if (!client.ready(node, time.milliseconds())) { + // Connection not ready, may need to initiate + client.tryConnect(node); + continue; + } + + // Get or create fetch session for node + FetchSessionHandler session = sessions.computeIfAbsent( + node, + n -> new FetchSessionHandler(logContext, n) + ); + + // Build fetch request for this node + FetchSessionHandler.FetchRequestData data = session.build( + fetchConfig, + partition, + nextFetchOffset(partition) + ); + + fetchRequestMap.put(node, data); + } + + return fetchRequestMap; + } + ``` + + - Key Aspects: + - Tracks node connection states + - Handles leader/follower relationships + - Manages fetch sessions per node + - Implements backoff for failed nodes + - Optimizes connection handling + + - the difference between Node and Broker in Kafka Implementation: + + ```mermaid + classDiagram + class Node { + -int id + -String host + -int port + -String rack + +idString() String + +hasRack() boolean + +isConnected() boolean + } + + class Broker { + -int id + -Node node + -Map~String,Object~ config + -List~String~ endpoints + -Set~String~ roles + +rack() Optional~String~ + +isEmpty() boolean + +hasMoved(Node) boolean + } + + class KafkaCluster { + -List~Broker~ brokers + -Map~Integer,Node~ nodes + } + + Broker --> Node : contains + KafkaCluster --> Broker : manages + ``` + + - Key Differences: + + - Node: + - A lightweight representation of network endpoint + - Contains basic connection information: + + ```java + public class Node { + private final int id; + private final String host; + private final int port; + private final String rack; + // Used for network connections + } + ``` + + - Used primarily by the client for network operations + - Represents any network endpoint (could be broker or controller) + + - Broker: + - A full Kafka broker instance + - Contains complete broker configuration and state: + + ```java + public class Broker { + private final int id; + private final Node node; + private final Map config; + private final List endpoints; + private final Set roles; + // Contains complete broker metadata + } + ``` + + - Includes additional metadata like: + - Broker roles (controller, broker) + - Configuration + - Multiple endpoints + - State information + + - Usage Example + + ```java + // In Fetcher class + private Map prepareFetchRequests() { + Map fetchRequestMap = new HashMap<>(); + + for (TopicPartition partition : subscriptions.fetchablePartitions()) { + // Gets Node (network endpoint) for the broker leading this partition + Node node = metadata.currentLeader(partition); + + if (node == null) { + // Handle missing leader + continue; + } + + // Network operations use Node + if (client.isUnavailable(node)) { + continue; + } + + // Fetch sessions are maintained per Node + FetchSessionHandler session = sessions.computeIfAbsent( + node, + n -> new FetchSessionHandler(logContext, n) + ); + + // Build fetch request for this node + fetchRequestMap.put(node, session.build()); + } + + return fetchRequestMap; + } + ``` + + - Interaction Flow: + + ```mermaid + sequenceDiagram + participant Client + participant AbstractFetch + participant LocalMetadataCache + participant KafkaBroker + participant ZK_or_Controller + + Note over ZK_or_Controller: Maintains authoritative
cluster metadata + Note over KafkaBroker: Gets metadata from
ZK/Controller + Note over LocalMetadataCache: Client's local cache
of cluster metadata + + Client->>AbstractFetch: prepareFetchRequests() + AbstractFetch->>LocalMetadataCache: fetch() for leader info + + alt Metadata is stale or missing + LocalMetadataCache->>KafkaBroker: Request metadata update + KafkaBroker->>ZK_or_Controller: Get latest metadata + ZK_or_Controller-->>KafkaBroker: Return metadata + KafkaBroker-->>LocalMetadataCache: Update metadata + end + + LocalMetadataCache-->>AbstractFetch: Return cached metadata + ``` + + - Key Points: + - Node is used for: + - Network connections + - Client-side operations + - Connection management + - Basic endpoint information + + - Broker is used for: + - Server-side state + - Cluster metadata + - Role management + - Configuration + + - In Consumer Context: + - Fetcher primarily works with Nodes + - Metadata service maps between Brokers and Nodes + - Network client uses Nodes for connections + - Broker information is mainly handled by metadata service + +- The decision to get metadata from ZooKeeper vs. Controller is based on whether the Kafka cluster is running in legacy mode (ZooKeeper-based) or KRaft (ZooKeeper-less) mode. This isn't decided at runtime - it's determined by how the Kafka cluster is configured when it starts up. + + ```mermaid + sequenceDiagram + participant Client + participant Broker + participant Controller + participant ZooKeeper + + alt KRaft Mode (KIP-500) + Note over Broker,Controller: Cluster started with
controller.quorum.voters set + Client->>Broker: Metadata Request + Broker->>Controller: Get metadata + Controller-->>Broker: Return metadata + Broker-->>Client: Metadata Response + else Legacy Mode (ZooKeeper-based) + Note over Broker,ZooKeeper: Cluster started with
zookeeper.connect set + Client->>Broker: Metadata Request + Broker->>ZooKeeper: Get metadata + ZooKeeper-->>Broker: Return metadata + Broker-->>Client: Metadata Response + end + ``` + + - The key configuration that determines this: + + - **KRaft Mode (Controller-based)**: + - Configured with `controller.quorum.voters` + - No ZooKeeper configuration + - Uses the Kafka Raft metadata mode + - Introduced in KIP-500 + - Will be the only mode supported in future versions + + - **Legacy Mode (ZooKeeper-based)**: + - Configured with `zookeeper.connect` + - Uses ZooKeeper for metadata management + - Original/legacy mode + - Being phased out + + - The client code doesn't need to know or decide which mode is being used - it simply makes metadata requests to brokers, and the brokers handle getting the metadata from the appropriate source based on how they were configured. + + - This is part of Kafka's evolution to remove the ZooKeeper dependency (KIP-500), making the system simpler to deploy and maintain.