Skip to content

Commit

Permalink
producer: allow the user to set Timestamp
Browse files Browse the repository at this point in the history
I'm not sure why I introduced the restriction that timestamps must be
sequential. It appears the MaxTimestamp field just needs to be the max
timestamp of the batch, and FirstTimestamp (BaseTimestamp) just needs to
be the timestamp of the first record. If timestamps go backwards,
nothing in Kafka seems to care / check this.

Tested by producing two records with a linger where the second record
has a 1s earlier timestamp than the first record.
  • Loading branch information
twmb committed May 29, 2022
1 parent 6378ac1 commit 653010d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
9 changes: 6 additions & 3 deletions pkg/kgo/produce_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func TestRecBatchAppendTo(t *testing.T) {
ourBatch := seqRecBatch{
seq: 10,
recBatch: &recBatch{
firstTimestamp: 20,
firstTimestamp: 20,
maxTimestampDelta: 4,
records: []promisedRec{
{
Record: &Record{
Expand Down Expand Up @@ -268,7 +269,8 @@ func TestMessageSetAppendTo(t *testing.T) {
// input
ourBatch := seqRecBatch{
recBatch: &recBatch{
firstTimestamp: 12,
firstTimestamp: 12,
maxTimestampDelta: 1,
records: []promisedRec{
{
Record: &Record{
Expand Down Expand Up @@ -363,7 +365,8 @@ func BenchmarkAppendBatch(b *testing.B) {
ourBatch := seqRecBatch{
seq: 10,
recBatch: &recBatch{
firstTimestamp: 20,
firstTimestamp: 20,
maxTimestampDelta: 4,
records: []promisedRec{
{
Record: &Record{
Expand Down
3 changes: 2 additions & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ func (cl *Client) TryProduce(
// produced in order per partition if the record is produced successfully.
// Successfully produced records will have their attributes, offset, and
// partition set before the promise is called. All promises are called serially
// (and should be relatively fast).
// (and should be relatively fast). If a record's timestamp is unset, this
// sets the timestamp to time.Now.
//
// If the topic field is empty, the client will use the DefaultProduceTopic; if
// that is also empty, the record is failed immediately. If the record is too
Expand Down
6 changes: 3 additions & 3 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Record struct {
// Record batches are always written with "CreateTime", meaning that
// timestamps are generated by clients rather than brokers.
//
// This field is always set in Produce.
// When producing, if this field is not yet set, it is set to time.Now.
Timestamp time.Time

// Topic is the topic that a record is written to.
Expand Down Expand Up @@ -143,11 +143,11 @@ type Record struct {
// (also because number width affects encoding length). We repurpose the Offset
// field to save space.
func (r *Record) setLengthAndTimestampDelta(length, tsDelta int32) {
r.Offset = int64(uint64(length)<<32 | uint64(tsDelta))
r.Offset = int64(uint64(uint32(length))<<32 | uint64(uint32(tsDelta)))
}

func (r *Record) lengthAndTimestampDelta() (length, tsDelta int32) {
return int32(uint64(r.Offset) >> 32), int32(uint64(r.Offset))
return int32(uint32(uint64(r.Offset) >> 32)), int32(uint32(uint64(r.Offset)))
}

// AppendFormat appends a record to b given the layout or returns an error if
Expand Down
25 changes: 14 additions & 11 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,10 +1107,12 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool {
recBuf.mu.Lock()
defer recBuf.mu.Unlock()

// Timestamp after locking to ensure sequential, and truncate to
// milliseconds to avoid some accumulated rounding error problems
// (see Shopify/sarama#1455)
pr.Timestamp = time.Now().Truncate(time.Millisecond)
// We truncate to milliseconds to avoid some accumulated rounding error
// problems (see Shopify/sarama#1455)
if pr.Timestamp.IsZero() {
pr.Timestamp = time.Now()
}
pr.Timestamp = pr.Timestamp.Truncate(time.Millisecond)

if recBuf.purged {
recBuf.cl.producer.promiseRecord(pr, errPurged)
Expand Down Expand Up @@ -1326,8 +1328,9 @@ type recBatch struct {
wireLength int32 // tracks total size this batch would currently encode as, including length prefix
v1wireLength int32 // same as wireLength, but for message set v1

attrs int16 // updated during apending; read and converted to RecordAttrs on success
firstTimestamp int64 // since unix epoch, in millis
attrs int16 // updated during apending; read and converted to RecordAttrs on success
firstTimestamp int64 // since unix epoch, in millis
maxTimestampDelta int32

mu sync.Mutex // guards appendTo's reading of records against failAllRecords emptying it
records []promisedRec // record w/ length, ts calculated
Expand Down Expand Up @@ -1371,6 +1374,8 @@ func (b *recBatch) appendRecord(pr promisedRec, nums recordNumbers) {
b.v1wireLength += messageSet1Length(pr.Record)
if len(b.records) == 0 {
b.firstTimestamp = pr.Timestamp.UnixNano() / 1e6
} else if nums.tsDelta > b.maxTimestampDelta {
b.maxTimestampDelta = nums.tsDelta
}
b.records = append(b.records, pr)
}
Expand Down Expand Up @@ -1903,6 +1908,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
if flexible {
dst = append(dst, 0)
}

return dst
}

Expand All @@ -1924,6 +1930,7 @@ func (b seqRecBatch) appendTo(
transactional bool,
compressor *compressor,
) (dst []byte, m ProduceBatchMetrics) { // named return so that our defer for flexible versions can modify it

flexible := version >= 9
dst = in
nullableBytesLen := b.wireLength - 4 // NULLABLE_BYTES leading length, minus itself
Expand Down Expand Up @@ -1985,11 +1992,7 @@ func (b seqRecBatch) appendTo(
dst = kbin.AppendInt16(dst, b.attrs)
dst = kbin.AppendInt32(dst, int32(len(b.records)-1)) // lastOffsetDelta
dst = kbin.AppendInt64(dst, b.firstTimestamp)

// maxTimestamp is the timestamp of the last record in a batch.
lastRecord := b.records[len(b.records)-1]
_, tsDelta := lastRecord.lengthAndTimestampDelta()
dst = kbin.AppendInt64(dst, b.firstTimestamp+int64(tsDelta))
dst = kbin.AppendInt64(dst, b.firstTimestamp+int64(b.maxTimestampDelta))

seq := b.seq
if producerID < 0 { // a negative producer ID means we are not using idempotence
Expand Down

0 comments on commit 653010d

Please sign in to comment.