Skip to content

Commit

Permalink
kgo.Record: add ProducerID and ProducerEpoch
Browse files Browse the repository at this point in the history
These are relatively straightforward to copy from the record batch
struct, similar to how attrs are copied.
  • Loading branch information
twmb committed Nov 10, 2020
1 parent 243bf55 commit 37d6868
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 36 deletions.
30 changes: 21 additions & 9 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,30 +88,42 @@ type Record struct {
// This field is always set in Produce.
Timestamp time.Time

// Attrs specifies what attributes were on this record.
Attrs RecordAttrs

// Topic is the topic that a record is written to.
//
// This must be set for producing.
Topic string

// Partition is the partition that a record is written to.
//
// For producing, this is left unset. If acks are required, this field
// will be filled in before the produce callback if the produce is
// successful.
// For producing, this is left unset. This will be set by the client
// as appropriate.
Partition int32

// Attrs specifies what attributes were on this record.
Attrs RecordAttrs

// ProducerEpoch is the producer epoch of this message if it was
// produced with a producer ID. An epoch and ID of 0 means it was not.
//
// For producing, this is left unset. This will be set by the client
// as appropriate.
ProducerEpoch int16

// ProducerEpoch is the producer ID of this message if it was produced
// with a producer ID. An epoch and ID of 0 means it was not.
//
// For producing, this is left unset. This will be set by the client
// as appropriate.
ProducerID int64

// LeaderEpoch is the leader epoch of the broker at the time this
// record was written, or -1 if on message sets.
LeaderEpoch int32

// Offset is the offset that a record is written as.
//
// For producing, this is left unset. If acks are required, this field
// will be filled in before the produce callback if the produce is
// successful.
// For producing, this is left unset. This will be set by the client
// as appropriate.
Offset int64
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
"err", err,
)
s.cl.failProducerID(req.producerID, req.producerEpoch, err)
s.cl.finishBatch(batch.recBatch, partition, rPartition.BaseOffset, err)
s.cl.finishBatch(batch.recBatch, req.producerID, req.producerEpoch, partition, rPartition.BaseOffset, err)
continue
}
if s.cl.cfg.onDataLoss != nil {
Expand Down Expand Up @@ -676,7 +676,7 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
"max_retries_reached", batch.tries == s.cl.cfg.retries,
)
}
s.cl.finishBatch(batch.recBatch, partition, rPartition.BaseOffset, err)
s.cl.finishBatch(batch.recBatch, req.producerID, req.producerEpoch, partition, rPartition.BaseOffset, err)
}
}

Expand Down Expand Up @@ -704,7 +704,7 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
//
// This is safe even if the owning recBuf migrated sinks, since we are
// finishing based off the status of an inflight req from the original sink.
func (cl *Client) finishBatch(batch *recBatch, partition int32, baseOffset int64, err error) {
func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch int16, partition int32, baseOffset int64, err error) {
recBuf := batch.owner
recBuf.mu.Lock()
defer recBuf.mu.Unlock()
Expand Down Expand Up @@ -733,6 +733,8 @@ func (cl *Client) finishBatch(batch *recBatch, partition int32, baseOffset int64
for i, pnr := range batch.records {
pnr.Offset = baseOffset + int64(i)
pnr.Partition = partition
pnr.ProducerID = producerID
pnr.ProducerEpoch = producerEpoch
cl.finishRecordPromise(pnr.promisedRec, err)
batch.records[i] = noPNR
}
Expand Down
54 changes: 30 additions & 24 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,15 +977,17 @@ func recordToRecord(
}

return &Record{
Key: record.Key,
Value: record.Value,
Headers: h,
Timestamp: timeFromMillis(batch.FirstTimestamp + int64(record.TimestampDelta)),
Attrs: RecordAttrs{uint8(batch.Attributes)},
Topic: topic,
Partition: partition,
LeaderEpoch: batch.PartitionLeaderEpoch,
Offset: batch.FirstOffset + int64(record.OffsetDelta),
Key: record.Key,
Value: record.Value,
Headers: h,
Timestamp: timeFromMillis(batch.FirstTimestamp + int64(record.TimestampDelta)),
Topic: topic,
Partition: partition,
Attrs: RecordAttrs{uint8(batch.Attributes)},
ProducerID: batch.ProducerID,
ProducerEpoch: batch.ProducerEpoch,
LeaderEpoch: batch.PartitionLeaderEpoch,
Offset: batch.FirstOffset + int64(record.OffsetDelta),
}
}

Expand All @@ -1005,13 +1007,15 @@ func v0MessageToRecord(
message *kmsg.MessageV0,
) *Record {
return &Record{
Key: message.Key,
Value: message.Value,
Attrs: messageAttrsToRecordAttrs(message.Attributes, true),
Topic: topic,
Partition: partition,
LeaderEpoch: -1,
Offset: message.Offset,
Key: message.Key,
Value: message.Value,
Topic: topic,
Partition: partition,
Attrs: messageAttrsToRecordAttrs(message.Attributes, true),
ProducerID: -1,
ProducerEpoch: -1,
LeaderEpoch: -1,
Offset: message.Offset,
}
}

Expand All @@ -1021,14 +1025,16 @@ func v1MessageToRecord(
message *kmsg.MessageV1,
) *Record {
return &Record{
Key: message.Key,
Value: message.Value,
Timestamp: timeFromMillis(message.Timestamp),
Attrs: messageAttrsToRecordAttrs(message.Attributes, false),
Topic: topic,
Partition: partition,
LeaderEpoch: -1,
Offset: message.Offset,
Key: message.Key,
Value: message.Value,
Timestamp: timeFromMillis(message.Timestamp),
Topic: topic,
Partition: partition,
Attrs: messageAttrsToRecordAttrs(message.Attributes, false),
ProducerID: -1,
ProducerEpoch: -1,
LeaderEpoch: -1,
Offset: message.Offset,
}
}

Expand Down

0 comments on commit 37d6868

Please sign in to comment.