From 9fda57249f382b5afbbc01652cb23ca951a15e45 Mon Sep 17 00:00:00 2001 From: Ivan Stankov Date: Thu, 2 Feb 2023 04:02:05 +0200 Subject: [PATCH 1/5] add Flush method --- batcher/batcher.go | 24 ++++++++++++++++++--- batcher/batcher_test.go | 47 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/batcher/batcher.go b/batcher/batcher.go index 2d1ccb4..5caa5f4 100644 --- a/batcher/batcher.go +++ b/batcher/batcher.go @@ -16,9 +16,11 @@ type Batcher struct { timeout time.Duration prefilter func(interface{}) error - lock sync.Mutex - submit chan *work - doWork func([]interface{}) error + lock sync.Mutex + submit chan *work + doWork func([]interface{}) error + workCount int + callbackWorkAdded func(count int) } // New constructs a new batcher that will batch all calls to Run that occur within @@ -75,6 +77,11 @@ func (b *Batcher) submitWork(w *work) { } b.submit <- w + b.workCount++ + if b.callbackWorkAdded != nil { + go b.callbackWorkAdded(b.workCount) + } + } func (b *Batcher) batch() { @@ -100,9 +107,20 @@ func (b *Batcher) batch() { func (b *Batcher) timer() { time.Sleep(b.timeout) + b.Flush() +} + +// Flush saves the changes before the timer expires. +// It is useful to flush the changes when you shutdown your application +func (b *Batcher) Flush() { b.lock.Lock() defer b.lock.Unlock() + if b.submit == nil { + return + } + close(b.submit) b.submit = nil + b.workCount = 0 } diff --git a/batcher/batcher_test.go b/batcher/batcher_test.go index f1b8d40..0bcf9df 100644 --- a/batcher/batcher_test.go +++ b/batcher/batcher_test.go @@ -41,6 +41,53 @@ func TestBatcherSuccess(t *testing.T) { } } +func TestFlushSuccess(t *testing.T) { + durationLimit := 10 * time.Millisecond + timeout := 2 * durationLimit + total := 0 + doSum := func(params []interface{}) error { + for _, param := range params { + intValue, ok := param.(int) + if !ok { + t.Error("expected type int") + } + total += intValue + } + return nil + } + + b := New(timeout, doSum) + b.callbackWorkAdded = func(count int) { + if count == 10 { + b.Flush() + } + } + + wg := &sync.WaitGroup{} + expectedTotal := 0 + start := time.Now() + for i := 0; i < 10; i++ { + expectedTotal += i + wg.Add(1) + go func(i int) { + if err := b.Run(i); err != nil { + t.Error(err) + } + wg.Done() + }(i) + } + wg.Wait() + + duration := time.Since(start) + if duration >= durationLimit { + t.Errorf("expected duration[%v] < durationLimit[%v]", duration, durationLimit) + } + + if total != expectedTotal { + t.Errorf("expected processed count[%v] < actual[%v]", expectedTotal, total) + } +} + func TestBatcherError(t *testing.T) { b := New(10*time.Millisecond, returnsError) From 5474485b94aa61b14542a9f5688175b146391ce5 Mon Sep 17 00:00:00 2001 From: Ivan Stankov Date: Thu, 2 Feb 2023 05:35:47 +0200 Subject: [PATCH 2/5] apply review comments for TestFlushSuccess --- batcher/batcher.go | 14 +++----------- batcher/batcher_test.go | 12 ++++++------ 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/batcher/batcher.go b/batcher/batcher.go index 5caa5f4..892a9c8 100644 --- a/batcher/batcher.go +++ b/batcher/batcher.go @@ -16,11 +16,9 @@ type Batcher struct { timeout time.Duration prefilter func(interface{}) error - lock sync.Mutex - submit chan *work - doWork func([]interface{}) error - workCount int - callbackWorkAdded func(count int) + lock sync.Mutex + submit chan *work + doWork func([]interface{}) error } // New constructs a new batcher that will batch all calls to Run that occur within @@ -77,11 +75,6 @@ func (b *Batcher) submitWork(w *work) { } b.submit <- w - b.workCount++ - if b.callbackWorkAdded != nil { - go b.callbackWorkAdded(b.workCount) - } - } func (b *Batcher) batch() { @@ -122,5 +115,4 @@ func (b *Batcher) Flush() { close(b.submit) b.submit = nil - b.workCount = 0 } diff --git a/batcher/batcher_test.go b/batcher/batcher_test.go index 0bcf9df..1e44f18 100644 --- a/batcher/batcher_test.go +++ b/batcher/batcher_test.go @@ -42,7 +42,8 @@ func TestBatcherSuccess(t *testing.T) { } func TestFlushSuccess(t *testing.T) { - durationLimit := 10 * time.Millisecond + sleepDuration := 5 * time.Millisecond + durationLimit := 2 * sleepDuration timeout := 2 * durationLimit total := 0 doSum := func(params []interface{}) error { @@ -57,11 +58,10 @@ func TestFlushSuccess(t *testing.T) { } b := New(timeout, doSum) - b.callbackWorkAdded = func(count int) { - if count == 10 { - b.Flush() - } - } + go func() { + time.Sleep(sleepDuration) + b.Flush() + }() wg := &sync.WaitGroup{} expectedTotal := 0 From 50368df963a0f8ed92f8ee3bb5352ef6d4357ab3 Mon Sep 17 00:00:00 2001 From: Ivan Stankov Date: Thu, 2 Feb 2023 06:01:30 +0200 Subject: [PATCH 3/5] Shutdown method --- batcher/batcher.go | 17 +++++++++++++++-- batcher/batcher_test.go | 4 ++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/batcher/batcher.go b/batcher/batcher.go index 892a9c8..6dbe22a 100644 --- a/batcher/batcher.go +++ b/batcher/batcher.go @@ -19,6 +19,7 @@ type Batcher struct { lock sync.Mutex submit chan *work doWork func([]interface{}) error + done chan bool } // New constructs a new batcher that will batch all calls to Run that occur within @@ -70,6 +71,7 @@ func (b *Batcher) submitWork(w *work) { defer b.lock.Unlock() if b.submit == nil { + b.done = make(chan bool, 1) b.submit = make(chan *work, 4) go b.batch() } @@ -95,17 +97,28 @@ func (b *Batcher) batch() { future <- ret close(future) } + b.done <- true } func (b *Batcher) timer() { time.Sleep(b.timeout) - b.Flush() + b.flush() +} + +// Shutdown flush the changes and wait to be saved +func (b *Batcher) Shutdown(wait bool) { + b.flush() + + if wait { + // wait done channel + <-b.done + } } // Flush saves the changes before the timer expires. // It is useful to flush the changes when you shutdown your application -func (b *Batcher) Flush() { +func (b *Batcher) flush() { b.lock.Lock() defer b.lock.Unlock() diff --git a/batcher/batcher_test.go b/batcher/batcher_test.go index 1e44f18..bff13a7 100644 --- a/batcher/batcher_test.go +++ b/batcher/batcher_test.go @@ -41,7 +41,7 @@ func TestBatcherSuccess(t *testing.T) { } } -func TestFlushSuccess(t *testing.T) { +func TestShutdownSuccess(t *testing.T) { sleepDuration := 5 * time.Millisecond durationLimit := 2 * sleepDuration timeout := 2 * durationLimit @@ -60,7 +60,7 @@ func TestFlushSuccess(t *testing.T) { b := New(timeout, doSum) go func() { time.Sleep(sleepDuration) - b.Flush() + b.Shutdown(true) }() wg := &sync.WaitGroup{} From dc4266b7caaf157bceb1ef575563385950508869 Mon Sep 17 00:00:00 2001 From: Ivan Stankov Date: Thu, 2 Feb 2023 15:30:06 +0200 Subject: [PATCH 4/5] apply code review comments --- batcher/batcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/batcher/batcher.go b/batcher/batcher.go index 6dbe22a..2261805 100644 --- a/batcher/batcher.go +++ b/batcher/batcher.go @@ -71,7 +71,7 @@ func (b *Batcher) submitWork(w *work) { defer b.lock.Unlock() if b.submit == nil { - b.done = make(chan bool, 1) + b.done = make(chan bool) b.submit = make(chan *work, 4) go b.batch() } @@ -98,6 +98,7 @@ func (b *Batcher) batch() { close(future) } b.done <- true + close(b.done) } func (b *Batcher) timer() { From 2fd7dddc7a119b1a0dabbccc53cbd0a9fe4b0e45 Mon Sep 17 00:00:00 2001 From: Ivan Stankov Date: Thu, 2 Feb 2023 15:33:50 +0200 Subject: [PATCH 5/5] apply code review comments --- batcher/batcher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/batcher/batcher.go b/batcher/batcher.go index 2261805..6aed64a 100644 --- a/batcher/batcher.go +++ b/batcher/batcher.go @@ -97,7 +97,6 @@ func (b *Batcher) batch() { future <- ret close(future) } - b.done <- true close(b.done) }