From 653010d65361bf96730f251c5a2b34ff35a0a5ba Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 29 May 2022 11:50:31 -0600 Subject: [PATCH] producer: allow the user to set Timestamp I'm not sure why I introduced the restriction that timestamps must be sequential. It appears the MaxTimestamp field just needs to be the max timestamp of the batch, and FirstTimestamp (BaseTimestamp) just needs to be the timestamp of the first record. If timestamps go backwards, nothing in Kafka seems to care / check this. Tested by producing two records with a linger where the second record has a 1s earlier timestamp than the first record. --- pkg/kgo/produce_request_test.go | 9 ++++++--- pkg/kgo/producer.go | 3 ++- pkg/kgo/record_and_fetch.go | 6 +++--- pkg/kgo/sink.go | 25 ++++++++++++++----------- 4 files changed, 25 insertions(+), 18 deletions(-) diff --git a/pkg/kgo/produce_request_test.go b/pkg/kgo/produce_request_test.go index b8df7a0f..72791920 100644 --- a/pkg/kgo/produce_request_test.go +++ b/pkg/kgo/produce_request_test.go @@ -92,7 +92,8 @@ func TestRecBatchAppendTo(t *testing.T) { ourBatch := seqRecBatch{ seq: 10, recBatch: &recBatch{ - firstTimestamp: 20, + firstTimestamp: 20, + maxTimestampDelta: 4, records: []promisedRec{ { Record: &Record{ @@ -268,7 +269,8 @@ func TestMessageSetAppendTo(t *testing.T) { // input ourBatch := seqRecBatch{ recBatch: &recBatch{ - firstTimestamp: 12, + firstTimestamp: 12, + maxTimestampDelta: 1, records: []promisedRec{ { Record: &Record{ @@ -363,7 +365,8 @@ func BenchmarkAppendBatch(b *testing.B) { ourBatch := seqRecBatch{ seq: 10, recBatch: &recBatch{ - firstTimestamp: 20, + firstTimestamp: 20, + maxTimestampDelta: 4, records: []promisedRec{ { Record: &Record{ diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 37b64902..32d02af1 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -324,7 +324,8 @@ func (cl *Client) TryProduce( // produced in order per partition if the record is produced successfully. // Successfully produced records will have their attributes, offset, and // partition set before the promise is called. All promises are called serially -// (and should be relatively fast). +// (and should be relatively fast). If a record's timestamp is unset, this +// sets the timestamp to time.Now. // // If the topic field is empty, the client will use the DefaultProduceTopic; if // that is also empty, the record is failed immediately. If the record is too diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 90c162cc..63cc4d8d 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -90,7 +90,7 @@ type Record struct { // Record batches are always written with "CreateTime", meaning that // timestamps are generated by clients rather than brokers. // - // This field is always set in Produce. + // When producing, if this field is not yet set, it is set to time.Now. Timestamp time.Time // Topic is the topic that a record is written to. @@ -143,11 +143,11 @@ type Record struct { // (also because number width affects encoding length). We repurpose the Offset // field to save space. func (r *Record) setLengthAndTimestampDelta(length, tsDelta int32) { - r.Offset = int64(uint64(length)<<32 | uint64(tsDelta)) + r.Offset = int64(uint64(uint32(length))<<32 | uint64(uint32(tsDelta))) } func (r *Record) lengthAndTimestampDelta() (length, tsDelta int32) { - return int32(uint64(r.Offset) >> 32), int32(uint64(r.Offset)) + return int32(uint32(uint64(r.Offset) >> 32)), int32(uint32(uint64(r.Offset))) } // AppendFormat appends a record to b given the layout or returns an error if diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index a534c177..a1a5cc17 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1107,10 +1107,12 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool { recBuf.mu.Lock() defer recBuf.mu.Unlock() - // Timestamp after locking to ensure sequential, and truncate to - // milliseconds to avoid some accumulated rounding error problems - // (see Shopify/sarama#1455) - pr.Timestamp = time.Now().Truncate(time.Millisecond) + // We truncate to milliseconds to avoid some accumulated rounding error + // problems (see Shopify/sarama#1455) + if pr.Timestamp.IsZero() { + pr.Timestamp = time.Now() + } + pr.Timestamp = pr.Timestamp.Truncate(time.Millisecond) if recBuf.purged { recBuf.cl.producer.promiseRecord(pr, errPurged) @@ -1326,8 +1328,9 @@ type recBatch struct { wireLength int32 // tracks total size this batch would currently encode as, including length prefix v1wireLength int32 // same as wireLength, but for message set v1 - attrs int16 // updated during apending; read and converted to RecordAttrs on success - firstTimestamp int64 // since unix epoch, in millis + attrs int16 // updated during apending; read and converted to RecordAttrs on success + firstTimestamp int64 // since unix epoch, in millis + maxTimestampDelta int32 mu sync.Mutex // guards appendTo's reading of records against failAllRecords emptying it records []promisedRec // record w/ length, ts calculated @@ -1371,6 +1374,8 @@ func (b *recBatch) appendRecord(pr promisedRec, nums recordNumbers) { b.v1wireLength += messageSet1Length(pr.Record) if len(b.records) == 0 { b.firstTimestamp = pr.Timestamp.UnixNano() / 1e6 + } else if nums.tsDelta > b.maxTimestampDelta { + b.maxTimestampDelta = nums.tsDelta } b.records = append(b.records, pr) } @@ -1903,6 +1908,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte { if flexible { dst = append(dst, 0) } + return dst } @@ -1924,6 +1930,7 @@ func (b seqRecBatch) appendTo( transactional bool, compressor *compressor, ) (dst []byte, m ProduceBatchMetrics) { // named return so that our defer for flexible versions can modify it + flexible := version >= 9 dst = in nullableBytesLen := b.wireLength - 4 // NULLABLE_BYTES leading length, minus itself @@ -1985,11 +1992,7 @@ func (b seqRecBatch) appendTo( dst = kbin.AppendInt16(dst, b.attrs) dst = kbin.AppendInt32(dst, int32(len(b.records)-1)) // lastOffsetDelta dst = kbin.AppendInt64(dst, b.firstTimestamp) - - // maxTimestamp is the timestamp of the last record in a batch. - lastRecord := b.records[len(b.records)-1] - _, tsDelta := lastRecord.lengthAndTimestampDelta() - dst = kbin.AppendInt64(dst, b.firstTimestamp+int64(tsDelta)) + dst = kbin.AppendInt64(dst, b.firstTimestamp+int64(b.maxTimestampDelta)) seq := b.seq if producerID < 0 { // a negative producer ID means we are not using idempotence