Skip to content

Commit

Permalink
producing: set Attrs properly before calling produce callback
Browse files Browse the repository at this point in the history
The only missing field now is LeaderEpoch; setting that would involve
more guts changes than really are worth it at the moment. Currently, the
producer does not need to know the leader epoch when producing.
  • Loading branch information
twmb committed Jan 22, 2021
1 parent da0c4a9 commit 3866e0c
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,14 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i
pnr.Partition = partition
pnr.ProducerID = producerID
pnr.ProducerEpoch = producerEpoch

// A recBuf.attrs is updated when appending to be written. For
// v0 && v1 produce requests, we set bit 8 in the attrs
// corresponding to our own RecordAttr's bit 8 being no
// timestamp type. Thus, we can directly convert the batch
// attrs to our own RecordAttrs.
pnr.Attrs = RecordAttrs{uint8(batch.attrs)}

cl.finishRecordPromise(pnr.promisedRec, err)
batch.records[i] = noPNR
}
Expand Down Expand Up @@ -1244,7 +1252,7 @@ type recBatch struct {
v1wireLength int32 // same as wireLength, but for message set v1
wireLength int32 // tracks total size this batch would currently encode as

attrs int16
attrs int16 // updated during apending; read and converted to RecordAttrs on success
firstTimestamp int64 // since unix epoch, in millis

// mu guards against records being concurrently modified in
Expand Down Expand Up @@ -1504,11 +1512,11 @@ func (r seqRecBatch) appendTo(
dst = kbin.AppendInt32(dst, 0) // reserved crc

attrsAt := len(dst) // in case compression adjusting
r.attrs = 0
if transactional {
r.attrs |= 0x0010 // bit 5 is the "is transactional" bit
}
attrs := r.attrs
dst = kbin.AppendInt16(dst, attrs)
dst = kbin.AppendInt16(dst, r.attrs)
dst = kbin.AppendInt32(dst, int32(len(r.records)-1)) // lastOffsetDelta
dst = kbin.AppendInt64(dst, r.firstTimestamp)

Expand Down Expand Up @@ -1543,10 +1551,10 @@ func (r seqRecBatch) appendTo(
savings := int32(len(toCompress) - len(compressed))
nullableBytesLen -= savings
batchLen -= savings
attrs |= int16(codec)
r.attrs |= int16(codec)
kbin.AppendInt32(dst[:nullableBytesLenAt], nullableBytesLen)
kbin.AppendInt32(dst[:batchLenAt], batchLen)
kbin.AppendInt16(dst[:attrsAt], attrs)
kbin.AppendInt16(dst[:attrsAt], r.attrs)
}
}

Expand Down Expand Up @@ -1586,6 +1594,18 @@ func (r seqRecBatch) appendToAsMessageSet(dst []byte, version uint8, compressor
)
}

r.attrs = 0

// Produce request v0 and v1 uses message set v0, which does not have
// timestamps. We set bit 8 in our attrs which corresponds with our own
// kgo.RecordAttrs's bit. The attrs field is unused in a sink / recBuf
// outside of the appending functions or finishing records; if we use
// more bits in our internal RecordAttrs, the below will need to
// change.
if version == 0 || version == 1 {
r.attrs |= 0b1000_0000
}

if compressor != nil {
toCompress := dst[nullableBytesLenAt+4:] // skip nullable bytes leading prefix
w := sliceWriters.Get().(*sliceWriter)
Expand All @@ -1601,6 +1621,8 @@ func (r seqRecBatch) appendToAsMessageSet(dst []byte, version uint8, compressor
if compressed != nil &&
int(wrappedLength) < len(toCompress) {

r.attrs |= int16(codec)

dst = appendMessageTo(
dst[:nullableBytesLenAt+4],
version,
Expand Down

0 comments on commit 3866e0c

Please sign in to comment.