Skip to content

Commit

Permalink
Added topic id to describeTopics Response (#1068)
Browse files Browse the repository at this point in the history
Added topic id to describe topics response
  • Loading branch information
pranavrth committed Oct 21, 2023
1 parent 8b99272 commit b4a69cc
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/admin_describe_topics/admin_describe_topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func main() {
continue
}
fmt.Printf("Topic: %s has succeeded\n", t.Name)
fmt.Printf("Topic Id: %s\n", t.TopicID)
if includeAuthorizedOperations {
fmt.Printf("Allowed operations: %s\n", t.AuthorizedOperations)
}
Expand Down
14 changes: 14 additions & 0 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ type TopicPartitionInfo struct {
type TopicDescription struct {
// Topic name.
Name string
// Topic Id
TopicID UUID
// Error, if any, of the result. Check with `Error.Code() != ErrNoError`.
Error Error
// Is the topic internal to Kafka?
Expand Down Expand Up @@ -1098,6 +1100,16 @@ func (a *AdminClient) cToAuthorizedOperations(
return authorizedOperations
}

// cToUUID converts a C rd_kafka_Uuid_t to a Go UUID.
func (a *AdminClient) cToUUID(cUUID *C.rd_kafka_Uuid_t) UUID {
uuid := UUID{
mostSignificantBits: int64(C.rd_kafka_Uuid_most_significant_bits(cUUID)),
leastSignificantBits: int64(C.rd_kafka_Uuid_least_significant_bits(cUUID)),
base64str: C.GoString(C.rd_kafka_Uuid_base64str(cUUID)),
}
return uuid
}

// cToNode converts a C Node_t* to a Go Node.
// If cNode is nil returns a Node with ID: -1.
func (a *AdminClient) cToNode(cNode *C.rd_kafka_Node_t) Node {
Expand Down Expand Up @@ -1251,6 +1263,7 @@ func (a *AdminClient) cToTopicDescriptions(

topicName := C.GoString(
C.rd_kafka_TopicDescription_name(cTopic))
TopicID := a.cToUUID(C.rd_kafka_TopicDescription_topic_id(cTopic))
err := newErrorFromCError(
C.rd_kafka_TopicDescription_error(cTopic))

Expand Down Expand Up @@ -1279,6 +1292,7 @@ func (a *AdminClient) cToTopicDescriptions(

result[idx] = TopicDescription{
Name: topicName,
TopicID: TopicID,
Error: err,
Partitions: partitions,
AuthorizedOperations: authorizedOperations,
Expand Down
3 changes: 3 additions & 0 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,9 @@ func (its *IntegrationTestSuite) TestAdminClient_DescribeTopics() {
"Expected correct error for nonexistent topic")

topicDesc := topicDescs[0]
assert.NotZero(topicDesc.TopicID.GetLeastSignificantBits())
assert.NotZero(topicDesc.TopicID.GetMostSignificantBits())
assert.NotEmpty(topicDesc.TopicID.String())
assert.Equal(topicDesc.Error.Code(), ErrNoError,
"Topic description should not have an error")
assert.False(topicDesc.IsInternal, "Topic should not be internal")
Expand Down
25 changes: 25 additions & 0 deletions kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,31 @@ func (n Node) String() string {
return fmt.Sprintf("[%s:%d]/%d", n.Host, n.Port, n.ID)
}

// UUID Kafka UUID representation
type UUID struct {
// Most Significant Bits.
mostSignificantBits int64
// Least Significant Bits.
leastSignificantBits int64
// Base64 representation
base64str string
}

// Base64 string representation of the UUID
func (uuid UUID) String() string {
return uuid.base64str
}

// GetMostSignificantBits returns Most Significant 64 bits of the 128 bits UUID
func (uuid UUID) GetMostSignificantBits() int64 {
return uuid.mostSignificantBits
}

// GetLeastSignificantBits returns Least Significant 64 bits of the 128 bits UUID
func (uuid UUID) GetLeastSignificantBits() int64 {
return uuid.leastSignificantBits
}

// ConsumerGroupTopicPartitions represents a consumer group's TopicPartitions.
type ConsumerGroupTopicPartitions struct {
// Group name
Expand Down

0 comments on commit b4a69cc

Please sign in to comment.