Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix): cleanup event processor #167

Merged
merged 10 commits into from
Nov 7, 2019
38 changes: 32 additions & 6 deletions pkg/event/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"golang.org/x/sync/semaphore"

"github.com/optimizely/go-sdk/pkg/logging"
"github.com/optimizely/go-sdk/pkg/notification"
"github.com/optimizely/go-sdk/pkg/registry"
Expand All @@ -41,9 +43,10 @@ type BatchEventProcessor struct {
FlushInterval time.Duration // in milliseconds
BatchSize int
Q Queue
Mux sync.Mutex
flushLock sync.Mutex
Ticker *time.Ticker
EventDispatcher Dispatcher
processing *semaphore.Weighted
}

// DefaultBatchSize holds the default value for the batch size
Expand All @@ -55,6 +58,8 @@ const DefaultEventQueueSize = 100
// DefaultEventFlushInterval holds the default value for the event flush interval
const DefaultEventFlushInterval = 30 * time.Second

const maxFlushWorkers = 1

var pLogger = logging.GetLogger("EventProcessor")

// BPOptionConfig is the BatchProcessor options that give you the ability to add one more more options before the processor is initialized.
Expand Down Expand Up @@ -105,7 +110,7 @@ func WithSDKKey(sdkKey string) BPOptionConfig {

// NewBatchEventProcessor returns a new instance of BatchEventProcessor with queueSize and flushInterval
func NewBatchEventProcessor(options ...BPOptionConfig) *BatchEventProcessor {
p := &BatchEventProcessor{}
p := &BatchEventProcessor{processing:semaphore.NewWeighted(int64(maxFlushWorkers))}

for _, opt := range options {
opt(p)
Expand All @@ -123,6 +128,11 @@ func NewBatchEventProcessor(options ...BPOptionConfig) *BatchEventProcessor {
p.BatchSize = DefaultBatchSize
}

if p.BatchSize > p.MaxQueueSize {
pLogger.Warning("Batch size is larger than queue size. Swapping")
p.BatchSize, p.MaxQueueSize = p.MaxQueueSize, p.BatchSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit sneaky and I'm not sure we want to swap parameters like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, it was a lint suggestion. i love it! :)

}

if p.Q == nil {
p.Q = NewInMemoryQueue(p.MaxQueueSize)
}
Expand All @@ -142,13 +152,28 @@ func (p *BatchEventProcessor) Start(exeCtx utils.ExecutionCtx) {

// ProcessEvent processes the given impression event
func (p *BatchEventProcessor) ProcessEvent(event UserEvent) {
p.Q.Add(event)

if p.Q.Size() >= p.MaxQueueSize {
pLogger.Warning("MaxQueueSize has been met. Discarding event")
return
}

p.Q.Add(event)

if p.Q.Size() < p.BatchSize {
return
}

if p.processing.TryAcquire(1) {
// it doesn't matter if the timer has kicked in here.
// we just want to start one go routine when the batch size is met.
pLogger.Debug("batch size reached. Flushing routine being called")
go func() {
p.FlushEvents()
p.processing.Release(1)
}()
}

}

// EventsCount returns size of an event queue
Expand All @@ -171,7 +196,7 @@ func (p *BatchEventProcessor) startTicker(exeCtx utils.ExecutionCtx) {
if p.Ticker != nil {
return
}
p.Ticker = time.NewTicker(p.FlushInterval * time.Millisecond)
p.Ticker = time.NewTicker(p.FlushInterval)
wg := exeCtx.GetWaitSync()
wg.Add(1)
go func() {
Expand Down Expand Up @@ -214,7 +239,9 @@ func (p *BatchEventProcessor) addToBatch(current *Batch, visitor Visitor) {
func (p *BatchEventProcessor) FlushEvents() {
// we flush when queue size is reached.
// however, if there is a ticker cycle already processing, we should wait
p.Mux.Lock()
p.flushLock.Lock()
defer p.flushLock.Unlock()

var batchEvent Batch
var batchEventCount = 0
var failedToSend = false
Expand Down Expand Up @@ -272,7 +299,6 @@ func (p *BatchEventProcessor) FlushEvents() {
}
}
}
p.Mux.Unlock()
}

// OnEventDispatch registers a handler for LogEvent notifications
Expand Down
102 changes: 90 additions & 12 deletions pkg/event/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ type MockDispatcher struct {
Events Queue
}

func (f *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
if f.ShouldFail {
func (m *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
if m.ShouldFail {
return false, errors.New("Failed to dispatch")
}

f.Events.Add(event)
m.Events.Add(event)
return true, nil
}

Expand Down Expand Up @@ -116,10 +116,44 @@ func TestDefaultEventProcessor_LogEventNotification(t *testing.T) {
assert.Nil(t, err)
}

func TestDefaultEventProcessor_DefaultConfig(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
conversion := BuildTestConversionEvent()

processor.ProcessEvent(impression)
processor.ProcessEvent(impression)
processor.ProcessEvent(conversion)
processor.ProcessEvent(conversion)

assert.Equal(t, 4, processor.EventsCount())

time.Sleep(31 * time.Second)

assert.NotNil(t, processor.Ticker)

assert.Equal(t, 0, processor.EventsCount())

result, ok := (processor.EventDispatcher).(*MockDispatcher)

if ok {
assert.Equal(t, 1, result.Events.Size())
evs := result.Events.Get(1)
logEvent, _ := evs[0].(LogEvent)
assert.Equal(t, 4, len(logEvent.Event.Visitors))
}
}

func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithFlushInterval(100), WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(NewMockDispatcher(100, false)))
processor := NewBatchEventProcessor(
WithFlushInterval(1 * time.Second),
WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
Expand All @@ -132,7 +166,7 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {

assert.Equal(t, 4, processor.EventsCount())

exeCtx.TerminateAndWait()
time.Sleep(1500 * time.Millisecond)

assert.NotNil(t, processor.Ticker)

Expand All @@ -148,10 +182,13 @@ func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
}
}

func TestDefaultEventProcessor_QSizeMet(t *testing.T) {
func TestDefaultEventProcessor_BatchSizeMet(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(2), WithFlushInterval(1000),
WithQueue(NewInMemoryQueue(2)), WithEventDispatcher(NewMockDispatcher(100, false)))
processor := NewBatchEventProcessor(
WithBatchSize(2),
WithFlushInterval(1000 * time.Millisecond),
WithQueue(NewInMemoryQueue(2)),
WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
Expand Down Expand Up @@ -190,10 +227,49 @@ func TestDefaultEventProcessor_QSizeMet(t *testing.T) {
}
}

func TestDefaultEventProcessor_BatchSizeLessThanQSize(t *testing.T) {
processor := NewBatchEventProcessor(
WithQueueSize(2),
WithFlushInterval(1000 * time.Millisecond),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)))

assert.Equal(t, 2, processor.BatchSize)
assert.Equal(t, 10, processor.MaxQueueSize)

}

func TestDefaultEventProcessor_QSizeExceeded(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(
WithQueueSize(2),
WithBatchSize(2),
WithFlushInterval(1000 * time.Millisecond),
WithQueue(NewInMemoryQueue(2)),
WithEventDispatcher(NewMockDispatcher(100, true)))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()

processor.ProcessEvent(impression)
processor.ProcessEvent(impression)

assert.Equal(t, 2, processor.EventsCount())

processor.ProcessEvent(impression)
processor.ProcessEvent(impression)

assert.Equal(t, 2, processor.EventsCount())

}

func TestDefaultEventProcessor_FailedDispatch(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(100), WithFlushInterval(100),
WithQueue(NewInMemoryQueue(100)), WithEventDispatcher(&MockDispatcher{ShouldFail: true, Events: NewInMemoryQueue(100)}))
processor := NewBatchEventProcessor(
WithQueueSize(100),
WithFlushInterval(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(&MockDispatcher{ShouldFail: true, Events: NewInMemoryQueue(100)}))
processor.Start(exeCtx)

impression := BuildTestImpressionEvent()
Expand Down Expand Up @@ -221,7 +297,9 @@ func TestDefaultEventProcessor_FailedDispatch(t *testing.T) {

func TestBatchEventProcessor_FlushesOnClose(t *testing.T) {
exeCtx := utils.NewCancelableExecutionCtx()
processor := NewBatchEventProcessor(WithQueueSize(100), WithQueue(NewInMemoryQueue(100)),
processor := NewBatchEventProcessor(
WithQueueSize(100),
WithQueue(NewInMemoryQueue(100)),
WithEventDispatcher(NewMockDispatcher(100, false)))
processor.Start(exeCtx)

Expand Down