Skip to content

Commit e3d854e

Browse files
authored
Merge pull request #53 from reugn/develop
Use generic transformation functions in Flows
2 parents 0dd3576 + 8187dcf commit e3d854e

File tree

10 files changed

+300
-180
lines changed

10 files changed

+300
-180
lines changed

.github/workflows/build.yml

+6-3
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,18 @@ jobs:
77
runs-on: ubuntu-latest
88
strategy:
99
matrix:
10-
go-version: [1.17.x]
10+
go-version: [1.19]
1111
steps:
1212
- name: Setup Go
13-
uses: actions/setup-go@v2
13+
uses: actions/setup-go@v3
1414
with:
1515
go-version: ${{ matrix.go-version }}
16+
1617
- name: Checkout code
17-
uses: actions/checkout@v2
18+
uses: actions/checkout@v3
19+
1820
- name: Run coverage
1921
run: go test ./... -coverprofile=coverage.out -covermode=atomic
22+
2023
- name: Upload coverage to Codecov
2124
run: bash <(curl -s https://codecov.io/bash)

.github/workflows/golangci-lint.yml

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ jobs:
1010
name: lint
1111
runs-on: ubuntu-latest
1212
steps:
13-
- uses: actions/checkout@v2
13+
- name: Checkout code
14+
uses: actions/checkout@v3
15+
1416
- name: golangci-lint
15-
uses: golangci/golangci-lint-action@v2
17+
uses: golangci/golangci-lint-action@v3

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ Flow capabilities ([flow](https://github.com/reugn/go-streams/tree/master/flow)
2020
* Map
2121
* FlatMap
2222
* Filter
23+
* Reduce
2324
* PassThrough
2425
* Split
2526
* FanOut
2627
* RoundRobin
2728
* Merge
29+
* Flatten
2830
* Throttler
2931
* SlidingWindow
3032
* TumblingWindow

flow/filter.go

+18-18
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
// FilterPredicate represents a filter predicate (boolean-valued function).
8-
type FilterPredicate func(interface{}) bool
8+
type FilterPredicate[T any] func(T) bool
99

1010
// Filter filters incoming elements using a filter predicate.
1111
// If an element matches the predicate, the element is passed downstream.
@@ -16,22 +16,22 @@ type FilterPredicate func(interface{}) bool
1616
// [ -------- FilterPredicate -------- ]
1717
//
1818
// out -- 1 -- 2 ------------------ 5 --
19-
type Filter struct {
20-
filterPredicate FilterPredicate
19+
type Filter[T any] struct {
20+
filterPredicate FilterPredicate[T]
2121
in chan interface{}
2222
out chan interface{}
2323
parallelism uint
2424
}
2525

2626
// Verify Filter satisfies the Flow interface.
27-
var _ streams.Flow = (*Filter)(nil)
27+
var _ streams.Flow = (*Filter[any])(nil)
2828

2929
// NewFilter returns a new Filter instance.
3030
//
3131
// filterPredicate is the boolean-valued filter function.
3232
// parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
33-
func NewFilter(filterPredicate FilterPredicate, parallelism uint) *Filter {
34-
filter := &Filter{
33+
func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism uint) *Filter[T] {
34+
filter := &Filter[T]{
3535
filterPredicate: filterPredicate,
3636
in: make(chan interface{}),
3737
out: make(chan interface{}),
@@ -43,44 +43,44 @@ func NewFilter(filterPredicate FilterPredicate, parallelism uint) *Filter {
4343
}
4444

4545
// Via streams data through the given flow
46-
func (f *Filter) Via(flow streams.Flow) streams.Flow {
46+
func (f *Filter[T]) Via(flow streams.Flow) streams.Flow {
4747
go f.transmit(flow)
4848
return flow
4949
}
5050

5151
// To streams data to the given sink
52-
func (f *Filter) To(sink streams.Sink) {
52+
func (f *Filter[T]) To(sink streams.Sink) {
5353
f.transmit(sink)
5454
}
5555

5656
// Out returns an output channel for sending data
57-
func (f *Filter) Out() <-chan interface{} {
57+
func (f *Filter[T]) Out() <-chan interface{} {
5858
return f.out
5959
}
6060

6161
// In returns an input channel for receiving data
62-
func (f *Filter) In() chan<- interface{} {
62+
func (f *Filter[T]) In() chan<- interface{} {
6363
return f.in
6464
}
6565

66-
func (f *Filter) transmit(inlet streams.Inlet) {
67-
for elem := range f.Out() {
68-
inlet.In() <- elem
66+
func (f *Filter[T]) transmit(inlet streams.Inlet) {
67+
for element := range f.Out() {
68+
inlet.In() <- element
6969
}
7070
close(inlet.In())
7171
}
7272

7373
// doStream discards items that don't match the filter predicate.
74-
func (f *Filter) doStream() {
74+
func (f *Filter[T]) doStream() {
7575
sem := make(chan struct{}, f.parallelism)
7676
for elem := range f.in {
7777
sem <- struct{}{}
78-
go func(e interface{}) {
78+
go func(element T) {
7979
defer func() { <-sem }()
80-
if f.filterPredicate(e) {
81-
f.out <- e
80+
if f.filterPredicate(element) {
81+
f.out <- element
8282
}
83-
}(elem)
83+
}(elem.(T))
8484
}
8585
for i := 0; i < int(f.parallelism); i++ {
8686
sem <- struct{}{}

flow/flat_map.go

+18-18
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
// FlatMapFunction represents a FlatMap transformation function.
8-
type FlatMapFunction func(interface{}) []interface{}
8+
type FlatMapFunction[T, R any] func(T) []R
99

1010
// FlatMap takes one element and produces zero, one, or more elements.
1111
//
@@ -14,22 +14,22 @@ type FlatMapFunction func(interface{}) []interface{}
1414
// [ -------- FlatMapFunction -------- ]
1515
//
1616
// out -- 1' - 2' -------- 4'- 4" - 5' -
17-
type FlatMap struct {
18-
flatMapFunction FlatMapFunction
17+
type FlatMap[T, R any] struct {
18+
flatMapFunction FlatMapFunction[T, R]
1919
in chan interface{}
2020
out chan interface{}
2121
parallelism uint
2222
}
2323

2424
// Verify FlatMap satisfies the Flow interface.
25-
var _ streams.Flow = (*FlatMap)(nil)
25+
var _ streams.Flow = (*FlatMap[any, any])(nil)
2626

2727
// NewFlatMap returns a new FlatMap instance.
2828
//
2929
// flatMapFunction is the FlatMap transformation function.
3030
// parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
31-
func NewFlatMap(flatMapFunction FlatMapFunction, parallelism uint) *FlatMap {
32-
flatMap := &FlatMap{
31+
func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism uint) *FlatMap[T, R] {
32+
flatMap := &FlatMap[T, R]{
3333
flatMapFunction: flatMapFunction,
3434
in: make(chan interface{}),
3535
out: make(chan interface{}),
@@ -41,44 +41,44 @@ func NewFlatMap(flatMapFunction FlatMapFunction, parallelism uint) *FlatMap {
4141
}
4242

4343
// Via streams data through the given flow
44-
func (fm *FlatMap) Via(flow streams.Flow) streams.Flow {
44+
func (fm *FlatMap[T, R]) Via(flow streams.Flow) streams.Flow {
4545
go fm.transmit(flow)
4646
return flow
4747
}
4848

4949
// To streams data to the given sink
50-
func (fm *FlatMap) To(sink streams.Sink) {
50+
func (fm *FlatMap[T, R]) To(sink streams.Sink) {
5151
fm.transmit(sink)
5252
}
5353

5454
// Out returns an output channel for sending data
55-
func (fm *FlatMap) Out() <-chan interface{} {
55+
func (fm *FlatMap[T, R]) Out() <-chan interface{} {
5656
return fm.out
5757
}
5858

5959
// In returns an input channel for receiving data
60-
func (fm *FlatMap) In() chan<- interface{} {
60+
func (fm *FlatMap[T, R]) In() chan<- interface{} {
6161
return fm.in
6262
}
6363

64-
func (fm *FlatMap) transmit(inlet streams.Inlet) {
65-
for elem := range fm.Out() {
66-
inlet.In() <- elem
64+
func (fm *FlatMap[T, R]) transmit(inlet streams.Inlet) {
65+
for element := range fm.Out() {
66+
inlet.In() <- element
6767
}
6868
close(inlet.In())
6969
}
7070

71-
func (fm *FlatMap) doStream() {
71+
func (fm *FlatMap[T, R]) doStream() {
7272
sem := make(chan struct{}, fm.parallelism)
7373
for elem := range fm.in {
7474
sem <- struct{}{}
75-
go func(e interface{}) {
75+
go func(element T) {
7676
defer func() { <-sem }()
77-
trans := fm.flatMapFunction(e)
78-
for _, item := range trans {
77+
result := fm.flatMapFunction(element)
78+
for _, item := range result {
7979
fm.out <- item
8080
}
81-
}(elem)
81+
}(elem.(T))
8282
}
8383
for i := 0; i < int(fm.parallelism); i++ {
8484
sem <- struct{}{}

0 commit comments

Comments
 (0)