Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer rebalance實驗 #431

Closed
harryteng9527 opened this issue Jun 27, 2022 · 14 comments
Closed

Consumer rebalance實驗 #431

harryteng9527 opened this issue Jun 27, 2022 · 14 comments

Comments

@harryteng9527
Copy link
Collaborator

06/27

目前進度

https://github.com/harryteng9527/ConsumerExperiment

Main

實驗的主要流程:

public class Main {
    private static final int threadNumber = 8;

    private static boolean isPattern = false;

    public static void main(String[] args) throws InterruptedException {
        Random r = new Random(System.currentTimeMillis());
        ArrayList<MultipleThreadConsumer> multipleThreadConsumers = Utility.createConsumers(threadNumber, isPattern);
        multipleThreadConsumers.forEach((MultipleThreadConsumer consumer) ->
                consumer.start());
        int consumerIds = multipleThreadConsumers.size();
        TriggerTask trigger = new TriggerTask(multipleThreadConsumers.size());
        int taskType = 0;


        KafkaConsumer<String, String> c = Utility.startUpConsumer(-1, isPattern).getConsumer();

        for(int i = 0; i < 20; i++) {
            System.out.println("round "+i);
            taskType = r.nextInt(3);
            if(taskType == 0) {
                trigger.killConsumer(multipleThreadConsumers);
            }
            else if(taskType == 1) {
                System.out.println("add consumer"+ consumerIds + " into the group");
                trigger.appendNewConsumer(multipleThreadConsumers, consumerIds, isPattern);
                consumerIds++;
            }
            else if(taskType == 2) {
                trigger.modifyPartitionsCount();
                TimeUnit.SECONDS.sleep(3);
                c.enforceRebalance();
                c.poll(Duration.ofSeconds(1));
                System.out.println("enforce rebalance");
            }
            TimeUnit.SECONDS.sleep(10);
        }
            Utility.printTimes();
    }

}
  1. 建立multi-thread consumer multipleThreadConsumers、觸發rebalance事件的trigger
  2. 隨機選擇事件來觸發rebalance
  3. 結束時印出每個consumer的generation、rebalance時間

增加某個topic的partition數量後,要等待5分鐘才會觸發rebalance,因為要等metadata五分鐘一次的更新。為了不等五分鐘,利用consumer的enforceRebalance()觸發rebalance

紀錄時間

public class Listener implements ConsumerRebalanceListener {
    private long revokedTime = System.currentTimeMillis();
    private final KafkaConsumer<String, String> consumer;
    private final int id;
    private ArrayList<DownTime> downTimes;

    public Listener(KafkaConsumer<String, String> consumer, int id) {
        this.consumer = consumer;
        this.id = id;
        this.downTimes = new ArrayList<>();
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Revoked #" + id);
        revokedTime = System.currentTimeMillis();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("Assigned #" + id);
        long assignedTime = System.currentTimeMillis();
        Duration duration = Duration.ofMillis(assignedTime - revokedTime);
        DownTime downTime = new DownTime(duration, consumer.groupMetadata().generationId());
        downTimes.add(downTime);
        if(id != -1)
            Utility.timeMap.put(id, downTimes);
    }
}

實作ConsumerRebalanceListener,利用callback來紀錄時間

  • onPartitionsRevoked : 在consumer revoke自己的partition時被呼叫
  • onPartitionsAssigned : 在consumer得到自己的assignment、fetch資料前被呼叫

TriggerTask

負責觸發rebalance事件,目前有

  1. 殺死consumer,讓coordinator去發現有人沒送heartbeat
  2. 新增一個consumer到consumer group中
  3. 增加Kafka cluster上隨機一個topic的partition count

Utility

放一些helper method、做其他事情的method,例如: create consumer instance、詢問topics、印出downtime時間

DownTime

儲存rebalanceTimegenerationId的物件

預計修、新增的功能

  • enforceRebalance()的使用方式重寫,現在是new一個KafkaConsumer去enforce
  • 新增unsubscribe()事件觸發rebalance
  • 新增符合pattern的topic事件,目前想法也是搭配enforceRebalance(),因為也要等五分鐘refresh metadata
  • 看能不能找到coordinator,故意把coordinator殺死,觸發rebalance事件
@chia7712
Copy link
Contributor

@harryteng9527 可否試著整合到此專案內,也就是放到App裡面

@harryteng9527
Copy link
Collaborator Author

08/09實驗

實驗環境

3 台 Kafka brokers 為 Intel i9-12900
叢集內有 : 20 個 Topic,650 個 partitions
1 個 consumer group,25個Consumers
20 個 Producers,每個 producer send 一個 topic

目前量測消費速率與時間的方式為 Prometheus

實驗流程

開啟consumers與producers,執行五分鐘後殺死 1 個 consumer,觸發rebalance後觀察Downtime時間及消費速率的變化

實驗結果

Range assignor

  • Partition counts diff :
    • rebalance 前為 18 , max = 36, min = 18
    • rebalance 後為 20 , max = 38, min = 18
  • Down time 為 45 seconds
  • Rebalance time 為 0.3 ms
  • Down time 期間的 Throughput 減少 3.8%

range_downtime_detail

RoundRobin assignor

  • Partition counts diff:
    • rebalance前為 0 max = 26, min = 26
    • rebalance後為 1 max = 28, min = 27
  • Down time 為 45 seconds
  • Rebalance time 為 0.1 ms
  • Down time 期間的 Throughput 減少 4%

roundrobin_downtime_detail

Sticky assignor

  • Partition counts diff:
    • rebalance前為 0 max = 26, min = 26
    • rebalance後為 1 max = 28, min = 27
  • Down time 為 105 seconds
  • Rebalance time 為 0.1 ms
  • Down time 期間的 Throughput 減少 6.3%

sticky_downtime_detail

Cooperative Sticky

  • Partition counts diff:
    • rebalance前為 0 max = 26, min = 26
    • rebalance後為 1 max = 28, min = 27
  • Down time 為 90 seconds
  • Rebalance time 為 10 min
  • Downtime 期間 減少約 40 % 的消費速率

程式開始執行五分鐘後殺死一個consumer,rebalance在殺死consumer後 10 分鐘被觸發。

CooperativeSticky_downtime_detail

總結

Down time總時間消費速率下降比例恢復Rebalance前的消費速率時間各Assignor 整合成一張圖

Down time時間

image

恢復消費速率的時間

image

消費速率下降比例

image

@chia7712
Copy link
Contributor

chia7712 commented Aug 9, 2022

@harryteng9527 這次的實驗做的非常的好,數據的整理也相當不錯。不過我有幾個疑問:

  1. 請試著說明Cooperative Sticky為何無法在五分鐘內爬到跟其他人一樣的讀取速度
  2. X軸的數字是「速率」嗎?它是怎麼算的?
  3. 請試著用簡短的文字總結

@harryteng9527
Copy link
Collaborator Author

  1. 請試著說明Cooperative Sticky為何無法在五分鐘內爬到跟其他人一樣的讀取速度

目前我只想到 Producers send資料到broker也是送這樣大小的資料
因為Cooperative StickySticky第一次分配assignment時消費速率應該會差不多。並且當時實驗Consumer assignment分配,沒有出現skew-assignment的現象,所以也不會出現有某些Consumer會被分配到較多的partitions。
這部份我再嘗試多跑幾次實驗看看能不能找出什麼能夠說明。

  1. X軸的數字是「速率」嗎?它是怎麼算的?

它是怎麼算的?
用JMX exporter去監控 Consumer 的 application,因為看到Kafka官網有提供Metrics

X軸的數字是「速率」嗎?
我把group內所有consumer的每秒平均消費的bytes加總起來( bytes-consumed-rate),所以是 consumer group 的消費速率(單位為Byte)

  1. 請試著用簡短的文字總結

因為一開始想測試Range assignor在不平衡的分配下的Downtime時間,所以故意創造Topic內partitions數量少於Consumer的環境。
總結:
1. 有察覺到Range消費的速率比其他的策略來得低且rebalance後消費速率還會下降
2. 發現要恢復Rebalance前的消費速率要花較久的時間
3. 這次實驗時間粒度有點大(15 sec),若要再更精準的實驗,可能要調到一秒或以下才能夠看到消費速率降至 0 的情況。Prometheus scrape 資料的速度,目前差不多 1s 以內能完成, 若之後仍要以 Prometheus 來撈取資料並再 Grafana 上監測,最緊繃可能每秒紀錄一次,若一秒紀錄一次還是不行觀察得更詳細,可能就要用Performance tool 紀錄實驗結果。

@chia7712
Copy link
Contributor

目前我只想到 Producers send資料到broker也是送這樣大小的資料。
因為Cooperative Sticky與Sticky第一次分配assignment時消費速率應該會差不多。並且當時實驗Consumer assignment分配,沒有出現skew-assignment的現象,所以也不會出現有某些Consumer會被分配到較多的partitions。
這部份我再嘗試多跑幾次實驗看看能不能找出什麼能夠說明。

我其實是想問是否代表Cooperative的平衡效果不好?

若一秒紀錄一次還是不行觀察得更詳細,可能就要用Performance tool 紀錄實驗結果。

請看一下 #573

@harryteng9527
Copy link
Collaborator Author

我其實是想問是否代表Cooperative的平衡效果不好?

對,我覺得平衡效果不好。
經過我多次實驗,我發現觸發rebalance後,雖然不會stop-the-world,但是消費速率會長時間的低落,也可能造成多次的rebalance。

@chia7712
Copy link
Contributor

對,我覺得平衡效果不好。

麻煩把這點記下來,作為調查行為的一個重點

不過實驗中一開始就沒有跑到足夠快的地方…

@harryteng9527
Copy link
Collaborator Author

不過實驗中一開始就沒有跑到足夠快的地方…

這邊的足夠快是指其他assignor一開始的速率嗎?

@chia7712
Copy link
Contributor

這邊的足夠快是指其他assignor一開始的速率嗎?

是的

@harryteng9527
Copy link
Collaborator Author

harryteng9527 commented Sep 3, 2022

09/02 對 Round-Robin assignor 不利之實驗

Round-Robin assignor 的分配方式為 : 將所有 topic-partition 輪流分配給有訂閱該 topic 的 consumer。

對於任何一個 partition assignor 來說,若 group 裡面的 consumers 訂閱不同的 topic,都很容易發生 skew-assignment 的情境

實驗環境

  • 3 台 brokers (intel 12 代)
  • 2 台電腦執行 performance tool, 分別訂閱 topics (如下圖)
    • 每個 performance tool 皆有 4 個 consumers
    • 每個 performance tool 運行 15 分鐘,每 180 秒觸發一次 re-balance
    • performance tool 每 100 ms 紀錄一次消費速率
  • 1 個 consumer group,8 個 consumers
  • 兩個 Topics
    • Topic A : 1000 個 partitions
    • Topic B : 2000 個 partitions

實驗指令大致如下 :

  • Performance tool A
./start_app.sh performance --producers 4 --consumers 4 --bootstrap.servers 192.168.103.171:9092,192.168.103.172:9092 --topics hot --partitions 2000 --replicas 1 --group.id seul --report.path /tmp/exp --run.until 15m --configs partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
  • Performance tool B
./start_app.sh performance --producers 4 --consumers 4 --bootstrap.servers 192.168.103.171:9092,192.168.103.172:9092 --topics test,hot --partitions 1000,2000 --replicas 1,1 --group.id seul --report.path /tmp/exp --run.until 15m --chaos.frequency 180 --configs partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

如下圖所示 :

exp-env

Range assignor

  • Partition count diff :
    • re-balance 前 : max = 500, min = 250
    • re-balance 後 : max = 620, min = 285
  • Down time : 沒明顯變化,只有消費速率下降。
  • Re-balance 後消費的速率差不多在 75 MB/s,約略下降 6% 左右的消費速率
  • 整體 throughput 大約 75 MB/sec
  • Re-balance 時間為 : 40 ms

實驗圖 :
range-rebalance

Round-Robin assignor

  • Partition count diff :
    • re-balance 前 : max = 500, min = 250
    • re-balance 後 : max = 620, min = 285
  • 整體 throughput 大約 68 MB/sec
  • Down time 為 : 8.7 sec
  • 實驗圖 :
    round-robin-rebalance

Down time 詳細圖 :
round-robin precise downtime

Sticky assignor

  • Partition count diff :
    • re-balance 前 : max = 375, min = 375
    • re-balance 後 : max = 477, min = 392
  • Down time 為 : 1.9 sec
  • 整體 throughput : 70 MB/sec

實驗圖 :
sticky-rebalance

Down time 詳細圖 :
sticky precise downtime

Cooperative assignor

  • Partition count diff :
    • re-balance 前 : max = 375, min = 375
    • re-balance 後 : max = 477, min = 392
  • Down time 為 : 8.8 秒
  • 整體 throughput : 67 MB/sec

實驗圖 :

cs re-balance

down time 圖 :
cs precise downtime

實驗結論

  • 發現 Round-Robin 的 Downtime 時間較長,個人覺得有下列因素所影響
    • 演算法時間,因為每次分配 topic-partition 時都會去檢查 consumer 有沒有訂閱 topic,時間複雜度會與 partition 數成正比
    • clean up state
  • Range assignor 也有 skew-assignment, re-balance 時間太短沒有紀錄到,但消費速率有下降
  • Cooperative Sticky re-balance 後的平均表現相較於上次對於 range assignor 不利的實驗還來得好。不過這次的消費速率有到 0 MB/sec

@chia7712
Copy link
Contributor

chia7712 commented Sep 3, 2022

@harryteng9527 感謝最新的實驗,我覺得內容有越來越專業和進入核心,有幾個疑問請試著解答:

1)Range 和 RR 為何會出現「re-balance 後 : max = 620, min = 285」這個結果?這個演算法不是每次都清掉所有訂閱然後重新分配嗎?那直覺應該保持 max 500 min 250的狀況
2)sticky 的下線時間為何比 RR 還短?難道sticky不用檢查 consumer 是否有訂閱該 partitions?
3) 另外 sticky 應該是儘量避免訂閱新舊差異,可否有這個統計值?

@harryteng9527
Copy link
Collaborator Author

1)Range 和 RR 為何會出現「re-balance 後 : max = 620, min = 285」這個結果?那直覺應該保持 max 500 min 250的狀況

re-balance 前的 consumer group 有 8 個而re-balance 後的 consumer group 只剩下 7 個 consumers,所以 re-balance 後的 max-assignment 與 min-assignment 才會是 620 與 285。

難道sticky不用檢查 consumer 是否有訂閱該 partitions?

Sticky 也會去檢查

sticky 的下線時間為何比 RR 還短?

這兩個 assignor 的 re-balance 時間幾乎都在 1~2 sec,。 而 Sticky 與 Round-Robin 的 downtime 差異為下列兩點 :

  1. Sticky 需要 re-fresh 的 partition state 比 Round-Robin 少,因為 Round-Robin 每次 re-balance 後,前後兩次的 assignment 差異都很大,導致 consumer 需要更新 partitions 的 fetch position, 更新 fetch position 會有 overhead (需要發送 request 給 broker)。
  2. Pre-fetched records 的消費, 使用 Sticky assignor 的 consumer 能在 re-balance 後,poll 資料時立刻消費到 records

我覺得最主要是 : 消費 Pre-fetched records 讓 sticky assignor 的 downtime 時間短

另外 sticky 應該是儘量避免訂閱新舊差異,可否有這個統計值?

這個可以開個 PR 為 performance tool 新增此功能,紀錄此統計值,讓使用者能知道 Sticky assignor 有保留多少 partition,也可以拿來做對 Sticky assignor 不利的情境

除了為 Performance tool 新增 sticky 的統計值外,也可以新增 re-balance 時間,也許能添加在 trackerfile writer

@chia7712
Copy link
Contributor

chia7712 commented Sep 7, 2022

這個可以開個 PR 為 performance tool 新增此功能,紀錄此統計值,讓使用者能知道 Sticky assignor 有保留多少 partition,也可以拿來做對 Sticky assignor 不利的情境

這個概念不錯,麻煩獨立到一個議題去

另外一個疑問,Cooperative assignor 的優勢從圖片似乎看不出來,downtime沒有最短,吞吐量的的穩定度似乎也不高,可否試著說明看看?

@chia7712
Copy link
Contributor

chia7712 commented Jan 1, 2023

#238 中我們已經介紹過各個 assignor 的優缺點和差異

@chia7712 chia7712 closed this as completed Jan 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants