Skip to content

Commit

Permalink
(fix): cleanup event processor (#167)
Browse files Browse the repository at this point in the history
* cleanup event processor

* better swap. used defer (debated on that one).

* add unit tests to cover changes

* use an atomic property for things that are accessed in different threads.

* fix lint errors

* fix lint errors

* fix atomic property with semaphore
  • Loading branch information
thomaszurkan-optimizely authored Nov 7, 2019
1 parent 5a32678 commit dbac8f6
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 18 deletions.
38 changes: 32 additions & 6 deletions pkg/event/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"golang.org/x/sync/semaphore"

"github.com/optimizely/go-sdk/pkg/logging"
"github.com/optimizely/go-sdk/pkg/notification"
"github.com/optimizely/go-sdk/pkg/registry"
Expand All @@ -41,9 +43,10 @@ type BatchEventProcessor struct {
FlushInterval time.Duration // in milliseconds
BatchSize int
Q Queue
Mux sync.Mutex
flushLock sync.Mutex
Ticker *time.Ticker
EventDispatcher Dispatcher
processing *semaphore.Weighted
}

// DefaultBatchSize holds the default value for the batch size
Expand All @@ -55,6 +58,8 @@ const DefaultEventQueueSize = 100
// DefaultEventFlushInterval holds the default value for the event flush interval
const DefaultEventFlushInterval = 30 * time.Second

const maxFlushWorkers = 1

var pLogger = logging.GetLogger("EventProcessor")

// BPOptionConfig is the BatchProcessor options that give you the ability to add one more more options before the processor is initialized.
Expand Down Expand Up @@ -105,7 +110,7 @@ func WithSDKKey(sdkKey string) BPOptionConfig {

// NewBatchEventProcessor returns a new instance of BatchEventProcessor with queueSize and flushInterval
func NewBatchEventProcessor(options ...BPOptionConfig) *BatchEventProcessor {
p := &BatchEventProcessor{}
p := &BatchEventProcessor{processing:semaphore.NewWeighted(int64(maxFlushWorkers))}

for _, opt := range options {
opt(p)
Expand All @@ -123,6 +128,11 @@ func NewBatchEventProcessor(options ...BPOptionConfig) *BatchEventProcessor {
p.BatchSize = DefaultBatchSize
}

if p.BatchSize > p.MaxQueueSize {
pLogger.Warning("Batch size is larger than queue size. Swapping")
p.BatchSize, p.MaxQueueSize = p.MaxQueueSize, p.BatchSize
}

if p.Q == nil {
p.Q = NewInMemoryQueue(p.MaxQueueSize)
}
Expand All @@ -142,13 +152,28 @@ func (p *BatchEventProcessor) Start(exeCtx utils.ExecutionCtx) {

// ProcessEvent processes the given impression event
func (p *BatchEventProcessor) ProcessEvent(event UserEvent) {
p.Q.Add(event)

if p.Q.Size() >= p.MaxQueueSize {
pLogger.Warning("MaxQueueSize has been met. Discarding event")
return
}

p.Q.Add(event)

if p.Q.Size() < p.BatchSize {
return
}

if p.processing.TryAcquire(1) {
// it doesn't matter if the timer has kicked in here.
// we just want to start one go routine when the batch size is met.
pLogger.Debug("batch size reached. Flushing routine being called")
go func() {
p.FlushEvents()
p.processing.Release(1)
}()
}

}

// EventsCount returns size of an event queue
Expand All @@ -171,7 +196,7 @@ func (p *BatchEventProcessor) startTicker(exeCtx utils.ExecutionCtx) {
if p.Ticker != nil {
return
}
p.Ticker = time.NewTicker(p.FlushInterval * time.Millisecond)
p.Ticker = time.NewTicker(p.FlushInterval)
wg := exeCtx.GetWaitSync()
wg.Add(1)
go func() {
Expand Down Expand Up @@ -214,7 +239,9 @@ func (p *BatchEventProcessor) addToBatch(current *Batch, visitor Visitor) {
func (p *BatchEventProcessor) FlushEvents() {
// we flush when queue size is reached.
// however, if there is a ticker cycle already processing, we should wait
p.Mux.Lock()
p.flushLock.Lock()
defer p.flushLock.Unlock()

var batchEvent Batch
var batchEventCount = 0
var failedToSend = false
Expand Down Expand Up @@ -272,7 +299,6 @@ func (p *BatchEventProcessor) FlushEvents() {
}
}
}
p.Mux.Unlock()
}

// OnEventDispatch registers a handler for LogEvent notifications
Expand Down
102 changes: 90 additions & 12 deletions pkg/event/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ type MockDispatcher struct {
Events Queue
}

func (f *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
if f.ShouldFail {
func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
if m.ShouldFail {
return false, errors.New("Failed to dispatch")
}

f.Events.Add(event)
m.Events.Add(event)
return true, nil
}

Expand Down Expand Up @@ -116,10 +116,44 @@ func TestDefaultEventProcessor_LogEventNotification(t *testing.T) {
assert.Nil(t, err)
}

func TestDefaultEventProcessor_DefaultConfig(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
conversion := BuildTestConversionEvent()

processor.ProcessEvent(impression)
processor.ProcessEvent(impression)
processor.ProcessEvent(conversion)
processor.ProcessEvent(conversion)

assert.Equal(t, 4, processor.EventsCount())

time.Sleep(31 * time.Second)

assert.NotNil(t, processor.Ticker)

assert.Equal(t, 0, processor.EventsCount())

result, ok := (processor.EventDispatcher).(*MockDispatcher)

if ok {
assert.Equal(t, 1, result.Events.Size())
evs := result.Events.Get(1)
logEvent, _ := evs[0].(LogEvent)
assert.Equal(t, 4, len(logEvent.Event.Visitors))
}
}

func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithFlushInterval(100), WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(NewMockDispatcher(100, false)))
processor := NewBatchEventProcessor(
WithFlushInterval(1 * time.Second),
WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
Expand All @@ -132,7 +166,7 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {

assert.Equal(t, 4, processor.EventsCount())

exeCtx.TerminateAndWait()
time.Sleep(1500 * time.Millisecond)

assert.NotNil(t, processor.Ticker)

Expand All @@ -148,10 +182,13 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
}
}

func TestDefaultEventProcessor_QSizeMet(t *testing.T) {
func TestDefaultEventProcessor_BatchSizeMet(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(2), WithFlushInterval(1000),
WithQueue(NewInMemoryQueue(2)), WithEventDispatcher(NewMockDispatcher(100, false)))
processor := NewBatchEventProcessor(
WithBatchSize(2),
WithFlushInterval(1000 * time.Millisecond),
WithQueue(NewInMemoryQueue(2)),
WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
Expand Down Expand Up @@ -190,10 +227,49 @@ func TestDefaultEventProcessor_QSizeMet(t *testing.T) {
}
}

func TestDefaultEventProcessor_BatchSizeLessThanQSize(t *testing.T) {
processor := NewBatchEventProcessor(
WithQueueSize(2),
WithFlushInterval(1000 * time.Millisecond),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)))

assert.Equal(t, 2, processor.BatchSize)
assert.Equal(t, 10, processor.MaxQueueSize)

}

func TestDefaultEventProcessor_QSizeExceeded(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(
WithQueueSize(2),
WithBatchSize(2),
WithFlushInterval(1000 * time.Millisecond),
WithQueue(NewInMemoryQueue(2)),
WithEventDispatcher(NewMockDispatcher(100, true)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()

processor.ProcessEvent(impression)
processor.ProcessEvent(impression)

assert.Equal(t, 2, processor.EventsCount())

processor.ProcessEvent(impression)
processor.ProcessEvent(impression)

assert.Equal(t, 2, processor.EventsCount())

}

func TestDefaultEventProcessor_FailedDispatch(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(100), WithFlushInterval(100),
WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(&MockDispatcher{ShouldFail: true, Events: NewInMemoryQueue(100)}))
processor := NewBatchEventProcessor(
WithQueueSize(100),
WithFlushInterval(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(&MockDispatcher{ShouldFail: true, Events: NewInMemoryQueue(100)}))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
Expand Down Expand Up @@ -221,7 +297,9 @@ func TestDefaultEventProcessor_FailedDispatch(t *testing.T) {

func TestBatchEventProcessor_FlushesOnClose(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(100), WithQueue(NewInMemoryQueue(100)),
processor := NewBatchEventProcessor(
WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

Expand Down

0 comments on commit dbac8f6

Please sign in to comment.