From 12e3c112c4d42a80246fc716e9bb5994d0d57788 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 28 Jan 2023 18:57:26 -0700 Subject: [PATCH] franz-go: support 64 bit timestamp deltas Most importantly, in a backwards compatible way. --- pkg/kbin/primitives.go | 6 ++++++ pkg/kbin/primitives_test.go | 36 +++++++++++++++++++++++++++++++- pkg/kgo/produce_request_test.go | 37 ++++++++++++++++++++------------- pkg/kgo/producer.go | 1 + pkg/kgo/record_and_fetch.go | 9 ++++---- pkg/kgo/sink.go | 12 +++++------ pkg/kgo/source.go | 2 +- 7 files changed, 76 insertions(+), 27 deletions(-) diff --git a/pkg/kbin/primitives.go b/pkg/kbin/primitives.go index 2c5990d0..487e7f6c 100644 --- a/pkg/kbin/primitives.go +++ b/pkg/kbin/primitives.go @@ -88,6 +88,12 @@ func UvarintLen(u uint32) int { return int(uvarintLens[byte(bits.Len32(u))]) } +// VarlongLen returns how long i would be if it were varlong encoded. +func VarlongLen(i int64) int { + u := uint64(i)<<1 ^ uint64(i>>63) + return uvarlongLen(u) +} + func uvarlongLen(u uint64) int { return int(uvarintLens[byte(bits.Len64(u))]) } diff --git a/pkg/kbin/primitives_test.go b/pkg/kbin/primitives_test.go index 0432e952..d3f81e02 100644 --- a/pkg/kbin/primitives_test.go +++ b/pkg/kbin/primitives_test.go @@ -8,6 +8,35 @@ import ( "testing/quick" ) +func TestVarint(t *testing.T) { + if err := quick.Check(func(x int32) bool { + var expPut [10]byte + n := binary.PutVarint(expPut[:], int64(x)) + + gotPut := AppendVarint(nil, x) + if !bytes.Equal(expPut[:n], gotPut) { + return false + } + if len(gotPut) != n { + return false + } + if VarintLen(x) != n { + return false + } + + expRead, expN := binary.Varint(expPut[:n]) + gotRead, gotN := Varint(gotPut) + + if expN != gotN || expRead != int64(gotRead) { + return false + } + + return true + }, nil); err != nil { + t.Error(err) + } +} + func TestUvarint(t *testing.T) { if err := quick.Check(func(u uint32) bool { var expPut [10]byte @@ -38,7 +67,12 @@ func TestVarlong(t *testing.T) { gotPut := AppendVarlong(nil, x) if !bytes.Equal(expPut[:n], gotPut) { - fmt.Println(expPut[:n], gotPut) + return false + } + if len(gotPut) != n { + return false + } + if VarlongLen(x) != n { return false } diff --git a/pkg/kgo/produce_request_test.go b/pkg/kgo/produce_request_test.go index ae399d57..8a652a2f 100644 --- a/pkg/kgo/produce_request_test.go +++ b/pkg/kgo/produce_request_test.go @@ -37,7 +37,8 @@ func TestPromisedRecAppendTo(t *testing.T) { {Key: "header key 1", Value: []byte("header value 1")}, {Key: "header key 2", Value: []byte("header value 2")}, }, - Offset: 1<<32 | 2, + LeaderEpoch: 1, + Offset: 2, }, } @@ -103,14 +104,16 @@ func TestRecBatchAppendTo(t *testing.T) { {"header key 1", []byte("header value 1")}, {"header key 2", []byte("header value 2")}, }, - Offset: 1<<32 | 2, + LeaderEpoch: 1, + Offset: 2, }, }, { Record: &Record{ - Key: []byte("key 2"), - Value: []byte("value 2"), - Offset: 3<<32 | 4, + Key: []byte("key 2"), + Value: []byte("value 2"), + LeaderEpoch: 3, + Offset: 4, }, }, }, @@ -274,16 +277,18 @@ func TestMessageSetAppendTo(t *testing.T) { records: []promisedRec{ { Record: &Record{ - Key: []byte("loooooong key 1"), - Value: []byte("loooooong value 1"), - Offset: 1 << 32, + Key: []byte("loooooong key 1"), + Value: []byte("loooooong value 1"), + LeaderEpoch: 1, + Offset: 0, }, }, { Record: &Record{ - Key: []byte("loooooong key 2"), - Value: []byte("loooooong value 2"), - Offset: 3<<32 | 1, + Key: []byte("loooooong key 2"), + Value: []byte("loooooong value 2"), + LeaderEpoch: 3, + Offset: 1, }, }, }, @@ -376,14 +381,16 @@ func BenchmarkAppendBatch(b *testing.B) { {"header key 1", []byte("header value 1")}, {"header key 2", []byte("header value 2")}, }, - Offset: 1<<32 | 2, + LeaderEpoch: 1, + Offset: 2, }, }, { Record: &Record{ - Key: []byte("key 2"), - Value: bytes.Repeat([]byte("value 2"), 1000), - Offset: 3<<32 | 4, + Key: []byte("key 2"), + Value: bytes.Repeat([]byte("value 2"), 1000), + LeaderEpoch: 3, + Offset: 4, }, }, }, diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 72bfdfad..ff5616a2 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -443,6 +443,7 @@ func (p *producer) finishPromises(b batchPromise) { start: p.promisesMu.Lock() for i, pr := range b.recs { + pr.LeaderEpoch = 0 pr.Offset = b.baseOffset + int64(i) pr.Partition = b.partition pr.ProducerID = b.pid diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 5022751a..7e050518 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -154,12 +154,13 @@ type Record struct { // When buffering records, we calculate the length and tsDelta ahead of time // (also because number width affects encoding length). We repurpose the Offset // field to save space. -func (r *Record) setLengthAndTimestampDelta(length, tsDelta int32) { - r.Offset = int64(uint64(uint32(length))<<32 | uint64(uint32(tsDelta))) +func (r *Record) setLengthAndTimestampDelta(length int32, tsDelta int64) { + r.LeaderEpoch = length + r.Offset = tsDelta } -func (r *Record) lengthAndTimestampDelta() (length, tsDelta int32) { - return int32(uint32(uint64(r.Offset) >> 32)), int32(uint32(uint64(r.Offset))) +func (r *Record) lengthAndTimestampDelta() (length int32, tsDelta int64) { + return r.LeaderEpoch, r.Offset } // AppendFormat appends a record to b given the layout or returns an error if diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index c0ee9ecb..77cc0952 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1376,7 +1376,7 @@ type recBatch struct { attrs int16 // updated during apending; read and converted to RecordAttrs on success firstTimestamp int64 // since unix epoch, in millis - maxTimestampDelta int32 + maxTimestampDelta int64 mu sync.Mutex // guards appendTo's reading of records against failAllRecords emptying it records []promisedRec // record w/ length, ts calculated @@ -1775,7 +1775,7 @@ func messageSet1Length(r *Record) int32 { // Returns the numbers for a record if it were added to the record batch. func (b *recBatch) calculateRecordNumbers(r *Record) recordNumbers { tsMillis := r.Timestamp.UnixNano() / 1e6 - tsDelta := int32(tsMillis - b.firstTimestamp) + tsDelta := tsMillis - b.firstTimestamp // If this is to be the first record in the batch, then our timestamp // delta is actually 0. @@ -1786,7 +1786,7 @@ func (b *recBatch) calculateRecordNumbers(r *Record) recordNumbers { offsetDelta := int32(len(b.records)) // since called before adding record, delta is the current end l := 1 + // attributes, int8 unused - kbin.VarintLen(tsDelta) + + kbin.VarlongLen(tsDelta) + kbin.VarintLen(offsetDelta) + kbin.VarintLen(int32(len(r.Key))) + len(r.Key) + @@ -1813,7 +1813,7 @@ func uvarlen(l int) int32 { return int32(kbin.UvarintLen(uvar32(int32(l)))) } // recordNumbers tracks a few numbers for a record that is buffered. type recordNumbers struct { lengthField int32 // the length field prefix of a record encoded on the wire - tsDelta int32 // the ms delta of when the record was added against the first timestamp + tsDelta int64 // the ms delta of when the record was added against the first timestamp } // wireLength is the wire length of a record including its length field prefix. @@ -2036,7 +2036,7 @@ func (b seqRecBatch) appendTo( dst = kbin.AppendInt16(dst, b.attrs) dst = kbin.AppendInt32(dst, int32(len(b.records)-1)) // lastOffsetDelta dst = kbin.AppendInt64(dst, b.firstTimestamp) - dst = kbin.AppendInt64(dst, b.firstTimestamp+int64(b.maxTimestampDelta)) + dst = kbin.AppendInt64(dst, b.firstTimestamp+b.maxTimestampDelta) seq := b.seq if producerID < 0 { // a negative producer ID means we are not using idempotence @@ -2092,7 +2092,7 @@ func (pr promisedRec) appendTo(dst []byte, offsetDelta int32) []byte { length, tsDelta := pr.lengthAndTimestampDelta() dst = kbin.AppendVarint(dst, length) dst = kbin.AppendInt8(dst, 0) // attributes, currently unused - dst = kbin.AppendVarint(dst, tsDelta) + dst = kbin.AppendVarlong(dst, tsDelta) dst = kbin.AppendVarint(dst, offsetDelta) dst = kbin.AppendVarintBytes(dst, pr.Key) dst = kbin.AppendVarintBytes(dst, pr.Value) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index e94a7c15..c6418a25 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1470,7 +1470,7 @@ func recordToRecord( Offset: batch.FirstOffset + int64(record.OffsetDelta), } if r.Attrs.TimestampType() == 0 { - r.Timestamp = timeFromMillis(batch.FirstTimestamp + int64(record.TimestampDelta)) + r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64) } else { r.Timestamp = timeFromMillis(batch.MaxTimestamp) }