Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use ticker time in sliding window #100

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/aerospike/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

ext "github.com/reugn/go-streams/aerospike"
"github.com/reugn/go-streams/util"

aero "github.com/aerospike/aerospike-client-go/v6"
"github.com/reugn/go-streams/flow"
Expand Down Expand Up @@ -50,7 +49,7 @@ func main() {

var transform = func(msg *aero.Record) ext.AerospikeKeyBins {
log.Println(msg.Bins)
msg.Bins["ts"] = util.NowNano()
msg.Bins["ts"] = time.Now().UnixNano()
return ext.AerospikeKeyBins{
Key: msg.Key,
Bins: msg.Bins,
Expand Down
2 changes: 1 addition & 1 deletion examples/std/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func tickerChan(repeat time.Duration) chan interface{} {
nc := make(chan interface{})
go func() {
for range oc {
nc <- &message{strconv.FormatInt(time.Now().UTC().UnixNano(), 10)}
nc <- &message{strconv.FormatInt(time.Now().UnixNano(), 10)}
}
}()
return nc
Expand Down
8 changes: 4 additions & 4 deletions flow/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ package flow_test
import (
"container/heap"
"testing"
"time"

"github.com/reugn/go-streams/flow"
"github.com/reugn/go-streams/util"
)

func TestQueueOps(t *testing.T) {
queue := &flow.PriorityQueue{}
heap.Push(queue, flow.NewItem(1, util.NowNano(), 0))
heap.Push(queue, flow.NewItem(1, time.Now().UnixNano(), 0))
heap.Push(queue, flow.NewItem(2, 1234, 0))
heap.Push(queue, flow.NewItem(3, util.NowNano(), 0))
heap.Push(queue, flow.NewItem(3, time.Now().UnixNano(), 0))
queue.Swap(0, 1)
head := queue.Head()
queue.Update(head, util.NowNano())
queue.Update(head, time.Now().UnixNano())
first := heap.Pop(queue).(*flow.Item)

assertEquals(t, 2, first.Msg.(int))
Expand Down
14 changes: 7 additions & 7 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/reugn/go-streams"
"github.com/reugn/go-streams/util"
)

// SlidingWindow assigns elements to windows of fixed length configured by the window
Expand Down Expand Up @@ -106,7 +105,7 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) {
// It returns system clock time otherwise.
func (sw *SlidingWindow[T]) timestamp(element T) int64 {
if sw.timestampExtractor == nil {
return util.NowNano()
return time.Now().UnixNano()
}
return sw.timestampExtractor(element)
}
Expand All @@ -128,16 +127,17 @@ func (sw *SlidingWindow[T]) emit() {
// wait for the sliding window to start
time.Sleep(sw.windowSize - sw.slidingInterval)

lastTick := time.Now()
ticker := time.NewTicker(sw.slidingInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
sw.dispatchWindow()
case lastTick = <-ticker.C:
sw.dispatchWindow(lastTick)

case <-sw.done:
sw.dispatchWindow()
sw.dispatchWindow(lastTick.Add(sw.slidingInterval))
close(sw.out)
return
}
Expand All @@ -146,11 +146,11 @@ func (sw *SlidingWindow[T]) emit() {

// dispatchWindow creates a new window and slides the elements queue.
// It sends the slice of elements to the output channel if the window is not empty.
func (sw *SlidingWindow[T]) dispatchWindow() {
func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
sw.Lock()
// build a window of elements
var windowBottomIndex int
now := util.NowNano()
now := tick.UnixNano()
windowUpperIndex := sw.queue.Len()
slideUpperIndex := windowUpperIndex
slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds()
Expand Down
17 changes: 8 additions & 9 deletions flow/sliding_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
"github.com/reugn/go-streams/util"
)

func TestSlidingWindow(t *testing.T) {
Expand Down Expand Up @@ -74,15 +73,15 @@ func TestSlidingWindowWithExtractor(t *testing.T) {

sink := ext.NewChanSink(out)

now := util.NowNano()
now := time.Now()
inputValues := []element{
{"a", now + 2*int64(time.Millisecond)},
{"b", now + 17*int64(time.Millisecond)},
{"c", now + 29*int64(time.Millisecond)},
{"d", now + 35*int64(time.Millisecond)},
{"e", now + 77*int64(time.Millisecond)},
{"f", now + 93*int64(time.Millisecond)},
{"g", now + 120*int64(time.Millisecond)},
{"a", now.Add(2 * time.Millisecond).UnixNano()},
{"b", now.Add(17 * time.Millisecond).UnixNano()},
{"c", now.Add(29 * time.Millisecond).UnixNano()},
{"d", now.Add(35 * time.Millisecond).UnixNano()},
{"e", now.Add(77 * time.Millisecond).UnixNano()},
{"f", now.Add(93 * time.Millisecond).UnixNano()},
{"g", now.Add(120 * time.Millisecond).UnixNano()},
}
go ingestSlice(inputValues, in)
go closeDeferred(in, 250*time.Millisecond)
Expand Down
6 changes: 0 additions & 6 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package util

import (
"hash/fnv"
"time"
)

// Check panics if the given error is not nil.
Expand All @@ -12,11 +11,6 @@ func Check(e error) {
}
}

// NowNano returns UnixNano in UTC.
func NowNano() int64 {
return time.Now().UTC().UnixNano()
}

// HashCode returns a uint32 hash for the given byte array.
func HashCode(b []byte) uint32 {
h := fnv.New32a()
Expand Down
Loading