Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-320: Allow fetchers to detect #968

Merged
merged 4 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

* Added Consumer `SeekPartitions()` method to seek multiple partitions at
once and deprecated `Seek()`.
* [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation):
add offset leader epoch to the TopicPartition \
and Message structs (#968).


confluent-kafka-go is based on librdkafka v2.1.0, see the
Expand Down
1 change: 1 addition & 0 deletions examples/transactions_example/txnhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func rewindConsumerPosition(partition int32) {
if tp.Offset < 0 {
// No committed offset, reset to earliest
tp.Offset = kafka.OffsetBeginning
tp.LeaderEpoch = nil
}

addLog(fmt.Sprintf("Processor: rewinding input partition %v to offset %v",
Expand Down
5 changes: 5 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err
if m.TopicPartition.Offset < 0 {
return nil, newErrorFromString(ErrInvalidArg, "Can't store message with offset less than 0")
}

if m.LeaderEpoch != nil && *m.LeaderEpoch < 0 {
return nil, newErrorFromString(ErrInvalidArg, "Can't store message with leader epoch less than 0")
}

offsets := []TopicPartition{m.TopicPartition}
offsets[0].Offset++
return c.StoreOffsets(offsets)
Expand Down
12 changes: 12 additions & 0 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,9 @@ func (its *IntegrationTestSuite) TestConsumerSeekPartitions() {
t.Fatalf("Failed to assign partition: %s", err)
}

var leaderEpoch int32 = 0
tps[0].Offset = Offset(numMessages / 2)
tps[0].LeaderEpoch = &leaderEpoch
seekedPartitions, err := consumer.SeekPartitions(tps)
if err != nil {
t.Errorf("SeekPartitions failed: %s", err)
Expand All @@ -673,6 +675,16 @@ func (its *IntegrationTestSuite) TestConsumerSeekPartitions() {
t.Errorf("Expected offset of read message is %d, got %d",
numMessages/2, msg.TopicPartition.Offset)
}

if msg.TopicPartition.LeaderEpoch == nil {
t.Errorf("Expected leader epoch got nil")
return
}

if *msg.TopicPartition.LeaderEpoch == 0 {
t.Errorf("Expected leader epoch of read message is %d, got %d",
0, *msg.TopicPartition.LeaderEpoch)
}
}

// TestAdminClient_DeleteConsumerGroups verifies the working of the
Expand Down
21 changes: 16 additions & 5 deletions kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,12 @@ const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)

// TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
type TopicPartition struct {
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
LeaderEpoch *int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be int32, we can use a negative value (-1) similar to C case (as illustrated inif cLeaderEpoch >= 0).
We can change if it makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just for the bindings I've followed the Java client that has an Option so I've put an optional field here, in Python and .NET

}

func (p TopicPartition) String() string {
Expand Down Expand Up @@ -364,6 +365,11 @@ func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kaf
rktpar.metadata = unsafe.Pointer(cmetadata)
rktpar.metadata_size = C.size_t(len(*part.Metadata))
}

if part.LeaderEpoch != nil {
cLeaderEpoch := C.int32_t(*part.LeaderEpoch)
C.rd_kafka_topic_partition_set_leader_epoch(rktpar, cLeaderEpoch)
}
}

return cparts
Expand All @@ -384,6 +390,11 @@ func setupTopicPartitionFromCrktpar(partition *TopicPartition, crktpar *C.rd_kaf
if crktpar.err != C.RD_KAFKA_RESP_ERR_NO_ERROR {
partition.Error = newError(crktpar.err)
}

cLeaderEpoch := int32(C.rd_kafka_topic_partition_get_leader_epoch(crktpar))
if cLeaderEpoch >= 0 {
partition.LeaderEpoch = &cLeaderEpoch
}
}

func newTopicPartitionsFromCparts(cparts *C.rd_kafka_topic_partition_list_t) (partitions []TopicPartition) {
Expand Down
6 changes: 6 additions & 0 deletions kafka/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Message struct {
TimestampType TimestampType
Opaque interface{}
Headers []Header
LeaderEpoch *int32 // LeaderEpoch or nil if not available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, the thing about this not being a pointer here, too

}

// String returns a human readable representation of a Message.
Expand Down Expand Up @@ -160,6 +161,11 @@ func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) {
if cmsg.err != 0 {
msg.TopicPartition.Error = newError(cmsg.err)
}

leaderEpoch := int32(C.rd_kafka_message_leader_epoch(cmsg))
if leaderEpoch >= 0 {
msg.LeaderEpoch = &leaderEpoch
}
}

// newMessageFromC creates a new message object from a C rd_kafka_message_t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ func rewindConsumerPosition(c *kafka.Consumer) error {
for _, tp := range committed {
if tp.Offset < 0 {
tp.Offset = kafka.OffsetBeginning
tp.LeaderEpoch = nil
}
err := c.Seek(tp, 1)
if err != nil {
Expand Down