Skip to content

Commit e825a5c

Browse files
authored
fix(engine): check md5 on artifact download + fix error printing (#5846)
1 parent 41181d7 commit e825a5c

17 files changed

+488
-272
lines changed

cli/cdsctl/workflow_artifact.go

+6-21
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"io"
76
"os"
87
"regexp"
98
"strconv"
@@ -180,33 +179,19 @@ func workflowArtifactDownloadRun(v cli.Values) error {
180179
}
181180

182181
if toDownload {
183-
var err error
184-
f, err = os.OpenFile(artifactData.Name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(artifactData.Perm))
185-
if err != nil {
186-
return err
187-
}
188182
fmt.Printf("Downloading %s...\n", artifactData.Name)
189-
r, err := client.CDNItemDownload(context.Background(), cdnURL, artifactData.CDNRefHash, sdk.CDNTypeItemRunResult)
183+
f, err := os.OpenFile(artifactData.Name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(artifactData.Perm))
190184
if err != nil {
191-
return err
185+
return sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", artifactData.Name, err))
192186
}
193-
if _, err := io.Copy(f, r); err != nil {
194-
return cli.WrapError(err, "unable to write file")
187+
if err := client.CDNItemDownload(context.Background(), cdnURL, artifactData.CDNRefHash, sdk.CDNTypeItemRunResult, artifactData.MD5, f); err != nil {
188+
_ = f.Close()
189+
return err
195190
}
196191
if err := f.Close(); err != nil {
197-
return cli.WrapError(err, "unable to close file")
192+
return sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", artifactData.Name, err)
198193
}
199194
}
200-
201-
md5Sum, err := sdk.FileMd5sum(artifactData.Name)
202-
if err != nil {
203-
return err
204-
}
205-
206-
if md5Sum != artifactData.MD5 {
207-
return cli.NewError("Invalid md5Sum \ndownloaded file:%s\n%s:%s", md5Sum, f.Name(), artifactData.MD5)
208-
}
209-
210195
if toDownload {
211196
fmt.Printf("File %s created, checksum OK\n", f.Name())
212197
} else {

cli/cdsctl/workflow_run_result.go

+4-18
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"io"
76
"os"
87
"regexp"
98

@@ -129,30 +128,17 @@ func workflowRunResultGet(v cli.Values) error {
129128
var err error
130129
f, err = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(perm))
131130
if err != nil {
132-
return err
131+
return sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", fileName, err))
133132
}
134133
fmt.Printf("Downloading %s...\n", fileName)
135-
r, err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, cdnHash, sdk.CDNTypeItemRunResult)
136-
if err != nil {
134+
if err := client.CDNItemDownload(context.Background(), confCDN.HTTPURL, cdnHash, sdk.CDNTypeItemRunResult, md5, f); err != nil {
135+
_ = f.Close()
137136
return err
138137
}
139-
if _, err := io.Copy(f, r); err != nil {
140-
return cli.WrapError(err, "unable to write file")
141-
}
142138
if err := f.Close(); err != nil {
143-
return cli.WrapError(err, "unable to close file")
139+
return sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", fileName, err)
144140
}
145141
}
146-
147-
md5Sum, err := sdk.FileMd5sum(fileName)
148-
if err != nil {
149-
return err
150-
}
151-
152-
if md5Sum != md5 {
153-
return cli.NewError("Invalid md5Sum \ndownloaded file:%s\n%s:%s", md5Sum, f.Name(), md5)
154-
}
155-
156142
if toDownload {
157143
fmt.Printf("File %s created, checksum OK\n", f.Name())
158144
} else {

engine/api/workflow_application.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler {
173173
var lastErr error
174174
for {
175175
attempt++
176-
reader, err := api.Client.CDNItemDownload(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemRunResult)
176+
reader, err := api.Client.CDNItemStream(ctx, cdnHTTP, r.CDNRefHash, sdk.CDNTypeItemRunResult)
177177
if err != nil {
178178
return err
179179
}
@@ -189,9 +189,7 @@ func (api *API) releaseApplicationWorkflowHandler() service.Handler {
189189
if lastErr != nil {
190190
return err
191191
}
192-
193192
}
194-
195193
return nil
196194
}
197195
}

engine/cdn/cdn_file.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/hex"
99
"encoding/json"
1010
"io"
11+
"net/http"
1112
"os"
1213

1314
"github.com/ovh/cds/engine/cdn/item"
@@ -42,7 +43,7 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
4243
// Check Item unicity
4344
_, err = item.LoadByAPIRefHashAndType(ctx, s.Mapper, s.mustDBWithCtx(ctx), hashRef, itemType)
4445
if err == nil {
45-
return sdk.WrapError(sdk.ErrConflictData, "cannot upload the same file twice")
46+
return sdk.NewErrorFrom(sdk.ErrInvalidData, "cannot upload the same file twice")
4647
}
4748
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
4849
return err
@@ -67,7 +68,11 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
6768
RunNodeID: runResultApiRef.RunNodeID,
6869
RunJobID: runResultApiRef.RunJobID,
6970
}
70-
if err := s.Client.QueueWorkflowRunResultCheck(ctx, sig.JobID, runResultCheck); err != nil {
71+
code, err := s.Client.QueueWorkflowRunResultCheck(ctx, sig.JobID, runResultCheck)
72+
if err != nil {
73+
if code == http.StatusConflict {
74+
return sdk.NewErrorFrom(sdk.ErrInvalidData, "unable to upload the same file twice")
75+
}
7176
return err
7277
}
7378
}

engine/cdn/cdn_sync_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,9 @@ func TestSyncLog(t *testing.T) {
141141

142142
var cdsStorage *cds.CDS
143143
for _, sto := range s.Units.Storages {
144-
cdsStorage = sto.(*cds.CDS)
145-
if cdsStorage != nil {
144+
var is bool
145+
cdsStorage, is = sto.(*cds.CDS)
146+
if is {
146147
break
147148
}
148149
}

engine/worker/internal/action/builtin_artifact_download.go

+9-16
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package action
33
import (
44
"context"
55
"fmt"
6-
"io"
76
"os"
87
"path"
98
"path/filepath"
@@ -115,34 +114,28 @@ func RunArtifactDownload(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac
115114
go func(a sdk.CDNItem) {
116115
defer wg.Done()
117116
destFile := path.Join(destPath, a.APIRef.ToFilename())
118-
f, err := wkDirFS.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm))
119-
if err != nil {
120-
res.Status = sdk.StatusFail
121-
res.Reason = err.Error()
122-
log.Warn(ctx, "Cannot download artifact (OpenFile) %s: %s", destFile, err)
123-
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
124-
return
125-
}
126117
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("Downloading artifact %s from workflow %s/%s on run %d...", destFile, project, workflow, n))
127118

128-
r, err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult)
119+
f, err := os.OpenFile(destFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(apiRef.Perm))
129120
if err != nil {
130121
res.Status = sdk.StatusFail
131-
res.Reason = err.Error()
132-
log.Warn(ctx, "Cannot download artifact %s: %s", destFile, err)
122+
res.Reason = sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("cannot create file (OpenFile) %s: %s", destFile, err)).Error()
123+
log.Warn(ctx, "%s", res.Reason)
133124
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
134125
return
135126
}
136-
if _, err := io.Copy(f, r); err != nil {
127+
if err := wk.Client().CDNItemDownload(ctx, wk.CDNHttpURL(), item.APIRefHash, sdk.CDNTypeItemRunResult, a.MD5, f); err != nil {
128+
_ = f.Close()
137129
res.Status = sdk.StatusFail
138130
res.Reason = err.Error()
139-
log.Warn(ctx, "cannot download artifact %s: %s", destFile, sdk.WithStack(err))
131+
log.Warn(ctx, "Cannot download artifact %s: %s", destFile, err)
140132
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
133+
return
141134
}
142135
if err := f.Close(); err != nil {
143136
res.Status = sdk.StatusFail
144-
res.Reason = err.Error()
145-
log.Warn(ctx, "Cannot close file %s: %s", destFile, err)
137+
res.Reason = sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to close file %s: %v", destFile, err).Error()
138+
log.Warn(ctx, "%s", res.Reason)
146139
wk.SendLog(ctx, workerruntime.LevelError, res.Reason)
147140
return
148141
}

engine/worker/internal/action/builtin_artifact_upload.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
9595
// 3. CDN activated or not
9696
if integrationName != sdk.DefaultStorageIntegrationName {
9797
if err := uploadArtifactByApiCall(path, wk, ctx, projectKey, integrationName, jobID, tag); err != nil {
98-
log.Warn(ctx, "queueArtifactUpload(%s, %s, %d, %s, %s) failed: %v", projectKey, integrationName, jobID, tag.Value, path, err)
9998
chanError <- sdk.WrapError(err, "Error while uploading artifact by api call %s", path)
10099
wgErrors.Add(1)
101100
}
@@ -107,14 +106,12 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
107106
}
108107
} else if !cdnArtifactEnabled {
109108
if err := uploadArtifactByApiCall(path, wk, ctx, projectKey, integrationName, jobID, tag); err != nil {
110-
log.Warn(ctx, "queueArtifactUpload(%s, %s, %d, %s, %s) failed: %v", projectKey, integrationName, jobID, tag.Value, path, err)
111109
chanError <- sdk.WrapError(err, "Error while uploading artifact by api call %s", path)
112110
wgErrors.Add(1)
113111
}
114112
return
115113
} else {
116114
if err := uploadArtifactIntoCDN(path, ctx, wk); err != nil {
117-
log.Error(ctx, "unable to upload artifact into cdn %q: %v", path, err)
118115
chanError <- err
119116
wgErrors.Add(1)
120117
}
@@ -133,7 +130,7 @@ func RunArtifactUpload(ctx context.Context, wk workerruntime.Runtime, a sdk.Acti
133130
wgErrors.Wait()
134131

135132
if !globalError.IsEmpty() {
136-
return res, sdk.NewError(sdk.ErrUnknownError, fmt.Errorf("error: fail to upload artifact"))
133+
return res, fmt.Errorf("%s", globalError.Error())
137134
}
138135

139136
return res, nil
@@ -143,7 +140,11 @@ func uploadArtifactByIntegrationPlugin(path string, ctx context.Context, wk work
143140
_, fileName := filepath.Split(path)
144141

145142
// Check run result
146-
if err := checkArtifactUpload(ctx, wk, fileName, sdk.WorkflowRunResultTypeArtifactManager); err != nil {
143+
code, err := checkArtifactUpload(ctx, wk, fileName, sdk.WorkflowRunResultTypeArtifactManager)
144+
if err != nil {
145+
if code == 409 {
146+
return fmt.Errorf("unable to upload the same file twice: %s", fileName)
147+
}
147148
return fmt.Errorf("unable to check artifact upload authorization: %v", err)
148149
}
149150

@@ -233,7 +234,7 @@ func uploadArtifactIntoCDN(path string, ctx context.Context, wk workerruntime.Ru
233234

234235
duration, err := wk.Client().CDNItemUpload(ctx, wk.CDNHttpURL(), signature, afero.NewOsFs(), path)
235236
if err != nil {
236-
return sdk.WrapError(err, "Error while uploading artifact %s", path)
237+
return err
237238
}
238239
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("File '%s' uploaded in %.2fs to CDS CDN", path, duration.Seconds()))
239240
return nil
@@ -252,7 +253,7 @@ func uploadArtifactByApiCall(path string, wk workerruntime.Runtime, ctx context.
252253
return nil
253254
}
254255

255-
func checkArtifactUpload(ctx context.Context, wk workerruntime.Runtime, fileName string, runResultType sdk.WorkflowRunResultType) error {
256+
func checkArtifactUpload(ctx context.Context, wk workerruntime.Runtime, fileName string, runResultType sdk.WorkflowRunResultType) (int, error) {
256257
runID, runNodeID, runJobID := wk.GetJobIdentifiers()
257258
runResultCheck := sdk.WorkflowRunResultCheck{
258259
RunJobID: runJobID,

0 commit comments

Comments
 (0)