Skip to content

Commit

Permalink
Added topic id to describeTopics Response
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavrth committed Oct 2, 2023
1 parent 4504a27 commit e7de1b1
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 2 deletions.
1 change: 1 addition & 0 deletions examples/admin_describe_topics/admin_describe_topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func main() {
continue
}
fmt.Printf("Topic: %s has succeeded\n", t.Name)
fmt.Printf("Topic Id: %s\n", t.TopicId)
if include_authorized_operations {
fmt.Printf("Allowed operations: %s\n", t.AuthorizedOperations)
}
Expand Down
14 changes: 14 additions & 0 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ type TopicPartitionInfo struct {
type TopicDescription struct {
// Topic name.
Name string
// Topic Id
TopicId Uuid
// Error, if any, of result. Check with `Error.Code() != ErrNoError`.
Error Error
// Is the topic is internal to Kafka?
Expand Down Expand Up @@ -1097,6 +1099,16 @@ func (a *AdminClient) cToAuthorizedOperations(
return authorizedOperations
}

// cToUuid converts a C rd_kafka_uuid_t to a Go Uuid.
func (a *AdminClient) cToUuid(cUuid *C.rd_kafka_uuid_t) Uuid {
uuid := Uuid{
mostSignificantBits: int64(C.rd_kafka_uuid_most_significant_bits(cUuid)),
leastSignificantBits: int64(C.rd_kafka_uuid_least_significant_bits(cUuid)),
base64str: C.GoString(C.rd_kafka_uuid_base64str(cUuid)),
}
return uuid
}

// cToNode converts a C Node_t* to a Go Node.
// cNode must not be nil.
func (a *AdminClient) cToNode(cNode *C.rd_kafka_Node_t) Node {
Expand Down Expand Up @@ -1244,6 +1256,7 @@ func (a *AdminClient) cToTopicDescriptions(

topicName := C.GoString(
C.rd_kafka_TopicDescription_name(cTopic))
topicId := a.cToUuid(C.rd_kafka_TopicDescription_topic_id(cTopic))
err := newErrorFromCError(
C.rd_kafka_TopicDescription_error(cTopic))

Expand Down Expand Up @@ -1272,6 +1285,7 @@ func (a *AdminClient) cToTopicDescriptions(

result[idx] = TopicDescription{
Name: topicName,
TopicId: topicId,
Error: err,
Partitions: partitions,
AuthorizedOperations: authorizedOperations,
Expand Down
2 changes: 1 addition & 1 deletion kafka/generated_errors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package kafka

// Copyright 2016-2023 Confluent Inc.
// AUTOMATICALLY GENERATED ON 2023-07-12 11:45:05.970954589 +0200 CEST m=+0.000441527 USING librdkafka 2.2.0
// AUTOMATICALLY GENERATED ON 2023-09-29 04:54:21.039467052 +0530 IST m=+0.006323752 USING librdkafka 2.2.0

/*
#include "select_rdkafka.h"
Expand Down
1 change: 1 addition & 0 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ func (its *IntegrationTestSuite) TestAdminClient_DescribeTopics() {
"Expected correct error for nonexistent topic")

topicDesc := topicDescs[0]
assert.NotNil(topicDesc.TopicId)
assert.Equal(topicDesc.Error.Code(), ErrNoError,
"Topic description should not have an error")
assert.False(topicDesc.IsInternal, "Topic should not be internal")
Expand Down
22 changes: 22 additions & 0 deletions kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,28 @@ func (n Node) String() string {
return fmt.Sprintf("[%s:%d]/%d", n.Host, n.Port, n.ID)
}

// Uuid.
type Uuid struct {
// Most Significant Bits.
mostSignificantBits int64
// Least Significant Bits.
leastSignificantBits int64
// Base64 representation
base64str string
}

func (uuid Uuid) String() string {
return fmt.Sprintf("%s", uuid.base64str)
}

func (uuid Uuid) getMostSignificantBits() int64 {
return uuid.mostSignificantBits
}

func (uuid Uuid) getLeastSignificantBits() int64 {
return uuid.leastSignificantBits
}

// ConsumerGroupTopicPartitions represents a consumer group's TopicPartitions.
type ConsumerGroupTopicPartitions struct {
// Group name
Expand Down
2 changes: 1 addition & 1 deletion kafka/testconf-example.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"Brokers": "mybroker or $BROKERS env",
"Brokers": "localhost:9092",
"Topic": "test",
"GroupID": "testgroup",
"PerfMsgCount": 1000000,
Expand Down

0 comments on commit e7de1b1

Please sign in to comment.