Skip to content

Commit

Permalink
fix: use ticker time in sliding window (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jan 10, 2024
1 parent 3821626 commit 66d65d4
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 29 deletions.
3 changes: 1 addition & 2 deletions examples/aerospike/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

ext "github.com/reugn/go-streams/aerospike"
"github.com/reugn/go-streams/util"

aero "github.com/aerospike/aerospike-client-go/v6"
"github.com/reugn/go-streams/flow"
Expand Down Expand Up @@ -50,7 +49,7 @@ func main() {

var transform = func(msg *aero.Record) ext.AerospikeKeyBins {
log.Println(msg.Bins)
msg.Bins["ts"] = util.NowNano()
msg.Bins["ts"] = time.Now().UnixNano()
return ext.AerospikeKeyBins{
Key: msg.Key,
Bins: msg.Bins,
Expand Down
2 changes: 1 addition & 1 deletion examples/std/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func tickerChan(repeat time.Duration) chan interface{} {
nc := make(chan interface{})
go func() {
for range oc {
nc <- &message{strconv.FormatInt(time.Now().UTC().UnixNano(), 10)}
nc <- &message{strconv.FormatInt(time.Now().UnixNano(), 10)}
}
}()
return nc
Expand Down
8 changes: 4 additions & 4 deletions flow/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ package flow_test
import (
"container/heap"
"testing"
"time"

"github.com/reugn/go-streams/flow"
"github.com/reugn/go-streams/util"
)

func TestQueueOps(t *testing.T) {
queue := &flow.PriorityQueue{}
heap.Push(queue, flow.NewItem(1, util.NowNano(), 0))
heap.Push(queue, flow.NewItem(1, time.Now().UnixNano(), 0))
heap.Push(queue, flow.NewItem(2, 1234, 0))
heap.Push(queue, flow.NewItem(3, util.NowNano(), 0))
heap.Push(queue, flow.NewItem(3, time.Now().UnixNano(), 0))
queue.Swap(0, 1)
head := queue.Head()
queue.Update(head, util.NowNano())
queue.Update(head, time.Now().UnixNano())
first := heap.Pop(queue).(*flow.Item)

assertEquals(t, 2, first.Msg.(int))
Expand Down
14 changes: 7 additions & 7 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/reugn/go-streams"
"github.com/reugn/go-streams/util"
)

// SlidingWindow assigns elements to windows of fixed length configured by the window
Expand Down Expand Up @@ -106,7 +105,7 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) {
// It returns system clock time otherwise.
func (sw *SlidingWindow[T]) timestamp(element T) int64 {
if sw.timestampExtractor == nil {
return util.NowNano()
return time.Now().UnixNano()
}
return sw.timestampExtractor(element)
}
Expand All @@ -128,16 +127,17 @@ func (sw *SlidingWindow[T]) emit() {
// wait for the sliding window to start
time.Sleep(sw.windowSize - sw.slidingInterval)

lastTick := time.Now()
ticker := time.NewTicker(sw.slidingInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
sw.dispatchWindow()
case lastTick = <-ticker.C:
sw.dispatchWindow(lastTick)

case <-sw.done:
sw.dispatchWindow()
sw.dispatchWindow(lastTick.Add(sw.slidingInterval))
close(sw.out)
return
}
Expand All @@ -146,11 +146,11 @@ func (sw *SlidingWindow[T]) emit() {

// dispatchWindow creates a new window and slides the elements queue.
// It sends the slice of elements to the output channel if the window is not empty.
func (sw *SlidingWindow[T]) dispatchWindow() {
func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
sw.Lock()
// build a window of elements
var windowBottomIndex int
now := util.NowNano()
now := tick.UnixNano()
windowUpperIndex := sw.queue.Len()
slideUpperIndex := windowUpperIndex
slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds()
Expand Down
17 changes: 8 additions & 9 deletions flow/sliding_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
"github.com/reugn/go-streams/util"
)

func TestSlidingWindow(t *testing.T) {
Expand Down Expand Up @@ -74,15 +73,15 @@ func TestSlidingWindowWithExtractor(t *testing.T) {

sink := ext.NewChanSink(out)

now := util.NowNano()
now := time.Now()
inputValues := []element{
{"a", now + 2*int64(time.Millisecond)},
{"b", now + 17*int64(time.Millisecond)},
{"c", now + 29*int64(time.Millisecond)},
{"d", now + 35*int64(time.Millisecond)},
{"e", now + 77*int64(time.Millisecond)},
{"f", now + 93*int64(time.Millisecond)},
{"g", now + 120*int64(time.Millisecond)},
{"a", now.Add(2 * time.Millisecond).UnixNano()},
{"b", now.Add(17 * time.Millisecond).UnixNano()},
{"c", now.Add(29 * time.Millisecond).UnixNano()},
{"d", now.Add(35 * time.Millisecond).UnixNano()},
{"e", now.Add(77 * time.Millisecond).UnixNano()},
{"f", now.Add(93 * time.Millisecond).UnixNano()},
{"g", now.Add(120 * time.Millisecond).UnixNano()},
}
go ingestSlice(inputValues, in)
go closeDeferred(in, 250*time.Millisecond)
Expand Down
6 changes: 0 additions & 6 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package util

import (
"hash/fnv"
"time"
)

// Check panics if the given error is not nil.
Expand All @@ -12,11 +11,6 @@ func Check(e error) {
}
}

// NowNano returns UnixNano in UTC.
func NowNano() int64 {
return time.Now().UTC().UnixNano()
}

// HashCode returns a uint32 hash for the given byte array.
func HashCode(b []byte) uint32 {
h := fnv.New32a()
Expand Down

0 comments on commit 66d65d4

Please sign in to comment.