-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ListOffsets KIP : 396 #4225
ListOffsets KIP : 396 #4225
Conversation
cbd499c
to
9f89db5
Compare
6b35ec6
to
3689716
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First review comments about general checks and rdkafka.h
additions.
09524c4
to
6fcce61
Compare
src/rdkafka.h
Outdated
@@ -9732,6 +9767,26 @@ rd_kafka_error_t *rd_kafka_commit_transaction(rd_kafka_t *rk, int timeout_ms); | |||
RD_EXPORT | |||
rd_kafka_error_t *rd_kafka_abort_transaction(rd_kafka_t *rk, int timeout_ms); | |||
|
|||
typedef enum rd_kafka_OffsetSpec_s { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing to move the section.
You can also put it after DeleteConsumerGroupOffsets
, before Admin API - User SCRAM credentials
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some things to change to keep backward compatibility for consumer rd_kafka_query_watermark_offsets
and rd_kafka_offsets_for_times
functions and to upgrade internal consumer calls too.
rd_kafka_buf_set_maker and upgrade to version 7
8f43390
to
f97979d
Compare
rd_kafka_query_watermark_offsets if partition leader changes
|
if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { | ||
/* Remove its cache in case the topic isn't a known topic. */ | ||
rd_kafka_wrlock(rk); | ||
rd_kafka_metadata_cache_delete_by_name(rk, state->topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can check here also that if cache is hinted for this topic, and in that case, do nothing as a refresh is already ongoing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for ListOffsets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already ensured by rd_kafka_metadata_cache_delete_by_name
because it does a rd_kafka_metadata_cache_find with valid
equal to 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It mostly looks great, thanks for the PR @mahajanadhitya and @emasab !
Some minor comments about the conventions and declarations of the variables
Two comments for the tests.
Good fix for the not leader issue, let's slowly (in other PRs) see if we can extend it to other admin functions which have similar retriable errors.
rd_usleep(100000, 0); | ||
} else if (err) { | ||
TEST_FAIL("Failed with error: %s", | ||
rd_kafka_err2name(err)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: break the outer loop here, it's cleaner that way without the else
src/rdkafka_admin.c
Outdated
goto err; | ||
} | ||
|
||
rd_list_t *topic_partitions_sorted = rd_list_new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move declaration up
src/rdkafka_admin.c
Outdated
|
||
rd_list_t *topic_partitions_sorted = rd_list_new( | ||
topic_partitions->cnt, rd_kafka_topic_partition_destroy_free); | ||
for (i = 0; i < topic_partitions->cnt; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (i = 0; i < topic_partitions->cnt; i++) { | |
for (i = 0; i < topic_partitions->cnt; i++) |
src/rdkafka_admin.c
Outdated
rd_list_add( | ||
topic_partitions_sorted, | ||
rd_kafka_topic_partition_copy(&topic_partitions->elems[i])); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | |
err = rd_kafka_event_error(event); | ||
if (err == RD_KAFKA_RESP_ERR__NOENT) { | ||
/* Still looking for the leader */ | ||
rd_usleep(100000, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: continue the loop here, avoids if elses
src/rdkafka.h
Outdated
@@ -6829,7 +6850,9 @@ typedef enum rd_kafka_admin_op_t { | |||
RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS, | |||
RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */ | |||
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */ | |||
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ | |||
/** ListOffsets */ | |||
RD_KAFKA_ADMIN_OP_LISTOFFSETS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RD_KAFKA_ADMIN_OP_LISTOFFSETS, | |
RD_KAFKA_ADMIN_OP_LISTOFFSETS, /**< ListOffsets */ |
src/rdkafka.h
Outdated
@@ -6829,7 +6850,9 @@ typedef enum rd_kafka_admin_op_t { | |||
RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS, | |||
RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */ | |||
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */ | |||
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ | |||
/** ListOffsets */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** ListOffsets */ |
tests/0138-admin_mock.c
Outdated
|
||
rd_kafka_event_destroy(rkev); | ||
|
||
rd_kafka_mock_broker_push_request_error_rtts( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the watermarks test - replace push of errors with rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);
instead and let the mock handler introduce the error
tests/0081-admin.c
Outdated
|
||
TEST_SAY("Testing offset %" PRId64 "\n", test_fixture.query); | ||
|
||
rd_kafka_topic_partition_list_t *topic_partitions_copy = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
declaration might need to be moved up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve. Given there are my changes too, let's wait for a second approval by @milindl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for these changes!
rd_kafka_query_watermark_offsets metadata refresh bug
… queue implemented