diff --git a/pkg/kgo/produce_request_test.go b/pkg/kgo/produce_request_test.go index b8df7a0f..72791920 100644 --- a/pkg/kgo/produce_request_test.go +++ b/pkg/kgo/produce_request_test.go @@ -92,7 +92,8 @@ func TestRecBatchAppendTo(t *testing.T) { ourBatch := seqRecBatch{ seq: 10, recBatch: &recBatch{ - firstTimestamp: 20, + firstTimestamp: 20, + maxTimestampDelta: 4, records: []promisedRec{ { Record: &Record{ @@ -268,7 +269,8 @@ func TestMessageSetAppendTo(t *testing.T) { // input ourBatch := seqRecBatch{ recBatch: &recBatch{ - firstTimestamp: 12, + firstTimestamp: 12, + maxTimestampDelta: 1, records: []promisedRec{ { Record: &Record{ @@ -363,7 +365,8 @@ func BenchmarkAppendBatch(b *testing.B) { ourBatch := seqRecBatch{ seq: 10, recBatch: &recBatch{ - firstTimestamp: 20, + firstTimestamp: 20, + maxTimestampDelta: 4, records: []promisedRec{ { Record: &Record{ diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 37b64902..32d02af1 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -324,7 +324,8 @@ func (cl *Client) TryProduce( // produced in order per partition if the record is produced successfully. // Successfully produced records will have their attributes, offset, and // partition set before the promise is called. All promises are called serially -// (and should be relatively fast). +// (and should be relatively fast). If a record's timestamp is unset, this +// sets the timestamp to time.Now. // // If the topic field is empty, the client will use the DefaultProduceTopic; if // that is also empty, the record is failed immediately. If the record is too diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 90c162cc..63cc4d8d 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -90,7 +90,7 @@ type Record struct { // Record batches are always written with "CreateTime", meaning that // timestamps are generated by clients rather than brokers. // - // This field is always set in Produce. + // When producing, if this field is not yet set, it is set to time.Now. Timestamp time.Time // Topic is the topic that a record is written to. @@ -143,11 +143,11 @@ type Record struct { // (also because number width affects encoding length). We repurpose the Offset // field to save space. func (r *Record) setLengthAndTimestampDelta(length, tsDelta int32) { - r.Offset = int64(uint64(length)<<32 | uint64(tsDelta)) + r.Offset = int64(uint64(uint32(length))<<32 | uint64(uint32(tsDelta))) } func (r *Record) lengthAndTimestampDelta() (length, tsDelta int32) { - return int32(uint64(r.Offset) >> 32), int32(uint64(r.Offset)) + return int32(uint32(uint64(r.Offset) >> 32)), int32(uint32(uint64(r.Offset))) } // AppendFormat appends a record to b given the layout or returns an error if diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index a534c177..a1a5cc17 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1107,10 +1107,12 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool { recBuf.mu.Lock() defer recBuf.mu.Unlock() - // Timestamp after locking to ensure sequential, and truncate to - // milliseconds to avoid some accumulated rounding error problems - // (see Shopify/sarama#1455) - pr.Timestamp = time.Now().Truncate(time.Millisecond) + // We truncate to milliseconds to avoid some accumulated rounding error + // problems (see Shopify/sarama#1455) + if pr.Timestamp.IsZero() { + pr.Timestamp = time.Now() + } + pr.Timestamp = pr.Timestamp.Truncate(time.Millisecond) if recBuf.purged { recBuf.cl.producer.promiseRecord(pr, errPurged) @@ -1326,8 +1328,9 @@ type recBatch struct { wireLength int32 // tracks total size this batch would currently encode as, including length prefix v1wireLength int32 // same as wireLength, but for message set v1 - attrs int16 // updated during apending; read and converted to RecordAttrs on success - firstTimestamp int64 // since unix epoch, in millis + attrs int16 // updated during apending; read and converted to RecordAttrs on success + firstTimestamp int64 // since unix epoch, in millis + maxTimestampDelta int32 mu sync.Mutex // guards appendTo's reading of records against failAllRecords emptying it records []promisedRec // record w/ length, ts calculated @@ -1371,6 +1374,8 @@ func (b *recBatch) appendRecord(pr promisedRec, nums recordNumbers) { b.v1wireLength += messageSet1Length(pr.Record) if len(b.records) == 0 { b.firstTimestamp = pr.Timestamp.UnixNano() / 1e6 + } else if nums.tsDelta > b.maxTimestampDelta { + b.maxTimestampDelta = nums.tsDelta } b.records = append(b.records, pr) } @@ -1903,6 +1908,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte { if flexible { dst = append(dst, 0) } + return dst } @@ -1924,6 +1930,7 @@ func (b seqRecBatch) appendTo( transactional bool, compressor *compressor, ) (dst []byte, m ProduceBatchMetrics) { // named return so that our defer for flexible versions can modify it + flexible := version >= 9 dst = in nullableBytesLen := b.wireLength - 4 // NULLABLE_BYTES leading length, minus itself @@ -1985,11 +1992,7 @@ func (b seqRecBatch) appendTo( dst = kbin.AppendInt16(dst, b.attrs) dst = kbin.AppendInt32(dst, int32(len(b.records)-1)) // lastOffsetDelta dst = kbin.AppendInt64(dst, b.firstTimestamp) - - // maxTimestamp is the timestamp of the last record in a batch. - lastRecord := b.records[len(b.records)-1] - _, tsDelta := lastRecord.lengthAndTimestampDelta() - dst = kbin.AppendInt64(dst, b.firstTimestamp+int64(tsDelta)) + dst = kbin.AppendInt64(dst, b.firstTimestamp+int64(b.maxTimestampDelta)) seq := b.seq if producerID < 0 { // a negative producer ID means we are not using idempotence