-
Notifications
You must be signed in to change notification settings - Fork 17
/
teststeps.go
214 lines (193 loc) · 7.25 KB
/
teststeps.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package teststeps
import (
"encoding/json"
"fmt"
"reflect"
"sync"
"github.com/facebookincubator/contest/pkg/target"
"github.com/facebookincubator/contest/pkg/test"
"github.com/facebookincubator/contest/pkg/xcontext"
)
// PerTargetFunc is a function type that is called on each target by the
// ForEachTarget function below.
type PerTargetFunc func(ctx xcontext.Context, target *target.Target) error
// ForEachTarget is a simple helper to write plugins that apply a given PerTargetFunc
// independenly to each target. This helper handles routing through the in/out channels,
// and forwards cancel/pause signals to the PerTargetFunc.
// Note this helper does NOT saving state and pausing a test step, so it is only suited for
// short-running tests or for environments that don't use job resumption.
// Use ForEachTargetWithResume below for a similar helper with full resumption support.
// This function wraps the logic that handles target routing through the in/out
// The implementation of the per-target function is responsible for
// reacting to cancel/pause signals and return quickly.
func ForEachTarget(pluginName string, ctx xcontext.Context, ch test.TestStepChannels, f PerTargetFunc) (json.RawMessage, error) {
reportTarget := func(t *target.Target, err error) {
if err != nil {
ctx.Errorf("%s: ForEachTarget: failed to apply test step function on target %s: %v", pluginName, t, err)
} else {
ctx.Debugf("%s: ForEachTarget: target %s completed successfully", pluginName, t)
}
select {
case ch.Out <- test.TestStepResult{Target: t, Err: err}:
case <-ctx.Done():
ctx.Debugf("%s: ForEachTarget: received cancellation signal while reporting result", pluginName)
}
}
var wg sync.WaitGroup
func() {
for {
select {
case tgt, ok := <-ch.In:
if !ok {
ctx.Debugf("%s: ForEachTarget: all targets have been received", pluginName)
return
}
ctx.Debugf("%s: ForEachTarget: received target %s", pluginName, tgt)
wg.Add(1)
go func() {
defer wg.Done()
err := f(ctx, tgt)
reportTarget(tgt, err)
}()
case <-ctx.Done():
ctx.Debugf("%s: ForEachTarget: incoming loop canceled", pluginName)
return
}
}
}()
wg.Wait()
return nil, nil
}
// MarshalState serializes the provided state struct as JSON.
// It sets the Version field to the specified value.
func MarshalState(state interface{}, version int) (json.RawMessage, error) {
{ // Set the version.
vs := reflect.Indirect(reflect.ValueOf(state))
if vs.Kind() != reflect.Struct {
return nil, fmt.Errorf("state must be a struct")
}
vf := vs.FieldByName("Version")
if vf.Kind() == 0 {
return nil, fmt.Errorf("no Version field in struct")
}
vf.SetInt(int64(version))
}
data, err := json.Marshal(state)
if err != nil {
return nil, err
}
return data, xcontext.ErrPaused
}
// TargetWithData holds a step target and the pause/resumption data for it
// Each per-target function gets this passed in and can store any data
// required to resume in data.
type TargetWithData struct {
Target *target.Target
Data json.RawMessage
}
// PerTargetWithResumeFunc is the function that is called per target by ForEachTargetWithResume
// It must obey the context and quickly return on cancellation and pause signals.
// Functions can modify target and store any data required for resumption in target.data.
type PerTargetWithResumeFunc func(ctx xcontext.Context, target *TargetWithData) error
// parallelTargetsState is the internal state of ForEachTargetWithResume.
type parallelTargetsState struct {
Version int `json:"V"`
Targets []*TargetWithData `json:"TWD,omitempty"`
}
// ForEachTargetWithResume is a helper to write plugins that support job resumption.
// This helper is for plugins that want to apply a single function to all targets, independently.
// This helper calls the supplied PerTargetWithResumeFunc immediately for each target received,
// in a separate goroutine. When the function returns, the target is sent to the output channels so it can
// run through the next test step.
// This helper directly accepts the resumeState from the Run method of the TestStep interface, and the
// return value can directly be passed back to the framework.
// The helper automatically manages data returned on pause and makes sure the function is called again
// with the same data on job resumption. The helper will not call functions again that succeeded or failed
// before the pause signal was received.
// The supplied PerTargetWithResumeFunc must react to pause and cancellation signals as normal.
func ForEachTargetWithResume(ctx xcontext.Context, ch test.TestStepChannels, resumeState json.RawMessage, currentStepStateVersion int, f PerTargetWithResumeFunc) (json.RawMessage, error) {
var ss parallelTargetsState
// Parse resume state, if any.
if len(resumeState) > 0 {
if err := json.Unmarshal(resumeState, &ss); err != nil {
return nil, fmt.Errorf("invalid resume state: %w", err)
}
if ss.Version != currentStepStateVersion {
return nil, fmt.Errorf("incompatible resume state: want %d, got %d", currentStepStateVersion, ss.Version)
}
}
var wg sync.WaitGroup
pauseStates := make(chan *TargetWithData)
handleTarget := func(tgt2 *TargetWithData) {
defer wg.Done()
err := f(ctx, tgt2)
switch err {
case xcontext.ErrCanceled:
// nothing to do for failed
case xcontext.ErrPaused:
select {
case pauseStates <- tgt2:
case <-ctx.Done():
ctx.Debugf("ForEachTargetWithResume: received cancellation signal while pausing")
}
default:
// nil or error
if err != nil {
ctx.Errorf("ForEachTargetWithResume: failed to apply test step function on target %s: %v", tgt2.Target.ID, err)
} else {
ctx.Debugf("ForEachTargetWithResume: target %s completed successfully", tgt2.Target.ID)
}
select {
case ch.Out <- test.TestStepResult{Target: tgt2.Target, Err: err}:
case <-ctx.Done():
ctx.Debugf("ForEachTargetWithResume: received cancellation signal while reporting result")
}
}
}
// restart paused targets
for _, state := range ss.Targets {
ctx.Debugf("ForEachTargetWithResume: resuming target %s", state.Target.ID)
wg.Add(1)
go handleTarget(state)
}
// delete info about running targets
ss.Targets = nil
var err error
mainloop:
for {
select {
// no need to check for pause here, pausing closes the channel
case tgt, ok := <-ch.In:
if !ok {
break mainloop
}
ctx.Debugf("ForEachTargetWithResume: received target %s", tgt)
wg.Add(1)
go handleTarget(&TargetWithData{Target: tgt})
case <-ctx.Done():
ctx.Debugf("ForEachTargetWithResume: canceled, terminating")
err = xcontext.ErrCanceled
break mainloop
}
}
// close pauseStates to signal all handlers are done
go func() {
wg.Wait()
close(pauseStates)
}()
for ps := range pauseStates {
ss.Targets = append(ss.Targets, ps)
}
// wrap up
if !ctx.IsSignaledWith(xcontext.ErrPaused) && len(ss.Targets) > 0 {
return nil, fmt.Errorf("ForEachTargetWithResume: some target functions paused, but no pause signal received: %v ", ss.Targets)
}
if ctx.IsSignaledWith(xcontext.ErrPaused) {
return MarshalState(&ss, currentStepStateVersion)
}
return nil, err
}