From 158a0183854237bfa344b2355dd0452710b16a64 Mon Sep 17 00:00:00 2001 From: reugn Date: Wed, 10 Jan 2024 08:55:36 +0200 Subject: [PATCH 1/2] fix: use ticker time in sliding window --- examples/aerospike/main.go | 3 +-- examples/std/main.go | 2 +- flow/queue_test.go | 8 ++++---- flow/sliding_window.go | 14 +++++++------- flow/sliding_window_test.go | 21 ++++++++++----------- util/util.go | 6 ------ 6 files changed, 23 insertions(+), 31 deletions(-) diff --git a/examples/aerospike/main.go b/examples/aerospike/main.go index ed160e3..f87a883 100644 --- a/examples/aerospike/main.go +++ b/examples/aerospike/main.go @@ -6,7 +6,6 @@ import ( "time" ext "github.com/reugn/go-streams/aerospike" - "github.com/reugn/go-streams/util" aero "github.com/aerospike/aerospike-client-go/v6" "github.com/reugn/go-streams/flow" @@ -50,7 +49,7 @@ func main() { var transform = func(msg *aero.Record) ext.AerospikeKeyBins { log.Println(msg.Bins) - msg.Bins["ts"] = util.NowNano() + msg.Bins["ts"] = time.Now().UnixNano() return ext.AerospikeKeyBins{ Key: msg.Key, Bins: msg.Bins, diff --git a/examples/std/main.go b/examples/std/main.go index ee97f77..5d962f4 100644 --- a/examples/std/main.go +++ b/examples/std/main.go @@ -38,7 +38,7 @@ func tickerChan(repeat time.Duration) chan interface{} { nc := make(chan interface{}) go func() { for range oc { - nc <- &message{strconv.FormatInt(time.Now().UTC().UnixNano(), 10)} + nc <- &message{strconv.FormatInt(time.Now().UnixNano(), 10)} } }() return nc diff --git a/flow/queue_test.go b/flow/queue_test.go index 0c4cc88..eee90a0 100644 --- a/flow/queue_test.go +++ b/flow/queue_test.go @@ -3,19 +3,19 @@ package flow_test import ( "container/heap" "testing" + "time" "github.com/reugn/go-streams/flow" - "github.com/reugn/go-streams/util" ) func TestQueueOps(t *testing.T) { queue := &flow.PriorityQueue{} - heap.Push(queue, flow.NewItem(1, util.NowNano(), 0)) + heap.Push(queue, flow.NewItem(1, time.Now().UnixNano(), 0)) heap.Push(queue, flow.NewItem(2, 1234, 0)) - heap.Push(queue, flow.NewItem(3, util.NowNano(), 0)) + heap.Push(queue, flow.NewItem(3, time.Now().UnixNano(), 0)) queue.Swap(0, 1) head := queue.Head() - queue.Update(head, util.NowNano()) + queue.Update(head, time.Now().UnixNano()) first := heap.Pop(queue).(*flow.Item) assertEquals(t, 2, first.Msg.(int)) diff --git a/flow/sliding_window.go b/flow/sliding_window.go index 8315dcf..4a24af6 100644 --- a/flow/sliding_window.go +++ b/flow/sliding_window.go @@ -7,7 +7,6 @@ import ( "time" "github.com/reugn/go-streams" - "github.com/reugn/go-streams/util" ) // SlidingWindow assigns elements to windows of fixed length configured by the window @@ -106,7 +105,7 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) { // It returns system clock time otherwise. func (sw *SlidingWindow[T]) timestamp(element T) int64 { if sw.timestampExtractor == nil { - return util.NowNano() + return time.Now().UnixNano() } return sw.timestampExtractor(element) } @@ -128,16 +127,17 @@ func (sw *SlidingWindow[T]) emit() { // wait for the sliding window to start time.Sleep(sw.windowSize - sw.slidingInterval) + lastTick := time.Now() ticker := time.NewTicker(sw.slidingInterval) defer ticker.Stop() for { select { - case <-ticker.C: - sw.dispatchWindow() + case lastTick = <-ticker.C: + sw.dispatchWindow(lastTick) case <-sw.done: - sw.dispatchWindow() + sw.dispatchWindow(lastTick.Add(sw.slidingInterval)) close(sw.out) return } @@ -146,11 +146,11 @@ func (sw *SlidingWindow[T]) emit() { // dispatchWindow creates a new window and slides the elements queue. // It sends the slice of elements to the output channel if the window is not empty. -func (sw *SlidingWindow[T]) dispatchWindow() { +func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) { sw.Lock() // build a window of elements var windowBottomIndex int - now := util.NowNano() + now := tick.UnixNano() windowUpperIndex := sw.queue.Len() slideUpperIndex := windowUpperIndex slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds() diff --git a/flow/sliding_window_test.go b/flow/sliding_window_test.go index af0cc45..a510a61 100644 --- a/flow/sliding_window_test.go +++ b/flow/sliding_window_test.go @@ -7,7 +7,6 @@ import ( ext "github.com/reugn/go-streams/extension" "github.com/reugn/go-streams/flow" - "github.com/reugn/go-streams/util" ) func TestSlidingWindow(t *testing.T) { @@ -45,8 +44,8 @@ func TestSlidingWindow(t *testing.T) { assertEquals(t, []string{"a", "b", "c"}, outputValues[0]) assertEquals(t, []string{"b", "c", "d"}, outputValues[1]) - // assertEquals(t, []string{"c", "d", "e"}, outputValues[2]) - // assertEquals(t, []string{"d", "e", "f", "g"}, outputValues[3]) + assertEquals(t, []string{"c", "d", "e"}, outputValues[2]) + assertEquals(t, []string{"d", "e", "f", "g"}, outputValues[3]) assertEquals(t, []string{"f", "g"}, outputValues[4]) assertEquals(t, []string{"g"}, outputValues[5]) } @@ -74,15 +73,15 @@ func TestSlidingWindowWithExtractor(t *testing.T) { sink := ext.NewChanSink(out) - now := util.NowNano() + now := time.Now() inputValues := []element{ - {"a", now + 2*int64(time.Millisecond)}, - {"b", now + 17*int64(time.Millisecond)}, - {"c", now + 29*int64(time.Millisecond)}, - {"d", now + 35*int64(time.Millisecond)}, - {"e", now + 77*int64(time.Millisecond)}, - {"f", now + 93*int64(time.Millisecond)}, - {"g", now + 120*int64(time.Millisecond)}, + {"a", now.Add(2 * time.Millisecond).UnixNano()}, + {"b", now.Add(17 * time.Millisecond).UnixNano()}, + {"c", now.Add(29 * time.Millisecond).UnixNano()}, + {"d", now.Add(35 * time.Millisecond).UnixNano()}, + {"e", now.Add(77 * time.Millisecond).UnixNano()}, + {"f", now.Add(93 * time.Millisecond).UnixNano()}, + {"g", now.Add(120 * time.Millisecond).UnixNano()}, } go ingestSlice(inputValues, in) go closeDeferred(in, 250*time.Millisecond) diff --git a/util/util.go b/util/util.go index 43848b2..eb2def5 100644 --- a/util/util.go +++ b/util/util.go @@ -2,7 +2,6 @@ package util import ( "hash/fnv" - "time" ) // Check panics if the given error is not nil. @@ -12,11 +11,6 @@ func Check(e error) { } } -// NowNano returns UnixNano in UTC. -func NowNano() int64 { - return time.Now().UTC().UnixNano() -} - // HashCode returns a uint32 hash for the given byte array. func HashCode(b []byte) uint32 { h := fnv.New32a() From d736186bd779bbefcc02a97dfdb207ac8ecc6342 Mon Sep 17 00:00:00 2001 From: reugn Date: Wed, 10 Jan 2024 09:02:55 +0200 Subject: [PATCH 2/2] roll back unstable test assertions --- flow/sliding_window_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/sliding_window_test.go b/flow/sliding_window_test.go index a510a61..0e57a62 100644 --- a/flow/sliding_window_test.go +++ b/flow/sliding_window_test.go @@ -44,8 +44,8 @@ func TestSlidingWindow(t *testing.T) { assertEquals(t, []string{"a", "b", "c"}, outputValues[0]) assertEquals(t, []string{"b", "c", "d"}, outputValues[1]) - assertEquals(t, []string{"c", "d", "e"}, outputValues[2]) - assertEquals(t, []string{"d", "e", "f", "g"}, outputValues[3]) + // assertEquals(t, []string{"c", "d", "e"}, outputValues[2]) + // assertEquals(t, []string{"d", "e", "f", "g"}, outputValues[3]) assertEquals(t, []string{"f", "g"}, outputValues[4]) assertEquals(t, []string{"g"}, outputValues[5]) }