Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cooperative-sticky partition_assignment_strategy option to kafka_reader #882

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

busma13
Copy link
Contributor

@busma13 busma13 commented Nov 27, 2024

This PR makes the following changes:

  • Bump the kafka-asset version from 5.3.0 to 5.4.0
  • Update the kafka_reader and kafka_reader_api to allow for cooperative-sticky as a partition_assignment_strategy option.
  • Set minimum_teraslice_version to 2.9.0, as this version uses node-rdkafka v3.2.0, which is the first version to allow for incremental rebalancing using cooperative-sticky.
  • Remove partition_assignment_strategy option from kafka_sender, kafka_sender_api and kafka_dead_letter. It looks like these might have been copy/paste errors, as this is not a valid property on a kafka producer config.

Ref: #869, #873

@busma13 busma13 marked this pull request as draft November 27, 2024 20:44
@busma13 busma13 marked this pull request as ready for review November 27, 2024 21:04
@busma13
Copy link
Contributor Author

busma13 commented Nov 27, 2024

Logs confirming incremental rebalancing is being used:

Worker that is shutting down revoking its partition:

[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=CGRPOP)
    message: [thrd:main]: Group "rand-p-json-noop-r2" received op TERMINATE in state up (join-state steady)
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=CGRPTERM)
    message: [thrd:main]: Terminating group "rand-p-json-noop-r2" in state up with 1 partition(s)
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=UNSUBSCRIBE)
    message: [thrd:main]: Group "rand-p-json-noop-r2": unsubscribe from current subscription of size 1 (leave group=true, has joined=true, rdkafka-35228bc6-aed2-4248-9fa3-d493cfa49597, join-state steady)
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=SUBSCRIPTION)
    message: [thrd:main]: Group "rand-p-json-noop-r2": clearing subscribed topics list (1)
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=SUBSCRIPTION)
    message: [thrd:main]: Group "rand-p-json-noop-r2": effective subscription list changed from 1 to 0 topic(s):
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=GRPLEADER)
    message: [thrd:main]: Group "rand-p-json-noop-r2": resetting group leader info: unsubscribe
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=REBALANCE)
    message: [thrd:main]: Group "rand-p-json-noop-r2" initiating rebalance (COOPERATIVE) in state up (join-state steady) with 1 assigned partition(s): unsubscribe
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=REBALANCE)
    message: [thrd:main]: Group "rand-p-json-noop-r2": revoking all 1 partition(s) (terminating)
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=CGRPJOINSTATE)
    message: [thrd:main]: Group "rand-p-json-noop-r2" changed join state steady -> wait-unassign-call (state up)
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=ASSIGN)
    message: [thrd:main]: Group "rand-p-json-noop-r2": delegating incremental revoke of 1 partition(s) to application on queue rd_kafka_consumer_close: unsubscribe
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=PAUSE)
    message: [thrd:main]: Pausing fetchers for 1 assigned partition(s): incremental rebalance
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=PAUSE, message="[thrd:main]: Library pausing 1 partition(s)")
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=BARRIER)
    message: [thrd:main]: rand-p-json-v1 [1]: rd_kafka_toppar_op_pause_resume:2328: new version barrier v3
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=PAUSE, message="[thrd:main]: Pause rand-p-json-v1 [1] (v3)")
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=ASSIGNMENT)
    message: [thrd:main]: Group "rand-p-json-noop-r2": 1 partition(s) being removed from group assignment of 1 partition(s)
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=ASSIGNMENT)
    message: [thrd:main]: Group "rand-p-json-noop-r2": clearing group assignment
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=DUMP)
    message: [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=DUMP_ALL, message="[thrd:main]: List with 1 partition(s):")
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=DUMP_ALL, message="[thrd:main]:  rand-p-json-v1 [1] offset STORED")
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=DUMP_PND, message="[thrd:main]: List with 0 partition(s):")
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=DUMP_QRY, message="[thrd:main]: List with 0 partition(s):")
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=DUMP_REM, message="[thrd:main]: List with 0 partition(s):")
[2024-11-27T20:32:32.929Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-hcqd2:  (assignment=worker, module=operation-api, worker_id=anC_ovLH, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=ASSIGNDONE)
    message: [thrd:main]: Group "rand-p-json-noop-r2": assignment operations done in join-state wait-unassign-call (rebalance rejoin=false)

Worker that received a partition:

[2024-11-27T20:33:16.774Z] DEBUG: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-54bfv: got new assignments partition: 1 (assignment=worker, module=operation-api, worker_id=M5BHBt3U, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0)
[2024-11-27T20:33:16.775Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-54bfv:  (assignment=worker, module=operation-api, worker_id=M5BHBt3U, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=ASSIGN)
    message: [thrd:main]: Group "rand-p-json-noop-r2": delegating incremental assign of 1 partition(s) to application on queue rd_kafka_cgrp_new: sync group assign
[2024-11-27T20:33:16.775Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-54bfv:  (assignment=worker, module=operation-api, worker_id=M5BHBt3U, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=PAUSE)
    message: [thrd:main]: Pausing fetchers for 1 assigned partition(s): incremental rebalance

Worker that did not have to rebalance:

[2024-11-27T20:33:16.776Z] DEBUG: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-7gkk2: got new assignments  (assignment=worker, module=operation-api, worker_id=4niw7A3r, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0)
[2024-11-27T20:33:16.776Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-7gkk2:  (assignment=worker, module=operation-api, worker_id=4niw7A3r, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=PAUSE, message="[thrd:main]: Library pausing 1 partition(s)")
[2024-11-27T20:33:16.776Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-7gkk2:  (assignment=worker, module=operation-api, worker_id=4niw7A3r, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=BARRIER)
    message: [thrd:main]: rand-p-json-v1 [0]: rd_kafka_toppar_op_pause_resume:2328: new version barrier v3
[2024-11-27T20:33:16.776Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-7gkk2:  (assignment=worker, module=operation-api, worker_id=4niw7A3r, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=PAUSE, message="[thrd:main]: Pause rand-p-json-v1 [0] (v3)")
[2024-11-27T20:33:16.776Z]  INFO: teraslice/10 on ts-wkr-peter-kafka-to-noop-2e33d9ba-489d-5ff7579f9-7gkk2:  (assignment=worker, module=operation-api, worker_id=4niw7A3r, ex_id=40d4e848-fcc4-43e6-926f-505b0a92249c, job_id=2e33d9ba-489d-4bfd-8242-bbf733536310, apiName=kafka_reader_api:kafka_reader-0, severity=7, fac=ASSIGNMENT)
    message: [thrd:main]: Group "rand-p-json-noop-r2": 0 partition(s) being added to group assignment of 1 partition(s)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant