From 640ffe3509e2a4711b45c11e4550e02e1e69b233 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 4 Apr 2023 17:57:39 +0200 Subject: [PATCH] KIP-320: Allow fetchers to detect and handle log truncation (#968) --- CHANGELOG.md | 3 +++ examples/transactions_example/txnhelpers.go | 1 + kafka/consumer.go | 5 +++++ kafka/integration_test.go | 12 +++++++++++ kafka/kafka.go | 21 ++++++++++++++----- kafka/message.go | 6 ++++++ .../soakclient_transaction.go | 1 + 7 files changed, 44 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 668c348bc..591469a54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ * Added Consumer `SeekPartitions()` method to seek multiple partitions at once and deprecated `Seek()`. + * [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation): + add offset leader epoch to the TopicPartition \ + and Message structs (#968). confluent-kafka-go is based on librdkafka v2.1.0, see the diff --git a/examples/transactions_example/txnhelpers.go b/examples/transactions_example/txnhelpers.go index fe3be7c87..926062f7e 100644 --- a/examples/transactions_example/txnhelpers.go +++ b/examples/transactions_example/txnhelpers.go @@ -149,6 +149,7 @@ func rewindConsumerPosition(partition int32) { if tp.Offset < 0 { // No committed offset, reset to earliest tp.Offset = kafka.OffsetBeginning + tp.LeaderEpoch = nil } addLog(fmt.Sprintf("Processor: rewinding input partition %v to offset %v", diff --git a/kafka/consumer.go b/kafka/consumer.go index 4d6897521..e6557fc55 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -364,6 +364,11 @@ func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err if m.TopicPartition.Offset < 0 { return nil, newErrorFromString(ErrInvalidArg, "Can't store message with offset less than 0") } + + if m.LeaderEpoch != nil && *m.LeaderEpoch < 0 { + return nil, newErrorFromString(ErrInvalidArg, "Can't store message with leader epoch less than 0") + } + offsets := []TopicPartition{m.TopicPartition} offsets[0].Offset++ return c.StoreOffsets(offsets) diff --git a/kafka/integration_test.go b/kafka/integration_test.go index 211713bfa..63d641c8f 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -649,7 +649,9 @@ func (its *IntegrationTestSuite) TestConsumerSeekPartitions() { t.Fatalf("Failed to assign partition: %s", err) } + var leaderEpoch int32 = 0 tps[0].Offset = Offset(numMessages / 2) + tps[0].LeaderEpoch = &leaderEpoch seekedPartitions, err := consumer.SeekPartitions(tps) if err != nil { t.Errorf("SeekPartitions failed: %s", err) @@ -673,6 +675,16 @@ func (its *IntegrationTestSuite) TestConsumerSeekPartitions() { t.Errorf("Expected offset of read message is %d, got %d", numMessages/2, msg.TopicPartition.Offset) } + + if msg.TopicPartition.LeaderEpoch == nil { + t.Errorf("Expected leader epoch got nil") + return + } + + if *msg.TopicPartition.LeaderEpoch == 0 { + t.Errorf("Expected leader epoch of read message is %d, got %d", + 0, *msg.TopicPartition.LeaderEpoch) + } } // TestAdminClient_DeleteConsumerGroups verifies the working of the diff --git a/kafka/kafka.go b/kafka/kafka.go index 1b4d4f022..338f62874 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -276,11 +276,12 @@ const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA) // TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset. type TopicPartition struct { - Topic *string - Partition int32 - Offset Offset - Metadata *string - Error error + Topic *string + Partition int32 + Offset Offset + Metadata *string + Error error + LeaderEpoch *int32 } func (p TopicPartition) String() string { @@ -364,6 +365,11 @@ func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kaf rktpar.metadata = unsafe.Pointer(cmetadata) rktpar.metadata_size = C.size_t(len(*part.Metadata)) } + + if part.LeaderEpoch != nil { + cLeaderEpoch := C.int32_t(*part.LeaderEpoch) + C.rd_kafka_topic_partition_set_leader_epoch(rktpar, cLeaderEpoch) + } } return cparts @@ -384,6 +390,11 @@ func setupTopicPartitionFromCrktpar(partition *TopicPartition, crktpar *C.rd_kaf if crktpar.err != C.RD_KAFKA_RESP_ERR_NO_ERROR { partition.Error = newError(crktpar.err) } + + cLeaderEpoch := int32(C.rd_kafka_topic_partition_get_leader_epoch(crktpar)) + if cLeaderEpoch >= 0 { + partition.LeaderEpoch = &cLeaderEpoch + } } func newTopicPartitionsFromCparts(cparts *C.rd_kafka_topic_partition_list_t) (partitions []TopicPartition) { diff --git a/kafka/message.go b/kafka/message.go index a8cb89185..a041e42fd 100644 --- a/kafka/message.go +++ b/kafka/message.go @@ -77,6 +77,7 @@ type Message struct { TimestampType TimestampType Opaque interface{} Headers []Header + LeaderEpoch *int32 // LeaderEpoch or nil if not available } // String returns a human readable representation of a Message. @@ -160,6 +161,11 @@ func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) { if cmsg.err != 0 { msg.TopicPartition.Error = newError(cmsg.err) } + + leaderEpoch := int32(C.rd_kafka_message_leader_epoch(cmsg)) + if leaderEpoch >= 0 { + msg.LeaderEpoch = &leaderEpoch + } } // newMessageFromC creates a new message object from a C rd_kafka_message_t diff --git a/soaktest/soakclient_transaction/soakclient_transaction.go b/soaktest/soakclient_transaction/soakclient_transaction.go index 3c17b5b0a..2e85e91a1 100644 --- a/soaktest/soakclient_transaction/soakclient_transaction.go +++ b/soaktest/soakclient_transaction/soakclient_transaction.go @@ -609,6 +609,7 @@ func rewindConsumerPosition(c *kafka.Consumer) error { for _, tp := range committed { if tp.Offset < 0 { tp.Offset = kafka.OffsetBeginning + tp.LeaderEpoch = nil } err := c.Seek(tp, 1) if err != nil {