diff --git a/examples/admin_describe_topics/admin_describe_topics.go b/examples/admin_describe_topics/admin_describe_topics.go index 7cffba19e..42ad101aa 100644 --- a/examples/admin_describe_topics/admin_describe_topics.go +++ b/examples/admin_describe_topics/admin_describe_topics.go @@ -79,6 +79,7 @@ func main() { continue } fmt.Printf("Topic: %s has succeeded\n", t.Name) + fmt.Printf("Topic Id: %s\n", t.TopicID) if includeAuthorizedOperations { fmt.Printf("Allowed operations: %s\n", t.AuthorizedOperations) } diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 2316a5443..e603736a1 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -331,6 +331,8 @@ type TopicPartitionInfo struct { type TopicDescription struct { // Topic name. Name string + // Topic Id + TopicID UUID // Error, if any, of the result. Check with `Error.Code() != ErrNoError`. Error Error // Is the topic internal to Kafka? @@ -1098,6 +1100,16 @@ func (a *AdminClient) cToAuthorizedOperations( return authorizedOperations } +// cToUUID converts a C rd_kafka_Uuid_t to a Go UUID. +func (a *AdminClient) cToUUID(cUUID *C.rd_kafka_Uuid_t) UUID { + uuid := UUID{ + mostSignificantBits: int64(C.rd_kafka_Uuid_most_significant_bits(cUUID)), + leastSignificantBits: int64(C.rd_kafka_Uuid_least_significant_bits(cUUID)), + base64str: C.GoString(C.rd_kafka_Uuid_base64str(cUUID)), + } + return uuid +} + // cToNode converts a C Node_t* to a Go Node. // If cNode is nil returns a Node with ID: -1. func (a *AdminClient) cToNode(cNode *C.rd_kafka_Node_t) Node { @@ -1251,6 +1263,7 @@ func (a *AdminClient) cToTopicDescriptions( topicName := C.GoString( C.rd_kafka_TopicDescription_name(cTopic)) + TopicID := a.cToUUID(C.rd_kafka_TopicDescription_topic_id(cTopic)) err := newErrorFromCError( C.rd_kafka_TopicDescription_error(cTopic)) @@ -1279,6 +1292,7 @@ func (a *AdminClient) cToTopicDescriptions( result[idx] = TopicDescription{ Name: topicName, + TopicID: TopicID, Error: err, Partitions: partitions, AuthorizedOperations: authorizedOperations, diff --git a/kafka/integration_test.go b/kafka/integration_test.go index 783ad9481..ed57d20ee 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -1420,6 +1420,9 @@ func (its *IntegrationTestSuite) TestAdminClient_DescribeTopics() { "Expected correct error for nonexistent topic") topicDesc := topicDescs[0] + assert.NotZero(topicDesc.TopicID.GetLeastSignificantBits()) + assert.NotZero(topicDesc.TopicID.GetMostSignificantBits()) + assert.NotEmpty(topicDesc.TopicID.String()) assert.Equal(topicDesc.Error.Code(), ErrNoError, "Topic description should not have an error") assert.False(topicDesc.IsInternal, "Topic should not be internal") diff --git a/kafka/kafka.go b/kafka/kafka.go index 69550b1ec..d349f0e86 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -334,6 +334,31 @@ func (n Node) String() string { return fmt.Sprintf("[%s:%d]/%d", n.Host, n.Port, n.ID) } +// UUID Kafka UUID representation +type UUID struct { + // Most Significant Bits. + mostSignificantBits int64 + // Least Significant Bits. + leastSignificantBits int64 + // Base64 representation + base64str string +} + +// Base64 string representation of the UUID +func (uuid UUID) String() string { + return uuid.base64str +} + +// GetMostSignificantBits returns Most Significant 64 bits of the 128 bits UUID +func (uuid UUID) GetMostSignificantBits() int64 { + return uuid.mostSignificantBits +} + +// GetLeastSignificantBits returns Least Significant 64 bits of the 128 bits UUID +func (uuid UUID) GetLeastSignificantBits() int64 { + return uuid.leastSignificantBits +} + // ConsumerGroupTopicPartitions represents a consumer group's TopicPartitions. type ConsumerGroupTopicPartitions struct { // Group name