Skip to content

Commit c4d8be8

Browse files
authored
feat: implement actions waiter (#407) [Backport release-1.x] (#433)
Implement a simpler and more versatile waiting functions for actions. Most use cases when waiting for actions is to return early if an action fails. If all actions must be waited until completion, the users should use the `WaitForFunc` function. If the final actions objects are needed, the users should use the `WaitForFunc` function to store the final actions using the `handleUpdate` callback. This deprecates the `ActionClient.WatchOverallProgress` and `ActionClient.WatchProgress` methods. (cherry picked from commit 1e3fa70) BEGIN_COMMIT_OVERRIDE feat: implement actions waiter END_COMMIT_OVERRIDE
1 parent 6aff0c0 commit c4d8be8

6 files changed

+439
-128
lines changed

hcloud/action_waiter.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package hcloud
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"maps"
7+
"slices"
8+
"time"
9+
)
10+
11+
type ActionWaiter interface {
12+
WaitForFunc(ctx context.Context, handleUpdate func(update *Action) error, actions ...*Action) error
13+
WaitFor(ctx context.Context, actions ...*Action) error
14+
}
15+
16+
var _ ActionWaiter = (*ActionClient)(nil)
17+
18+
// WaitForFunc waits until all actions are completed by polling the API at the interval
19+
// defined by [WithPollBackoffFunc]. An action is considered as complete when its status is
20+
// either [ActionStatusSuccess] or [ActionStatusError].
21+
//
22+
// The handleUpdate callback is called every time an action is updated.
23+
func (c *ActionClient) WaitForFunc(ctx context.Context, handleUpdate func(update *Action) error, actions ...*Action) error {
24+
running := make(map[int]struct{}, len(actions))
25+
for _, action := range actions {
26+
if action.Status == ActionStatusRunning {
27+
running[action.ID] = struct{}{}
28+
} else if handleUpdate != nil {
29+
// We filter out already completed actions from the API polling loop; while
30+
// this isn't a real update, the caller should be notified about the new
31+
// state.
32+
if err := handleUpdate(action); err != nil {
33+
return err
34+
}
35+
}
36+
}
37+
38+
retries := 0
39+
for {
40+
if len(running) == 0 {
41+
break
42+
}
43+
44+
select {
45+
case <-ctx.Done():
46+
return ctx.Err()
47+
case <-time.After(c.action.client.pollBackoffFunc(retries)):
48+
retries++
49+
}
50+
51+
opts := ActionListOpts{
52+
Sort: []string{"status", "id"},
53+
ID: make([]int, 0, len(running)),
54+
}
55+
for actionID := range running {
56+
opts.ID = append(opts.ID, actionID)
57+
}
58+
slices.Sort(opts.ID)
59+
60+
updates, err := c.AllWithOpts(ctx, opts)
61+
if err != nil {
62+
return err
63+
}
64+
65+
if len(updates) != len(running) {
66+
// Some actions may not exist in the API, also fail early to prevent an
67+
// infinite loop when updates == 0.
68+
69+
notFound := maps.Clone(running)
70+
for _, update := range updates {
71+
delete(notFound, update.ID)
72+
}
73+
notFoundIDs := make([]int, 0, len(notFound))
74+
for unknownID := range notFound {
75+
notFoundIDs = append(notFoundIDs, unknownID)
76+
}
77+
78+
return fmt.Errorf("actions not found: %v", notFoundIDs)
79+
}
80+
81+
for _, update := range updates {
82+
if update.Status != ActionStatusRunning {
83+
delete(running, update.ID)
84+
}
85+
86+
if handleUpdate != nil {
87+
if err := handleUpdate(update); err != nil {
88+
return err
89+
}
90+
}
91+
}
92+
}
93+
94+
return nil
95+
}
96+
97+
// WaitFor waits until all actions succeed by polling the API at the interval defined by
98+
// [WithPollBackoffFunc]. An action is considered as succeeded when its status is either
99+
// [ActionStatusSuccess].
100+
//
101+
// If a single action fails, the function will stop waiting and the error set in the
102+
// action will be returned as an [ActionError].
103+
//
104+
// For more flexibility, see the [WaitForFunc] function.
105+
func (c *ActionClient) WaitFor(ctx context.Context, actions ...*Action) error {
106+
return c.WaitForFunc(
107+
ctx,
108+
func(update *Action) error {
109+
if update.Status == ActionStatusError {
110+
return update.Error()
111+
}
112+
return nil
113+
},
114+
actions...,
115+
)
116+
}

hcloud/action_waiter_test.go

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package hcloud
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestWaitFor(t *testing.T) {
11+
RunMockedTestCases(t,
12+
[]MockedTestCase{
13+
{
14+
Name: "succeed",
15+
WantRequests: []MockedRequest{
16+
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200,
17+
`{
18+
"actions": [
19+
{ "id": 1509772237, "status": "running", "progress": 0 }
20+
],
21+
"meta": { "pagination": { "page": 1 }}
22+
}`},
23+
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200,
24+
`{
25+
"actions": [
26+
{ "id": 1509772237, "status": "success", "progress": 100 }
27+
],
28+
"meta": { "pagination": { "page": 1 }}
29+
}`},
30+
},
31+
Run: func(env testEnv) {
32+
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}}
33+
34+
err := env.Client.Action.WaitFor(context.Background(), actions...)
35+
assert.NoError(t, err)
36+
},
37+
},
38+
{
39+
Name: "succeed with already succeeded action",
40+
Run: func(env testEnv) {
41+
actions := []*Action{{ID: 1509772237, Status: ActionStatusSuccess}}
42+
43+
err := env.Client.Action.WaitFor(context.Background(), actions...)
44+
assert.NoError(t, err)
45+
},
46+
},
47+
{
48+
Name: "fail with unknown action",
49+
WantRequests: []MockedRequest{
50+
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200,
51+
`{
52+
"actions": [],
53+
"meta": { "pagination": { "page": 1 }}
54+
}`},
55+
},
56+
Run: func(env testEnv) {
57+
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}}
58+
59+
err := env.Client.Action.WaitFor(context.Background(), actions...)
60+
assert.Error(t, err)
61+
assert.Equal(t, "actions not found: [1509772237]", err.Error())
62+
},
63+
},
64+
{
65+
Name: "fail with canceled context",
66+
Run: func(env testEnv) {
67+
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}}
68+
69+
ctx, cancelFunc := context.WithCancel(context.Background())
70+
cancelFunc()
71+
err := env.Client.Action.WaitFor(ctx, actions...)
72+
assert.Error(t, err)
73+
},
74+
},
75+
{
76+
Name: "fail with api error",
77+
WantRequests: []MockedRequest{
78+
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 503, ""},
79+
},
80+
Run: func(env testEnv) {
81+
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}}
82+
83+
err := env.Client.Action.WaitFor(context.Background(), actions...)
84+
assert.Error(t, err)
85+
assert.Equal(t, "hcloud: server responded with status code 503", err.Error())
86+
},
87+
},
88+
},
89+
)
90+
}
91+
92+
func TestWaitForFunc(t *testing.T) {
93+
RunMockedTestCases(t,
94+
[]MockedTestCase{
95+
{
96+
Name: "succeed",
97+
WantRequests: []MockedRequest{
98+
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200,
99+
`{
100+
"actions": [
101+
{ "id": 1509772237, "status": "running", "progress": 40 },
102+
{ "id": 1509772238, "status": "running", "progress": 0 }
103+
],
104+
"meta": { "pagination": { "page": 1 }}
105+
}`},
106+
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200,
107+
`{
108+
"actions": [
109+
{ "id": 1509772237, "status": "running", "progress": 60 },
110+
{ "id": 1509772238, "status": "running", "progress": 50 }
111+
],
112+
"meta": { "pagination": { "page": 1 }}
113+
}`},
114+
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200,
115+
`{
116+
"actions": [
117+
{ "id": 1509772237, "status": "success", "progress": 100 },
118+
{ "id": 1509772238, "status": "running", "progress": 75 }
119+
],
120+
"meta": { "pagination": { "page": 1 }}
121+
}`},
122+
{"GET", "/actions?id=1509772238&page=1&sort=status&sort=id", nil, 200,
123+
`{
124+
"actions": [
125+
{ "id": 1509772238, "status": "error", "progress": 75,
126+
"error": {
127+
"code": "action_failed",
128+
"message": "Something went wrong with the action"
129+
}
130+
}
131+
],
132+
"meta": { "pagination": { "page": 1 }}
133+
}`},
134+
},
135+
Run: func(env testEnv) {
136+
actions := []*Action{
137+
{ID: 1509772236, Status: ActionStatusSuccess},
138+
{ID: 1509772237, Status: ActionStatusRunning},
139+
{ID: 1509772238, Status: ActionStatusRunning},
140+
}
141+
progress := make([]int, 0)
142+
143+
progressByAction := make(map[int]int, len(actions))
144+
err := env.Client.Action.WaitForFunc(context.Background(), func(update *Action) error {
145+
switch update.Status {
146+
case ActionStatusRunning:
147+
progressByAction[update.ID] = update.Progress
148+
case ActionStatusSuccess:
149+
progressByAction[update.ID] = 100
150+
case ActionStatusError:
151+
progressByAction[update.ID] = 100
152+
}
153+
154+
sum := 0
155+
for _, value := range progressByAction {
156+
sum += value
157+
}
158+
progress = append(progress, sum/len(actions))
159+
160+
return nil
161+
}, actions...)
162+
163+
assert.Nil(t, err)
164+
assert.Equal(t, []int{33, 46, 46, 53, 70, 83, 91, 100}, progress)
165+
},
166+
},
167+
},
168+
)
169+
}

0 commit comments

Comments
 (0)