Skip to content

Use double buffers in EventLoop #270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/events/events_suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
. "github.com/onsi/gomega"
)

func TestState(t *testing.T) {
func TestEvents(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Events Suite")
}
45 changes: 45 additions & 0 deletions internal/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package events

import (
"testing"

"github.com/google/go-cmp/cmp"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func TestEventLoop_SwapBatches(t *testing.T) {
eventLoop := NewEventLoop(nil, zap.New(), nil, nil)

eventLoop.currentBatch = EventBatch{
"event0",
"event1",
"event2",
}

nextBatch := EventBatch{
"event3",
"event4",
"event5",
"event6",
}

eventLoop.nextBatch = nextBatch

eventLoop.swapBatches()

if l := len(eventLoop.currentBatch); l != 4 {
t.Errorf("EventLoop.swapBatches() mismatch. Expected 4 events in the current batch, got %d", l)
}

if diff := cmp.Diff(eventLoop.currentBatch, nextBatch); diff != "" {
t.Errorf("EventLoop.swapBatches() mismatch on current batch events (-want +got):\n%s", diff)
}

if l := len(eventLoop.nextBatch); l != 0 {
t.Errorf("EventLoop.swapBatches() mismatch. Expected 0 events in the next batch, got %d", l)
}

if c := cap(eventLoop.nextBatch); c != 3 {
t.Errorf("EventLoop.swapBatches() mismatch. Expected capacity of 3 in the next batch, got %d", c)
}
}
60 changes: 36 additions & 24 deletions internal/events/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ type EventLoop struct {
preparer FirstEventBatchPreparer
eventCh <-chan interface{}
logger logr.Logger

// The EventLoop uses double buffering to handle event batch processing.
// The goroutine that handles the batch will always read from the currentBatch slice.
// While the current batch is being handled, new events are added to the nextBatch slice.
// The batches are swapped before starting the handler goroutine.
currentBatch EventBatch
nextBatch EventBatch
}

// NewEventLoop creates a new EventLoop.
Expand All @@ -36,37 +43,38 @@ func NewEventLoop(
preparer FirstEventBatchPreparer,
) *EventLoop {
return &EventLoop{
eventCh: eventCh,
logger: logger,
handler: handler,
preparer: preparer,
eventCh: eventCh,
logger: logger,
handler: handler,
preparer: preparer,
currentBatch: make(EventBatch, 0),
nextBatch: make(EventBatch, 0),
}
}

// Start starts the EventLoop.
// This method will block until the EventLoop stops, which will happen after the ctx is closed.
func (el *EventLoop) Start(ctx context.Context) error {
// The current batch.
var batch EventBatch
// handling tells if any batch is currently being handled.
var handling bool
// handlingDone is used to signal the completion of handling a batch.
handlingDone := make(chan struct{})

handleAndResetBatch := func() {
handleBatch := func() {
go func(batch EventBatch) {
el.logger.Info("Handling events from the batch", "total", len(batch))

el.handler.HandleEventBatch(ctx, batch)

el.logger.Info("Finished handling the batch")
handlingDone <- struct{}{}
}(batch)
}(el.currentBatch)
}

// FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations.
// Use a double-buffer approach - create two buffers and exchange them between the producer and consumer
// routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity.
batch = make([]interface{}, 0)
swapAndHandleBatch := func() {
el.swapBatches()
handleBatch()
handling = true
}

// Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources.
Expand All @@ -81,13 +89,13 @@ func (el *EventLoop) Start(ctx context.Context) error {
// not trigger any reconfiguration after receiving an upsert for an existing resource with the same Generation.

var err error
batch, err = el.preparer.Prepare(ctx)
el.currentBatch, err = el.preparer.Prepare(ctx)
if err != nil {
return fmt.Errorf("failed to prepare the first batch: %w", err)
}

// Handle the first batch
handleAndResetBatch()
handleBatch()
handling = true

// Note: at any point of time, no more than one batch is currently being handled.
Expand All @@ -103,28 +111,32 @@ func (el *EventLoop) Start(ctx context.Context) error {
return nil
case e := <-el.eventCh:
// Add the event to the current batch.
batch = append(batch, e)
el.nextBatch = append(el.nextBatch, e)

// FIXME(pleshakov): Log more details about the event like resource GVK and ns/name.
el.logger.Info(
"added an event to the current batch",
"added an event to the next batch",
"type", fmt.Sprintf("%T", e),
"total", len(batch),
"total", len(el.nextBatch),
)

// Handle the current batch if no batch is being handled.
// If no batch is currently being handled, swap batches and begin handling the batch.
if !handling {
handleAndResetBatch()
handling = true
swapAndHandleBatch()
}
case <-handlingDone:
handling = false

// Handle the current batch if it has at least one event.
if len(batch) > 0 {
handleAndResetBatch()
handling = true
// If there's at least one event in the next batch, swap batches and begin handling the batch.
if len(el.nextBatch) > 0 {
swapAndHandleBatch()
}
}
}
}

// swapBatches swaps the current and next batches.
func (el *EventLoop) swapBatches() {
el.currentBatch, el.nextBatch = el.nextBatch, el.currentBatch
el.nextBatch = el.nextBatch[:0]
}