-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathrolescheduler.go
109 lines (90 loc) · 2.86 KB
/
rolescheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package adaptertest
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/luno/workflow"
)
func RunRoleSchedulerTest(t *testing.T, factory func(t *testing.T, instances int) []workflow.RoleScheduler) {
tests := []func(t *testing.T, factory func(t *testing.T, instances int) []workflow.RoleScheduler){
testReturnedContext,
testLocking,
testReleasing,
}
for _, test := range tests {
test(t, factory)
}
}
func testReturnedContext(t *testing.T, factory func(t *testing.T, instances int) []workflow.RoleScheduler) {
t.Run("Ensure that the passed in context is a parent of the returned context", func(t *testing.T) {
rs := factory(t, 1)
ctx := context.Background()
ctxWithValue := context.WithValue(ctx, "parent", "context")
ctx2, cancel, err := rs[0].Await(ctxWithValue, "leader-cancelled-ctx")
require.Nil(t, err)
cancel()
require.Equal(t, "context", ctx2.Value("parent"))
})
}
func testLocking(t *testing.T, factory func(t *testing.T, instances int) []workflow.RoleScheduler) {
t.Run("Ensure role is locked and successive calls are blocked", func(t *testing.T) {
rs := factory(t, 5)
ctx := context.Background()
ctxWithValue := context.WithValue(ctx, "parent", "context")
rolesObtained := make(chan bool, len(rs))
for _, rinkInstance := range rs {
go func(rolesObtained chan bool) {
_, _, err := rinkInstance.Await(ctxWithValue, "leader-lock")
require.Nil(t, err)
rolesObtained <- true
}(rolesObtained)
}
checkInterval := time.NewTicker(50 * time.Millisecond).C
timeout := time.NewTicker(250 * time.Millisecond).C
for {
select {
case <-timeout:
// Pass - timeout expected to return first as the role has not been released
return
case <-checkInterval:
if len(rolesObtained) > 1 {
require.FailNow(t, "more than one instance received a role lock")
}
}
}
})
}
func testReleasing(t *testing.T, factory func(t *testing.T, instances int) []workflow.RoleScheduler) {
t.Run("Ensure role is released on context cancellation", func(t *testing.T) {
instanceCount := 2
rs := factory(t, instanceCount)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
passed := make(chan bool)
go func() {
_, _, err := rs[0].Await(ctx, "leader-releasing")
require.Nil(t, err)
ctx2, cancel2 := context.WithCancel(context.Background())
go func() {
_, _, err := rs[1].Await(ctx2, "leader-releasing")
require.ErrorIs(t, err, context.Canceled)
// Record that the execution got here.
passed <- true
}()
// Cancel the other caller to test that it unlocks on context cancellation
cancel2()
}()
timeout := time.NewTicker(5 * time.Second).C
for {
select {
case <-timeout:
require.FailNow(t, "not all instances obtained the lock")
return
case <-passed:
// Expected call stack executed
return
}
}
})
}