Skip to content
27 changes: 3 additions & 24 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ type ackLoop struct {
sig chan batchAckMsg
lst chanList

totalACK uint64
totalSched uint64

batchesSched uint64
batchesACKed uint64
totalACK uint64

processACK func(chanList, int)
}
Expand Down Expand Up @@ -59,26 +55,14 @@ func (l *ackLoop) run() {
for {
select {
case <-l.broker.done:
// TODO: handle pending ACKs?
// TODO: panic on pending batches?
return

case acks <- acked:
acks, acked = nil, 0

case lst := <-l.broker.scheduledACKs:
count, events := lst.count()
l.lst.concat(&lst)

// log.Debug("ACK List:")
// for current := l.lst.head; current != nil; current = current.next {
// log.Debugf(" ack entry(seq=%v, start=%v, count=%v",
// current.seq, current.start, current.count)
// }

l.batchesSched += uint64(count)
l.totalSched += uint64(events)

case <-l.sig:
acked += l.handleBatchSig()
if acked > 0 {
Expand Down Expand Up @@ -136,15 +120,15 @@ func (l *ackLoop) collectAcked() chanList {
lst := chanList{}

acks := l.lst.pop()
l.onACK(acks)
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
lst.append(acks)

done := false
for !l.lst.empty() && !done {
acks := l.lst.front()
select {
case <-acks.ch:
l.onACK(acks)
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
lst.append(l.lst.pop())

default:
Expand All @@ -154,8 +138,3 @@ func (l *ackLoop) collectAcked() chanList {

return lst
}

func (l *ackLoop) onACK(acks *ackChan) {
l.batchesACKed++
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
}
15 changes: 0 additions & 15 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ func (b *batchBuffer) init(sz int) {
b.clients = make([]clientState, 0, sz)
}

func (b *batchBuffer) initWith(sz int, old batchBuffer) {
events, clients := old.events, old.clients
L := len(events)

b.events = make([]publisher.Event, L, sz)
b.clients = make([]clientState, L, sz)

copy(b.events, events)
copy(b.clients, clients)
}

func (b *batchBuffer) add(event publisher.Event, st clientState) {
b.events = append(b.events, event)
b.clients = append(b.clients, st)
Expand All @@ -57,10 +46,6 @@ func (b *batchBuffer) length() int {
return len(b.events)
}

func (b *batchBuffer) capacity() int {
return cap(b.events)
}

func (b *batchBuffer) cancel(st *produceState) int {
events := b.events[:0]
clients := b.clients[:0]
Expand Down
22 changes: 4 additions & 18 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
type broker struct {
done chan struct{}

logger logger
logger *logp.Logger

bufSize int

Expand All @@ -51,16 +51,14 @@ type broker struct {
ackListener queue.ACKListener

// wait group for worker shutdown
wg sync.WaitGroup
waitOnClose bool
wg sync.WaitGroup
}

type Settings struct {
ACKListener queue.ACKListener
Events int
FlushMinEvents int
FlushTimeout time.Duration
WaitOnClose bool
InputQueueSize int
}

Expand Down Expand Up @@ -112,7 +110,7 @@ func create(
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewQueue(
logger logger,
logger *logp.Logger,
settings Settings,
) queue.Queue {
var (
Expand Down Expand Up @@ -151,8 +149,6 @@ func NewQueue(
acks: make(chan int),
scheduledACKs: make(chan chanList),

waitOnClose: settings.WaitOnClose,

ackListener: settings.ACKListener,
}

Expand Down Expand Up @@ -185,9 +181,6 @@ func NewQueue(

func (b *broker) Close() error {
close(b.done)
if b.waitOnClose {
b.wg.Wait()
}
return nil
}

Expand All @@ -214,6 +207,7 @@ var ackChanPool = sync.Pool{
}

func newACKChan(seq uint, start, count int, states []clientState) *ackChan {
//nolint: errcheck // Return value doesn't need to be checked before conversion.
ch := ackChanPool.Get().(*ackChan)
ch.next = nil
ch.seq = seq
Expand Down Expand Up @@ -259,14 +253,6 @@ func (l *chanList) append(ch *ackChan) {
l.tail = ch
}

func (l *chanList) count() (elems, count int) {
for ch := l.head; ch != nil; ch = ch.next {
elems++
count += ch.count
}
return
}

func (l *chanList) empty() bool {
return l.head == nil
}
Expand Down
18 changes: 0 additions & 18 deletions libbeat/publisher/queue/memqueue/buf.go

This file was deleted.

9 changes: 4 additions & 5 deletions libbeat/publisher/queue/memqueue/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ type consumer struct {
}

type batch struct {
consumer *consumer
events []publisher.Event
clientStates []clientState
ack *ackChan
state ackState
consumer *consumer
events []publisher.Event
ack *ackChan
state ackState
}

type ackState uint8
Expand Down
21 changes: 0 additions & 21 deletions libbeat/publisher/queue/memqueue/doc.go

This file was deleted.

11 changes: 5 additions & 6 deletions libbeat/publisher/queue/memqueue/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"math"
"time"

"github.com/elastic/beats/v7/libbeat/logp"
)

// directEventLoop implements the broker main event loop. It buffers events,
Expand Down Expand Up @@ -145,7 +147,7 @@ func (l *directEventLoop) insert(req *pushRequest) (int, bool) {
log := l.broker.logger

if req.state == nil {
_, avail = l.buf.insert(req.event, clientState{})
avail = l.buf.insert(req.event, clientState{})
return avail, true
}

Expand All @@ -155,7 +157,7 @@ func (l *directEventLoop) insert(req *pushRequest) (int, bool) {
return -1, false
}

_, avail = l.buf.insert(req.event, clientState{
avail = l.buf.insert(req.event, clientState{
seq: req.seq,
state: st,
})
Expand Down Expand Up @@ -233,9 +235,6 @@ func (l *directEventLoop) processACK(lst chanList, N int) {
start := acks.start
states := acks.states

// TODO: global boolean to check if clients will need an ACK
// no need to report ACKs if no client is interested in ACKs

idx := start + N - 1
if idx >= len(states) {
idx -= len(states)
Expand Down Expand Up @@ -578,7 +577,7 @@ func (l *flushList) add(b *batchBuffer) {
}
}

func reportCancelledState(log logger, req *pushRequest) {
func reportCancelledState(log *logp.Logger, req *pushRequest) {
log.Debugf("cancelled producer - ignore event: %v\t%v\t%p", req.event, req.seq, req.state)

// do not add waiting events if producer did send cancel signal
Expand Down
2 changes: 0 additions & 2 deletions libbeat/publisher/queue/memqueue/internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,3 @@ type getResponse struct {
}

type batchAckMsg struct{}

type batchCancelRequest struct{ ack *ackChan }
23 changes: 0 additions & 23 deletions libbeat/publisher/queue/memqueue/log.go

This file was deleted.

3 changes: 2 additions & 1 deletion libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package memqueue

import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)
Expand All @@ -37,7 +38,7 @@ type ackProducer struct {
}

type openState struct {
log logger
log *logp.Logger
done chan struct{}
events chan pushRequest
}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ func TestProduceConsumer(t *testing.T) {
minEvents := 32

rand.Seed(seed)
//nolint: gosec // These calls don't need to be cryptographically secure.
events := rand.Intn(maxEvents-minEvents) + minEvents
//nolint: gosec // These calls don't need to be cryptographically secure.
batchSize := rand.Intn(events-8) + 4
//nolint: gosec // These calls don't need to be cryptographically secure.
bufferSize := rand.Intn(batchSize*2) + 4

// events := 4
Expand Down Expand Up @@ -81,7 +84,6 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu
Events: sz,
FlushMinEvents: minEvents,
FlushTimeout: flushTimeout,
WaitOnClose: true,
})
}
}
Expand Down
Loading