Skip to content

Commit

Permalink
sink: bugfix firstTimestamp
Browse files Browse the repository at this point in the history
In the producer guts overhaul, I changed where firstTimestamp is
calculated. Unfortunately, calculateRecordNumbers now happens before
setting firstTimestamp, so the **first** record added to a batch
calculated a timestamp delta off of 0, which was large and would put the
batch creation time into the future. All other records would then
calculate a timestamp delta off of the actual real firstTimestamp,
because the first record would be buffered, and the buffering is what
sets firstTimestamp.

Now, in calculateRecordNumbers, if the record is to be the first record,
then we zero out tsDelta.
  • Loading branch information
twmb committed May 3, 2021
1 parent d9c34c0 commit 5475f6b
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,13 @@ func messageSet1Length(r *Record) int32 {
func (b *recBatch) calculateRecordNumbers(r *Record) recordNumbers {
tsMillis := r.Timestamp.UnixNano() / 1e6
tsDelta := int32(tsMillis - b.firstTimestamp)

// If this is to be the first record in the batch, then our timestamp
// delta is actually 0.
if len(b.records) == 0 {
tsDelta = 0
}

offsetDelta := int32(len(b.records)) // since called before adding record, delta is the current end

l := 1 + // attributes, int8 unused
Expand Down

0 comments on commit 5475f6b

Please sign in to comment.