From f86d40d963ef74c989caf09beadf1305a05a050c Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 24 Mar 2015 13:13:35 +0000 Subject: [PATCH 1/2] producer: make Flush.Frequency behaviour better Previously the timer would tick every Flush.Frequency regardless of if a message was actually queued. This meant that if no message arrived for Flush.Frequency then the next message would be sent immediately, which is unexpected if not *exactly* wrong. With this change the timer is only started when the first message arrives, and is cleared when a flush occurs. This should result in slightly better batching for low-volume topics at the result of slightly higher latency (although the delay will still never be more than Flush.Frequency). --- async_producer.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/async_producer.go b/async_producer.go index 374bce2eb..4c1aad29d 100644 --- a/async_producer.go +++ b/async_producer.go @@ -421,17 +421,13 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch // groups messages together into appropriately-sized batches for sending to the broker // based on https://godoc.org/github.com/eapache/channels#BatchingChannel func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) { - var ticker *time.Ticker - var timer <-chan time.Time - if p.conf.Producer.Flush.Frequency > 0 { - ticker = time.NewTicker(p.conf.Producer.Flush.Frequency) - timer = ticker.C - } - - var buffer []*ProducerMessage - var doFlush chan []*ProducerMessage - var bytesAccumulated int - var defaultFlush bool + var ( + timer <-chan time.Time + buffer []*ProducerMessage + doFlush chan []*ProducerMessage + bytesAccumulated int + defaultFlush bool + ) if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 { defaultFlush = true @@ -454,6 +450,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe flusher <- buffer buffer = nil doFlush = nil + timer = nil bytesAccumulated = 0 } @@ -465,20 +462,20 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe (p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) || (p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) { doFlush = flusher + } else if p.conf.Producer.Flush.Frequency > 0 && timer == nil { + timer = time.After(p.conf.Producer.Flush.Frequency) } case <-timer: doFlush = flusher case doFlush <- buffer: buffer = nil doFlush = nil + timer = nil bytesAccumulated = 0 } } shutdown: - if ticker != nil { - ticker.Stop() - } if len(buffer) > 0 { flusher <- buffer } From 509eef33e086e3bce1d1f6b9e149071c40f8e083 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 24 Mar 2015 13:38:24 +0000 Subject: [PATCH 2/2] Rename some things --- async_producer.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/async_producer.go b/async_producer.go index 4c1aad29d..0674e6a9f 100644 --- a/async_producer.go +++ b/async_producer.go @@ -424,7 +424,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe var ( timer <-chan time.Time buffer []*ProducerMessage - doFlush chan []*ProducerMessage + flushTriggered chan []*ProducerMessage bytesAccumulated int defaultFlush bool ) @@ -433,8 +433,8 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe defaultFlush = true } - flusher := make(chan []*ProducerMessage) - go withRecover(func() { p.flusher(broker, flusher) }) + output := make(chan []*ProducerMessage) + go withRecover(func() { p.flusher(broker, output) }) for { select { @@ -447,10 +447,10 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe (p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) || (p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) { Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush") - flusher <- buffer - buffer = nil - doFlush = nil + output <- buffer timer = nil + buffer = nil + flushTriggered = nil bytesAccumulated = 0 } @@ -461,25 +461,25 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe msg.flags&chaser == chaser || (p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) || (p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) { - doFlush = flusher + flushTriggered = output } else if p.conf.Producer.Flush.Frequency > 0 && timer == nil { timer = time.After(p.conf.Producer.Flush.Frequency) } case <-timer: - doFlush = flusher - case doFlush <- buffer: - buffer = nil - doFlush = nil + flushTriggered = output + case flushTriggered <- buffer: timer = nil + buffer = nil + flushTriggered = nil bytesAccumulated = 0 } } shutdown: if len(buffer) > 0 { - flusher <- buffer + output <- buffer } - close(flusher) + close(output) } // one per broker