From 076aaf8bf135e8f808a6ff829b31ad9118685dc9 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Tue, 25 Apr 2023 13:14:00 -0700 Subject: [PATCH 1/4] compiles --- src/golang/lib/engine/aq_engine.go | 4 +++ src/golang/lib/workflow/artifact/artifact.go | 27 +++++++++++++------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/golang/lib/engine/aq_engine.go b/src/golang/lib/engine/aq_engine.go index 7c841c183..942e4437c 100644 --- a/src/golang/lib/engine/aq_engine.go +++ b/src/golang/lib/engine/aq_engine.go @@ -1003,6 +1003,10 @@ func (eng *aqEngine) execute( // and check operators with warning severity. if execState.HasBlockingFailure() { log.Infof("Stopping execution of operator %v", op.ID()) + if execState.Error != nil { + log.Infof("Reason: %s", execState.Error.Message()) + } + for id, dagOp := range workflowDag.Operators() { log.Infof("Checking status of operator %v", id) // Skip if this operator has already been completed or is in progress. diff --git a/src/golang/lib/workflow/artifact/artifact.go b/src/golang/lib/workflow/artifact/artifact.go index 5e84a52a7..ca75e2959 100644 --- a/src/golang/lib/workflow/artifact/artifact.go +++ b/src/golang/lib/workflow/artifact/artifact.go @@ -3,6 +3,7 @@ package artifact import ( "context" "encoding/json" + "fmt" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/models" @@ -207,9 +208,7 @@ func (a *ArtifactImpl) updateArtifactResultAfterComputation( execState *shared.ExecutionState, ) { changes := map[string]interface{}{ - models.ArtifactResultStatus: execState.Status, - models.ArtifactResultExecState: execState, - models.ArtifactResultMetadata: nil, + models.ArtifactResultMetadata: nil, } if a.Computed(ctx) { @@ -222,11 +221,18 @@ func (a *ArtifactImpl) updateArtifactResultAfterComputation( ) if err != nil { log.Errorf("Unable to read artifact result metadata from storage and unmarshal: %v", err) - return + execState.Error.Context = fmt.Sprintf( + "%s\nError reading metadata: %v", + execState.Error.Context, + err, + ) } changes[models.ArtifactResultMetadata] = &artifactResultMetadata } + changes[models.ArtifactResultStatus] = execState.Status + changes[models.ArtifactResultExecState] = execState + _, err := a.resultRepo.Update( ctx, a.resultID, @@ -322,12 +328,15 @@ func (a *ArtifactImpl) GetMetadata(ctx context.Context) (*shared.ArtifactResultM return nil, nil } - var metadata shared.ArtifactResultMetadata - err := utils.ReadFromStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath, &metadata) - if err != nil { - return nil, err + // If the path is not provided, we assume the data is not available. + if a.execPaths.ArtifactMetadataPath != "" { + var metadata shared.ArtifactResultMetadata + err := utils.ReadFromStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath, &metadata) + if err != nil { + return nil, err + } + a.resultMetadata = &metadata } - a.resultMetadata = &metadata } return a.resultMetadata, nil From 9ad500c9407b8d347f104164d34343c2c30909d7 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 28 Apr 2023 13:36:46 -0700 Subject: [PATCH 2/4] improve metadata handling --- src/golang/lib/workflow/artifact/artifact.go | 29 ++++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/golang/lib/workflow/artifact/artifact.go b/src/golang/lib/workflow/artifact/artifact.go index ca75e2959..921ca8d3a 100644 --- a/src/golang/lib/workflow/artifact/artifact.go +++ b/src/golang/lib/workflow/artifact/artifact.go @@ -3,7 +3,6 @@ package artifact import ( "context" "encoding/json" - "fmt" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/models" @@ -211,7 +210,9 @@ func (a *ArtifactImpl) updateArtifactResultAfterComputation( models.ArtifactResultMetadata: nil, } - if a.Computed(ctx) { + metadataExists := utils.ObjectExistsInStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath) + + if a.Computed(ctx) && metadataExists { var artifactResultMetadata shared.ArtifactResultMetadata err := utils.ReadFromStorage( ctx, @@ -221,11 +222,7 @@ func (a *ArtifactImpl) updateArtifactResultAfterComputation( ) if err != nil { log.Errorf("Unable to read artifact result metadata from storage and unmarshal: %v", err) - execState.Error.Context = fmt.Sprintf( - "%s\nError reading metadata: %v", - execState.Error.Context, - err, - ) + return } changes[models.ArtifactResultMetadata] = &artifactResultMetadata } @@ -328,15 +325,17 @@ func (a *ArtifactImpl) GetMetadata(ctx context.Context) (*shared.ArtifactResultM return nil, nil } - // If the path is not provided, we assume the data is not available. - if a.execPaths.ArtifactMetadataPath != "" { - var metadata shared.ArtifactResultMetadata - err := utils.ReadFromStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath, &metadata) - if err != nil { - return nil, err - } - a.resultMetadata = &metadata + // If the path is not available, we assume the data is not available. + if !utils.ObjectExistsInStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath) { + return nil, nil + } + + var metadata shared.ArtifactResultMetadata + err := utils.ReadFromStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath, &metadata) + if err != nil { + return nil, err } + a.resultMetadata = &metadata } return a.resultMetadata, nil From 3775dde376c00107cbccf9833980a3b84059eda2 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 28 Apr 2023 13:38:16 -0700 Subject: [PATCH 3/4] amend --- src/golang/lib/workflow/artifact/artifact.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/golang/lib/workflow/artifact/artifact.go b/src/golang/lib/workflow/artifact/artifact.go index 921ca8d3a..a8638bc3e 100644 --- a/src/golang/lib/workflow/artifact/artifact.go +++ b/src/golang/lib/workflow/artifact/artifact.go @@ -207,7 +207,9 @@ func (a *ArtifactImpl) updateArtifactResultAfterComputation( execState *shared.ExecutionState, ) { changes := map[string]interface{}{ - models.ArtifactResultMetadata: nil, + models.ArtifactResultMetadata: nil, + models.ArtifactResultStatus: execState.Status, + models.ArtifactResultExecState: execState, } metadataExists := utils.ObjectExistsInStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath) @@ -227,9 +229,6 @@ func (a *ArtifactImpl) updateArtifactResultAfterComputation( changes[models.ArtifactResultMetadata] = &artifactResultMetadata } - changes[models.ArtifactResultStatus] = execState.Status - changes[models.ArtifactResultExecState] = execState - _, err := a.resultRepo.Update( ctx, a.resultID, From 84ad3bfeb0274457f0fae8f650cf63bf5dfd13fe Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Fri, 28 Apr 2023 13:39:56 -0700 Subject: [PATCH 4/4] improve storage management --- src/golang/lib/workflow/utils/storage.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/golang/lib/workflow/utils/storage.go b/src/golang/lib/workflow/utils/storage.go index 7204b1f24..64f04c11c 100644 --- a/src/golang/lib/workflow/utils/storage.go +++ b/src/golang/lib/workflow/utils/storage.go @@ -24,6 +24,10 @@ func CleanupStorageFiles(ctx context.Context, storageConfig *shared.StorageConfi } func ObjectExistsInStorage(ctx context.Context, storageConfig *shared.StorageConfig, path string) bool { + if path == "" { + return false + } + return storage.NewStorage(storageConfig).Exists(ctx, path) }