diff --git a/pkg/event/processor.go b/pkg/event/processor.go index 11c9bf316..5d2aed312 100644 --- a/pkg/event/processor.go +++ b/pkg/event/processor.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "golang.org/x/sync/semaphore" + "github.com/optimizely/go-sdk/pkg/logging" "github.com/optimizely/go-sdk/pkg/notification" "github.com/optimizely/go-sdk/pkg/registry" @@ -41,9 +43,10 @@ type BatchEventProcessor struct { FlushInterval time.Duration // in milliseconds BatchSize int Q Queue - Mux sync.Mutex + flushLock sync.Mutex Ticker *time.Ticker EventDispatcher Dispatcher + processing *semaphore.Weighted } // DefaultBatchSize holds the default value for the batch size @@ -55,6 +58,8 @@ const DefaultEventQueueSize = 100 // DefaultEventFlushInterval holds the default value for the event flush interval const DefaultEventFlushInterval = 30 * time.Second +const maxFlushWorkers = 1 + var pLogger = logging.GetLogger("EventProcessor") // BPOptionConfig is the BatchProcessor options that give you the ability to add one more more options before the processor is initialized. @@ -105,7 +110,7 @@ func WithSDKKey(sdkKey string) BPOptionConfig { // NewBatchEventProcessor returns a new instance of BatchEventProcessor with queueSize and flushInterval func NewBatchEventProcessor(options ...BPOptionConfig) *BatchEventProcessor { - p := &BatchEventProcessor{} + p := &BatchEventProcessor{processing:semaphore.NewWeighted(int64(maxFlushWorkers))} for _, opt := range options { opt(p) @@ -123,6 +128,11 @@ func NewBatchEventProcessor(options ...BPOptionConfig) *BatchEventProcessor { p.BatchSize = DefaultBatchSize } + if p.BatchSize > p.MaxQueueSize { + pLogger.Warning("Batch size is larger than queue size. Swapping") + p.BatchSize, p.MaxQueueSize = p.MaxQueueSize, p.BatchSize + } + if p.Q == nil { p.Q = NewInMemoryQueue(p.MaxQueueSize) } @@ -142,13 +152,28 @@ func (p *BatchEventProcessor) Start(exeCtx utils.ExecutionCtx) { // ProcessEvent processes the given impression event func (p *BatchEventProcessor) ProcessEvent(event UserEvent) { - p.Q.Add(event) if p.Q.Size() >= p.MaxQueueSize { + pLogger.Warning("MaxQueueSize has been met. Discarding event") + return + } + + p.Q.Add(event) + + if p.Q.Size() < p.BatchSize { + return + } + + if p.processing.TryAcquire(1) { + // it doesn't matter if the timer has kicked in here. + // we just want to start one go routine when the batch size is met. + pLogger.Debug("batch size reached. Flushing routine being called") go func() { p.FlushEvents() + p.processing.Release(1) }() } + } // EventsCount returns size of an event queue @@ -171,7 +196,7 @@ func (p *BatchEventProcessor) startTicker(exeCtx utils.ExecutionCtx) { if p.Ticker != nil { return } - p.Ticker = time.NewTicker(p.FlushInterval * time.Millisecond) + p.Ticker = time.NewTicker(p.FlushInterval) wg := exeCtx.GetWaitSync() wg.Add(1) go func() { @@ -214,7 +239,9 @@ func (p *BatchEventProcessor) addToBatch(current *Batch, visitor Visitor) { func (p *BatchEventProcessor) FlushEvents() { // we flush when queue size is reached. // however, if there is a ticker cycle already processing, we should wait - p.Mux.Lock() + p.flushLock.Lock() + defer p.flushLock.Unlock() + var batchEvent Batch var batchEventCount = 0 var failedToSend = false @@ -272,7 +299,6 @@ func (p *BatchEventProcessor) FlushEvents() { } } } - p.Mux.Unlock() } // OnEventDispatch registers a handler for LogEvent notifications diff --git a/pkg/event/processor_test.go b/pkg/event/processor_test.go index 7e5742b13..ff0fd7ab3 100644 --- a/pkg/event/processor_test.go +++ b/pkg/event/processor_test.go @@ -32,12 +32,12 @@ type MockDispatcher struct { Events Queue } -func (f *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) { - if f.ShouldFail { +func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) { + if m.ShouldFail { return false, errors.New("Failed to dispatch") } - f.Events.Add(event) + m.Events.Add(event) return true, nil } @@ -116,10 +116,44 @@ func TestDefaultEventProcessor_LogEventNotification(t *testing.T) { assert.Nil(t, err) } +func TestDefaultEventProcessor_DefaultConfig(t *testing.T) { + exeCtx := utils.NewCancelableExecutionCtx() + processor := NewBatchEventProcessor(WithEventDispatcher(NewMockDispatcher(100, false))) + processor.Start(exeCtx) + + impression := BuildTestImpressionEvent() + conversion := BuildTestConversionEvent() + + processor.ProcessEvent(impression) + processor.ProcessEvent(impression) + processor.ProcessEvent(conversion) + processor.ProcessEvent(conversion) + + assert.Equal(t, 4, processor.EventsCount()) + + time.Sleep(31 * time.Second) + + assert.NotNil(t, processor.Ticker) + + assert.Equal(t, 0, processor.EventsCount()) + + result, ok := (processor.EventDispatcher).(*MockDispatcher) + + if ok { + assert.Equal(t, 1, result.Events.Size()) + evs := result.Events.Get(1) + logEvent, _ := evs[0].(LogEvent) + assert.Equal(t, 4, len(logEvent.Event.Visitors)) + } +} + func TestDefaultEventProcessor_ProcessBatch(t *testing.T) { exeCtx := utils.NewCancelableExecutionCtx() - processor := NewBatchEventProcessor(WithFlushInterval(100), WithQueueSize(100), - WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(NewMockDispatcher(100, false))) + processor := NewBatchEventProcessor( + WithFlushInterval(1 * time.Second), + WithQueueSize(100), + WithQueue(NewInMemoryQueue(100)), + WithEventDispatcher(NewMockDispatcher(100, false))) processor.Start(exeCtx) impression := BuildTestImpressionEvent() @@ -132,7 +166,7 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) { assert.Equal(t, 4, processor.EventsCount()) - exeCtx.TerminateAndWait() + time.Sleep(1500 * time.Millisecond) assert.NotNil(t, processor.Ticker) @@ -148,10 +182,13 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) { } } -func TestDefaultEventProcessor_QSizeMet(t *testing.T) { +func TestDefaultEventProcessor_BatchSizeMet(t *testing.T) { exeCtx := utils.NewCancelableExecutionCtx() - processor := NewBatchEventProcessor(WithQueueSize(2), WithFlushInterval(1000), - WithQueue(NewInMemoryQueue(2)), WithEventDispatcher(NewMockDispatcher(100, false))) + processor := NewBatchEventProcessor( + WithBatchSize(2), + WithFlushInterval(1000 * time.Millisecond), + WithQueue(NewInMemoryQueue(2)), + WithEventDispatcher(NewMockDispatcher(100, false))) processor.Start(exeCtx) impression := BuildTestImpressionEvent() @@ -190,10 +227,49 @@ func TestDefaultEventProcessor_QSizeMet(t *testing.T) { } } +func TestDefaultEventProcessor_BatchSizeLessThanQSize(t *testing.T) { + processor := NewBatchEventProcessor( + WithQueueSize(2), + WithFlushInterval(1000 * time.Millisecond), + WithQueue(NewInMemoryQueue(100)), + WithEventDispatcher(NewMockDispatcher(100, false))) + + assert.Equal(t, 2, processor.BatchSize) + assert.Equal(t, 10, processor.MaxQueueSize) + +} + +func TestDefaultEventProcessor_QSizeExceeded(t *testing.T) { + exeCtx := utils.NewCancelableExecutionCtx() + processor := NewBatchEventProcessor( + WithQueueSize(2), + WithBatchSize(2), + WithFlushInterval(1000 * time.Millisecond), + WithQueue(NewInMemoryQueue(2)), + WithEventDispatcher(NewMockDispatcher(100, true))) + processor.Start(exeCtx) + + impression := BuildTestImpressionEvent() + + processor.ProcessEvent(impression) + processor.ProcessEvent(impression) + + assert.Equal(t, 2, processor.EventsCount()) + + processor.ProcessEvent(impression) + processor.ProcessEvent(impression) + + assert.Equal(t, 2, processor.EventsCount()) + +} + func TestDefaultEventProcessor_FailedDispatch(t *testing.T) { exeCtx := utils.NewCancelableExecutionCtx() - processor := NewBatchEventProcessor(WithQueueSize(100), WithFlushInterval(100), - WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(&MockDispatcher{ShouldFail: true, Events: NewInMemoryQueue(100)})) + processor := NewBatchEventProcessor( + WithQueueSize(100), + WithFlushInterval(100), + WithQueue(NewInMemoryQueue(100)), + WithEventDispatcher(&MockDispatcher{ShouldFail: true, Events: NewInMemoryQueue(100)})) processor.Start(exeCtx) impression := BuildTestImpressionEvent() @@ -221,7 +297,9 @@ func TestDefaultEventProcessor_FailedDispatch(t *testing.T) { func TestBatchEventProcessor_FlushesOnClose(t *testing.T) { exeCtx := utils.NewCancelableExecutionCtx() - processor := NewBatchEventProcessor(WithQueueSize(100), WithQueue(NewInMemoryQueue(100)), + processor := NewBatchEventProcessor( + WithQueueSize(100), + WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(NewMockDispatcher(100, false))) processor.Start(exeCtx)