Skip to content

Commit

Permalink
KIP-320: Allow fetchers to detect and handle log truncation (#968)
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Apr 4, 2023
1 parent 3c01752 commit 640ffe3
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

* Added Consumer `SeekPartitions()` method to seek multiple partitions at
once and deprecated `Seek()`.
* [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation):
add offset leader epoch to the TopicPartition \
and Message structs (#968).


confluent-kafka-go is based on librdkafka v2.1.0, see the
Expand Down
1 change: 1 addition & 0 deletions examples/transactions_example/txnhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func rewindConsumerPosition(partition int32) {
if tp.Offset < 0 {
// No committed offset, reset to earliest
tp.Offset = kafka.OffsetBeginning
tp.LeaderEpoch = nil
}

addLog(fmt.Sprintf("Processor: rewinding input partition %v to offset %v",
Expand Down
5 changes: 5 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err
if m.TopicPartition.Offset < 0 {
return nil, newErrorFromString(ErrInvalidArg, "Can't store message with offset less than 0")
}

if m.LeaderEpoch != nil && *m.LeaderEpoch < 0 {
return nil, newErrorFromString(ErrInvalidArg, "Can't store message with leader epoch less than 0")
}

offsets := []TopicPartition{m.TopicPartition}
offsets[0].Offset++
return c.StoreOffsets(offsets)
Expand Down
12 changes: 12 additions & 0 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,9 @@ func (its *IntegrationTestSuite) TestConsumerSeekPartitions() {
t.Fatalf("Failed to assign partition: %s", err)
}

var leaderEpoch int32 = 0
tps[0].Offset = Offset(numMessages / 2)
tps[0].LeaderEpoch = &leaderEpoch
seekedPartitions, err := consumer.SeekPartitions(tps)
if err != nil {
t.Errorf("SeekPartitions failed: %s", err)
Expand All @@ -673,6 +675,16 @@ func (its *IntegrationTestSuite) TestConsumerSeekPartitions() {
t.Errorf("Expected offset of read message is %d, got %d",
numMessages/2, msg.TopicPartition.Offset)
}

if msg.TopicPartition.LeaderEpoch == nil {
t.Errorf("Expected leader epoch got nil")
return
}

if *msg.TopicPartition.LeaderEpoch == 0 {
t.Errorf("Expected leader epoch of read message is %d, got %d",
0, *msg.TopicPartition.LeaderEpoch)
}
}

// TestAdminClient_DeleteConsumerGroups verifies the working of the
Expand Down
21 changes: 16 additions & 5 deletions kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,12 @@ const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)

// TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
type TopicPartition struct {
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
LeaderEpoch *int32
}

func (p TopicPartition) String() string {
Expand Down Expand Up @@ -364,6 +365,11 @@ func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kaf
rktpar.metadata = unsafe.Pointer(cmetadata)
rktpar.metadata_size = C.size_t(len(*part.Metadata))
}

if part.LeaderEpoch != nil {
cLeaderEpoch := C.int32_t(*part.LeaderEpoch)
C.rd_kafka_topic_partition_set_leader_epoch(rktpar, cLeaderEpoch)
}
}

return cparts
Expand All @@ -384,6 +390,11 @@ func setupTopicPartitionFromCrktpar(partition *TopicPartition, crktpar *C.rd_kaf
if crktpar.err != C.RD_KAFKA_RESP_ERR_NO_ERROR {
partition.Error = newError(crktpar.err)
}

cLeaderEpoch := int32(C.rd_kafka_topic_partition_get_leader_epoch(crktpar))
if cLeaderEpoch >= 0 {
partition.LeaderEpoch = &cLeaderEpoch
}
}

func newTopicPartitionsFromCparts(cparts *C.rd_kafka_topic_partition_list_t) (partitions []TopicPartition) {
Expand Down
6 changes: 6 additions & 0 deletions kafka/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Message struct {
TimestampType TimestampType
Opaque interface{}
Headers []Header
LeaderEpoch *int32 // LeaderEpoch or nil if not available
}

// String returns a human readable representation of a Message.
Expand Down Expand Up @@ -160,6 +161,11 @@ func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) {
if cmsg.err != 0 {
msg.TopicPartition.Error = newError(cmsg.err)
}

leaderEpoch := int32(C.rd_kafka_message_leader_epoch(cmsg))
if leaderEpoch >= 0 {
msg.LeaderEpoch = &leaderEpoch
}
}

// newMessageFromC creates a new message object from a C rd_kafka_message_t
Expand Down
1 change: 1 addition & 0 deletions soaktest/soakclient_transaction/soakclient_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ func rewindConsumerPosition(c *kafka.Consumer) error {
for _, tp := range committed {
if tp.Offset < 0 {
tp.Offset = kafka.OffsetBeginning
tp.LeaderEpoch = nil
}
err := c.Seek(tp, 1)
if err != nil {
Expand Down

0 comments on commit 640ffe3

Please sign in to comment.