Skip to content

Commit

Permalink
fix: close output channel properly in window flows (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jan 9, 2024
1 parent 70c1649 commit 3821626
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 90 deletions.
53 changes: 32 additions & 21 deletions flow/session_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ var _ streams.Flow = (*SessionWindow)(nil)
//
// inactivityGap is the gap of inactivity that closes a session window when occurred.
func NewSessionWindow(inactivityGap time.Duration) *SessionWindow {
window := &SessionWindow{
sessionWindow := &SessionWindow{
inactivityGap: inactivityGap,
timer: time.NewTimer(inactivityGap),
in: make(chan interface{}),
out: make(chan interface{}),
done: make(chan struct{}),
}
go window.emit()
go window.receive()
go sessionWindow.emit()
go sessionWindow.receive()

return window
return sessionWindow
}

// Via streams data through the given flow
Expand All @@ -60,44 +60,55 @@ func (sw *SessionWindow) In() chan<- interface{} {
return sw.in
}

// submit emitted windows to the next Inlet
// transmit submits closed windows to the next Inlet.
func (sw *SessionWindow) transmit(inlet streams.Inlet) {
for elem := range sw.Out() {
inlet.In() <- elem
for window := range sw.out {
inlet.In() <- window
}
close(inlet.In())
}

// receive buffers the incoming elements.
// It resets the inactivity timer on each new element.
func (sw *SessionWindow) receive() {
for elem := range sw.in {
for element := range sw.in {
sw.Lock()
sw.buffer = append(sw.buffer, elem)
sw.timer.Reset(sw.inactivityGap)
sw.buffer = append(sw.buffer, element)
sw.timer.Reset(sw.inactivityGap) // reset the inactivity timer
sw.Unlock()
}
close(sw.done)
close(sw.out)
}

// emit generates and emits a new window.
// emit captures and emits a session window based on the gap of inactivity.
// When this period expires, the current session closes and subsequent elements
// are assigned to a new session window.
func (sw *SessionWindow) emit() {
defer sw.timer.Stop()

for {
select {
case <-sw.timer.C:
sw.Lock()
windowSlice := sw.buffer
sw.buffer = nil
sw.Unlock()

// send the window slice to the out chan
if len(windowSlice) > 0 {
sw.out <- windowSlice
}
sw.dispatchWindow()

case <-sw.done:
sw.dispatchWindow()
close(sw.out)
return
}
}
}

// dispatchWindow creates a window from buffered elements and resets the buffer.
// It sends the slice of elements to the output channel if the window is not empty.
func (sw *SessionWindow) dispatchWindow() {
sw.Lock()
windowElements := sw.buffer
sw.buffer = nil
sw.Unlock()

// send elements if the window is not empty
if len(windowElements) > 0 {
sw.out <- windowElements
}
}
108 changes: 59 additions & 49 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewSlidingWindowWithTSExtractor[T any](
return nil, errors.New("slidingInterval is larger than windowSize")
}

window := &SlidingWindow[T]{
slidingWindow := &SlidingWindow[T]{
windowSize: windowSize,
slidingInterval: slidingInterval,
queue: &PriorityQueue{},
Expand All @@ -66,9 +66,9 @@ func NewSlidingWindowWithTSExtractor[T any](
done: make(chan struct{}),
timestampExtractor: timestampExtractor,
}
go window.receive()
go slidingWindow.receive()

return window, nil
return slidingWindow, nil
}

// Via streams data through the given flow
Expand All @@ -94,35 +94,36 @@ func (sw *SlidingWindow[T]) In() chan<- interface{} {
return sw.in
}

// transmit submits newly created windows to the next Inlet.
// transmit submits closed windows to the next Inlet.
func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) {
for elem := range sw.Out() {
inlet.In() <- elem
for window := range sw.out {
inlet.In() <- window
}
close(inlet.In())
}

// timestamp extracts the timestamp from a record if the timestampExtractor is set.
// Returns system clock time otherwise.
func (sw *SlidingWindow[T]) timestamp(elem T) int64 {
// timestamp extracts the timestamp from an element if the timestampExtractor is set.
// It returns system clock time otherwise.
func (sw *SlidingWindow[T]) timestamp(element T) int64 {
if sw.timestampExtractor == nil {
return util.NowNano()
}
return sw.timestampExtractor(elem)
return sw.timestampExtractor(element)
}

// receive buffers the incoming elements by pushing them into a priority queue,
// ordered by their creation time.
func (sw *SlidingWindow[T]) receive() {
for elem := range sw.in {
item := &Item{elem, sw.timestamp(elem.(T)), 0}
for element := range sw.in {
item := &Item{element, sw.timestamp(element.(T)), 0}
sw.Lock()
heap.Push(sw.queue, item)
sw.Unlock()
}
close(sw.done)
close(sw.out)
}

// emit is triggered by the sliding interval
// emit captures and emits a new window every sw.slidingInterval.
func (sw *SlidingWindow[T]) emit() {
// wait for the sliding window to start
time.Sleep(sw.windowSize - sw.slidingInterval)
Expand All @@ -133,48 +134,57 @@ func (sw *SlidingWindow[T]) emit() {
for {
select {
case <-ticker.C:
sw.Lock()
// build a window slice and send it to the out chan
var windowBottomIndex int
now := util.NowNano()
windowUpperIndex := sw.queue.Len()
slideUpperIndex := windowUpperIndex
slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds()
windowBottomTime := now - sw.windowSize.Nanoseconds()
for i, item := range *sw.queue {
if item.epoch < windowBottomTime {
windowBottomIndex = i
}
if item.epoch > slideUpperTime {
slideUpperIndex = i
break
}
}
windowSlice := extract(sw.queue.Slice(windowBottomIndex, windowUpperIndex))
if windowUpperIndex > 0 { // the queue is not empty
s := sw.queue.Slice(slideUpperIndex, windowUpperIndex)
// reset the queue
sw.queue = &s
heap.Init(sw.queue)
}
sw.Unlock()

// send window slice to the out chan
if len(windowSlice) > 0 {
sw.out <- windowSlice
}
sw.dispatchWindow()

case <-sw.done:
sw.dispatchWindow()
close(sw.out)
return
}
}
}

// extract generates a new window slice out of the given items.
func extract(items []*Item) []interface{} {
messages := make([]interface{}, len(items))
// dispatchWindow creates a new window and slides the elements queue.
// It sends the slice of elements to the output channel if the window is not empty.
func (sw *SlidingWindow[T]) dispatchWindow() {
sw.Lock()
// build a window of elements
var windowBottomIndex int
now := util.NowNano()
windowUpperIndex := sw.queue.Len()
slideUpperIndex := windowUpperIndex
slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds()
windowBottomTime := now - sw.windowSize.Nanoseconds()
for i, item := range *sw.queue {
if item.epoch < windowBottomTime {
windowBottomIndex = i
}
if item.epoch > slideUpperTime {
slideUpperIndex = i
break
}
}
windowElements := extractWindowElements(sw.queue.Slice(windowBottomIndex, windowUpperIndex))
if windowUpperIndex > 0 { // the queue is not empty
// slice the queue using the lower and upper bounds
sliced := sw.queue.Slice(slideUpperIndex, windowUpperIndex)
// reset the queue
sw.queue = &sliced
heap.Init(sw.queue)
}
sw.Unlock()

// send elements if the window is not empty
if len(windowElements) > 0 {
sw.out <- windowElements
}
}

// extractWindowElements generates a window of elements from a given slice of queue items.
func extractWindowElements(items []*Item) []interface{} {
elements := make([]interface{}, len(items))
for i, item := range items {
messages[i] = item.Msg
elements[i] = item.Msg
}
return messages
return elements
}
48 changes: 28 additions & 20 deletions flow/tumbling_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ var _ streams.Flow = (*TumblingWindow)(nil)
//
// size is the Duration of generated windows.
func NewTumblingWindow(size time.Duration) *TumblingWindow {
window := &TumblingWindow{
tumblingWindow := &TumblingWindow{
windowSize: size,
in: make(chan interface{}),
out: make(chan interface{}),
done: make(chan struct{}),
}
go window.receive()
go window.emit()
go tumblingWindow.receive()
go tumblingWindow.emit()

return window
return tumblingWindow
}

// Via streams data through the given flow
Expand All @@ -58,44 +58,52 @@ func (tw *TumblingWindow) In() chan<- interface{} {
return tw.in
}

// submit emitted windows to the next Inlet
// transmit submits closed windows to the next Inlet.
func (tw *TumblingWindow) transmit(inlet streams.Inlet) {
for elem := range tw.Out() {
inlet.In() <- elem
for window := range tw.out {
inlet.In() <- window
}
close(inlet.In())
}

// receive buffers the incoming elements.
func (tw *TumblingWindow) receive() {
for elem := range tw.in {
for element := range tw.in {
tw.Lock()
tw.buffer = append(tw.buffer, elem)
tw.buffer = append(tw.buffer, element)
tw.Unlock()
}
close(tw.done)
close(tw.out)
}

// emit generates and emits a new window.
// emit captures and emits a new window based on the fixed time interval.
func (tw *TumblingWindow) emit() {
ticker := time.NewTicker(tw.windowSize)
defer ticker.Stop()

for {
select {
case <-ticker.C:
tw.Lock()
windowSlice := tw.buffer
tw.buffer = nil
tw.Unlock()

// send the window slice to the out chan
if len(windowSlice) > 0 {
tw.out <- windowSlice
}
tw.dispatchWindow()

case <-tw.done:
tw.dispatchWindow()
close(tw.out)
return
}
}
}

// dispatchWindow creates a window from buffered elements and resets the buffer.
// It sends the slice of elements to the output channel if the window is not empty.
func (tw *TumblingWindow) dispatchWindow() {
tw.Lock()
windowElements := tw.buffer
tw.buffer = nil
tw.Unlock()

// send elements if the window is not empty
if len(windowElements) > 0 {
tw.out <- windowElements
}
}

0 comments on commit 3821626

Please sign in to comment.