Skip to content

Commit 0bd7b56

Browse files
authored
New helper collection: channel (#95)
* feat(channels): add ToChannel + Generator + Batch + BatchWithTimeout (WIP) * feat: return duration of Batch**** helpers * doc: improve BatchWithTimeout doc * fix(BatchWithTimeout): replace time.After by time.NewTimer in order to prevent memory leak * doc: improve BatchWithTimeout doc
1 parent cce6411 commit 0bd7b56

File tree

4 files changed

+322
-0
lines changed

4 files changed

+322
-0
lines changed

Diff for: CHANGELOG.md

+9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@
22

33
@samber: I sometimes forget to update this file. Ping me on [Twitter](https://twitter.com/samuelberthe) or open an issue in case of error. We need to keep a clear changelog for easier lib upgrade.
44

5+
## 1.31.0 (2022-10-06)
6+
7+
Adding:
8+
9+
- lo.SliceToChannel
10+
- lo.Generator
11+
- lo.Batch
12+
- lo.BatchWithTimeout
13+
514
## 1.30.1 (2022-10-06)
615

716
Fix:

Diff for: README.md

+132
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ Supported helpers for tuples:
135135
Supported helpers for channels:
136136

137137
- [ChannelDispatcher](#channeldispatcher)
138+
- [SliceToChannel](#slicetochannel)
139+
- [Generator](#generator)
140+
- [Batch](#batch)
141+
- [BatchWithTimeout](#batchwithtimeout)
138142

139143
Supported intersection helpers:
140144

@@ -1262,6 +1266,134 @@ children := lo.ChannelDispatcher(ch, 5, 10, customStrategy)
12621266
...
12631267
```
12641268

1269+
### SliceToChannel
1270+
1271+
Returns a read-only channels of collection elements. Channel is closed after last element. Channel capacity can be customized.
1272+
1273+
```go
1274+
list := []int{1, 2, 3, 4, 5}
1275+
1276+
for v := range lo.SliceToChannel(2, list) {
1277+
println(v)
1278+
}
1279+
// prints 1, then 2, then 3, then 4, then 5
1280+
```
1281+
1282+
### Generator
1283+
1284+
Implements the generator design pattern. Channel is closed after last element. Channel capacity can be customized.
1285+
1286+
```go
1287+
generator := func(yield func(int)) {
1288+
yield(1)
1289+
yield(2)
1290+
yield(3)
1291+
}
1292+
1293+
for v := range lo.Generator(2, generator) {
1294+
println(v)
1295+
}
1296+
// prints 1, then 2, then 3
1297+
```
1298+
1299+
### Batch
1300+
1301+
Creates a slice of n elements from a channel. Returns the slice, the slice length, the read time and the channel status (opened/closed).
1302+
1303+
```go
1304+
ch := lo.SliceToChannel(2, []int{1, 2, 3, 4, 5})
1305+
1306+
items1, length1, duration1, ok1 := lo.Batch(ch, 3)
1307+
// []int{1, 2, 3}, 3, 0s, true
1308+
items2, length2, duration2, ok2 := lo.Batch(ch, 3)
1309+
// []int{4, 5}, 2, 0s, false
1310+
```
1311+
1312+
Example: RabbitMQ consumer 👇
1313+
1314+
```go
1315+
ch := readFromQueue()
1316+
1317+
for {
1318+
// read 1k items
1319+
items, length, _, ok := lo.Batch(ch, 1000)
1320+
1321+
// do batching stuff
1322+
1323+
if !ok {
1324+
break
1325+
}
1326+
}
1327+
```
1328+
1329+
### BatchWithTimeout
1330+
1331+
Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed).
1332+
1333+
```go
1334+
generator := func(yield func(int)) {
1335+
for i := 0; i < 5; i++ {
1336+
yield(i)
1337+
time.Sleep(35*time.Millisecond)
1338+
}
1339+
}
1340+
1341+
ch := lo.Generator(0, generator)
1342+
1343+
items1, length1, duration1, ok1 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
1344+
// []int{1, 2}, 2, 100ms, true
1345+
items2, length2, duration2, ok2 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
1346+
// []int{3, 4, 5}, 3, 75ms, true
1347+
items3, length3, duration2, ok3 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
1348+
// []int{}, 0, 10ms, false
1349+
```
1350+
1351+
Example: RabbitMQ consumer 👇
1352+
1353+
```go
1354+
ch := readFromQueue()
1355+
1356+
for {
1357+
// read 1k items
1358+
// wait up to 1 second
1359+
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)
1360+
1361+
// do batching stuff
1362+
1363+
if !ok {
1364+
break
1365+
}
1366+
}
1367+
```
1368+
1369+
Example: Multithreaded RabbitMQ consumer 👇
1370+
1371+
```go
1372+
ch := readFromQueue()
1373+
1374+
// 5 workers
1375+
// prefetch 1k messages per worker
1376+
children := lo.ChannelDispatcher(ch, 5, 1000, DispatchingStrategyFirst[int])
1377+
1378+
consumer := func(c <-chan int) {
1379+
for {
1380+
// read 1k items
1381+
// wait up to 1 second
1382+
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)
1383+
1384+
// do batching stuff
1385+
1386+
if !ok {
1387+
break
1388+
}
1389+
}
1390+
}
1391+
1392+
for i := range children {
1393+
go consumer(children[i])
1394+
}
1395+
```
1396+
12651397
### Contains
12661398

12671399
Returns true if an element is present in a collection.

Diff for: channel.go

+77
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,80 @@ func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) in
149149
return len(channels[item]) > len(channels[max]) && channelIsNotFull(channels[item])
150150
})
151151
}
152+
153+
// SliceToChannel returns a read-only channels of collection elements.
154+
func SliceToChannel[T any](bufferSize int, collection []T) <-chan T {
155+
ch := make(chan T, bufferSize)
156+
157+
go func() {
158+
for _, item := range collection {
159+
ch <- item
160+
}
161+
162+
close(ch)
163+
}()
164+
165+
return ch
166+
}
167+
168+
// Generator implements the generator design pattern.
169+
func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T {
170+
ch := make(chan T, bufferSize)
171+
172+
go func() {
173+
// WARNING: infinite loop
174+
generator(func(t T) {
175+
ch <- t
176+
})
177+
178+
close(ch)
179+
}()
180+
181+
return ch
182+
}
183+
184+
// Batch creates a slice of n elements from a channel. Returns the slice and the slice length.
185+
// @TODO: we should probaby provide an helper that reuse the same buffer.
186+
func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) {
187+
buffer := make([]T, 0, size)
188+
index := 0
189+
now := time.Now()
190+
191+
for ; index < size; index++ {
192+
item, ok := <-ch
193+
if !ok {
194+
return buffer, index, time.Since(now), false
195+
}
196+
197+
buffer = append(buffer, item)
198+
}
199+
200+
return buffer, index, time.Since(now), true
201+
}
202+
203+
// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
204+
// @TODO: we should probaby provide an helper that reuse the same buffer.
205+
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
206+
expire := time.NewTimer(timeout)
207+
defer expire.Stop()
208+
209+
buffer := make([]T, 0, size)
210+
index := 0
211+
now := time.Now()
212+
213+
for ; index < size; index++ {
214+
select {
215+
case item, ok := <-ch:
216+
if !ok {
217+
return buffer, index, time.Since(now), false
218+
}
219+
220+
buffer = append(buffer, item)
221+
222+
case <-expire.C:
223+
return buffer, index, time.Since(now), true
224+
}
225+
}
226+
227+
return buffer, index, time.Since(now), true
228+
}

Diff for: channel_test.go

+104
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,107 @@ func TestDispatchingStrategyMost(t *testing.T) {
187187
children[1] <- 1
188188
is.Equal(0, DispatchingStrategyMost(42, 0, rochildren))
189189
}
190+
191+
func TestSliceToChannel(t *testing.T) {
192+
t.Parallel()
193+
testWithTimeout(t, 10*time.Millisecond)
194+
is := assert.New(t)
195+
196+
ch := SliceToChannel[int](2, []int{1, 2, 3})
197+
198+
r1, ok1 := <-ch
199+
r2, ok2 := <-ch
200+
r3, ok3 := <-ch
201+
is.True(ok1)
202+
is.Equal(1, r1)
203+
is.True(ok2)
204+
is.Equal(2, r2)
205+
is.True(ok3)
206+
is.Equal(3, r3)
207+
208+
_, ok4 := <-ch
209+
is.False(ok4)
210+
}
211+
212+
func TestGenerate(t *testing.T) {
213+
t.Parallel()
214+
testWithTimeout(t, 10*time.Millisecond)
215+
is := assert.New(t)
216+
217+
generator := func(yield func(int)) {
218+
yield(0)
219+
yield(1)
220+
yield(2)
221+
yield(3)
222+
}
223+
224+
i := 0
225+
226+
for v := range Generator(2, generator) {
227+
is.Equal(i, v)
228+
i++
229+
}
230+
231+
is.Equal(i, 4)
232+
}
233+
234+
func TestBatch(t *testing.T) {
235+
t.Parallel()
236+
testWithTimeout(t, 10*time.Millisecond)
237+
is := assert.New(t)
238+
239+
ch := SliceToChannel(2, []int{1, 2, 3})
240+
241+
items1, length1, _, ok1 := Batch(ch, 2)
242+
items2, length2, _, ok2 := Batch(ch, 2)
243+
items3, length3, _, ok3 := Batch(ch, 2)
244+
245+
is.Equal([]int{1, 2}, items1)
246+
is.Equal(2, length1)
247+
is.True(ok1)
248+
is.Equal([]int{3}, items2)
249+
is.Equal(1, length2)
250+
is.False(ok2)
251+
is.Equal([]int{}, items3)
252+
is.Equal(0, length3)
253+
is.False(ok3)
254+
}
255+
256+
func TestBatchWithTimeout(t *testing.T) {
257+
t.Parallel()
258+
testWithTimeout(t, 200*time.Millisecond)
259+
is := assert.New(t)
260+
261+
generator := func(yield func(int)) {
262+
for i := 0; i < 5; i++ {
263+
yield(i)
264+
time.Sleep(10 * time.Millisecond)
265+
}
266+
}
267+
ch := Generator(0, generator)
268+
269+
items1, length1, _, ok1 := BatchWithTimeout(ch, 20, 15*time.Millisecond)
270+
is.Equal([]int{0, 1}, items1)
271+
is.Equal(2, length1)
272+
is.True(ok1)
273+
274+
items2, length2, _, ok2 := BatchWithTimeout(ch, 20, 2*time.Millisecond)
275+
is.Equal([]int{}, items2)
276+
is.Equal(0, length2)
277+
is.True(ok2)
278+
279+
items3, length3, _, ok3 := BatchWithTimeout(ch, 1, 30*time.Millisecond)
280+
is.Equal([]int{2}, items3)
281+
is.Equal(1, length3)
282+
is.True(ok3)
283+
284+
items4, length4, _, ok4 := BatchWithTimeout(ch, 2, 25*time.Millisecond)
285+
is.Equal([]int{3, 4}, items4)
286+
is.Equal(2, length4)
287+
is.True(ok4)
288+
289+
items5, length5, _, ok5 := BatchWithTimeout(ch, 3, 25*time.Millisecond)
290+
is.Equal([]int{}, items5)
291+
is.Equal(0, length5)
292+
is.False(ok5)
293+
}

0 commit comments

Comments
 (0)