Skip to content

Commit

Permalink
Added topic id to describe topics response
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavrth committed Oct 11, 2023
1 parent 22210ad commit 34f496c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/admin_describe_topics/admin_describe_topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func main() {
continue
}
fmt.Printf("Topic: %s has succeeded\n", t.Name)
fmt.Printf("Topic Id: %s\n", t.TopicId)
if include_authorized_operations {
fmt.Printf("Allowed operations: %s\n", t.AuthorizedOperations)
}
Expand Down
14 changes: 14 additions & 0 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ type TopicPartitionInfo struct {
type TopicDescription struct {
// Topic name.
Name string
// Topic Id
TopicId Uuid
// Error, if any, of result. Check with `Error.Code() != ErrNoError`.
Error Error
// Is the topic is internal to Kafka?
Expand Down Expand Up @@ -1097,6 +1099,16 @@ func (a *AdminClient) cToAuthorizedOperations(
return authorizedOperations
}

// cToUuid converts a C rd_kafka_Uuid_t to a Go Uuid.
func (a *AdminClient) cToUuid(cUuid *C.rd_kafka_Uuid_t) Uuid {
uuid := Uuid{
mostSignificantBits: int64(C.rd_kafka_Uuid_most_significant_bits(cUuid)),
leastSignificantBits: int64(C.rd_kafka_Uuid_least_significant_bits(cUuid)),
base64str: C.GoString(C.rd_kafka_Uuid_base64str(cUuid)),
}
return uuid
}

// cToNode converts a C Node_t* to a Go Node.
// cNode must not be nil.
func (a *AdminClient) cToNode(cNode *C.rd_kafka_Node_t) Node {
Expand Down Expand Up @@ -1244,6 +1256,7 @@ func (a *AdminClient) cToTopicDescriptions(

topicName := C.GoString(
C.rd_kafka_TopicDescription_name(cTopic))
topicId := a.cToUuid(C.rd_kafka_TopicDescription_topic_id(cTopic))
err := newErrorFromCError(
C.rd_kafka_TopicDescription_error(cTopic))

Expand Down Expand Up @@ -1272,6 +1285,7 @@ func (a *AdminClient) cToTopicDescriptions(

result[idx] = TopicDescription{
Name: topicName,
TopicId: topicId,
Error: err,
Partitions: partitions,
AuthorizedOperations: authorizedOperations,
Expand Down
1 change: 1 addition & 0 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ func (its *IntegrationTestSuite) TestAdminClient_DescribeTopics() {
"Expected correct error for nonexistent topic")

topicDesc := topicDescs[0]
assert.NotNil(topicDesc.TopicId)
assert.Equal(topicDesc.Error.Code(), ErrNoError,
"Topic description should not have an error")
assert.False(topicDesc.IsInternal, "Topic should not be internal")
Expand Down
22 changes: 22 additions & 0 deletions kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,28 @@ func (n Node) String() string {
return fmt.Sprintf("[%s:%d]/%d", n.Host, n.Port, n.ID)
}

// Uuid.
type Uuid struct {
// Most Significant Bits.
mostSignificantBits int64
// Least Significant Bits.
leastSignificantBits int64
// Base64 representation
base64str string
}

func (uuid Uuid) String() string {
return fmt.Sprintf("%s", uuid.base64str)
}

func (uuid Uuid) getMostSignificantBits() int64 {
return uuid.mostSignificantBits
}

func (uuid Uuid) getLeastSignificantBits() int64 {
return uuid.leastSignificantBits
}

// ConsumerGroupTopicPartitions represents a consumer group's TopicPartitions.
type ConsumerGroupTopicPartitions struct {
// Group name
Expand Down

0 comments on commit 34f496c

Please sign in to comment.