Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ArtifactGC Fails for Stopped Workflows. Fixes #11879 #11947

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4226bc3
test: Add failing e2e test for stopped DAG workflow ArtifactGC. Towar…
Garett-MacGowan Sep 29, 2023
4fee946
test: Improve robustness of stopped DAG workflow ArtifactGC test. Tow…
Garett-MacGowan Oct 2, 2023
4ffeb5f
test: Move task reconciliation in operator.go to before garbage colle…
Garett-MacGowan Oct 3, 2023
59b2e80
Merge branch 'master' into fix/artifactgc_stopped_workflow
Garett-MacGowan Oct 7, 2023
a0aa696
test: Add new E2E_WAIT_TIMEOUT_BIAS environment varible to enable a b…
Garett-MacGowan Oct 10, 2023
b328314
test: Minor refactors to TestStoppedWorkflow ArtifactGC test. Fixes #…
Garett-MacGowan Oct 11, 2023
50cadb6
test: Adjust TestStoppedWorkflow test to prevent potential unwanted w…
Garett-MacGowan Oct 12, 2023
07fec09
fix: Add new WorkflowTaskResult label to prevent race condition in Ar…
Garett-MacGowan Oct 16, 2023
5d267c9
test: Fix ArtifactGC tests. Add tests for refactor. Fixes #11879
Garett-MacGowan Oct 17, 2023
edb8ed6
Merge branch 'master' into fix/artifactgc_stopped_workflow
Garett-MacGowan Oct 17, 2023
f9e026a
test: Add 30s to WaitForWorkflow timeouts to account for WaitForWorkf…
Garett-MacGowan Oct 17, 2023
5ed2ed9
fix: Add state to ArtGCStatus to keep track of completed task results…
Garett-MacGowan Oct 26, 2023
29e8b29
Merge branch 'master' into fix/artifactgc_stopped_workflow
Garett-MacGowan Oct 26, 2023
d37e911
fix: Undo context rework in wait container since it did not have the …
Garett-MacGowan Oct 26, 2023
02b58a1
fix: Move TaskResultsCompleted from ArtGCStatus to WorkflowStatus. Si…
Garett-MacGowan Oct 27, 2023
7b058ca
fix: Refactors to address PR comments. Add context.Background() to wa…
Garett-MacGowan Oct 31, 2023
65bd7dd
test: Fix ReportOutputs test. Fixes #11879
Garett-MacGowan Oct 31, 2023
db7b4e0
fix: fix typo. Towards #11974
Garett-MacGowan Oct 31, 2023
f7a3a82
Merge branch 'master' into fix/artifactgc_stopped_workflow
Garett-MacGowan Nov 1, 2023
025c22c
Merge branch 'master' into fix/artifactgc_stopped_workflow
Garett-MacGowan Nov 1, 2023
df91580
fix: Bug fix for marking task result complete in context of legacy/in…
Garett-MacGowan Nov 2, 2023
5398870
Merge branch 'fix/artifactgc_stopped_workflow' of github.com:GarettSo…
Garett-MacGowan Nov 2, 2023
ff69ba3
fix: return error from AddAnnotation for AnnotationKeyReportOutputsCo…
Garett-MacGowan Nov 2, 2023
948444d
test: Add 30 seconds or more to WaitForWorkflow timeout to account fo…
Garett-MacGowan Nov 2, 2023
97af87a
test: Add 30 seconds or more to WaitForWorkflow timeout to account fo…
Garett-MacGowan Nov 2, 2023
f7022a3
fix: Run linter. Towards #11879
Garett-MacGowan Nov 2, 2023
ff0adc5
test: Make TestMetricGC more robust. Towards #11879
Garett-MacGowan Nov 2, 2023
1eedcf1
test: rerun tests. Towards #11879
Garett-MacGowan Nov 2, 2023
a78e413
test: rerun tests. Towards #11879
Garett-MacGowan Nov 2, 2023
5168734
fix: Move ArtifactGC to before workflow level synchronization lock. T…
Garett-MacGowan Nov 2, 2023
95aa027
test: rerun tests. Towards #11879
Garett-MacGowan Nov 2, 2023
6564221
Merge branch 'master' into fix/artifactgc_stopped_workflow
Garett-MacGowan Nov 2, 2023
f5af96d
test: rerun tests. Towards #11879
Garett-MacGowan Nov 2, 2023
bf3a4e7
fix: merge master. Towards #11879
Garett-MacGowan Nov 2, 2023
6da8567
fix: Make signaled container set test more robust. Prevent premature …
Garett-MacGowan Nov 10, 2023
941bab0
test: Fix signals test. comment & logging tweaks
Garett-MacGowan Nov 10, 2023
c5cb204
test: Fix ordering of assert.contains in signals test
Garett-MacGowan Nov 10, 2023
b35c058
test: empty commit for ci re-submit.
Garett-MacGowan Nov 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func initExecutor() *executor.WorkflowExecutor {
podName,
types.UID(os.Getenv(common.EnvVarPodUID)),
os.Getenv(common.EnvVarWorkflowName),
types.UID(os.Getenv(common.EnvVarWorkflowUID)),
os.Getenv(common.EnvVarNodeID),
namespace,
cre,
Expand Down
26 changes: 20 additions & 6 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ func NewWaitCommand() *cobra.Command {

func waitContainer(ctx context.Context) error {
wfExecutor := initExecutor()
defer wfExecutor.HandleError(ctx) // Must be placed at the bottom of defers stack.

// Don't allow cancellation to impact capture of results, parameters, artifacts, or defers.
bgCtx := context.Background()
Garett-MacGowan marked this conversation as resolved.
Show resolved Hide resolved

defer wfExecutor.HandleError(bgCtx) // Must be placed at the bottom of defers stack.
defer wfExecutor.FinalizeOutput(bgCtx) // Ensures the LabelKeyReportOutputsCompleted is set to true.
defer stats.LogStats()
stats.StartStatsTicker(5 * time.Minute)

Expand All @@ -35,24 +40,33 @@ func waitContainer(ctx context.Context) error {
if err != nil {
wfExecutor.AddError(err)
}
ctx = context.Background() // don't allow cancellation to impact capture of results, parameters,or artifacts

// Capture output script result
err = wfExecutor.CaptureScriptResult(ctx)
err = wfExecutor.CaptureScriptResult(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Saving output parameters
err = wfExecutor.SaveParameters(ctx)
err = wfExecutor.SaveParameters(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Saving output artifacts
err = wfExecutor.SaveArtifacts(ctx)
err = wfExecutor.SaveArtifacts(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Save log artifacts
logArtifacts := wfExecutor.SaveLogs(bgCtx)

// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
err = wfExecutor.ReportOutputs(bgCtx, logArtifacts)
if err != nil {
wfExecutor.AddError(err)
}

wfExecutor.SaveLogs(ctx)
return wfExecutor.HasError()
}
1 change: 1 addition & 0 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ WorkflowStatus contains overall status information about a workflow
|`storedTemplates`|[`Template`](#template)|StoredTemplates is a mapping between a template ref and the node's status.|
|`storedWorkflowTemplateSpec`|[`WorkflowSpec`](#workflowspec)|StoredWorkflowSpec stores the WorkflowTemplate spec for future execution.|
|`synchronization`|[`SynchronizationStatus`](#synchronizationstatus)|Synchronization stores the status of synchronization locks|
|`taskResultsCompleted`|`Map< boolean , string >`|Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.|

## CronWorkflowSpec

Expand Down
4 changes: 4 additions & 0 deletions manifests/base/crds/full/argoproj.io_workflows.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,527 changes: 847 additions & 680 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,30 @@ type WorkflowStatus struct {

// ArtifactGCStatus maintains the status of Artifact Garbage Collection
ArtifactGCStatus *ArtGCStatus `json:"artifactGCStatus,omitempty" protobuf:"bytes,19,opt,name=artifactGCStatus"`

// Have task results been completed? (mapped by Pod name) used to prevent premature garbage collection of artifacts.
TaskResultsCompleted map[string]bool `json:"taskResultsCompleted,omitempty" protobuf:"bytes,20,opt,name=taskResultsCompleted"`
}

func (ws *WorkflowStatus) InitializeTaskResultIncomplete(resultName string) {
if ws.TaskResultsCompleted == nil {
ws.TaskResultsCompleted = make(map[string]bool)
}
if _, ok := ws.TaskResultsCompleted[resultName]; !ok {
ws.MarkTaskResultIncomplete(resultName)
}
}
func (ws *WorkflowStatus) MarkTaskResultComplete(name string) {
ws.TaskResultsCompleted[name] = true
}
func (ws *WorkflowStatus) MarkTaskResultIncomplete(name string) {
ws.TaskResultsCompleted[name] = false
}
func (ws *WorkflowStatus) GetTaskResultCompleted(name string) bool {
return ws.TaskResultsCompleted[name]
}
func (ws *WorkflowStatus) GetTaskResultsCompleted() map[string]bool {
return ws.TaskResultsCompleted
}

func (ws *WorkflowStatus) IsOffloadNodeStatus() bool {
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions sdks/python/client/docs/WorkflowServiceApi.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion test/e2e/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ spec:
`).
When().
SubmitWorkflow().
WaitForWorkflow(time.Minute + 5*time.Second).
WaitForWorkflow(2 * time.Minute).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
Expand Down
95 changes: 93 additions & 2 deletions test/e2e/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/common"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
Expand Down Expand Up @@ -66,6 +67,96 @@ type artifactState struct {
deletedAtWFDeletion bool
}

func (s *ArtifactsSuite) TestStoppedWorkflow() {

for _, tt := range []struct {
workflowFile string
}{
{workflowFile: "@testdata/artifactgc/artgc-dag-wf-stopped.yaml"},
{workflowFile: "@testdata/artifactgc/artgc-dag-wf-stopped-pod-gc-on-pod-completion.yaml"},
} {
// Create the minio client for interacting with the bucket.
c, err := minio.New("localhost:9000", &minio.Options{
Creds: credentials.NewStaticV4("admin", "password", ""),
})
assert.NoError(s.T(), err)

// Ensure the artifacts aren't in the bucket.
_, err = c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.StatObjectOptions{})
if err == nil {
err = c.RemoveObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.RemoveObjectOptions{})
assert.NoError(s.T(), err)
}
_, err = c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.StatObjectOptions{})
if err == nil {
err = c.RemoveObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.RemoveObjectOptions{})
assert.NoError(s.T(), err)
}

then := s.Given().
Workflow(tt.workflowFile).
When().
Then()

// Assert the artifacts don't exist.
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})

when := then.When().
SubmitWorkflow().
WaitForWorkflow(
fixtures.WorkflowCompletionOkay(true),
fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {

condition := "for artifacts to exist"

_, err1 := c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-1", minio.StatObjectOptions{})
_, err2 := c.StatObject(context.Background(), "my-bucket-3", "on-deletion-wf-stopped-2", minio.StatObjectOptions{})

if err1 == nil && err2 == nil {
return true, condition
}

return false, condition
}))

then = when.Then()

// Assert artifact exists
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})

when = then.When()

when.
DeleteWorkflow().
WaitForWorkflowDeletion()

then = when.Then()

// Assert the artifacts don't exist.
then.ExpectArtifactByKey("on-deletion-wf-stopped-1", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
then.ExpectArtifactByKey("on-deletion-wf-stopped-2", "my-bucket-3", func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})

when = then.When()

// Remove the finalizers so the workflow gets deleted in case the test failed.
when.RemoveFinalizers(false)
}
}

func (s *ArtifactsSuite) TestArtifactGC() {

s.Given().
Expand Down Expand Up @@ -231,7 +322,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
} else {
fmt.Printf("verifying artifact %s is not deleted\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.Nil(t, err)
assert.NoError(t, err)
})
}
}
Expand Down Expand Up @@ -270,7 +361,7 @@ spec:

// create a ServiceAccount which won't be tied to the artifactgc role and attempt to use that service account in the GC Pod
// Want to verify that this causes the ArtifactGCError Condition in the Workflow
func (s *ArtifactsSuite) TestArtifactGC_InsufficientRole() {
func (s *ArtifactsSuite) TestInsufficientRole() {
ctx := context.Background()
_, err := s.KubeClient.CoreV1().ServiceAccounts(fixtures.Namespace).Create(ctx, &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: "artgc-role-test-sa"}}, metav1.CreateOptions{})
assert.NoError(s.T(), err)
Expand Down
Loading