Skip to content

Commit

Permalink
feat(backend): implement subdag output resolution (#11196)
Browse files Browse the repository at this point in the history
* fix(backend): implement subdag output resolution

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>
Co-authored-by: CarterFendley <[email protected]>

* Add support for subdags of subdags

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>
Co-authored-by: CarterFendley <[email protected]>

* handle edge case

Signed-off-by: zazulam <[email protected]>
Co-authored-by: droctothorpe <[email protected]>

* Handle artifact outputs as well

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
Co-authored-by: edmondop <[email protected]>

* Simplify parameter handling logic

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Begin decomposition

Signed-off-by: droctothorpe <[email protected]>

* Add support for multiple artifacts and params

Signed-off-by: Tyler Kalbach <[email protected]>

* Implement large tests for subdagio

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>
Co-authored-by: CarterFendley <[email protected]>

* Address PR comments & handle oneof

Signed-off-by: zazulam <[email protected]>
Co-authored-by: droctothorpe <[email protected]>

* update backend metadata client test

Signed-off-by: zazulam <[email protected]>

* Enable nested pipeline IO large tests in CI

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Execute narrow lookup before broad task lookup

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>
Co-authored-by: CarterFendley <[email protected]>

* reimplement getDAGTasks to address edge cases

Signed-off-by: zazulam <[email protected]>
Co-authored-by: droctothorpe <[email protected]>

* handle parallelfor in getDAGTask

Signed-off-by: zazulam <[email protected]>
Co-authored-by: droctothorpe <[email protected]>

* Check if iterationIndex is nil

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* check iteration_count for parallel tasks

Signed-off-by: zazulam <[email protected]>
Co-authored-by: droctothorpe <[email protected]>

* update comment & disable v(4) logs

Signed-off-by: zazulam <[email protected]>

* Address Chen's feedback

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Clarify comment

Signed-off-by: droctothorpe <[email protected]>

---------

Signed-off-by: droctothorpe <[email protected]>
Signed-off-by: zazulam <[email protected]>
Signed-off-by: Tyler Kalbach <[email protected]>
Co-authored-by: zazulam <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
Co-authored-by: edmondop <[email protected]>
Co-authored-by: Tyler Kalbach <[email protected]>
  • Loading branch information
5 people authored Oct 31, 2024
1 parent f2fead5 commit c5b787a
Show file tree
Hide file tree
Showing 17 changed files with 945 additions and 108 deletions.
11 changes: 11 additions & 0 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
}
}
glog.Infof("publish success.")
// At the end of the current task, we check the statuses of all tasks in
// the current DAG and update the DAG's status accordingly.
dag, err := l.metadataClient.GetDAG(ctx, execution.GetExecution().CustomProperties["parent_dag_id"].GetIntValue())
if err != nil {
glog.Errorf("DAG Status Update: failed to get DAG: %s", err.Error())
}
pipeline, _ := l.metadataClient.GetPipelineFromExecution(ctx, execution.GetID())
err = l.metadataClient.UpdateDAGExecutionsState(ctx, dag, pipeline)
if err != nil {
glog.Errorf("failed to update DAG state: %s", err.Error())
}
}()
executedStartedTime := time.Now().Unix()
execution, err = l.prePublish(ctx)
Expand Down
Loading

0 comments on commit c5b787a

Please sign in to comment.