Skip to content

Commit 0f8cf78

Browse files
committed
new object
1 parent 7ec6c8b commit 0f8cf78

File tree

5 files changed

+60
-55
lines changed

5 files changed

+60
-55
lines changed

README.md

+8-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ import (
1717
)
1818

1919
func main() {
20-
inputStream := func() <-chan string {
20+
ctx, cancel := context.WithCancel(context.TODO())
21+
defer cancel()
22+
23+
my := oproc.NewOrderedProc[string /*input param*/, string /*output param*/](ctx)
24+
my.InputStream = func() <-chan string {
2125
ch := make(chan string)
2226
go func() {
2327
defer close(ch)
@@ -28,20 +32,16 @@ func main() {
2832
}
2933
}()
3034
return ch
31-
}
32-
33-
doWork := func(str string) string {
35+
}()
36+
my.DoWork = func(str string) string {
3437
// sleep instead of fetching
3538
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
3639
return fmt.Sprintf("%s ... is fetched!", str)
3740
}
3841

39-
ctx, cancel := context.WithCancel(context.TODO())
40-
defer cancel()
41-
4242
start := time.Now()
4343

44-
for s := range oproc.OrderedProc(ctx, inputStream(), doWork /*, 10 optional # of goroutines */) {
44+
for s := range my.Process() {
4545
fmt.Println(s)
4646
}
4747

examples/ctan/main.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ type item struct {
2323
}
2424

2525
func main() {
26-
inputStream := func() <-chan item {
26+
ctx, cancel := context.WithCancel(context.TODO())
27+
defer cancel()
28+
29+
my := oproc.NewOrderedProc[item /*input param*/, item /*output param*/](ctx)
30+
my.InputStream = func() <-chan item {
2731
valStream := make(chan item)
2832
go func() {
2933
defer close(valStream)
@@ -43,9 +47,8 @@ func main() {
4347
}
4448
}()
4549
return valStream
46-
}
47-
48-
doWork := func(o item) item {
50+
}()
51+
my.DoWork = func(o item) item {
4952
resp, err := http.Get("https://ctan.org/json/2.0/pkg/" + o.Key)
5053
if err != nil {
5154
log.Fatal(err)
@@ -57,11 +60,9 @@ func main() {
5760
}
5861
return o
5962
}
63+
my.Size = 20
6064

61-
ctx, cancel := context.WithCancel(context.TODO())
62-
defer cancel()
63-
64-
for s := range oproc.OrderedProc(ctx, inputStream(), doWork, 20) {
65+
for s := range my.Process() {
6566
b, err := json.Marshal(s)
6667
if err == nil {
6768
fmt.Println(string(b))

examples/ordered/main.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import (
1010
)
1111

1212
func main() {
13-
inputStream := func() <-chan string {
13+
ctx, cancel := context.WithCancel(context.TODO())
14+
defer cancel()
15+
16+
my := oproc.NewOrderedProc[string /*input param*/, string /*output param*/](ctx)
17+
my.InputStream = func() <-chan string {
1418
ch := make(chan string)
1519
go func() {
1620
defer close(ch)
@@ -21,20 +25,16 @@ func main() {
2125
}
2226
}()
2327
return ch
24-
}
25-
26-
doWork := func(str string) string {
28+
}()
29+
my.DoWork = func(str string) string {
2730
// sleep instead of fetching
2831
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
2932
return fmt.Sprintf("%s ... is fetched!", str)
3033
}
3134

32-
ctx, cancel := context.WithCancel(context.TODO())
33-
defer cancel()
34-
3535
start := time.Now()
3636

37-
for s := range oproc.OrderedProc(ctx, inputStream(), doWork) {
37+
for s := range my.Process() {
3838
fmt.Println(s)
3939
}
4040

oproc.go

+27-23
Original file line numberDiff line numberDiff line change
@@ -5,76 +5,80 @@ import (
55
"runtime"
66
)
77

8-
func OrderedProc[T, V any](
9-
ctx context.Context,
10-
inStream <-chan V,
11-
doWork func(V) T,
12-
size ...int,
13-
) <-chan T {
14-
lvl := runtime.NumCPU()
15-
if len(size) > 0 {
16-
lvl = size[0]
8+
type OrderedProc[TI, TO any] struct {
9+
Ctx context.Context
10+
InputStream <-chan TI
11+
DoWork func(TI) TO
12+
Size int
13+
}
14+
15+
func NewOrderedProc[TI, TO any](ctx context.Context) *OrderedProc[TI, TO] {
16+
return &OrderedProc[TI, TO]{
17+
Ctx: ctx,
18+
Size: runtime.NumCPU(),
1719
}
20+
}
1821

19-
orDone := func(c <-chan T) <-chan T {
20-
ch := make(chan T)
22+
func (o *OrderedProc[TI, TO]) Process() <-chan TO {
23+
orDone := func(c <-chan TO) <-chan TO {
24+
ch := make(chan TO)
2125
go func() {
2226
defer close(ch)
2327
for {
2428
select {
25-
case <-ctx.Done():
29+
case <-o.Ctx.Done():
2630
return
2731
case v, ok := <-c:
2832
if !ok {
2933
return
3034
}
3135
select {
3236
case ch <- v:
33-
case <-ctx.Done():
37+
case <-o.Ctx.Done():
3438
}
3539
}
3640
}
3741
}()
3842
return ch
3943
}
4044

41-
chanchan := func() <-chan <-chan T {
42-
chch := make(chan (<-chan T), lvl)
45+
chanchan := func() <-chan <-chan TO {
46+
chch := make(chan (<-chan TO), o.Size)
4347
go func() {
4448
defer close(chch)
45-
for v := range inStream {
46-
ch := make(chan T)
49+
for v := range o.InputStream {
50+
ch := make(chan TO)
4751
chch <- ch
4852

4953
go func() {
5054
defer close(ch)
51-
ch <- doWork(v)
55+
ch <- o.DoWork(v)
5256
}()
5357
}
5458
}()
5559
return chch
5660
}
5761

5862
// bridge-channel
59-
return func(chch <-chan <-chan T) <-chan T {
60-
vch := make(chan T)
63+
return func(chch <-chan <-chan TO) <-chan TO {
64+
vch := make(chan TO)
6165
go func() {
6266
defer close(vch)
6367
for {
64-
var ch <-chan T
68+
var ch <-chan TO
6569
select {
6670
case maybe, ok := <-chch:
6771
if !ok {
6872
return
6973
}
7074
ch = maybe
71-
case <-ctx.Done():
75+
case <-o.Ctx.Done():
7276
return
7377
}
7478
for v := range orDone(ch) {
7579
select {
7680
case vch <- v:
77-
case <-ctx.Done():
81+
case <-o.Ctx.Done():
7882
}
7983
}
8084
}

oproc_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ import (
66
)
77

88
func ExampleOrderedProc() {
9-
inputStream := func() <-chan string {
9+
ctx, cancel := context.WithCancel(context.TODO())
10+
defer cancel()
11+
12+
my := NewOrderedProc[string /*input param*/, string /*output param*/](ctx)
13+
my.InputStream = func() <-chan string {
1014
ch := make(chan string)
1115
go func() {
1216
defer close(ch)
@@ -15,16 +19,12 @@ func ExampleOrderedProc() {
1519
}
1620
}()
1721
return ch
18-
}
19-
20-
doWork := func(str string) string {
22+
}()
23+
my.DoWork = func(str string) string {
2124
return fmt.Sprintf("line:%s", str)
2225
}
2326

24-
ctx, cancel := context.WithCancel(context.TODO())
25-
defer cancel()
26-
27-
for s := range OrderedProc(ctx, inputStream(), doWork) {
27+
for s := range my.Process() {
2828
fmt.Println(s)
2929
// Output:
3030
// line:0

0 commit comments

Comments
 (0)