From be59c783cf20b8f77d64805b04400c01e0cc47c3 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 13 Apr 2022 15:13:50 -0400 Subject: [PATCH 01/11] remove no-op struct fields --- libbeat/publisher/pipeline/pipeline.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index c7e7f0b47fb1..d28bf4c4e706 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -118,9 +118,6 @@ type OutputReloader interface { } type pipelineEventer struct { - mutex sync.Mutex - modifyable bool - observer queueObserver waitClose *waitCloser } @@ -161,7 +158,6 @@ func New( p.observer = newMetricsObserver(monitors.Metrics) } p.eventer.observer = p.observer - p.eventer.modifyable = true if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { p.waitCloser = &waitCloser{} @@ -254,10 +250,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return nil, err } - p.eventer.mutex.Lock() - p.eventer.modifyable = false - p.eventer.mutex.Unlock() - switch cfg.PublishMode { case beat.GuaranteedSend: eventFlags = publisher.GuaranteedSend From 7bd9ff5fe4a3e0ca1f261166fee8614e483b0cf3 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 13 Apr 2022 16:13:26 -0400 Subject: [PATCH 02/11] placate linter --- libbeat/publisher/pipeline/pipeline.go | 27 +++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index d28bf4c4e706..8ce828403dc3 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -211,7 +211,7 @@ func (p *Pipeline) Close() error { } - // TODO: close/disconnect still active clients + // Note: active clients are not closed / disconnected. // close output before shutting down queue p.output.Close() @@ -359,18 +359,19 @@ func (p *Pipeline) runSignalPropagation() { } // new client -> register client for signal propagation. - client := recv.Interface().(*client) - channels = append(channels, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(client.closeRef.Done()), - }, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(client.done), - }, - ) - clients = append(clients, client) + if client := recv.Interface().(*client); client != nil { + channels = append(channels, + reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(client.closeRef.Done()), + }, + reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(client.done), + }, + ) + clients = append(clients, client) + } continue } From 40baa04c3620fe6a1a926a9212dda2f9356d0fe1 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 13 Apr 2022 16:35:24 -0400 Subject: [PATCH 03/11] placate linter --- libbeat/publisher/pipeline/pipeline.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 8ce828403dc3..0671598b8df4 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -359,6 +359,7 @@ func (p *Pipeline) runSignalPropagation() { } // new client -> register client for signal propagation. + //nolint: errcheck // The linter doesn't understand that `client != nil` checks the return value of Interface(). if client := recv.Interface().(*client); client != nil { channels = append(channels, reflect.SelectCase{ From 0c9509817992e2ffd226d935993ef11cba06bf7c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 13 Apr 2022 17:51:39 -0400 Subject: [PATCH 04/11] clean up some unneeded helper structs --- libbeat/publisher/pipeline/client.go | 6 +- libbeat/publisher/pipeline/pipeline.go | 76 ++++++-------------------- 2 files changed, 19 insertions(+), 63 deletions(-) diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index 29d243257d1c..89711f2ff2ec 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -124,7 +124,7 @@ func (c *client) publish(e beat.Event) { } if c.reportEvents { - c.pipeline.waitCloser.inc() + c.pipeline.waitCloseGroup.Add(1) } var published bool @@ -139,7 +139,7 @@ func (c *client) publish(e beat.Event) { } else { c.onDroppedOnPublish(e) if c.reportEvents { - c.pipeline.waitCloser.dec(1) + c.pipeline.waitCloseGroup.Add(-1) } } } @@ -189,7 +189,7 @@ func (c *client) unlink() { if c.reportEvents { log.Debugf("client: remove client events") if n > 0 { - c.pipeline.waitCloser.dec(n) + c.pipeline.waitCloseGroup.Add(-n) } } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 0671598b8df4..af4b53146d9c 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -63,12 +63,10 @@ type Pipeline struct { observer observer - eventer pipelineEventer - // wait close support - waitCloseMode WaitCloseMode + waitOnClose bool waitCloseTimeout time.Duration - waitCloser *waitCloser + waitCloseGroup sync.WaitGroup // closeRef signal propagation support guardStartSigPropagation sync.Once @@ -117,16 +115,6 @@ type OutputReloader interface { ) error } -type pipelineEventer struct { - observer queueObserver - waitClose *waitCloser -} - -type waitCloser struct { - // keep track of total number of active events (minus dropped by processors) - events sync.WaitGroup -} - type queueFactory func(queue.ACKListener) (queue.Queue, error) // New create a new Pipeline instance from a queue instance and a set of outputs. @@ -149,7 +137,7 @@ func New( beatInfo: beat, monitors: monitors, observer: nilObserver, - waitCloseMode: settings.WaitCloseMode, + waitOnClose: settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0, waitCloseTimeout: settings.WaitClose, processors: settings.Processors, } @@ -157,16 +145,8 @@ func New( if monitors.Metrics != nil { p.observer = newMetricsObserver(monitors.Metrics) } - p.eventer.observer = p.observer - - if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { - p.waitCloser = &waitCloser{} - - // waitCloser decrements counter on queue ACK (not per client) - p.eventer.waitClose = p.waitCloser - } - p.queue, err = queueFactory(&p.eventer) + p.queue, err = queueFactory(p) if err != nil { return nil, err } @@ -185,6 +165,14 @@ func New( return p, nil } +func (p *Pipeline) OnACK(n int) { + p.observer.queueACKed(n) + + if p.waitOnClose { + p.waitCloseGroup.Add(-n) + } +} + // Close stops the pipeline, outputs and queue. // If WaitClose with WaitOnPipelineClose mode is configured, Close will block // for a duration of WaitClose, if there are still active events in the pipeline. @@ -194,10 +182,10 @@ func (p *Pipeline) Close() error { log.Debug("close pipeline") - if p.waitCloser != nil { + if p.waitOnClose { ch := make(chan struct{}) go func() { - p.waitCloser.wait() + p.waitCloseGroup.Wait() ch <- struct{}{} }() @@ -208,7 +196,6 @@ func (p *Pipeline) Close() error { case <-time.After(p.waitCloseTimeout): // timeout -> close pipeline with pending events } - } // Note: active clients are not closed / disconnected. @@ -258,16 +245,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } waitClose := cfg.WaitClose - reportEvents := p.waitCloser != nil - - switch p.waitCloseMode { - case NoWaitOnClose: - - case WaitOnClientClose: - if waitClose <= 0 { - waitClose = p.waitCloseTimeout - } - } + reportEvents := p.waitOnClose processors, err := p.createEventProcessing(cfg.Processing, publishDisabled) if err != nil { @@ -296,7 +274,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { cfg.Events.DroppedOnPublish(event) } if reportEvents { - p.waitCloser.dec(1) + p.waitCloseGroup.Add(-1) } } } @@ -420,28 +398,6 @@ func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bo return p.processors.Create(cfg, noPublish) } -func (e *pipelineEventer) OnACK(n int) { - e.observer.queueACKed(n) - - if wc := e.waitClose; wc != nil { - wc.dec(n) - } -} - -func (e *waitCloser) inc() { - e.events.Add(1) -} - -func (e *waitCloser) dec(n int) { - for i := 0; i < n; i++ { - e.events.Done() - } -} - -func (e *waitCloser) wait() { - e.events.Wait() -} - // OutputReloader returns a reloadable object for the output section of this pipeline func (p *Pipeline) OutputReloader() OutputReloader { return p.output From cdba49937fce2bb190d7ecbd98599a259dbb6be2 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 13 Apr 2022 18:12:30 -0400 Subject: [PATCH 05/11] add comment --- libbeat/publisher/pipeline/pipeline.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index af4b53146d9c..068bca08c52e 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -100,10 +100,6 @@ const ( // to ACK any outstanding events. This is independent of Clients asking for // ACK and/or WaitClose. Clients can still optionally configure WaitClose themselves. WaitOnPipelineClose - - // WaitOnClientClose applies WaitClose timeout to each client connecting to - // the pipeline. Clients are still allowed to overwrite WaitClose with a timeout > 0s. - WaitOnClientClose ) // OutputReloader interface, that can be queried from an active publisher pipeline. @@ -165,6 +161,8 @@ func New( return p, nil } +// OnACK implements the queue.ACKListener interface, so the queue can notify the +// Pipeline when events are acknowledged. func (p *Pipeline) OnACK(n int) { p.observer.queueACKed(n) From 62fc1c11dc9864e007de40a689a6ca8075970150 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 14 Apr 2022 07:18:47 -0400 Subject: [PATCH 06/11] placate linter --- libbeat/publisher/pipeline/client.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index 89711f2ff2ec..84224e41f33e 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -30,10 +30,6 @@ import ( ) // client connects a beat with the processors and pipeline queue. -// -// TODO: All ackers currently drop any late incoming ACK. Some beats still might -// be interested in handling/waiting for event ACKs more globally -// -> add support for not dropping pending ACKs type client struct { pipeline *Pipeline processors beat.Processor @@ -101,8 +97,8 @@ func (c *client) publish(e beat.Event) { event, err = c.processors.Run(event) publish = event != nil if err != nil { - // TODO: introduce dead-letter queue? - + // If we introduce a dead-letter queue, this is where we should + // route the event to it. log.Errorf("Failed to publish event: %v", err) } } @@ -178,7 +174,7 @@ func (c *client) Close() error { return nil } -// unlink is the final step of closing a client. It cancells the connect of the +// unlink is the final step of closing a client. It cancels the connect of the // client as producer to the queue. func (c *client) unlink() { log := c.logger() @@ -271,7 +267,7 @@ func (w *clientCloseWaiter) ACKEvents(n int) { } // The Close signal from the pipeline is ignored. Instead the client -// explicitely uses `signalClose` and `wait` before it continues with the +// explicitly uses `signalClose` and `wait` before it continues with the // closing sequence. func (w *clientCloseWaiter) Close() {} From a1bc2f91452ab7295d5aaf648ed66557e58ce9e4 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 14 Apr 2022 07:57:14 -0400 Subject: [PATCH 07/11] remove unused data and code --- libbeat/publisher/queue/memqueue/ackloop.go | 19 +------------------ libbeat/publisher/queue/memqueue/batchbuf.go | 15 --------------- libbeat/publisher/queue/memqueue/broker.go | 17 +---------------- libbeat/publisher/queue/memqueue/consume.go | 9 ++++----- .../publisher/queue/memqueue/internal_api.go | 2 -- .../publisher/queue/memqueue/queue_test.go | 1 - libbeat/publisher/queue/memqueue/ringbuf.go | 14 +------------- 7 files changed, 7 insertions(+), 70 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 0c804824539a..8b92441eb4e0 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -27,11 +27,7 @@ type ackLoop struct { sig chan batchAckMsg lst chanList - totalACK uint64 - totalSched uint64 - - batchesSched uint64 - batchesACKed uint64 + totalACK uint64 processACK func(chanList, int) } @@ -59,26 +55,14 @@ func (l *ackLoop) run() { for { select { case <-l.broker.done: - // TODO: handle pending ACKs? - // TODO: panic on pending batches? return case acks <- acked: acks, acked = nil, 0 case lst := <-l.broker.scheduledACKs: - count, events := lst.count() l.lst.concat(&lst) - // log.Debug("ACK List:") - // for current := l.lst.head; current != nil; current = current.next { - // log.Debugf(" ack entry(seq=%v, start=%v, count=%v", - // current.seq, current.start, current.count) - // } - - l.batchesSched += uint64(count) - l.totalSched += uint64(events) - case <-l.sig: acked += l.handleBatchSig() if acked > 0 { @@ -156,6 +140,5 @@ func (l *ackLoop) collectAcked() chanList { } func (l *ackLoop) onACK(acks *ackChan) { - l.batchesACKed++ l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count) } diff --git a/libbeat/publisher/queue/memqueue/batchbuf.go b/libbeat/publisher/queue/memqueue/batchbuf.go index 30019cadeb63..d44bfbe9f38b 100644 --- a/libbeat/publisher/queue/memqueue/batchbuf.go +++ b/libbeat/publisher/queue/memqueue/batchbuf.go @@ -37,17 +37,6 @@ func (b *batchBuffer) init(sz int) { b.clients = make([]clientState, 0, sz) } -func (b *batchBuffer) initWith(sz int, old batchBuffer) { - events, clients := old.events, old.clients - L := len(events) - - b.events = make([]publisher.Event, L, sz) - b.clients = make([]clientState, L, sz) - - copy(b.events, events) - copy(b.clients, clients) -} - func (b *batchBuffer) add(event publisher.Event, st clientState) { b.events = append(b.events, event) b.clients = append(b.clients, st) @@ -57,10 +46,6 @@ func (b *batchBuffer) length() int { return len(b.events) } -func (b *batchBuffer) capacity() int { - return cap(b.events) -} - func (b *batchBuffer) cancel(st *produceState) int { events := b.events[:0] clients := b.clients[:0] diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 83c995da31b5..3d04e0dcf7e4 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -51,8 +51,7 @@ type broker struct { ackListener queue.ACKListener // wait group for worker shutdown - wg sync.WaitGroup - waitOnClose bool + wg sync.WaitGroup } type Settings struct { @@ -60,7 +59,6 @@ type Settings struct { Events int FlushMinEvents int FlushTimeout time.Duration - WaitOnClose bool InputQueueSize int } @@ -151,8 +149,6 @@ func NewQueue( acks: make(chan int), scheduledACKs: make(chan chanList), - waitOnClose: settings.WaitOnClose, - ackListener: settings.ACKListener, } @@ -185,9 +181,6 @@ func NewQueue( func (b *broker) Close() error { close(b.done) - if b.waitOnClose { - b.wg.Wait() - } return nil } @@ -259,14 +252,6 @@ func (l *chanList) append(ch *ackChan) { l.tail = ch } -func (l *chanList) count() (elems, count int) { - for ch := l.head; ch != nil; ch = ch.next { - elems++ - count += ch.count - } - return -} - func (l *chanList) empty() bool { return l.head == nil } diff --git a/libbeat/publisher/queue/memqueue/consume.go b/libbeat/publisher/queue/memqueue/consume.go index a995fbdc0ca1..1e4c7b76729b 100644 --- a/libbeat/publisher/queue/memqueue/consume.go +++ b/libbeat/publisher/queue/memqueue/consume.go @@ -35,11 +35,10 @@ type consumer struct { } type batch struct { - consumer *consumer - events []publisher.Event - clientStates []clientState - ack *ackChan - state ackState + consumer *consumer + events []publisher.Event + ack *ackChan + state ackState } type ackState uint8 diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index d693645587f6..b3b4c234ffed 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -49,5 +49,3 @@ type getResponse struct { } type batchAckMsg struct{} - -type batchCancelRequest struct{ ack *ackChan } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index ffb887ec5bbe..7e8068ef4c0e 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -81,7 +81,6 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu Events: sz, FlushMinEvents: minEvents, FlushTimeout: flushTimeout, - WaitOnClose: true, }) } } diff --git a/libbeat/publisher/queue/memqueue/ringbuf.go b/libbeat/publisher/queue/memqueue/ringbuf.go index 202411c9c432..d5d35bf57009 100644 --- a/libbeat/publisher/queue/memqueue/ringbuf.go +++ b/libbeat/publisher/queue/memqueue/ringbuf.go @@ -70,12 +70,6 @@ func (b *eventBuffer) Set(idx int, event publisher.Event, st clientState) { b.clients[idx] = st } -func newRingBuffer(log logger, size int) *ringBuffer { - b := &ringBuffer{} - b.init(log, size) - return b -} - func (b *ringBuffer) init(log logger, size int) { *b = ringBuffer{} b.buf.init(size) @@ -200,12 +194,6 @@ func (b *ringBuffer) cancelRegion(st *produceState, reg region) (removed int) { return len(events) } -// activeBufferOffsets returns start and end offset -// of all available events in region A. -func (b *ringBuffer) activeBufferOffsets() (int, int) { - return b.regA.index, b.regA.index + b.regA.size -} - // reserve returns up to `sz` events from the brokerBuffer, // exclusively marking the events as 'reserved'. Subsequent calls to `reserve` // will only return enqueued and non-reserved events from the buffer. @@ -253,7 +241,7 @@ func (b *ringBuffer) ack(sz int) { // }() if b.regA.size < sz { - panic(fmt.Errorf("Commit region to big (commit region=%v, buffer size=%v)", + panic(fmt.Errorf("commit region to big (commit region=%v, buffer size=%v)", sz, b.regA.size, )) } From e8334ed75378ff381188c7a33ba0977017ed319f Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 14 Apr 2022 08:32:01 -0400 Subject: [PATCH 08/11] remove more unused bits --- libbeat/publisher/queue/memqueue/ackloop.go | 8 ++----- libbeat/publisher/queue/memqueue/broker.go | 4 ++-- libbeat/publisher/queue/memqueue/buf.go | 18 --------------- libbeat/publisher/queue/memqueue/doc.go | 21 ----------------- libbeat/publisher/queue/memqueue/eventloop.go | 4 +++- libbeat/publisher/queue/memqueue/log.go | 23 ------------------- libbeat/publisher/queue/memqueue/ringbuf.go | 16 ------------- 7 files changed, 7 insertions(+), 87 deletions(-) delete mode 100644 libbeat/publisher/queue/memqueue/buf.go delete mode 100644 libbeat/publisher/queue/memqueue/doc.go delete mode 100644 libbeat/publisher/queue/memqueue/log.go diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 8b92441eb4e0..d6a78ae4ed7c 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -120,7 +120,7 @@ func (l *ackLoop) collectAcked() chanList { lst := chanList{} acks := l.lst.pop() - l.onACK(acks) + l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count) lst.append(acks) done := false @@ -128,7 +128,7 @@ func (l *ackLoop) collectAcked() chanList { acks := l.lst.front() select { case <-acks.ch: - l.onACK(acks) + l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count) lst.append(l.lst.pop()) default: @@ -138,7 +138,3 @@ func (l *ackLoop) collectAcked() chanList { return lst } - -func (l *ackLoop) onACK(acks *ackChan) { - l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count) -} diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 3d04e0dcf7e4..41d311f48198 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -35,7 +35,7 @@ const ( type broker struct { done chan struct{} - logger logger + logger *logp.Logger bufSize int @@ -110,7 +110,7 @@ func create( // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. func NewQueue( - logger logger, + logger *logp.Logger, settings Settings, ) queue.Queue { var ( diff --git a/libbeat/publisher/queue/memqueue/buf.go b/libbeat/publisher/queue/memqueue/buf.go deleted file mode 100644 index bca06d9bd0e1..000000000000 --- a/libbeat/publisher/queue/memqueue/buf.go +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package memqueue diff --git a/libbeat/publisher/queue/memqueue/doc.go b/libbeat/publisher/queue/memqueue/doc.go deleted file mode 100644 index 333fe04b9dd2..000000000000 --- a/libbeat/publisher/queue/memqueue/doc.go +++ /dev/null @@ -1,21 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package memqueue provides an in-memory queue.Queue implementation for -// use with the publisher pipeline. -// The queue implementation is registered as queue type "mem". -package memqueue diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 83ab4fb7b5b1..5c3ad28d1992 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -21,6 +21,8 @@ import ( "fmt" "math" "time" + + "github.com/elastic/beats/v7/libbeat/logp" ) // directEventLoop implements the broker main event loop. It buffers events, @@ -578,7 +580,7 @@ func (l *flushList) add(b *batchBuffer) { } } -func reportCancelledState(log logger, req *pushRequest) { +func reportCancelledState(log *logp.Logger, req *pushRequest) { log.Debugf("cancelled producer - ignore event: %v\t%v\t%p", req.event, req.seq, req.state) // do not add waiting events if producer did send cancel signal diff --git a/libbeat/publisher/queue/memqueue/log.go b/libbeat/publisher/queue/memqueue/log.go deleted file mode 100644 index b85c2a0ff455..000000000000 --- a/libbeat/publisher/queue/memqueue/log.go +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package memqueue - -type logger interface { - Debug(...interface{}) - Debugf(string, ...interface{}) -} diff --git a/libbeat/publisher/queue/memqueue/ringbuf.go b/libbeat/publisher/queue/memqueue/ringbuf.go index d5d35bf57009..cbc26306f564 100644 --- a/libbeat/publisher/queue/memqueue/ringbuf.go +++ b/libbeat/publisher/queue/memqueue/ringbuf.go @@ -263,26 +263,10 @@ func (b *ringBuffer) ack(sz int) { } } -func (b *ringBuffer) Empty() bool { - return (b.regA.size - b.reserved) == 0 -} - func (b *ringBuffer) Avail() int { return b.regA.size - b.reserved } -func (b *ringBuffer) RegionBActive() bool { - return b.regB.size > 0 -} - -func (b *ringBuffer) RegionSizes() (int, int) { - return b.regA.size, b.regB.size -} - -func (b *ringBuffer) TotalAvail() int { - return b.regA.size + b.regB.size - b.reserved -} - func (b *ringBuffer) Full() bool { var avail int if b.regB.size > 0 { From 0c5691629fe4bd185a18ab2a5a306b7d93fd6ffe Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 14 Apr 2022 09:06:08 -0400 Subject: [PATCH 09/11] remove more occurrences of internal logger --- libbeat/publisher/queue/memqueue/produce.go | 3 ++- libbeat/publisher/queue/memqueue/ringbuf.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index f6898d100127..625ab473d949 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -19,6 +19,7 @@ package memqueue import ( "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -37,7 +38,7 @@ type ackProducer struct { } type openState struct { - log logger + log *logp.Logger done chan struct{} events chan pushRequest } diff --git a/libbeat/publisher/queue/memqueue/ringbuf.go b/libbeat/publisher/queue/memqueue/ringbuf.go index cbc26306f564..55129d6d36d0 100644 --- a/libbeat/publisher/queue/memqueue/ringbuf.go +++ b/libbeat/publisher/queue/memqueue/ringbuf.go @@ -20,6 +20,7 @@ package memqueue import ( "fmt" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/publisher" ) @@ -43,7 +44,7 @@ type region struct { } type eventBuffer struct { - logger logger + logger *logp.Logger events []publisher.Event clients []clientState @@ -70,7 +71,7 @@ func (b *eventBuffer) Set(idx int, event publisher.Event, st clientState) { b.clients[idx] = st } -func (b *ringBuffer) init(log logger, size int) { +func (b *ringBuffer) init(log *logp.Logger, size int) { *b = ringBuffer{} b.buf.init(size) b.buf.logger = log From 159693417cba7fea5686854b8e8a247c534bcf77 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 14 Apr 2022 10:03:10 -0400 Subject: [PATCH 10/11] make linter happy --- libbeat/publisher/queue/memqueue/eventloop.go | 7 ++----- libbeat/publisher/queue/memqueue/queue_test.go | 3 +++ libbeat/publisher/queue/memqueue/ringbuf.go | 14 ++++++-------- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 5c3ad28d1992..4fb10a7b83a9 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -147,7 +147,7 @@ func (l *directEventLoop) insert(req *pushRequest) (int, bool) { log := l.broker.logger if req.state == nil { - _, avail = l.buf.insert(req.event, clientState{}) + avail = l.buf.insert(req.event, clientState{}) return avail, true } @@ -157,7 +157,7 @@ func (l *directEventLoop) insert(req *pushRequest) (int, bool) { return -1, false } - _, avail = l.buf.insert(req.event, clientState{ + avail = l.buf.insert(req.event, clientState{ seq: req.seq, state: st, }) @@ -235,9 +235,6 @@ func (l *directEventLoop) processACK(lst chanList, N int) { start := acks.start states := acks.states - // TODO: global boolean to check if clients will need an ACK - // no need to report ACKs if no client is interested in ACKs - idx := start + N - 1 if idx >= len(states) { idx -= len(states) diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 7e8068ef4c0e..45cd055d91e1 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -41,8 +41,11 @@ func TestProduceConsumer(t *testing.T) { minEvents := 32 rand.Seed(seed) + //nolint: gosec // These calls don't need to be cryptographically secure. events := rand.Intn(maxEvents-minEvents) + minEvents + //nolint: gosec // These calls don't need to be cryptographically secure. batchSize := rand.Intn(events-8) + 4 + //nolint: gosec // These calls don't need to be cryptographically secure. bufferSize := rand.Intn(batchSize*2) + 4 // events := 4 diff --git a/libbeat/publisher/queue/memqueue/ringbuf.go b/libbeat/publisher/queue/memqueue/ringbuf.go index 55129d6d36d0..80ece2c00c41 100644 --- a/libbeat/publisher/queue/memqueue/ringbuf.go +++ b/libbeat/publisher/queue/memqueue/ringbuf.go @@ -77,7 +77,7 @@ func (b *ringBuffer) init(log *logp.Logger, size int) { b.buf.logger = log } -func (b *ringBuffer) insert(event publisher.Event, client clientState) (bool, int) { +func (b *ringBuffer) insert(event publisher.Event, client clientState) int { // log := b.buf.logger // log.Debug("insert:") // log.Debug(" region A:", b.regA) @@ -97,13 +97,13 @@ func (b *ringBuffer) insert(event publisher.Event, client clientState) (bool, in idx := b.regB.index + b.regB.size avail := b.regA.index - idx if avail == 0 { - return false, 0 + return 0 } b.buf.Set(idx, event, client) b.regB.size++ - return true, avail - 1 + return avail - 1 } // region B does not exist yet, check if region A is available for use @@ -119,7 +119,7 @@ func (b *ringBuffer) insert(event publisher.Event, client clientState) (bool, in // log.Debug(" - no space in region B") - return false, 0 + return 0 } // create region B and insert events @@ -127,14 +127,14 @@ func (b *ringBuffer) insert(event publisher.Event, client clientState) (bool, in b.regB.index = 0 b.regB.size = 1 b.buf.Set(0, event, client) - return true, b.regA.index - 1 + return b.regA.index - 1 } // space available in region A -> let's append the event // log.Debug(" - push into region A") b.buf.Set(idx, event, client) b.regA.size++ - return true, avail - 1 + return avail - 1 } // cancel removes all buffered events matching `st`, not yet reserved by @@ -151,8 +151,6 @@ func (b *ringBuffer) cancel(st *produceState) int { // log.Debug(" -> reserved:", b.reserved) // }() - // TODO: return if st has no pending events - cancelB := b.cancelRegion(st, b.regB) b.regB.size -= cancelB From f49dc9543a59d477fef3952b1e1da9c527ee84fd Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 14 Apr 2022 10:14:56 -0400 Subject: [PATCH 11/11] lint lint lint --- libbeat/publisher/queue/memqueue/broker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 41d311f48198..1315091c8406 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -207,6 +207,7 @@ var ackChanPool = sync.Pool{ } func newACKChan(seq uint, start, count int, states []clientState) *ackChan { + //nolint: errcheck // Return value doesn't need to be checked before conversion. ch := ackChanPool.Get().(*ackChan) ch.next = nil ch.seq = seq