Skip to content

Commit ac91461

Browse files
Sidddddarthlvrach
andauthored
feat: sync.Group (#425)
Co-authored-by: Leonidas Vrachnis <[email protected]>
1 parent a27349a commit ac91461

File tree

7 files changed

+205
-0
lines changed

7 files changed

+205
-0
lines changed

.github/workflows/labeler.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ name: "Pull request labeler"
22
on:
33
- pull_request
44

5+
concurrency:
6+
group: ${{ github.workflow }}-${{ github.head_ref }}
7+
cancel-in-progress: true
58
jobs:
69
triage:
710
permissions:

.github/workflows/pr-description-enforcer.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ on:
66
- edited
77
- reopened
88

9+
concurrency:
10+
group: ${{ github.workflow }}-${{ github.head_ref }}
11+
cancel-in-progress: true
912
jobs:
1013
enforce:
1114
runs-on: ubuntu-latest

.github/workflows/semantic-pr.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ on:
1111
- ready_for_review
1212
- synchronize
1313

14+
concurrency:
15+
group: ${{ github.workflow }}-${{ github.head_ref }}
16+
cancel-in-progress: true
1417
jobs:
1518
main:
1619
name: title

.github/workflows/tests.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ on:
66
- main
77
- "release/*"
88
pull_request:
9+
concurrency:
10+
group: ${{ github.workflow }}-${{ github.head_ref || github.sha }}
11+
cancel-in-progress: true
912
jobs:
1013
unit:
1114
name: unit

.github/workflows/verify.yml

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ on:
77
- master
88
- main
99
pull_request:
10+
concurrency:
11+
group: ${{ github.workflow }}-${{ github.head_ref || github.sha }}
12+
cancel-in-progress: true
1013
jobs:
1114
generate:
1215
name: generated files

sync/group.go

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
// A EagerGroup is a collection of goroutines working on subtasks that are part of
9+
// the same overall task.
10+
//
11+
// Use NewEagerGroup to create a new group.
12+
type EagerGroup struct {
13+
ctx context.Context
14+
cancel context.CancelCauseFunc
15+
wg sync.WaitGroup
16+
sem chan struct{}
17+
errOnce sync.Once
18+
err error
19+
}
20+
21+
// NewEagerGroup returns a new eager group and an associated Context derived from ctx.
22+
//
23+
// The derived Context is canceled the first time a function passed to Go
24+
// returns a non-nil error or the first time Wait returns, whichever occurs
25+
// first.
26+
//
27+
// limit < 1 means no limit on the number of active goroutines.
28+
func NewEagerGroup(ctx context.Context, limit int) (*EagerGroup, context.Context) {
29+
ctx, cancel := context.WithCancelCause(ctx)
30+
g := &EagerGroup{
31+
ctx: ctx,
32+
cancel: cancel,
33+
}
34+
if limit > 0 {
35+
g.sem = make(chan struct{}, limit)
36+
}
37+
return g, ctx
38+
}
39+
40+
// Go calls the given function in a new goroutine.
41+
// It blocks until the new goroutine can be added without the number of
42+
// active goroutines in the group exceeding the configured limit.
43+
//
44+
// The first call to return a non-nil error cancels the group's context.
45+
// The error will be returned by Wait.
46+
//
47+
// If the group was created by calling NewEagerGroup with limit < 1, there is no
48+
// limit on the number of active goroutines.
49+
//
50+
// If the group's context is canceled, routines that have not executed yet due to the limit won't be executed.
51+
// Additionally, there is a best effort not to execute `f()` once the context is canceled
52+
// and that happens whether or not a limit has been specified.
53+
func (g *EagerGroup) Go(f func() error) {
54+
if err := g.ctx.Err(); err != nil {
55+
g.errOnce.Do(func() {
56+
g.err = g.ctx.Err()
57+
g.cancel(g.err)
58+
})
59+
return
60+
}
61+
62+
if g.sem != nil {
63+
select {
64+
case <-g.ctx.Done():
65+
g.errOnce.Do(func() {
66+
g.err = g.ctx.Err()
67+
g.cancel(g.err)
68+
})
69+
return
70+
case g.sem <- struct{}{}:
71+
}
72+
}
73+
74+
g.wg.Add(1)
75+
go func() {
76+
err := g.ctx.Err()
77+
if err == nil {
78+
err = f()
79+
}
80+
if err != nil {
81+
g.errOnce.Do(func() {
82+
g.err = err
83+
g.cancel(g.err)
84+
})
85+
}
86+
if g.sem != nil {
87+
<-g.sem
88+
}
89+
g.wg.Done()
90+
}()
91+
}
92+
93+
// Wait blocks until all function calls from the Go method have returned, then
94+
// returns the first non-nil error (if any) from them.
95+
func (g *EagerGroup) Wait() error {
96+
g.wg.Wait()
97+
g.cancel(g.err)
98+
return g.err
99+
}

sync/group_test.go

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestEagerGroupWithLimit(t *testing.T) {
14+
g, ctx := NewEagerGroup(context.Background(), 2)
15+
var count atomic.Int64
16+
// One of the following three goroutines should DEFINITELY NOT be executed due to the limit of 2 and the context being cancelled.
17+
// The context should get cancelled automatically because the first two routines returned an error.
18+
g.Go(func() error {
19+
t.Log("one")
20+
count.Add(1)
21+
return fmt.Errorf("one")
22+
})
23+
g.Go(func() error {
24+
t.Log("two")
25+
count.Add(1)
26+
return fmt.Errorf("two")
27+
})
28+
g.Go(func() error {
29+
t.Log("three")
30+
count.Add(1)
31+
return fmt.Errorf("three")
32+
})
33+
require.Error(t, g.Wait(), "We expect group.Wait() to return an error")
34+
ok := true
35+
select {
36+
case <-ctx.Done():
37+
_, ok = <-ctx.Done()
38+
case <-time.After(time.Second):
39+
}
40+
require.False(t, ok, "We expect the context to be cancelled")
41+
require.True(t, 1 <= count.Load() && count.Load() <= 2, "We expect count to be between 1 and 2")
42+
}
43+
44+
func TestEagerGroupWithNoLimit(t *testing.T) {
45+
ctx, cancel := context.WithCancel(context.Background())
46+
g, ctx := NewEagerGroup(ctx, 0)
47+
funcCounter := &atomic.Int64{}
48+
49+
go func() {
50+
for {
51+
if funcCounter.Load() > 10 {
52+
cancel()
53+
return
54+
}
55+
}
56+
}()
57+
58+
for i := 0; i < 10000; i++ {
59+
g.Go(func() error {
60+
select {
61+
case <-ctx.Done():
62+
return ctx.Err()
63+
default:
64+
}
65+
funcCounter.Add(1)
66+
return nil
67+
})
68+
}
69+
require.ErrorIs(t, g.Wait(), ctx.Err(), "We expect group.Wait() to return the context error")
70+
_, ok := <-ctx.Done()
71+
require.False(t, ok, "We expect the context to be cancelled")
72+
t.Log(funcCounter.Load(), "funcs executed")
73+
// We expect between 10 and 10000 funcs to be executed
74+
// because group tries to return early if context is cancelled
75+
require.Less(
76+
t,
77+
funcCounter.Load(),
78+
int64(10000),
79+
"Expected less than 1000 funcs to be executed",
80+
)
81+
}
82+
83+
func TestNoInitEagerGroup(t *testing.T) {
84+
g := &EagerGroup{}
85+
f := func() error { return nil }
86+
require.Panics(
87+
t,
88+
func() { g.Go(f) },
89+
"We expect a panic when calling Go on a group that has not been initialized with NewEagerGroup",
90+
)
91+
}

0 commit comments

Comments
 (0)