-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy patheventstreaming.go
141 lines (120 loc) · 3.78 KB
/
eventstreaming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package adaptertest
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
clock_testing "k8s.io/utils/clock/testing"
"github.com/luno/workflow"
"github.com/luno/workflow/adapters/memrecordstore"
"github.com/luno/workflow/adapters/memrolescheduler"
"github.com/luno/workflow/adapters/memtimeoutstore"
)
type SyncStatus int
const (
SyncStatusUnknown SyncStatus = 0
SyncStatusStarted SyncStatus = 1
SyncStatusEmailSet SyncStatus = 2
SyncStatusRegulationTimeout SyncStatus = 3
SyncStatusCompleted SyncStatus = 4
)
func (s SyncStatus) String() string {
switch s {
case SyncStatusStarted:
return "Started"
case SyncStatusEmailSet:
return "Email set"
case SyncStatusRegulationTimeout:
return "Regulatory cool down period"
case SyncStatusCompleted:
return "Completed"
default:
return "Unknown"
}
}
func RunEventStreamerTest(t *testing.T, constructor workflow.EventStreamer) {
b := workflow.NewBuilder[User, SyncStatus]("sync user 2")
b.AddStep(
SyncStatusStarted,
setEmail(),
SyncStatusEmailSet,
).WithOptions(
workflow.PollingFrequency(time.Millisecond*200),
workflow.ParallelCount(5),
)
b.AddTimeout(
SyncStatusEmailSet,
coolDownTimerFunc(),
coolDownTimeout(),
SyncStatusRegulationTimeout,
).WithOptions(
workflow.PollingFrequency(time.Millisecond * 200),
)
b.AddStep(
SyncStatusRegulationTimeout,
generateUserID(),
SyncStatusCompleted,
).WithOptions(
workflow.PollingFrequency(time.Millisecond*200),
workflow.ParallelCount(5),
)
now := time.Date(2023, time.April, 9, 8, 30, 0, 0, time.UTC)
clock := clock_testing.NewFakeClock(now)
wf := b.Build(
constructor,
memrecordstore.New(),
memrolescheduler.New(),
workflow.WithClock(clock),
workflow.WithTimeoutStore(memtimeoutstore.New()),
)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
})
wf.Run(ctx)
t.Cleanup(wf.Stop)
foreignID := "1"
u := User{
CountryCode: "GB",
}
runId, err := wf.Trigger(ctx, foreignID, SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&u))
require.Nil(t, err)
workflow.AwaitTimeoutInsert(t, wf, foreignID, runId, SyncStatusEmailSet)
clock.Step(time.Hour)
record, err := wf.Await(ctx, foreignID, runId, SyncStatusCompleted)
require.Nil(t, err)
require.Equal(t, "[email protected]", record.Object.Email)
require.Equal(t, SyncStatusCompleted.String(), record.Status.String())
require.NotEmpty(t, record.Object.UID)
}
func setEmail() func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
return func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
t.Object.Email = "[email protected]"
return SyncStatusEmailSet, nil
}
}
func coolDownTimerFunc() func(ctx context.Context, r *workflow.Run[User, SyncStatus], now time.Time) (time.Time, error) {
return func(ctx context.Context, r *workflow.Run[User, SyncStatus], now time.Time) (time.Time, error) {
// Place a 1-hour cool down period for Great Britain users
if r.Object.CountryCode == "GB" {
return now.Add(time.Hour), nil
}
// Don't provide a timeout for users outside of GB
return time.Time{}, nil
}
}
func coolDownTimeout() func(ctx context.Context, r *workflow.Run[User, SyncStatus], now time.Time) (SyncStatus, error) {
return func(ctx context.Context, r *workflow.Run[User, SyncStatus], now time.Time) (SyncStatus, error) {
if r.Object.Email == "[email protected]" {
return SyncStatusRegulationTimeout, nil
}
return 0, nil
}
}
func generateUserID() func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
return func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
t.Object.UID = uuid.New().String()
return SyncStatusCompleted, nil
}
}