Skip to content

Commit 3771cd7

Browse files
authored
Merge pull request #60 from reugn/develop
v0.9.0
2 parents 40a2384 + c8018b3 commit 3771cd7

8 files changed

+376
-93
lines changed

.github/workflows/build.yml

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
name: Build
22

3-
on: [push, pull_request]
3+
on:
4+
push:
5+
branches:
6+
- '**'
7+
pull_request:
8+
branches:
9+
- master
410

511
jobs:
612
build:
713
runs-on: ubuntu-latest
814
strategy:
915
matrix:
10-
go-version: [1.19]
16+
go-version: [1.18.x, 1.19.x]
1117
steps:
1218
- name: Setup Go
1319
uses: actions/setup-go@v3
@@ -21,4 +27,5 @@ jobs:
2127
run: go test ./... -coverprofile=coverage.out -covermode=atomic
2228

2329
- name: Upload coverage to Codecov
30+
if: ${{ matrix.go-version == '1.18.x' }}
2431
run: bash <(curl -s https://codecov.io/bash)

flow/flow_test.go

+40-56
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package flow_test
22

33
import (
4-
"container/heap"
5-
"fmt"
64
"reflect"
75
"sort"
86
"strings"
@@ -11,7 +9,6 @@ import (
119

1210
ext "github.com/reugn/go-streams/extension"
1311
"github.com/reugn/go-streams/flow"
14-
"github.com/reugn/go-streams/util"
1512
)
1613

1714
var addAsterisk = func(in string) []string {
@@ -31,7 +28,6 @@ var reduceSum = func(a int, b int) int {
3128

3229
func ingestSlice[T any](source []T, in chan interface{}) {
3330
for _, e := range source {
34-
fmt.Printf("ingest: %v", e)
3531
in <- e
3632
}
3733
}
@@ -53,26 +49,23 @@ func TestComplexFlow(t *testing.T) {
5349
source := ext.NewChanSource(in)
5450
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
5551
appendAsteriskFlatMapFlow := flow.NewFlatMap(addAsterisk, 1)
56-
throttler := flow.NewThrottler(10, time.Second, 50, flow.Backpressure)
57-
slidingWindow := flow.NewSlidingWindow(2*time.Second, 2*time.Second)
58-
tumblingWindow := flow.NewTumblingWindow(time.Second)
59-
filterNotContainsA := flow.NewFilter(filterNotContainsA, 1)
52+
throttler := flow.NewThrottler(10, 200*time.Millisecond, 50, flow.Backpressure)
53+
tumblingWindow := flow.NewTumblingWindow(200 * time.Millisecond)
54+
filterFlow := flow.NewFilter(filterNotContainsA, 1)
6055
sink := ext.NewChanSink(out)
6156

6257
inputValues := []string{"a", "b", "c"}
6358
go ingestSlice(inputValues, in)
64-
go closeDeferred(in, 3*time.Second)
59+
go closeDeferred(in, time.Second)
6560

6661
go func() {
6762
source.
6863
Via(toUpperMapFlow).
6964
Via(appendAsteriskFlatMapFlow).
7065
Via(tumblingWindow).
7166
Via(flow.Flatten(1)).
72-
Via(slidingWindow).
7367
Via(throttler).
74-
Via(flow.Flatten(1)).
75-
Via(filterNotContainsA).
68+
Via(filterFlow).
7669
To(sink)
7770
}()
7871

@@ -85,42 +78,41 @@ func TestComplexFlow(t *testing.T) {
8578
assertEquals(t, expectedValues, outputValues)
8679
}
8780

88-
func TestFanOutFlow(t *testing.T) {
89-
in := make(chan interface{})
90-
out := make(chan interface{})
81+
func TestSplitFlow(t *testing.T) {
82+
in := make(chan interface{}, 3)
83+
out := make(chan interface{}, 3)
9184

9285
source := ext.NewChanSource(in)
93-
filterNotContainsA := flow.NewFilter(filterNotContainsA, 1)
9486
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
9587
sink := ext.NewChanSink(out)
9688

9789
inputValues := []string{"a", "b", "c"}
98-
go ingestSlice(inputValues, in)
99-
go closeDeferred(in, 100*time.Millisecond)
90+
ingestSlice(inputValues, in)
91+
close(in)
10092

101-
go func() {
102-
fanOut := flow.FanOut(source.Via(filterNotContainsA).Via(toUpperMapFlow), 2)
103-
flow.
104-
Merge(fanOut...).
105-
To(sink)
106-
}()
93+
split := flow.Split(
94+
source.
95+
Via(toUpperMapFlow), filterNotContainsA)
96+
97+
flow.Merge(split[0], split[1]).
98+
To(sink)
10799

108100
var outputValues []string
109101
for e := range sink.Out {
110102
outputValues = append(outputValues, e.(string))
111103
}
112104
sort.Strings(outputValues)
113105

114-
expectedValues := []string{"B", "B", "C", "C"}
106+
expectedValues := []string{"A", "B", "C"}
115107
assertEquals(t, expectedValues, outputValues)
116108
}
117109

118-
func TestRoundRobinFlow(t *testing.T) {
110+
func TestFanOutFlow(t *testing.T) {
119111
in := make(chan interface{})
120112
out := make(chan interface{})
121113

122114
source := ext.NewChanSource(in)
123-
filterNotContainsA := flow.NewFilter(filterNotContainsA, 1)
115+
filterFlow := flow.NewFilter(filterNotContainsA, 1)
124116
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
125117
sink := ext.NewChanSink(out)
126118

@@ -129,9 +121,12 @@ func TestRoundRobinFlow(t *testing.T) {
129121
go closeDeferred(in, 100*time.Millisecond)
130122

131123
go func() {
132-
roundRobin := flow.RoundRobin(source.Via(filterNotContainsA).Via(toUpperMapFlow), 2)
124+
fanOut := flow.FanOut(
125+
source.
126+
Via(filterFlow).
127+
Via(toUpperMapFlow), 2)
133128
flow.
134-
Merge(roundRobin...).
129+
Merge(fanOut...).
135130
To(sink)
136131
}()
137132

@@ -141,39 +136,41 @@ func TestRoundRobinFlow(t *testing.T) {
141136
}
142137
sort.Strings(outputValues)
143138

144-
expectedValues := []string{"B", "C"}
139+
expectedValues := []string{"B", "B", "C", "C"}
145140
assertEquals(t, expectedValues, outputValues)
146141
}
147142

148-
func TestSessionWindow(t *testing.T) {
143+
func TestRoundRobinFlow(t *testing.T) {
149144
in := make(chan interface{})
150145
out := make(chan interface{})
151146

152147
source := ext.NewChanSource(in)
153-
sessionWindow := flow.NewSessionWindow(200 * time.Millisecond)
148+
filterFlow := flow.NewFilter(filterNotContainsA, 1)
149+
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
154150
sink := ext.NewChanSink(out)
155151

156152
inputValues := []string{"a", "b", "c"}
157153
go ingestSlice(inputValues, in)
158-
go ingestDeferred("d", in, 300*time.Millisecond)
159-
go ingestDeferred("e", in, 700*time.Millisecond)
160-
go closeDeferred(in, time.Second)
154+
go closeDeferred(in, 100*time.Millisecond)
161155

162156
go func() {
163-
source.
164-
Via(sessionWindow).
157+
roundRobin := flow.RoundRobin(
158+
source.
159+
Via(filterFlow).
160+
Via(toUpperMapFlow), 2)
161+
flow.
162+
Merge(roundRobin...).
165163
To(sink)
166164
}()
167165

168-
var outputValues [][]interface{}
166+
var outputValues []string
169167
for e := range sink.Out {
170-
outputValues = append(outputValues, e.([]interface{}))
168+
outputValues = append(outputValues, e.(string))
171169
}
170+
sort.Strings(outputValues)
172171

173-
assertEquals(t, 3, len(outputValues))
174-
assertEquals(t, 3, len(outputValues[0]))
175-
assertEquals(t, 1, len(outputValues[1]))
176-
assertEquals(t, 1, len(outputValues[2]))
172+
expectedValues := []string{"B", "C"}
173+
assertEquals(t, expectedValues, outputValues)
177174
}
178175

179176
func TestReduceFlow(t *testing.T) {
@@ -201,19 +198,6 @@ func TestReduceFlow(t *testing.T) {
201198
assertEquals(t, expectedValues, outputValues)
202199
}
203200

204-
func TestQueue(t *testing.T) {
205-
queue := &flow.PriorityQueue{}
206-
heap.Push(queue, flow.NewItem(1, util.NowNano(), 0))
207-
heap.Push(queue, flow.NewItem(2, 1234, 0))
208-
heap.Push(queue, flow.NewItem(3, util.NowNano(), 0))
209-
queue.Swap(0, 1)
210-
head := queue.Head()
211-
queue.Update(head, util.NowNano())
212-
first := heap.Pop(queue).(*flow.Item)
213-
214-
assertEquals(t, 2, first.Msg.(int))
215-
}
216-
217201
func assertEquals[T any](t *testing.T, expected T, actual T) {
218202
if !reflect.DeepEqual(expected, actual) {
219203
t.Fatalf("%v != %v", expected, actual)

flow/queue_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package flow_test
2+
3+
import (
4+
"container/heap"
5+
"testing"
6+
7+
"github.com/reugn/go-streams/flow"
8+
"github.com/reugn/go-streams/util"
9+
)
10+
11+
func TestQueueOps(t *testing.T) {
12+
queue := &flow.PriorityQueue{}
13+
heap.Push(queue, flow.NewItem(1, util.NowNano(), 0))
14+
heap.Push(queue, flow.NewItem(2, 1234, 0))
15+
heap.Push(queue, flow.NewItem(3, util.NowNano(), 0))
16+
queue.Swap(0, 1)
17+
head := queue.Head()
18+
queue.Update(head, util.NowNano())
19+
first := heap.Pop(queue).(*flow.Item)
20+
21+
assertEquals(t, 2, first.Msg.(int))
22+
}
23+
24+
func TestQueueOrder(t *testing.T) {
25+
queue := &flow.PriorityQueue{}
26+
27+
pushItem(queue, 5)
28+
pushItem(queue, 4)
29+
pushItem(queue, 6)
30+
pushItem(queue, 3)
31+
pushItem(queue, 7)
32+
pushItem(queue, 2)
33+
pushItem(queue, 8)
34+
pushItem(queue, 1)
35+
pushItem(queue, 9)
36+
37+
assertEquals(t, 1, popMsg(queue))
38+
assertEquals(t, 2, popMsg(queue))
39+
assertEquals(t, 3, popMsg(queue))
40+
assertEquals(t, 4, popMsg(queue))
41+
assertEquals(t, 5, popMsg(queue))
42+
assertEquals(t, 6, popMsg(queue))
43+
assertEquals(t, 7, popMsg(queue))
44+
assertEquals(t, 8, popMsg(queue))
45+
assertEquals(t, 9, popMsg(queue))
46+
}
47+
48+
func pushItem(queue *flow.PriorityQueue, timestamp int64) {
49+
item := flow.NewItem(timestamp, timestamp, 0)
50+
heap.Push(queue, item)
51+
}
52+
53+
func popMsg(queue *flow.PriorityQueue) int64 {
54+
return (heap.Pop(queue).(*flow.Item)).Msg.(int64)
55+
}

flow/session_window_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package flow_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
ext "github.com/reugn/go-streams/extension"
9+
"github.com/reugn/go-streams/flow"
10+
)
11+
12+
func TestSessionWindow(t *testing.T) {
13+
in := make(chan interface{})
14+
out := make(chan interface{})
15+
16+
source := ext.NewChanSource(in)
17+
sessionWindow := flow.NewSessionWindow(20 * time.Millisecond)
18+
sink := ext.NewChanSink(out)
19+
20+
inputValues := []string{"a", "b", "c"}
21+
go ingestSlice(inputValues, in)
22+
go ingestDeferred("d", in, 30*time.Millisecond)
23+
go ingestDeferred("e", in, 70*time.Millisecond)
24+
go closeDeferred(in, 100*time.Millisecond)
25+
26+
go func() {
27+
source.
28+
Via(sessionWindow).
29+
To(sink)
30+
}()
31+
32+
var outputValues [][]interface{}
33+
for e := range sink.Out {
34+
outputValues = append(outputValues, e.([]interface{}))
35+
}
36+
fmt.Println(outputValues)
37+
38+
assertEquals(t, 3, len(outputValues)) // [[a b c] [d] [e]]
39+
40+
assertEquals(t, []interface{}{"a", "b", "c"}, outputValues[0])
41+
assertEquals(t, []interface{}{"d"}, outputValues[1])
42+
assertEquals(t, []interface{}{"e"}, outputValues[2])
43+
}

0 commit comments

Comments
 (0)