-
Notifications
You must be signed in to change notification settings - Fork 662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Elect leader api(KIP-460) implemented #1311
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
6705733
to
c7960d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good
Please add an integration test
examples/.gitignore
Outdated
@@ -43,3 +43,4 @@ protobuf_consumer_example/protobuf_consumer_example | |||
protobuf_producer_example/protobuf_producer_example | |||
stats_example/stats_example | |||
transactions_example/transactions_example | |||
admin_elect_leaders/admin_elect_leaders |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add with all the other admin stuff (in alphabetical order)
examples/README.md
Outdated
@@ -86,6 +86,8 @@ Examples | |||
|
|||
[transactions_example](transactions_example) - Showcasing a transactional consume-process-produce application | |||
|
|||
[admin_elect_leaders](admin_elect_leaders) - Perform Preferred/ Unclean elections for the specified Topic Partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[admin_elect_leaders](admin_elect_leaders) - Perform Preferred/ Unclean elections for the specified Topic Partitions | |
[admin_elect_leaders](admin_elect_leaders) - Perform Preferred/Unclean elections for the specified Topic Partitions |
And also keep it in alphabetical order
kafka/adminapi.go
Outdated
return result, newErrorFromString(ErrInvalidArg, "expected non-empty partitions") | ||
} | ||
|
||
result = ElectLeadersResult{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not needed
kafka/adminapi.go
Outdated
partitions = make([]TopicPartition, partCnt) | ||
|
||
for i := 0; i < partCnt; i++ { | ||
setupTopicPartitionFromCtopicPartitionResult(&partitions[i], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor this method so that you can just pass the C.rd_kafka_topic_partition_result_t directly into it and let it call the other on its own.
I also feel like both newTopicPartitionsFromCTopicPartitionResult and setupTopicPartitionFromCtopicPartitionResult should be in kafka.go, but that can wait for when we need it from another place.
kafka/adminapi.go
Outdated
// err returns any top level level that occured during the operation. | ||
func (a *AdminClient) ElectLeaders(ctx context.Context, electLeaderRequest ElectLeadersRequest, options ...ElectLeadersAdminOption) (result ElectLeadersResult, err error) { | ||
|
||
if len(electLeaderRequest.partitions) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed in the call with the team:
0-length should give an error
nil corresponds to null and should set for all partitions
But len(nil) == len([]) == 0
partitions []TopicPartition | ||
} | ||
|
||
func NewElectLeadersRequest(electionType ElectionType, partitions []TopicPartition) ElectLeadersRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment above function
kafka/adminapi.go
Outdated
// TopicPartitions for which election has been performed and specific error if any | ||
// that occured while election in the specific TopicPartition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TopicPartitions for which election has been performed and specific error if any | |
// that occured while election in the specific TopicPartition. | |
// TopicPartitions for which election has been performed and the per-partition error, if any | |
// that occurred while running the election for the specific TopicPartition. |
kafka/adminapi.go
Outdated
// User has to check all the elements of the result to check any error occured per partition. | ||
// err returns any top level level that occured during the operation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// User has to check all the elements of the result to check any error occured per partition. | |
// err returns any top level level that occured during the operation. | |
// Individual TopicPartitions inside the ElectLeadersResult should be checked for errors. | |
// Additionally, an error that is not nil for client-level errors is returned. |
kafka/adminapi_test.go
Outdated
{}, | ||
{SetAdminRequestTimeout(time.Second)}, | ||
} { | ||
// empty list should fail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this after the librdkafka change, so the empty list fails, but a nil
list times out instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually after the librdkafka change, both empty list and nil partitions is valid.
If there is an empty list in input, we are getting an empty list in the output and for nil, all partitions.
|
||
res, err := ac.ElectLeaders(ctx, kafka.NewElectLeadersRequest(electionType, topicPartitionList)) | ||
if err != nil { | ||
kafkaErr, ok := err.(kafka.Error) // Type assertion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafkaErr, ok := err.(kafka.Error) // Type assertion | |
kafkaErr, ok := err.(kafka.Error) |
c7960d9
to
7c3f979
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great PR Pratyush! A few comments:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Well done Pratyush!
Implemented the ElectLeaders API and wrote unit tests for it