Skip to content

Commit b9fc0d6

Browse files
committed
update: expose ProducerMessage.byteSize() function
Signed-off-by: K8sCat <[email protected]>
1 parent 7a92423 commit b9fc0d6

File tree

3 files changed

+5
-5
lines changed

3 files changed

+5
-5
lines changed

async_producer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ type ProducerMessage struct {
232232

233233
const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
234234

235-
func (m *ProducerMessage) byteSize(version int) int {
235+
func (m *ProducerMessage) ByteSize(version int) int {
236236
var size int
237237
if version >= 2 {
238238
size = maximumRecordOverhead
@@ -366,7 +366,7 @@ func (p *asyncProducer) dispatcher() {
366366
p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
367367
continue
368368
}
369-
if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
369+
if msg.ByteSize(version) > p.conf.Producer.MaxMessageBytes {
370370
p.returnError(msg, ErrMessageSizeTooLarge)
371371
continue
372372
}

produce_set.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,11 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
235235

236236
switch {
237237
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
238-
case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
238+
case ps.bufferBytes+msg.ByteSize(version) >= int(MaxRequestSize-(10*1024)):
239239
return true
240240
// Would we overflow the size-limit of a message-batch for this partition?
241241
case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
242-
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
242+
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.ByteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
243243
return true
244244
// Would we overflow simply in number of messages?
245245
case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:

produce_set_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) {
7373

7474
msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
7575

76-
for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes {
76+
for ps.bufferBytes+msg.ByteSize(2) < parent.conf.Producer.MaxMessageBytes {
7777
if ps.wouldOverflow(msg) {
7878
t.Error("set shouldn't fill up before 1000 bytes")
7979
}

0 commit comments

Comments
 (0)