Skip to content

Commit 266b47c

Browse files
authored
fix(hooks): sync outgoing tasks (#5854)
1 parent a0920e1 commit 266b47c

File tree

9 files changed

+157
-13
lines changed

9 files changed

+157
-13
lines changed

engine/api/api_helper.go

+8
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ func isCDN(ctx context.Context) bool {
9898
return c.Service != nil && c.Service.Type == sdk.TypeCDN
9999
}
100100

101+
func isHooks(ctx context.Context) bool {
102+
c := getAPIConsumer(ctx)
103+
if c == nil {
104+
return false
105+
}
106+
return c.Service != nil && c.Service.Type == sdk.TypeHooks
107+
}
108+
101109
func isMFA(ctx context.Context) bool {
102110
s := getAuthSession(ctx)
103111
if s == nil {

engine/api/api_routes.go

+1
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,7 @@ func (api *API) InitRouter() {
419419

420420
r.Handle("/workflow/search", Scope(sdk.AuthConsumerScopeProject), r.GET(api.getSearchWorkflowHandler))
421421
r.Handle("/workflow/hook", Scope(sdk.AuthConsumerScopeHooks), r.GET(api.getWorkflowHooksHandler))
422+
r.Handle("/workflow/hook/executions", Scope(sdk.AuthConsumerScopeHooks), r.GET(api.getWorkflowHookExecutionsHandler))
422423
r.Handle("/workflow/hook/model/{model}", ScopeNone(), r.GET(api.getWorkflowHookModelHandler), r.POST(api.postWorkflowHookModelHandler, service.OverrideAuth(api.authAdminMiddleware)), r.PUT(api.putWorkflowHookModelHandler, service.OverrideAuth(api.authAdminMiddleware)))
423424

424425
// SSE

engine/api/workflow/dao_node_run.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ const withLightNodeRunTestsField string = ", json_build_object('ko', workflow_no
6161
func LoadNodeRunIDsWithLogs(db gorp.SqlExecutor, wIDs []int64, status []string) ([]sdk.WorkflowNodeRunIdentifiers, error) {
6262
query := `
6363
WITH noderun as (
64-
SELECT distinct workflow_node_run_id as id, workflow_node_run.workflow_run_id, status
64+
SELECT distinct workflow_node_run_id as id, workflow_node_run.workflow_run_id, status
6565
FROM workflow_node_run_job_logs
6666
JOIN workflow_node_run ON workflow_node_run.id = workflow_node_run_id
6767
WHERE workflow_node_run.workflow_id = ANY($1)
@@ -866,3 +866,16 @@ func RunExist(db gorp.SqlExecutor, projectKey string, workflowID int64, hash str
866866
count, err := db.SelectInt(query, projectKey, workflowID, hash)
867867
return count != 0, err
868868
}
869+
870+
func LoadNodeRunDistinctExecutionIDs(db gorp.SqlExecutor) ([]string, error) {
871+
query := `
872+
SELECT DISTINCT execution_id
873+
FROM workflow_node_run
874+
WHERE execution_id <> '' AND execution_id IS NOT NULL;
875+
`
876+
var executionIDs []string
877+
if _, err := db.Select(&executionIDs, query); err != nil {
878+
return nil, sdk.WithStack(err)
879+
}
880+
return executionIDs, nil
881+
}

engine/api/workflow_hook.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ import (
1717

1818
func (api *API) getWorkflowHooksHandler() service.Handler {
1919
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
20-
// This handler can only be called by a service managed by an admin
21-
if isService := isService(ctx); !isService && !isAdmin(ctx) {
20+
if !isHooks(ctx) {
2221
return sdk.WithStack(sdk.ErrForbidden)
2322
}
2423

@@ -31,6 +30,21 @@ func (api *API) getWorkflowHooksHandler() service.Handler {
3130
}
3231
}
3332

33+
func (api *API) getWorkflowHookExecutionsHandler() service.Handler {
34+
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
35+
if !isHooks(ctx) {
36+
return sdk.WithStack(sdk.ErrForbidden)
37+
}
38+
39+
executionIDs, err := workflow.LoadNodeRunDistinctExecutionIDs(api.mustDB())
40+
if err != nil {
41+
return err
42+
}
43+
44+
return service.WriteJSON(w, executionIDs, http.StatusOK)
45+
}
46+
}
47+
3448
func (api *API) getWorkflowOutgoingHookModelsHandler() service.Handler {
3549
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
3650
m, err := workflow.LoadOutgoingHookModels(api.mustDB())

engine/hooks/tasks.go

+23-10
Original file line numberDiff line numberDiff line change
@@ -69,29 +69,41 @@ func (s *Service) synchronizeTasks(ctx context.Context) error {
6969

7070
log.Info(ctx, "Hooks> Synchronizing tasks from CDS API (%s)", s.Cfg.API.HTTP.URL)
7171

72-
//Get all hooks from CDS, and synchronize the tasks in cache
72+
// Get all hooks from CDS, and synchronize the tasks in cache
7373
hooks, err := s.Client.WorkflowAllHooksList()
7474
if err != nil {
75-
return sdk.WrapError(err, "Unable to get hooks")
75+
return sdk.WrapError(err, "unable to get hooks")
76+
}
77+
mHookIDs := make(map[string]struct{}, len(hooks))
78+
for i := range hooks {
79+
mHookIDs[hooks[i].UUID] = struct{}{}
80+
}
81+
82+
// Get all node run execution ids from CDS, and synchronize the outgoing tasks in cache
83+
executionIDs, err := s.Client.WorkflowAllHooksExecutions()
84+
if err != nil {
85+
return sdk.WrapError(err, "unable to get hook execution ids")
86+
}
87+
mExecutionIDs := make(map[string]struct{}, len(executionIDs))
88+
for i := range executionIDs {
89+
mExecutionIDs[executionIDs[i]] = struct{}{}
7690
}
7791

7892
allOldTasks, err := s.Dao.FindAllTasks(ctx)
7993
if err != nil {
8094
return sdk.WrapError(err, "Unable to get allOldTasks")
8195
}
8296

83-
//Delete all old task which are not referenced in CDS API anymore
97+
// Delete all old task which are not referenced in CDS API anymore
8498
for i := range allOldTasks {
8599
t := &allOldTasks[i]
86100
var found bool
87-
for _, h := range hooks {
88-
if h.UUID == t.UUID {
89-
found = true
90-
log.Debug(ctx, "Hook> Synchronizing %s task %s", h.HookModelName, t.UUID)
91-
break
92-
}
101+
if t.Type == TypeOutgoingWebHook || t.Type == TypeOutgoingWorkflow {
102+
_, found = mExecutionIDs[t.UUID]
103+
} else {
104+
_, found = mHookIDs[t.UUID]
93105
}
94-
if !found && t.Type != TypeOutgoingWebHook && t.Type != TypeOutgoingWorkflow {
106+
if !found {
95107
if err := s.deleteTask(ctx, t); err != nil {
96108
log.Error(ctx, "Hook> Error on task %s delete on synchronization: %v", t.UUID, err)
97109
} else {
@@ -100,6 +112,7 @@ func (s *Service) synchronizeTasks(ctx context.Context) error {
100112
}
101113
}
102114

115+
// Create or update hook tasks from CDS API data
103116
for _, h := range hooks {
104117
confProj := h.Config[sdk.HookConfigProject]
105118
confWorkflow := h.Config[sdk.HookConfigWorkflow]

engine/hooks/tasks_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package hooks
33
import (
44
"context"
55
"net/http"
6+
"sort"
67
"testing"
78
"time"
89

@@ -88,6 +89,7 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
8889
// Mock the sync of tasks
8990
// It will remove all the tasks from the database
9091
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil)
92+
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
9193
m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes()
9294
require.NoError(t, s.synchronizeTasks(ctx))
9395

@@ -164,6 +166,7 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
164166
// Now we will triggered another hooks sync
165167
// The mock must return one hook
166168
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{*h}, nil)
169+
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
167170
require.NoError(t, s.synchronizeTasks(context.Background()))
168171

169172
// We must be able to find the task
@@ -177,3 +180,55 @@ func Test_dequeueTaskExecutions_ScheduledTask(t *testing.T) {
177180
assert.Equal(t, "DONE", execs[0].Status)
178181
assert.Equal(t, "SCHEDULED", execs[1].Status)
179182
}
183+
184+
func Test_synchronizeTasks(t *testing.T) {
185+
log.Factory = log.NewTestingWrapper(t)
186+
s, cancel := setupTestHookService(t)
187+
defer cancel()
188+
189+
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
190+
defer cancel()
191+
192+
// Get the mock
193+
m := s.Client.(*mock_cdsclient.MockInterface)
194+
195+
m.EXPECT().VCSConfiguration().Return(nil, nil).AnyTimes()
196+
197+
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{}, nil)
198+
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{}, nil)
199+
require.NoError(t, s.synchronizeTasks(ctx))
200+
201+
tasks, err := s.Dao.FindAllTasks(ctx)
202+
require.NoError(t, err)
203+
require.Len(t, tasks, 0)
204+
205+
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
206+
UUID: "1",
207+
Type: TypeScheduler,
208+
}))
209+
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
210+
UUID: sdk.UUID(),
211+
Type: TypeScheduler,
212+
}))
213+
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
214+
UUID: "2",
215+
Type: TypeOutgoingWorkflow,
216+
}))
217+
require.NoError(t, s.Dao.SaveTask(&sdk.Task{
218+
UUID: sdk.UUID(),
219+
Type: TypeOutgoingWorkflow,
220+
}))
221+
222+
m.EXPECT().WorkflowAllHooksList().Return([]sdk.NodeHook{{UUID: "1"}}, nil)
223+
m.EXPECT().WorkflowAllHooksExecutions().Return([]string{"2"}, nil)
224+
require.NoError(t, s.synchronizeTasks(ctx))
225+
226+
tasks, err = s.Dao.FindAllTasks(ctx)
227+
require.NoError(t, err)
228+
require.Len(t, tasks, 2)
229+
sort.Slice(tasks, func(i, j int) bool { return tasks[i].UUID < tasks[j].UUID })
230+
require.Equal(t, "1", tasks[0].UUID)
231+
require.Equal(t, TypeScheduler, tasks[0].Type)
232+
require.Equal(t, "2", tasks[1].UUID)
233+
require.Equal(t, TypeOutgoingWorkflow, tasks[1].Type)
234+
}

sdk/cdsclient/client_workflow_hooks.go

+9
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,12 @@ func (c *client) WorkflowAllHooksList() ([]sdk.NodeHook, error) {
1515
}
1616
return w, nil
1717
}
18+
19+
func (c *client) WorkflowAllHooksExecutions() ([]string, error) {
20+
url := fmt.Sprintf("/workflow/hook/executions")
21+
var res []string
22+
if _, err := c.GetJSON(context.Background(), url, &res); err != nil {
23+
return nil, err
24+
}
25+
return res, nil
26+
}

sdk/cdsclient/interface.go

+1
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ type WorkflowClient interface {
356356
WorkflowLogDownload(ctx context.Context, link sdk.CDNLogLink) ([]byte, error)
357357
WorkflowNodeRunRelease(projectKey string, workflowName string, runNumber int64, nodeRunID int64, release sdk.WorkflowNodeRunRelease) error
358358
WorkflowAllHooksList() ([]sdk.NodeHook, error)
359+
WorkflowAllHooksExecutions() ([]string, error)
359360
WorkflowCachePush(projectKey, integrationName, ref string, tarContent io.Reader, size int) error
360361
WorkflowCachePull(projectKey, integrationName, ref string) (io.Reader, error)
361362
WorkflowTransformAsCode(projectKey, workflowName, branch, message string) (*sdk.Operation, error)

sdk/cdsclient/mock_cdsclient/interface_mock.go

+30
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)