From b1da706f4f6b6fb5f75b7a47cee59f4e78c472c3 Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Wed, 11 Sep 2024 12:40:28 -0400 Subject: [PATCH 01/19] fix(backend): implement subdag output resolution Signed-off-by: droctothorpe Co-authored-by: zazulam Co-authored-by: CarterFendley --- backend/src/v2/driver/driver.go | 111 ++++++++++++++++++++++++++++-- backend/src/v2/metadata/client.go | 16 ++++- 2 files changed, 117 insertions(+), 10 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 12d7a377182..f461196fc8f 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -17,10 +17,11 @@ import ( "context" "encoding/json" "fmt" - "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "strconv" "time" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" + "github.com/golang/glog" "github.com/golang/protobuf/ptypes/timestamp" "github.com/google/uuid" @@ -125,6 +126,8 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio err = fmt.Errorf("driver.RootDAG(%s) failed: %w", opts.info(), err) } }() + b, _ := json.Marshal(opts) + glog.V(4).Info("RootDAG opts: ", string(b)) err = validateRootDAG(opts) if err != nil { return nil, err @@ -230,6 +233,8 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl err = fmt.Errorf("driver.Container(%s) failed: %w", opts.info(), err) } }() + b, _ := json.Marshal(opts) + glog.V(4).Info("Container opts: ", string(b)) err = validateContainer(opts) if err != nil { return nil, err @@ -699,6 +704,8 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E err = fmt.Errorf("driver.DAG(%s) failed: %w", opts.info(), err) } }() + b, _ := json.Marshal(opts) + glog.V(4).Info("DAG opts: ", string(b)) err = validateDAG(opts) if err != nil { return nil, err @@ -749,6 +756,27 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E ecfg.ParentDagID = dag.Execution.GetID() ecfg.IterationIndex = iterationIndex ecfg.NotTriggered = !execution.WillTrigger() + + outputParameters := opts.Component.GetDag().GetOutputs().GetParameters() + glog.V(4).Info("outputParameters: ", outputParameters) + for _, value := range outputParameters { + outputParameterKey := value.GetValueFromParameter().OutputParameterKey + producerSubTask := value.GetValueFromParameter().ProducerSubtask + glog.V(4).Info("outputParameterKey: ", outputParameterKey) + glog.V(4).Info("producerSubtask: ", producerSubTask) + + outputParameterMap := map[string]interface{}{ + "output_parameter_key": outputParameterKey, + "producer_subtask": producerSubTask, + } + + outputParameterStruct, _ := structpb.NewValue(outputParameterMap) + + ecfg.OutputParameters = map[string]*structpb.Value{ + value.GetValueFromParameter().OutputParameterKey: outputParameterStruct, + } + } + if opts.Task.GetArtifactIterator() != nil { return execution, fmt.Errorf("ArtifactIterator is not implemented") } @@ -793,6 +821,12 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E ecfg.IterationCount = &count execution.IterationCount = &count } + + glog.V(4).Info("pipeline: ", pipeline) + b, _ = json.Marshal(*ecfg) + glog.V(4).Info("ecfg: ", string(b)) + glog.V(4).Infof("dag: %v", dag) + // TODO(Bobgy): change execution state to pending, because this is driver, execution hasn't started. createdExecution, err := mlmd.CreateExecution(ctx, pipeline, ecfg) if err != nil { @@ -939,6 +973,8 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, err = fmt.Errorf("failed to resolve inputs: %w", err) } }() + glog.V(4).Infof("dag: %v", dag) + glog.V(4).Infof("task: %v", task) inputParams, _, err := dag.Execution.GetParameters() if err != nil { return nil, err @@ -1112,10 +1148,31 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if err != nil { return nil, err } + // TODO: Make this recursive. + for _, v := range tasks { + if v.GetExecution().GetType() == "system.DAGExecution" { + glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) + dag, err := mlmd.GetDAG(ctx, v.GetExecution().GetId()) + if err != nil { + return nil, err + } + subdagTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline) + if err != nil { + return nil, err + } + for k, v := range subdagTasks { + tasks[k] = v + } + } + } tasksCache = tasks + return tasks, nil } + for name, paramSpec := range task.GetInputs().GetParameters() { + glog.V(4).Infof("name: %v", name) + glog.V(4).Infof("paramSpec: %v", paramSpec) paramError := func(err error) error { return fmt.Errorf("resolving input parameter %s with spec %s: %w", name, paramSpec, err) } @@ -1131,8 +1188,11 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, } inputs.ParameterValues[name] = v + // This is the case where we are consuming an output parameter from an + // upstream task. That task can be a container or a DAG. case *pipelinespec.TaskInputsSpec_InputParameterSpec_TaskOutputParameter: taskOutput := paramSpec.GetTaskOutputParameter() + glog.V(4).Info("taskOutput: ", taskOutput) if taskOutput.GetProducerTask() == "" { return nil, paramError(fmt.Errorf("producer task is empty")) } @@ -1143,19 +1203,56 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if err != nil { return nil, paramError(err) } + + // The producer is the task that produces the output that we need to + // consume. producer, ok := tasks[taskOutput.GetProducerTask()] - if !ok { - return nil, paramError(fmt.Errorf("cannot find producer task %q", taskOutput.GetProducerTask())) - } - _, outputs, err := producer.GetParameters() + + glog.V(4).Info("producer: ", producer) + + // Get the producer's outputs. + _, producerOutputs, err := producer.GetParameters() if err != nil { return nil, paramError(fmt.Errorf("get producer output parameters: %w", err)) } - param, ok := outputs[taskOutput.GetOutputParameterKey()] + glog.V(4).Info("producer output parameters: ", producerOutputs) + // Deserialize them. + var producerOutputsMap map[string]string + b, err := producerOutputs["Output"].GetStructValue().MarshalJSON() + if err != nil { + return nil, err + } + json.Unmarshal(b, &producerOutputsMap) + glog.V(4).Info("producerOutputsMap: ", producerOutputsMap) + + // If the producer's output includes a producer subtask, which means + // that the producer is a DAG that is getting its output from one of + // the tasks in the DAG, then we want to roll up the output from the + // producer subtask to the producer, so that the downstream logic + // can retrieve it appropriately. + if producerSubTask, ok := producerOutputsMap["producer_subtask"]; ok { + glog.V(4).Infof( + "Overriding producer task, %v, output with producer_subtask, %v, output.", + producer.TaskName(), + producerSubTask, + ) + _, producerOutputs, err = tasks[producerSubTask].GetParameters() + if err != nil { + return nil, err + } + glog.V(4).Info("producerSubTask output parameters: ", producerOutputs) + // The only reason we're updating this is to make the downstream + // logging more accurate. + taskOutput.ProducerTask = producerOutputsMap["producer_subtask"] + } + + // Grab the value of the producer output. + producerOutputValue, ok := producerOutputs[taskOutput.GetOutputParameterKey()] if !ok { return nil, paramError(fmt.Errorf("cannot find output parameter key %q in producer task %q", taskOutput.GetOutputParameterKey(), taskOutput.GetProducerTask())) } - inputs.ParameterValues[name] = param + // Update the input to be the producer output value. + inputs.ParameterValues[name] = producerOutputValue case *pipelinespec.TaskInputsSpec_InputParameterSpec_RuntimeValue: runtimeValue := paramSpec.GetRuntimeValue() switch t := runtimeValue.Value.(type) { diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index a292c1fe643..1a670c9d12e 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -21,14 +21,15 @@ import ( "encoding/json" "errors" "fmt" - "github.com/kubeflow/pipelines/backend/src/common/util" - "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "path" "strconv" "strings" "sync" "time" + "github.com/kubeflow/pipelines/backend/src/common/util" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" + "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/golang/glog" @@ -134,6 +135,7 @@ type ExecutionConfig struct { NotTriggered bool // optional, not triggered executions will have CANCELED state. ParentDagID int64 // parent DAG execution ID. Only the root DAG does not have a parent DAG. InputParameters map[string]*structpb.Value + OutputParameters map[string]*structpb.Value InputArtifactIDs map[string][]int64 IterationIndex *int // Index of the iteration. @@ -448,6 +450,8 @@ func getArtifactName(eventPath *pb.Event_Path) (string, error) { func (c *Client) PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error { e := execution.execution e.LastKnownState = state.Enum() + glog.V(4).Infof("outputParameters: %v", outputParameters) + glog.V(4).Infof("outputArtifacts: %v", outputArtifacts) if outputParameters != nil { // Record output parameters. @@ -576,7 +580,13 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config }, }} } - + if config.OutputParameters != nil { + e.CustomProperties[keyOutputs] = &pb.Value{Value: &pb.Value_StructValue{ + StructValue: &structpb.Struct{ + Fields: config.OutputParameters, + }, + }} + } req := &pb.PutExecutionRequest{ Execution: e, Contexts: []*pb.Context{pipeline.pipelineCtx, pipeline.pipelineRunCtx}, From 4c5d97e126c2718573eef8e0e91421fa193883c3 Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Wed, 11 Sep 2024 16:44:55 -0400 Subject: [PATCH 02/19] Add support for subdags of subdags Signed-off-by: droctothorpe Co-authored-by: zazulam Co-authored-by: CarterFendley --- backend/src/v2/driver/driver.go | 167 ++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 75 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index f461196fc8f..2f029939dc9 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -967,6 +967,44 @@ func validateNonRoot(opts Options) error { return nil } +// getDAGTasks gets all the tasks associated with the specified DAG and all of +// its subDAGs. +func getDAGTasks( + ctx context.Context, + dag *metadata.DAG, + pipeline *metadata.Pipeline, + mlmd *metadata.Client, + flattenedTasks map[string]*metadata.Execution, +) (map[string]*metadata.Execution, error) { + if flattenedTasks == nil { + flattenedTasks = make(map[string]*metadata.Execution) + } + currentExecutionTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline) + if err != nil { + return nil, err + } + for k, v := range currentExecutionTasks { + flattenedTasks[k] = v + } + for _, v := range currentExecutionTasks { + if v.GetExecution().GetType() == "system.DAGExecution" { + glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) + subDAG, err := mlmd.GetDAG(ctx, v.GetExecution().GetId()) + if err != nil { + return nil, err + } + // Pass the subDAG into a recursive call to getDAGTasks and update + // tasks to include the subDAG's tasks. + flattenedTasks, err = getDAGTasks(ctx, subDAG, pipeline, mlmd, flattenedTasks) + if err != nil { + return nil, err + } + } + } + + return flattenedTasks, nil +} + func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, pipeline *metadata.Pipeline, task *pipelinespec.PipelineTaskSpec, inputsSpec *pipelinespec.ComponentInputsSpec, mlmd *metadata.Client, expr *expression.Expr) (inputs *pipelinespec.ExecutorInput_Inputs, err error) { defer func() { if err != nil { @@ -1138,37 +1176,6 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, } return inputs, nil } - // get executions in context on demand - var tasksCache map[string]*metadata.Execution - getDAGTasks := func() (map[string]*metadata.Execution, error) { - if tasksCache != nil { - return tasksCache, nil - } - tasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline) - if err != nil { - return nil, err - } - // TODO: Make this recursive. - for _, v := range tasks { - if v.GetExecution().GetType() == "system.DAGExecution" { - glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) - dag, err := mlmd.GetDAG(ctx, v.GetExecution().GetId()) - if err != nil { - return nil, err - } - subdagTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline) - if err != nil { - return nil, err - } - for k, v := range subdagTasks { - tasks[k] = v - } - } - } - tasksCache = tasks - - return tasks, nil - } for name, paramSpec := range task.GetInputs().GetParameters() { glog.V(4).Infof("name: %v", name) @@ -1199,60 +1206,70 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if taskOutput.GetOutputParameterKey() == "" { return nil, paramError(fmt.Errorf("output parameter key is empty")) } - tasks, err := getDAGTasks() + tasks, err := getDAGTasks(ctx, dag, pipeline, mlmd, nil) if err != nil { return nil, paramError(err) } - // The producer is the task that produces the output that we need to - // consume. - producer, ok := tasks[taskOutput.GetProducerTask()] + // If the producer is a DAG, AND its output / producer subtask is + // ALSO a DAG, then we need to cycle through this loop until we + // arrive at a non-DAG subtask and essentially bubble up that + // non-DAG subtask so that its value can be consumed. + producerSubTaskMaybeDAG := true + for producerSubTaskMaybeDAG { + // The producer is the task that produces the output that we need to + // consume. + producer := tasks[taskOutput.GetProducerTask()] - glog.V(4).Info("producer: ", producer) + glog.V(4).Info("producer: ", producer) - // Get the producer's outputs. - _, producerOutputs, err := producer.GetParameters() - if err != nil { - return nil, paramError(fmt.Errorf("get producer output parameters: %w", err)) - } - glog.V(4).Info("producer output parameters: ", producerOutputs) - // Deserialize them. - var producerOutputsMap map[string]string - b, err := producerOutputs["Output"].GetStructValue().MarshalJSON() - if err != nil { - return nil, err - } - json.Unmarshal(b, &producerOutputsMap) - glog.V(4).Info("producerOutputsMap: ", producerOutputsMap) - - // If the producer's output includes a producer subtask, which means - // that the producer is a DAG that is getting its output from one of - // the tasks in the DAG, then we want to roll up the output from the - // producer subtask to the producer, so that the downstream logic - // can retrieve it appropriately. - if producerSubTask, ok := producerOutputsMap["producer_subtask"]; ok { - glog.V(4).Infof( - "Overriding producer task, %v, output with producer_subtask, %v, output.", - producer.TaskName(), - producerSubTask, - ) - _, producerOutputs, err = tasks[producerSubTask].GetParameters() + // Get the producer's outputs. + _, producerOutputs, err := producer.GetParameters() + if err != nil { + return nil, paramError(fmt.Errorf("get producer output parameters: %w", err)) + } + glog.V(4).Info("producer output parameters: ", producerOutputs) + // Deserialize them. + var producerOutputsMap map[string]string + b, err := producerOutputs["Output"].GetStructValue().MarshalJSON() if err != nil { return nil, err } - glog.V(4).Info("producerSubTask output parameters: ", producerOutputs) - // The only reason we're updating this is to make the downstream - // logging more accurate. - taskOutput.ProducerTask = producerOutputsMap["producer_subtask"] + json.Unmarshal(b, &producerOutputsMap) + glog.V(4).Info("producerOutputsMap: ", producerOutputsMap) + + // If the producer's output includes a producer subtask, which means + // that the producer is a DAG that is getting its output from one of + // the tasks in the DAG, then we want to roll up the output from the + // producer subtask to the producer, so that the downstream logic + // can retrieve it appropriately. + if producerSubTask, ok := producerOutputsMap["producer_subtask"]; ok { + glog.V(4).Infof( + "Overriding producer task, %v, output with producer_subtask, %v, output.", + producer.TaskName(), + producerSubTask, + ) + _, producerOutputs, err = tasks[producerSubTask].GetParameters() + if err != nil { + return nil, err + } + glog.V(4).Info("producerSubTask output parameters: ", producerOutputs) + // The only reason we're updating this is to make the downstream + // logging more accurate. + taskOutput.ProducerTask = producerOutputsMap["producer_subtask"] + // Grab the value of the producer output. + producerOutputValue, ok := producerOutputs[taskOutput.GetOutputParameterKey()] + if !ok { + return nil, paramError(fmt.Errorf("cannot find output parameter key %q in producer task %q", taskOutput.GetOutputParameterKey(), taskOutput.GetProducerTask())) + } + // Update the input to be the producer output value. + inputs.ParameterValues[name] = producerOutputValue + } else { + // The producer subtask is not a DAG, so we exit the loop. + producerSubTaskMaybeDAG = false + } } - // Grab the value of the producer output. - producerOutputValue, ok := producerOutputs[taskOutput.GetOutputParameterKey()] - if !ok { - return nil, paramError(fmt.Errorf("cannot find output parameter key %q in producer task %q", taskOutput.GetOutputParameterKey(), taskOutput.GetProducerTask())) - } - // Update the input to be the producer output value. - inputs.ParameterValues[name] = producerOutputValue case *pipelinespec.TaskInputsSpec_InputParameterSpec_RuntimeValue: runtimeValue := paramSpec.GetRuntimeValue() switch t := runtimeValue.Value.(type) { @@ -1292,7 +1309,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if taskOutput.GetOutputArtifactKey() == "" { return nil, artifactError(fmt.Errorf("output artifact key is empty")) } - tasks, err := getDAGTasks() + tasks, err := getDAGTasks(ctx, dag, pipeline, mlmd, nil) if err != nil { return nil, artifactError(err) } From 5235a84ef0538e600a8f73b487ea48a7dc749cad Mon Sep 17 00:00:00 2001 From: zazulam Date: Tue, 17 Sep 2024 09:58:39 -0400 Subject: [PATCH 03/19] handle edge case Signed-off-by: zazulam Co-authored-by: droctothorpe --- backend/src/v2/driver/driver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 2f029939dc9..31cde91d005 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1267,6 +1267,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, } else { // The producer subtask is not a DAG, so we exit the loop. producerSubTaskMaybeDAG = false + inputs.ParameterValues[name] = producerOutputs[taskOutput.GetOutputParameterKey()] } } From 1cefad2cf494f2d3ee017de0895a351076434ebb Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Fri, 13 Sep 2024 09:14:30 -0400 Subject: [PATCH 04/19] Handle artifact outputs as well Signed-off-by: droctothorpe Co-authored-by: zazulam Co-authored-by: CarterFendley Co-authored-by: edmondop --- backend/src/v2/cmd/driver/main.go | 1 + backend/src/v2/driver/driver.go | 100 +++++++++++++++++++++--------- backend/src/v2/metadata/client.go | 18 +++++- 3 files changed, 90 insertions(+), 29 deletions(-) diff --git a/backend/src/v2/cmd/driver/main.go b/backend/src/v2/cmd/driver/main.go index 793ccfe1b80..9437d889862 100644 --- a/backend/src/v2/cmd/driver/main.go +++ b/backend/src/v2/cmd/driver/main.go @@ -85,6 +85,7 @@ func init() { flag.Set("logtostderr", "true") // Change the WARNING to INFO level for debugging. flag.Set("stderrthreshold", "WARNING") + flag.Set("v", "4") } func validate() error { diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 31cde91d005..9415b6f35b0 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -757,6 +757,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E ecfg.IterationIndex = iterationIndex ecfg.NotTriggered = !execution.WillTrigger() + // Handle writing output parameters to MLMD. outputParameters := opts.Component.GetDag().GetOutputs().GetParameters() glog.V(4).Info("outputParameters: ", outputParameters) for _, value := range outputParameters { @@ -777,6 +778,11 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } } + // Handle writing output artifacts to MLMD. + outputArtifacts := opts.Component.GetDag().GetOutputs().GetArtifacts() + glog.V(4).Info("outputArtifacts: ", outputArtifacts) + ecfg.OutputArtifacts = outputArtifacts + if opts.Task.GetArtifactIterator() != nil { return execution, fmt.Errorf("ArtifactIterator is not implemented") } @@ -1177,6 +1183,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, return inputs, nil } + // Handle parameters. for name, paramSpec := range task.GetInputs().GetParameters() { glog.V(4).Infof("name: %v", name) glog.V(4).Infof("paramSpec: %v", paramSpec) @@ -1224,50 +1231,50 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, glog.V(4).Info("producer: ", producer) // Get the producer's outputs. - _, producerOutputs, err := producer.GetParameters() + _, producerOutputParameters, err := producer.GetParameters() if err != nil { return nil, paramError(fmt.Errorf("get producer output parameters: %w", err)) } - glog.V(4).Info("producer output parameters: ", producerOutputs) + glog.V(4).Info("producer output parameters: ", producerOutputParameters) // Deserialize them. - var producerOutputsMap map[string]string - b, err := producerOutputs["Output"].GetStructValue().MarshalJSON() + var producerOutputParametersMap map[string]string + b, err := producerOutputParameters["Output"].GetStructValue().MarshalJSON() if err != nil { return nil, err } - json.Unmarshal(b, &producerOutputsMap) - glog.V(4).Info("producerOutputsMap: ", producerOutputsMap) + json.Unmarshal(b, &producerOutputParametersMap) + glog.V(4).Info("producerOutputParametersMap: ", producerOutputParametersMap) // If the producer's output includes a producer subtask, which means // that the producer is a DAG that is getting its output from one of // the tasks in the DAG, then we want to roll up the output from the // producer subtask to the producer, so that the downstream logic // can retrieve it appropriately. - if producerSubTask, ok := producerOutputsMap["producer_subtask"]; ok { + if producerSubTask, ok := producerOutputParametersMap["producer_subtask"]; ok { glog.V(4).Infof( "Overriding producer task, %v, output with producer_subtask, %v, output.", producer.TaskName(), producerSubTask, ) - _, producerOutputs, err = tasks[producerSubTask].GetParameters() + _, producerOutputParameters, err = tasks[producerSubTask].GetParameters() if err != nil { return nil, err } - glog.V(4).Info("producerSubTask output parameters: ", producerOutputs) + glog.V(4).Info("producerSubTask output parameters: ", producerOutputParameters) // The only reason we're updating this is to make the downstream // logging more accurate. - taskOutput.ProducerTask = producerOutputsMap["producer_subtask"] + taskOutput.ProducerTask = producerOutputParametersMap["producer_subtask"] // Grab the value of the producer output. - producerOutputValue, ok := producerOutputs[taskOutput.GetOutputParameterKey()] + producerOutputParameterValue, ok := producerOutputParameters[taskOutput.GetOutputParameterKey()] if !ok { return nil, paramError(fmt.Errorf("cannot find output parameter key %q in producer task %q", taskOutput.GetOutputParameterKey(), taskOutput.GetProducerTask())) } // Update the input to be the producer output value. - inputs.ParameterValues[name] = producerOutputValue + inputs.ParameterValues[name] = producerOutputParameterValue } else { // The producer subtask is not a DAG, so we exit the loop. producerSubTaskMaybeDAG = false - inputs.ParameterValues[name] = producerOutputs[taskOutput.GetOutputParameterKey()] + inputs.ParameterValues[name] = producerOutputParameters[taskOutput.GetOutputParameterKey()] } } @@ -1286,6 +1293,8 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, return nil, paramError(fmt.Errorf("parameter spec of type %T not implemented yet", t)) } } + + // Handle artifacts. for name, artifactSpec := range task.GetInputs().GetArtifacts() { artifactError := func(err error) error { return fmt.Errorf("failed to resolve input artifact %s with spec %s: %w", name, artifactSpec, err) @@ -1314,25 +1323,60 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if err != nil { return nil, artifactError(err) } + producer, ok := tasks[taskOutput.GetProducerTask()] if !ok { return nil, artifactError(fmt.Errorf("cannot find producer task %q", taskOutput.GetProducerTask())) } - // TODO(Bobgy): cache results - outputs, err := mlmd.GetOutputArtifactsByExecutionId(ctx, producer.GetID()) - if err != nil { - return nil, artifactError(err) - } - artifact, ok := outputs[taskOutput.GetOutputArtifactKey()] - if !ok { - return nil, artifactError(fmt.Errorf("cannot find output artifact key %q in producer task %q", taskOutput.GetOutputArtifactKey(), taskOutput.GetProducerTask())) - } - runtimeArtifact, err := artifact.ToRuntimeArtifact() - if err != nil { - return nil, artifactError(err) - } - inputs.Artifacts[name] = &pipelinespec.ArtifactList{ - Artifacts: []*pipelinespec.RuntimeArtifact{runtimeArtifact}, + glog.V(4).Info("producer: ", producer) + currentTask := producer + var outputArtifactKey string + currentSubTaskMaybeDAG := true + for currentSubTaskMaybeDAG { + // If the current task is a DAG: + glog.V(4).Info("currentTask: ", currentTask.TaskName()) + if *currentTask.GetExecution().Type == "system.DAGExecution" { + // Get the sub-task. + outputArtifactsCustomProperty := currentTask.GetExecution().GetCustomProperties()["output_artifacts"] + // Deserialize the output artifacts. + var outputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec + err := json.Unmarshal([]byte(outputArtifactsCustomProperty.GetStringValue()), &outputArtifacts) + if err != nil { + return nil, err + } + glog.V(4).Infof("Deserialized outputArtifacts: %v", outputArtifacts) + artifactSelectors := outputArtifacts["Output"].GetArtifactSelectors() + // TODO: Add support for multiple output artifacts. + subTaskName := artifactSelectors[0].ProducerSubtask + outputArtifactKey = artifactSelectors[0].OutputArtifactKey + glog.V(4).Info("subTaskName: ", subTaskName) + glog.V(4).Info("outputArtifactKey: ", outputArtifactKey) + currentSubTask := tasks[subTaskName] + // If the sub-task is a DAG, reassign currentTask and run + // through the loop again. + currentTask = currentSubTask + // } + } else { + // Base case, subtask is a container, not a DAG. + outputs, err := mlmd.GetOutputArtifactsByExecutionId(ctx, currentTask.GetID()) + if err != nil { + return nil, artifactError(err) + } + glog.V(4).Infof("outputs: %#v", outputs) + artifact, ok := outputs[outputArtifactKey] + if !ok { + return nil, artifactError(fmt.Errorf("cannot find output artifact key %q in producer task %q", taskOutput.GetOutputArtifactKey(), taskOutput.GetProducerTask())) + } + runtimeArtifact, err := artifact.ToRuntimeArtifact() + if err != nil { + return nil, artifactError(err) + } + inputs.Artifacts[name] = &pipelinespec.ArtifactList{ + Artifacts: []*pipelinespec.RuntimeArtifact{runtimeArtifact}, + } + // Since we are in the base case, escape the loop. + currentSubTaskMaybeDAG = false + } } default: return nil, artifactError(fmt.Errorf("artifact spec of type %T not implemented yet", t)) diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index 1a670c9d12e..c6729cb67b5 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -136,6 +136,9 @@ type ExecutionConfig struct { ParentDagID int64 // parent DAG execution ID. Only the root DAG does not have a parent DAG. InputParameters map[string]*structpb.Value OutputParameters map[string]*structpb.Value + // OutputArtifacts map[string]*structpb.Value + // OutputArtifacts []*pipelinespec.DagOutputsSpec_ArtifactSelectorSpec + OutputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec InputArtifactIDs map[string][]int64 IterationIndex *int // Index of the iteration. @@ -516,7 +519,8 @@ const ( keyCacheFingerPrint = "cache_fingerprint" keyCachedExecutionID = "cached_execution_id" keyInputs = "inputs" - keyOutputs = "outputs" + keyOutputs = "outputs" // TODO: Consider renaming this to output_parameters to be consistent. + keyOutputArtifacts = "output_artifacts" keyParentDagID = "parent_dag_id" // Parent DAG Execution ID. keyIterationIndex = "iteration_index" keyIterationCount = "iteration_count" @@ -580,6 +584,10 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config }, }} } + // We save the output parameter and output artifact relationships in MLMD in + // case they're provided by a sub-task so that we can follow the + // relationships and retrieve outputs downstream in components that depend + // on said outputs as inputs. if config.OutputParameters != nil { e.CustomProperties[keyOutputs] = &pb.Value{Value: &pb.Value_StructValue{ StructValue: &structpb.Struct{ @@ -587,6 +595,14 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config }, }} } + if config.OutputArtifacts != nil { + b, err := json.Marshal(config.OutputArtifacts) + if err != nil { + return nil, err + } + e.CustomProperties[keyOutputArtifacts] = StringValue(string(b)) + } + req := &pb.PutExecutionRequest{ Execution: e, Contexts: []*pb.Context{pipeline.pipelineCtx, pipeline.pipelineRunCtx}, From 87914caa05225f62c4dccff9c7bd290008d59bce Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Fri, 13 Sep 2024 09:14:30 -0400 Subject: [PATCH 05/19] Simplify parameter handling logic Signed-off-by: droctothorpe Co-authored-by: zazulam --- backend/src/v2/driver/driver.go | 84 +++++++++++++------------------ backend/src/v2/metadata/client.go | 2 - 2 files changed, 34 insertions(+), 52 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 9415b6f35b0..1539748dfd0 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1218,63 +1218,48 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, return nil, paramError(err) } + // The producer is the task that produces the output that we need to + // consume. + producer := tasks[taskOutput.GetProducerTask()] + glog.V(4).Info("producer: ", producer) + currentTask := producer // If the producer is a DAG, AND its output / producer subtask is // ALSO a DAG, then we need to cycle through this loop until we // arrive at a non-DAG subtask and essentially bubble up that // non-DAG subtask so that its value can be consumed. - producerSubTaskMaybeDAG := true - for producerSubTaskMaybeDAG { - // The producer is the task that produces the output that we need to - // consume. - producer := tasks[taskOutput.GetProducerTask()] - - glog.V(4).Info("producer: ", producer) - - // Get the producer's outputs. - _, producerOutputParameters, err := producer.GetParameters() - if err != nil { - return nil, paramError(fmt.Errorf("get producer output parameters: %w", err)) - } - glog.V(4).Info("producer output parameters: ", producerOutputParameters) - // Deserialize them. - var producerOutputParametersMap map[string]string - b, err := producerOutputParameters["Output"].GetStructValue().MarshalJSON() + currentSubTaskMaybeDAG := true + for currentSubTaskMaybeDAG { + glog.V(4).Info("currentTask: ", currentTask.TaskName()) + _, outputParametersCustomProperty, err := currentTask.GetParameters() if err != nil { return nil, err } - json.Unmarshal(b, &producerOutputParametersMap) - glog.V(4).Info("producerOutputParametersMap: ", producerOutputParametersMap) - - // If the producer's output includes a producer subtask, which means - // that the producer is a DAG that is getting its output from one of - // the tasks in the DAG, then we want to roll up the output from the - // producer subtask to the producer, so that the downstream logic - // can retrieve it appropriately. - if producerSubTask, ok := producerOutputParametersMap["producer_subtask"]; ok { - glog.V(4).Infof( - "Overriding producer task, %v, output with producer_subtask, %v, output.", - producer.TaskName(), - producerSubTask, - ) - _, producerOutputParameters, err = tasks[producerSubTask].GetParameters() + // If the current task is a DAG: + if *currentTask.GetExecution().Type == "system.DAGExecution" { + // Since currentTask is a DAG, we need to deserialize its + // output parameter map so that we can look its + // corresponding producer sub-task, reassign currentTask, + // and iterate through this loop again. + var outputParametersMap map[string]string + b, err := outputParametersCustomProperty["Output"].GetStructValue().MarshalJSON() if err != nil { return nil, err } - glog.V(4).Info("producerSubTask output parameters: ", producerOutputParameters) - // The only reason we're updating this is to make the downstream - // logging more accurate. - taskOutput.ProducerTask = producerOutputParametersMap["producer_subtask"] - // Grab the value of the producer output. - producerOutputParameterValue, ok := producerOutputParameters[taskOutput.GetOutputParameterKey()] - if !ok { - return nil, paramError(fmt.Errorf("cannot find output parameter key %q in producer task %q", taskOutput.GetOutputParameterKey(), taskOutput.GetProducerTask())) - } - // Update the input to be the producer output value. - inputs.ParameterValues[name] = producerOutputParameterValue + json.Unmarshal(b, &outputParametersMap) + glog.V(4).Info("Deserialized outputParametersMap: ", outputParametersMap) + subTaskName := outputParametersMap["producer_subtask"] + glog.V(4).Infof( + "Overriding currentTask, %v, output with currentTask's producer_subtask, %v, output.", + currentTask.TaskName(), + subTaskName, + ) + + // Reassign sub-task before running through the loop again. + currentTask = tasks[subTaskName] } else { - // The producer subtask is not a DAG, so we exit the loop. - producerSubTaskMaybeDAG = false - inputs.ParameterValues[name] = producerOutputParameters[taskOutput.GetOutputParameterKey()] + inputs.ParameterValues[name] = outputParametersCustomProperty[taskOutput.GetOutputParameterKey()] + // Exit the loop. + currentSubTaskMaybeDAG = false } } @@ -1333,8 +1318,8 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, var outputArtifactKey string currentSubTaskMaybeDAG := true for currentSubTaskMaybeDAG { - // If the current task is a DAG: glog.V(4).Info("currentTask: ", currentTask.TaskName()) + // If the current task is a DAG: if *currentTask.GetExecution().Type == "system.DAGExecution" { // Get the sub-task. outputArtifactsCustomProperty := currentTask.GetExecution().GetCustomProperties()["output_artifacts"] @@ -1351,13 +1336,12 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, outputArtifactKey = artifactSelectors[0].OutputArtifactKey glog.V(4).Info("subTaskName: ", subTaskName) glog.V(4).Info("outputArtifactKey: ", outputArtifactKey) - currentSubTask := tasks[subTaskName] // If the sub-task is a DAG, reassign currentTask and run // through the loop again. - currentTask = currentSubTask + currentTask = tasks[subTaskName] // } } else { - // Base case, subtask is a container, not a DAG. + // Base case, currentTask is a container, not a DAG. outputs, err := mlmd.GetOutputArtifactsByExecutionId(ctx, currentTask.GetID()) if err != nil { return nil, artifactError(err) diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index c6729cb67b5..c11d5f356f0 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -136,8 +136,6 @@ type ExecutionConfig struct { ParentDagID int64 // parent DAG execution ID. Only the root DAG does not have a parent DAG. InputParameters map[string]*structpb.Value OutputParameters map[string]*structpb.Value - // OutputArtifacts map[string]*structpb.Value - // OutputArtifacts []*pipelinespec.DagOutputsSpec_ArtifactSelectorSpec OutputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec InputArtifactIDs map[string][]int64 IterationIndex *int // Index of the iteration. From e3baa895ee3bed284a6596d9d200f3ec48bb773e Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Thu, 19 Sep 2024 11:58:53 -0400 Subject: [PATCH 06/19] Begin decomposition Signed-off-by: droctothorpe --- backend/src/v2/driver/driver.go | 387 +++++++++++++++++++------------- 1 file changed, 231 insertions(+), 156 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 1539748dfd0..493d1e15c1e 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -973,44 +973,6 @@ func validateNonRoot(opts Options) error { return nil } -// getDAGTasks gets all the tasks associated with the specified DAG and all of -// its subDAGs. -func getDAGTasks( - ctx context.Context, - dag *metadata.DAG, - pipeline *metadata.Pipeline, - mlmd *metadata.Client, - flattenedTasks map[string]*metadata.Execution, -) (map[string]*metadata.Execution, error) { - if flattenedTasks == nil { - flattenedTasks = make(map[string]*metadata.Execution) - } - currentExecutionTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline) - if err != nil { - return nil, err - } - for k, v := range currentExecutionTasks { - flattenedTasks[k] = v - } - for _, v := range currentExecutionTasks { - if v.GetExecution().GetType() == "system.DAGExecution" { - glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) - subDAG, err := mlmd.GetDAG(ctx, v.GetExecution().GetId()) - if err != nil { - return nil, err - } - // Pass the subDAG into a recursive call to getDAGTasks and update - // tasks to include the subDAG's tasks. - flattenedTasks, err = getDAGTasks(ctx, subDAG, pipeline, mlmd, flattenedTasks) - if err != nil { - return nil, err - } - } - } - - return flattenedTasks, nil -} - func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, pipeline *metadata.Pipeline, task *pipelinespec.PipelineTaskSpec, inputsSpec *pipelinespec.ComponentInputsSpec, mlmd *metadata.Client, expr *expression.Expr) (inputs *pipelinespec.ExecutorInput_Inputs, err error) { defer func() { if err != nil { @@ -1202,65 +1164,20 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, } inputs.ParameterValues[name] = v - // This is the case where we are consuming an output parameter from an - // upstream task. That task can be a container or a DAG. + // This is the case where the input comes from the output of an upstream task. case *pipelinespec.TaskInputsSpec_InputParameterSpec_TaskOutputParameter: - taskOutput := paramSpec.GetTaskOutputParameter() - glog.V(4).Info("taskOutput: ", taskOutput) - if taskOutput.GetProducerTask() == "" { - return nil, paramError(fmt.Errorf("producer task is empty")) + cfg := resolveUpstreamParametersConfig{ + ctx: ctx, + paramSpec: paramSpec, + dag: dag, + pipeline: pipeline, + mlmd: mlmd, + inputs: inputs, + name: name, + paramError: paramError, } - if taskOutput.GetOutputParameterKey() == "" { - return nil, paramError(fmt.Errorf("output parameter key is empty")) - } - tasks, err := getDAGTasks(ctx, dag, pipeline, mlmd, nil) - if err != nil { - return nil, paramError(err) - } - - // The producer is the task that produces the output that we need to - // consume. - producer := tasks[taskOutput.GetProducerTask()] - glog.V(4).Info("producer: ", producer) - currentTask := producer - // If the producer is a DAG, AND its output / producer subtask is - // ALSO a DAG, then we need to cycle through this loop until we - // arrive at a non-DAG subtask and essentially bubble up that - // non-DAG subtask so that its value can be consumed. - currentSubTaskMaybeDAG := true - for currentSubTaskMaybeDAG { - glog.V(4).Info("currentTask: ", currentTask.TaskName()) - _, outputParametersCustomProperty, err := currentTask.GetParameters() - if err != nil { - return nil, err - } - // If the current task is a DAG: - if *currentTask.GetExecution().Type == "system.DAGExecution" { - // Since currentTask is a DAG, we need to deserialize its - // output parameter map so that we can look its - // corresponding producer sub-task, reassign currentTask, - // and iterate through this loop again. - var outputParametersMap map[string]string - b, err := outputParametersCustomProperty["Output"].GetStructValue().MarshalJSON() - if err != nil { - return nil, err - } - json.Unmarshal(b, &outputParametersMap) - glog.V(4).Info("Deserialized outputParametersMap: ", outputParametersMap) - subTaskName := outputParametersMap["producer_subtask"] - glog.V(4).Infof( - "Overriding currentTask, %v, output with currentTask's producer_subtask, %v, output.", - currentTask.TaskName(), - subTaskName, - ) - - // Reassign sub-task before running through the loop again. - currentTask = tasks[subTaskName] - } else { - inputs.ParameterValues[name] = outputParametersCustomProperty[taskOutput.GetOutputParameterKey()] - // Exit the loop. - currentSubTaskMaybeDAG = false - } + if err := resolveUpstreamParameters(cfg); err != nil { + return nil, err } case *pipelinespec.TaskInputsSpec_InputParameterSpec_RuntimeValue: @@ -1297,77 +1214,235 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, inputs.Artifacts[name] = v case *pipelinespec.TaskInputsSpec_InputArtifactSpec_TaskOutputArtifact: - taskOutput := artifactSpec.GetTaskOutputArtifact() - if taskOutput.GetProducerTask() == "" { - return nil, artifactError(fmt.Errorf("producer task is empty")) + cfg := resolveUpstreamArtifactsConfig{ + ctx: ctx, + artifactSpec: artifactSpec, + dag: dag, + pipeline: pipeline, + mlmd: mlmd, + inputs: inputs, + name: name, + artifactError: artifactError, } - if taskOutput.GetOutputArtifactKey() == "" { - return nil, artifactError(fmt.Errorf("output artifact key is empty")) + if err := resolveUpstreamArtifacts(cfg); err != nil { + return nil, err } - tasks, err := getDAGTasks(ctx, dag, pipeline, mlmd, nil) + default: + return nil, artifactError(fmt.Errorf("artifact spec of type %T not implemented yet", t)) + } + } + // TODO(Bobgy): validate executor inputs match component inputs definition + return inputs, nil +} + +// resolveUpstreamParametersConfig is just a config struct used to store the +// input parameters of the resolveUpstreamParameters function. +type resolveUpstreamParametersConfig struct { + ctx context.Context + paramSpec *pipelinespec.TaskInputsSpec_InputParameterSpec + dag *metadata.DAG + pipeline *metadata.Pipeline + mlmd *metadata.Client + inputs *pipelinespec.ExecutorInput_Inputs + name string + paramError func(error) error +} + +// resolveUpstreamParameters resolves input parameters that come from upstream +// tasks. These tasks can be components/containers, which is relatively +// straightforward, or DAGs, in which case, we need to traverse the graph until +// we arrive at a component/container (since there can be n nested DAGs). +func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { + taskOutput := cfg.paramSpec.GetTaskOutputParameter() + glog.V(4).Info("taskOutput: ", taskOutput) + if taskOutput.GetProducerTask() == "" { + return cfg.paramError(fmt.Errorf("producer task is empty")) + } + if taskOutput.GetOutputParameterKey() == "" { + return cfg.paramError(fmt.Errorf("output parameter key is empty")) + } + tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil) + if err != nil { + return cfg.paramError(err) + } + + // The producer is the task that produces the output that we need to + // consume. + producer := tasks[taskOutput.GetProducerTask()] + glog.V(4).Info("producer: ", producer) + currentTask := producer + currentSubTaskMaybeDAG := true + // Continue looping until we reach a sub-task that is NOT a DAG. + for currentSubTaskMaybeDAG { + glog.V(4).Info("currentTask: ", currentTask.TaskName()) + _, outputParametersCustomProperty, err := currentTask.GetParameters() + if err != nil { + return err + } + // If the current task is a DAG: + if *currentTask.GetExecution().Type == "system.DAGExecution" { + // Since currentTask is a DAG, we need to deserialize its + // output parameter map so that we can look its + // corresponding producer sub-task, reassign currentTask, + // and iterate through this loop again. + var outputParametersMap map[string]string + b, err := outputParametersCustomProperty["Output"].GetStructValue().MarshalJSON() if err != nil { - return nil, artifactError(err) + return err } + json.Unmarshal(b, &outputParametersMap) + glog.V(4).Info("Deserialized outputParametersMap: ", outputParametersMap) + subTaskName := outputParametersMap["producer_subtask"] + glog.V(4).Infof( + "Overriding currentTask, %v, output with currentTask's producer_subtask, %v, output.", + currentTask.TaskName(), + subTaskName, + ) + + // Reassign sub-task before running through the loop again. + currentTask = tasks[subTaskName] + } else { + cfg.inputs.ParameterValues[cfg.name] = outputParametersCustomProperty[taskOutput.GetOutputParameterKey()] + // Exit the loop. + currentSubTaskMaybeDAG = false + } + } + + return nil +} + +// resolveUpstreamArtifactsConfig is just a config struct used to store the +// input parameters of the resolveUpstreamArtifacts function. +type resolveUpstreamArtifactsConfig struct { + ctx context.Context + artifactSpec *pipelinespec.TaskInputsSpec_InputArtifactSpec + dag *metadata.DAG + pipeline *metadata.Pipeline + mlmd *metadata.Client + inputs *pipelinespec.ExecutorInput_Inputs + name string + artifactError func(error) error +} - producer, ok := tasks[taskOutput.GetProducerTask()] +// resolveUpstreamArtifacts resolves input artifacts that come from upstream +// tasks. These tasks can be components/containers, which is relatively +// straightforward, or DAGs, in which case, we need to traverse the graph until +// we arrive at a component/container (since there can be n nested DAGs). +func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { + taskOutput := cfg.artifactSpec.GetTaskOutputArtifact() + if taskOutput.GetProducerTask() == "" { + return cfg.artifactError(fmt.Errorf("producer task is empty")) + } + if taskOutput.GetOutputArtifactKey() == "" { + cfg.artifactError(fmt.Errorf("output artifact key is empty")) + } + tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil) + if err != nil { + cfg.artifactError(err) + } + + producer, ok := tasks[taskOutput.GetProducerTask()] + if !ok { + cfg.artifactError( + fmt.Errorf("cannot find producer task %q", taskOutput.GetProducerTask()), + ) + } + glog.V(4).Info("producer: ", producer) + currentTask := producer + var outputArtifactKey string + currentSubTaskMaybeDAG := true + // Continue looping until we reach a sub-task that is NOT a DAG. + for currentSubTaskMaybeDAG { + glog.V(4).Info("currentTask: ", currentTask.TaskName()) + // If the current task is a DAG: + if *currentTask.GetExecution().Type == "system.DAGExecution" { + // Get the sub-task. + outputArtifactsCustomProperty := currentTask.GetExecution().GetCustomProperties()["output_artifacts"] + // Deserialize the output artifacts. + var outputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec + err := json.Unmarshal([]byte(outputArtifactsCustomProperty.GetStringValue()), &outputArtifacts) + if err != nil { + return err + } + glog.V(4).Infof("Deserialized outputArtifacts: %v", outputArtifacts) + artifactSelectors := outputArtifacts["Output"].GetArtifactSelectors() + // TODO: Add support for multiple output artifacts. + subTaskName := artifactSelectors[0].ProducerSubtask + outputArtifactKey = artifactSelectors[0].OutputArtifactKey + glog.V(4).Info("subTaskName: ", subTaskName) + glog.V(4).Info("outputArtifactKey: ", outputArtifactKey) + // If the sub-task is a DAG, reassign currentTask and run + // through the loop again. + currentTask = tasks[subTaskName] + // } + } else { + // Base case, currentTask is a container, not a DAG. + outputs, err := cfg.mlmd.GetOutputArtifactsByExecutionId(cfg.ctx, currentTask.GetID()) + if err != nil { + cfg.artifactError(err) + } + glog.V(4).Infof("outputs: %#v", outputs) + artifact, ok := outputs[outputArtifactKey] if !ok { - return nil, artifactError(fmt.Errorf("cannot find producer task %q", taskOutput.GetProducerTask())) + cfg.artifactError( + fmt.Errorf( + "cannot find output artifact key %q in producer task %q", + taskOutput.GetOutputArtifactKey(), + taskOutput.GetProducerTask(), + ), + ) } - glog.V(4).Info("producer: ", producer) - currentTask := producer - var outputArtifactKey string - currentSubTaskMaybeDAG := true - for currentSubTaskMaybeDAG { - glog.V(4).Info("currentTask: ", currentTask.TaskName()) - // If the current task is a DAG: - if *currentTask.GetExecution().Type == "system.DAGExecution" { - // Get the sub-task. - outputArtifactsCustomProperty := currentTask.GetExecution().GetCustomProperties()["output_artifacts"] - // Deserialize the output artifacts. - var outputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec - err := json.Unmarshal([]byte(outputArtifactsCustomProperty.GetStringValue()), &outputArtifacts) - if err != nil { - return nil, err - } - glog.V(4).Infof("Deserialized outputArtifacts: %v", outputArtifacts) - artifactSelectors := outputArtifacts["Output"].GetArtifactSelectors() - // TODO: Add support for multiple output artifacts. - subTaskName := artifactSelectors[0].ProducerSubtask - outputArtifactKey = artifactSelectors[0].OutputArtifactKey - glog.V(4).Info("subTaskName: ", subTaskName) - glog.V(4).Info("outputArtifactKey: ", outputArtifactKey) - // If the sub-task is a DAG, reassign currentTask and run - // through the loop again. - currentTask = tasks[subTaskName] - // } - } else { - // Base case, currentTask is a container, not a DAG. - outputs, err := mlmd.GetOutputArtifactsByExecutionId(ctx, currentTask.GetID()) - if err != nil { - return nil, artifactError(err) - } - glog.V(4).Infof("outputs: %#v", outputs) - artifact, ok := outputs[outputArtifactKey] - if !ok { - return nil, artifactError(fmt.Errorf("cannot find output artifact key %q in producer task %q", taskOutput.GetOutputArtifactKey(), taskOutput.GetProducerTask())) - } - runtimeArtifact, err := artifact.ToRuntimeArtifact() - if err != nil { - return nil, artifactError(err) - } - inputs.Artifacts[name] = &pipelinespec.ArtifactList{ - Artifacts: []*pipelinespec.RuntimeArtifact{runtimeArtifact}, - } - // Since we are in the base case, escape the loop. - currentSubTaskMaybeDAG = false - } + runtimeArtifact, err := artifact.ToRuntimeArtifact() + if err != nil { + cfg.artifactError(err) } - default: - return nil, artifactError(fmt.Errorf("artifact spec of type %T not implemented yet", t)) + cfg.inputs.Artifacts[cfg.name] = &pipelinespec.ArtifactList{ + Artifacts: []*pipelinespec.RuntimeArtifact{runtimeArtifact}, + } + // Since we are in the base case, escape the loop. + currentSubTaskMaybeDAG = false } } - // TODO(Bobgy): validate executor inputs match component inputs definition - return inputs, nil + + return nil +} + +// getDAGTasks gets all the tasks associated with the specified DAG and all of +// its subDAGs. +func getDAGTasks( + ctx context.Context, + dag *metadata.DAG, + pipeline *metadata.Pipeline, + mlmd *metadata.Client, + flattenedTasks map[string]*metadata.Execution, +) (map[string]*metadata.Execution, error) { + if flattenedTasks == nil { + flattenedTasks = make(map[string]*metadata.Execution) + } + currentExecutionTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline) + if err != nil { + return nil, err + } + for k, v := range currentExecutionTasks { + flattenedTasks[k] = v + } + for _, v := range currentExecutionTasks { + if v.GetExecution().GetType() == "system.DAGExecution" { + glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) + subDAG, err := mlmd.GetDAG(ctx, v.GetExecution().GetId()) + if err != nil { + return nil, err + } + // Pass the subDAG into a recursive call to getDAGTasks and update + // tasks to include the subDAG's tasks. + flattenedTasks, err = getDAGTasks(ctx, subDAG, pipeline, mlmd, flattenedTasks) + if err != nil { + return nil, err + } + } + } + + return flattenedTasks, nil } func provisionOutputs(pipelineRoot, taskName string, outputsSpec *pipelinespec.ComponentOutputsSpec, outputUriSalt string) *pipelinespec.ExecutorInput_Outputs { From 2958960b154d2603182ea360d150bc15ab67ddda Mon Sep 17 00:00:00 2001 From: Tyler Kalbach Date: Mon, 23 Sep 2024 22:56:56 -0400 Subject: [PATCH 07/19] Add support for multiple artifacts and params Signed-off-by: Tyler Kalbach --- backend/src/v2/driver/driver.go | 39 ++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 493d1e15c1e..5c180da8376 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -760,9 +760,10 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E // Handle writing output parameters to MLMD. outputParameters := opts.Component.GetDag().GetOutputs().GetParameters() glog.V(4).Info("outputParameters: ", outputParameters) - for _, value := range outputParameters { - outputParameterKey := value.GetValueFromParameter().OutputParameterKey - producerSubTask := value.GetValueFromParameter().ProducerSubtask + ecfg.OutputParameters = make(map[string]*structpb.Value) + for name, value := range outputParameters { + outputParameterKey := value.GetValueFromParameter().GetOutputParameterKey() + producerSubTask := value.GetValueFromParameter().GetProducerSubtask() glog.V(4).Info("outputParameterKey: ", outputParameterKey) glog.V(4).Info("producerSubtask: ", producerSubTask) @@ -773,9 +774,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E outputParameterStruct, _ := structpb.NewValue(outputParameterMap) - ecfg.OutputParameters = map[string]*structpb.Value{ - value.GetValueFromParameter().OutputParameterKey: outputParameterStruct, - } + ecfg.OutputParameters[name] = outputParameterStruct } // Handle writing output artifacts to MLMD. @@ -1198,6 +1197,8 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, // Handle artifacts. for name, artifactSpec := range task.GetInputs().GetArtifacts() { + glog.V(4).Infof("inputs: %#v", task.GetInputs()) + glog.V(4).Infof("artifacts: %#v", task.GetInputs().GetArtifacts()) artifactError := func(err error) error { return fmt.Errorf("failed to resolve input artifact %s with spec %s: %w", name, artifactSpec, err) } @@ -1269,6 +1270,7 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { // The producer is the task that produces the output that we need to // consume. producer := tasks[taskOutput.GetProducerTask()] + outputParameterKey := taskOutput.GetOutputParameterKey() glog.V(4).Info("producer: ", producer) currentTask := producer currentSubTaskMaybeDAG := true @@ -1286,13 +1288,14 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { // corresponding producer sub-task, reassign currentTask, // and iterate through this loop again. var outputParametersMap map[string]string - b, err := outputParametersCustomProperty["Output"].GetStructValue().MarshalJSON() + b, err := outputParametersCustomProperty[outputParameterKey].GetStructValue().MarshalJSON() if err != nil { return err } json.Unmarshal(b, &outputParametersMap) glog.V(4).Info("Deserialized outputParametersMap: ", outputParametersMap) subTaskName := outputParametersMap["producer_subtask"] + outputParameterKey = outputParametersMap["output_parameter_key"] glog.V(4).Infof( "Overriding currentTask, %v, output with currentTask's producer_subtask, %v, output.", currentTask.TaskName(), @@ -1302,7 +1305,7 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { // Reassign sub-task before running through the loop again. currentTask = tasks[subTaskName] } else { - cfg.inputs.ParameterValues[cfg.name] = outputParametersCustomProperty[taskOutput.GetOutputParameterKey()] + cfg.inputs.ParameterValues[cfg.name] = outputParametersCustomProperty[outputParameterKey] // Exit the loop. currentSubTaskMaybeDAG = false } @@ -1329,6 +1332,7 @@ type resolveUpstreamArtifactsConfig struct { // straightforward, or DAGs, in which case, we need to traverse the graph until // we arrive at a component/container (since there can be n nested DAGs). func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { + glog.V(4).Infof("artifactSpec: %#v", cfg.artifactSpec) taskOutput := cfg.artifactSpec.GetTaskOutputArtifact() if taskOutput.GetProducerTask() == "" { return cfg.artifactError(fmt.Errorf("producer task is empty")) @@ -1349,7 +1353,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { } glog.V(4).Info("producer: ", producer) currentTask := producer - var outputArtifactKey string + var outputArtifactKey string = taskOutput.GetOutputArtifactKey() currentSubTaskMaybeDAG := true // Continue looping until we reach a sub-task that is NOT a DAG. for currentSubTaskMaybeDAG { @@ -1365,12 +1369,17 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { return err } glog.V(4).Infof("Deserialized outputArtifacts: %v", outputArtifacts) - artifactSelectors := outputArtifacts["Output"].GetArtifactSelectors() - // TODO: Add support for multiple output artifacts. - subTaskName := artifactSelectors[0].ProducerSubtask - outputArtifactKey = artifactSelectors[0].OutputArtifactKey - glog.V(4).Info("subTaskName: ", subTaskName) - glog.V(4).Info("outputArtifactKey: ", outputArtifactKey) + // Adding support for multiple output artifacts + var subTaskName string + value := outputArtifacts[outputArtifactKey].GetArtifactSelectors() + + for _, v := range value { + glog.V(4).Infof("v: %v", v) + glog.V(4).Infof("v.ProducerSubtask: %v", v.ProducerSubtask) + glog.V(4).Infof("v.OutputArtifactKey: %v", v.OutputArtifactKey) + subTaskName = v.ProducerSubtask + outputArtifactKey = v.OutputArtifactKey + } // If the sub-task is a DAG, reassign currentTask and run // through the loop again. currentTask = tasks[subTaskName] From 7c66a33cf7069f7e9e6b64f8dbbf76727b39be2e Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Tue, 1 Oct 2024 11:05:31 -0400 Subject: [PATCH 08/19] Implement large tests for subdagio Signed-off-by: droctothorpe Co-authored-by: zazulam Co-authored-by: CarterFendley --- backend/src/v2/driver/driver.go | 6 +- samples/v2/sample_test.py | 65 +++++++++++++----- samples/v2/subdagio/__init__.py | 7 ++ samples/v2/subdagio/artifact.py | 47 +++++++++++++ samples/v2/subdagio/artifact_cache.py | 42 ++++++++++++ samples/v2/subdagio/mixed_parameters.py | 48 ++++++++++++++ .../subdagio/multiple_artifacts_namedtuple.py | 66 +++++++++++++++++++ .../multiple_parameters_namedtuple.py | 51 ++++++++++++++ samples/v2/subdagio/parameter.py | 45 +++++++++++++ samples/v2/subdagio/parameter_cache.py | 40 +++++++++++ .../kfp/compiler/pipeline_spec_builder.py | 11 +++- 11 files changed, 406 insertions(+), 22 deletions(-) create mode 100644 samples/v2/subdagio/__init__.py create mode 100644 samples/v2/subdagio/artifact.py create mode 100644 samples/v2/subdagio/artifact_cache.py create mode 100644 samples/v2/subdagio/mixed_parameters.py create mode 100644 samples/v2/subdagio/multiple_artifacts_namedtuple.py create mode 100644 samples/v2/subdagio/multiple_parameters_namedtuple.py create mode 100644 samples/v2/subdagio/parameter.py create mode 100644 samples/v2/subdagio/parameter_cache.py diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 5c180da8376..8e1801864c2 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1353,7 +1353,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { } glog.V(4).Info("producer: ", producer) currentTask := producer - var outputArtifactKey string = taskOutput.GetOutputArtifactKey() + outputArtifactKey := taskOutput.GetOutputArtifactKey() currentSubTaskMaybeDAG := true // Continue looping until we reach a sub-task that is NOT a DAG. for currentSubTaskMaybeDAG { @@ -1371,9 +1371,9 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { glog.V(4).Infof("Deserialized outputArtifacts: %v", outputArtifacts) // Adding support for multiple output artifacts var subTaskName string - value := outputArtifacts[outputArtifactKey].GetArtifactSelectors() + artifactSelectors := outputArtifacts[outputArtifactKey].GetArtifactSelectors() - for _, v := range value { + for _, v := range artifactSelectors { glog.V(4).Infof("v: %v", v) glog.V(4).Infof("v.ProducerSubtask: %v", v.ProducerSubtask) glog.V(4).Infof("v.OutputArtifactKey: %v", v.OutputArtifactKey) diff --git a/samples/v2/sample_test.py b/samples/v2/sample_test.py index d34599a3c18..ed5fa0da825 100644 --- a/samples/v2/sample_test.py +++ b/samples/v2/sample_test.py @@ -11,20 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os -import unittest -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import as_completed +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass +import inspect +import os from pprint import pprint from typing import List +import unittest +import component_with_optional_inputs +import hello_world import kfp from kfp.dsl.graph_component import GraphComponent -import component_with_optional_inputs +import pipeline_container_no_input import pipeline_with_env -import hello_world import producer_consumer_param -import pipeline_container_no_input import two_step_pipeline_containerized _MINUTE = 60 # seconds @@ -38,16 +40,21 @@ class TestCase: class SampleTest(unittest.TestCase): - _kfp_host_and_port = os.getenv('KFP_API_HOST_AND_PORT', 'http://localhost:8888') - _kfp_ui_and_port = os.getenv('KFP_UI_HOST_AND_PORT', 'http://localhost:8080') + _kfp_host_and_port = os.getenv('KFP_API_HOST_AND_PORT', + 'http://localhost:8888') + _kfp_ui_and_port = os.getenv('KFP_UI_HOST_AND_PORT', + 'http://localhost:8080') _client = kfp.Client(host=_kfp_host_and_port, ui_host=_kfp_ui_and_port) def test(self): test_cases: List[TestCase] = [ TestCase(pipeline_func=hello_world.pipeline_hello_world), - TestCase(pipeline_func=producer_consumer_param.producer_consumer_param_pipeline), - TestCase(pipeline_func=pipeline_container_no_input.pipeline_container_no_input), - TestCase(pipeline_func=two_step_pipeline_containerized.two_step_pipeline_containerized), + TestCase(pipeline_func=producer_consumer_param + .producer_consumer_param_pipeline), + TestCase(pipeline_func=pipeline_container_no_input + .pipeline_container_no_input), + TestCase(pipeline_func=two_step_pipeline_containerized + .two_step_pipeline_containerized), TestCase(pipeline_func=component_with_optional_inputs.pipeline), TestCase(pipeline_func=pipeline_with_env.pipeline_with_env), @@ -56,27 +63,51 @@ def test(self): # TestCase(pipeline_func=pipeline_with_volume.pipeline_with_volume), # TestCase(pipeline_func=pipeline_with_secret_as_volume.pipeline_secret_volume), # TestCase(pipeline_func=pipeline_with_secret_as_env.pipeline_secret_env), + + # This next set of tests needs to be commented out until issue + # https://github.com/kubeflow/pipelines/issues/11239#issuecomment-2374792592 + # is addressed or the driver image that is used in CI is updated + # because otherwise the tests are run against incompatible version + # of the driver. In the meantime, for local validation, these tests + # can be executed (once you've manually deployed an updated driver + # image). + + # TestCase(pipeline_func=subdagio.parameter.crust), + # TestCase(pipeline_func=subdagio.parameter_cache.crust), + # TestCase(pipeline_func=subdagio.artifact_cache.crust), + # TestCase(pipeline_func=subdagio.artifact.crust), + # TestCase(pipeline_func=subdagio.mixed_parameters.crust), + # TestCase(pipeline_func=subdagio.multiple_parameters_namedtuple.crust) + # TestCase(pipeline_func=subdagio.multiple_artifacts_namedtuple.crust), ] with ThreadPoolExecutor() as executor: futures = [ - executor.submit(self.run_test_case, test_case.pipeline_func, test_case.timeout) - for test_case in test_cases + executor.submit(self.run_test_case, test_case.pipeline_func, + test_case.timeout) for test_case in test_cases ] for future in as_completed(futures): future.result() def run_test_case(self, pipeline_func: GraphComponent, timeout: int): with self.subTest(pipeline=pipeline_func, msg=pipeline_func.name): - run_result = self._client.create_run_from_pipeline_func(pipeline_func=pipeline_func) + print( + f'Running pipeline: {inspect.getmodule(pipeline_func.pipeline_func).__name__}/{pipeline_func.name}.' + ) + run_result = self._client.create_run_from_pipeline_func( + pipeline_func=pipeline_func) run_response = run_result.wait_for_run_completion(timeout) pprint(run_response.run_details) - print("Run details page URL:") - print(f"{self._kfp_ui_and_port}/#/runs/details/{run_response.run_id}") + print('Run details page URL:') + print( + f'{self._kfp_ui_and_port}/#/runs/details/{run_response.run_id}') - self.assertEqual(run_response.state, "SUCCEEDED") + self.assertEqual(run_response.state, 'SUCCEEDED') + print( + f'Pipeline, {inspect.getmodule(pipeline_func.pipeline_func).__name__}/{pipeline_func.name}, succeeded.' + ) if __name__ == '__main__': diff --git a/samples/v2/subdagio/__init__.py b/samples/v2/subdagio/__init__.py new file mode 100644 index 00000000000..dc8f8b3ceae --- /dev/null +++ b/samples/v2/subdagio/__init__.py @@ -0,0 +1,7 @@ +from subdagio import artifact +from subdagio import artifact_cache +from subdagio import mixed_parameters +from subdagio import multiple_artifacts_namedtuple +from subdagio import multiple_parameters_namedtuple +from subdagio import parameter +from subdagio import parameter_cache diff --git a/samples/v2/subdagio/artifact.py b/samples/v2/subdagio/artifact.py new file mode 100644 index 00000000000..8f425662a1b --- /dev/null +++ b/samples/v2/subdagio/artifact.py @@ -0,0 +1,47 @@ +import os + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp(dataset: dsl.Output[dsl.Dataset]): + with open(dataset.path, 'w') as f: + f.write('foo') + + +@dsl.component +def crust_comp(input: dsl.Dataset): + with open(input.path, 'r') as f: + print('input: ', f.read()) + + +@dsl.pipeline +def core() -> dsl.Dataset: + task = core_comp() + task.set_caching_options(False) + + return task.output + + +@dsl.pipeline +def mantle() -> dsl.Dataset: + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp(input=dag_task.output) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/artifact_cache.py b/samples/v2/subdagio/artifact_cache.py new file mode 100644 index 00000000000..5b52b25fb23 --- /dev/null +++ b/samples/v2/subdagio/artifact_cache.py @@ -0,0 +1,42 @@ +import os + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp(dataset: dsl.Output[dsl.Dataset]): + with open(dataset.path, 'w') as f: + f.write('foo') + + +@dsl.component +def crust_comp(input: dsl.Dataset): + with open(input.path, 'r') as f: + print('input: ', f.read()) + + +@dsl.pipeline +def core() -> dsl.Dataset: + task = core_comp() + + return task.output + + +@dsl.pipeline +def mantle() -> dsl.Dataset: + dag_task = core() + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + + task = crust_comp(input=dag_task.output) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/mixed_parameters.py b/samples/v2/subdagio/mixed_parameters.py new file mode 100644 index 00000000000..0a660d335d9 --- /dev/null +++ b/samples/v2/subdagio/mixed_parameters.py @@ -0,0 +1,48 @@ +import os + +from kfp import Client +from kfp import dsl +from kfp.compiler import Compiler + + +@dsl.component +def core_comp() -> int: + return 1 + + +@dsl.component +def crust_comp(x: int, y: int): + print('sum :', x + y) + + +@dsl.pipeline +def core() -> int: + task = core_comp() + task.set_caching_options(False) + + return task.output + + +@dsl.pipeline +def mantle() -> int: + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp(x=2, y=dag_task.output) + task.set_caching_options(False) + + +if __name__ == '__main__': + Compiler().compile( + pipeline_func=crust, + package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/multiple_artifacts_namedtuple.py b/samples/v2/subdagio/multiple_artifacts_namedtuple.py new file mode 100644 index 00000000000..7d2777d38b0 --- /dev/null +++ b/samples/v2/subdagio/multiple_artifacts_namedtuple.py @@ -0,0 +1,66 @@ +import os +from typing import NamedTuple + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp(ds1: dsl.Output[dsl.Dataset], ds2: dsl.Output[dsl.Dataset]): + with open(ds1.path, 'w') as f: + f.write('foo') + with open(ds2.path, 'w') as f: + f.write('bar') + + +@dsl.component +def crust_comp( + ds1: dsl.Dataset, + ds2: dsl.Dataset, +): + with open(ds1.path, 'r') as f: + print('ds1: ', f.read()) + with open(ds2.path, 'r') as f: + print('ds2: ', f.read()) + + +@dsl.pipeline +def core() -> NamedTuple( + 'outputs', + ds1=dsl.Dataset, + ds2=dsl.Dataset, +): # type: ignore + task = core_comp() + task.set_caching_options(False) + + return task.outputs + + +@dsl.pipeline +def mantle() -> NamedTuple( + 'outputs', + ds1=dsl.Dataset, + ds2=dsl.Dataset, +): # type: ignore + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.outputs + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp( + ds1=dag_task.outputs['ds1'], + ds2=dag_task.outputs['ds2'], + ) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/multiple_parameters_namedtuple.py b/samples/v2/subdagio/multiple_parameters_namedtuple.py new file mode 100644 index 00000000000..29699088554 --- /dev/null +++ b/samples/v2/subdagio/multiple_parameters_namedtuple.py @@ -0,0 +1,51 @@ +import os +from typing import NamedTuple + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp() -> NamedTuple('outputs', val1=str, val2=str): # type: ignore + outputs = NamedTuple('outputs', val1=str, val2=str) + return outputs('foo', 'bar') + + +@dsl.component +def crust_comp(val1: str, val2: str): + print('val1: ', val1) + print('val2: ', val2) + + +@dsl.pipeline +def core() -> NamedTuple('outputs', val1=str, val2=str): # type: ignore + task = core_comp() + task.set_caching_options(False) + + return task.outputs + + +@dsl.pipeline +def mantle() -> NamedTuple('outputs', val1=str, val2=str): # type: ignore + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.outputs + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp( + val1=dag_task.outputs['val1'], + val2=dag_task.outputs['val2'], + ) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/parameter.py b/samples/v2/subdagio/parameter.py new file mode 100644 index 00000000000..c00439dd1c8 --- /dev/null +++ b/samples/v2/subdagio/parameter.py @@ -0,0 +1,45 @@ +import os + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp() -> str: + return 'foo' + + +@dsl.component +def crust_comp(input: str): + print('input :', input) + + +@dsl.pipeline +def core() -> str: + task = core_comp() + task.set_caching_options(False) + + return task.output + + +@dsl.pipeline +def mantle() -> str: + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp(input=dag_task.output) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/samples/v2/subdagio/parameter_cache.py b/samples/v2/subdagio/parameter_cache.py new file mode 100644 index 00000000000..9fe2402e2b8 --- /dev/null +++ b/samples/v2/subdagio/parameter_cache.py @@ -0,0 +1,40 @@ +import os + +from kfp import Client +from kfp import dsl + + +@dsl.component +def core_comp() -> str: + return 'foo' + + +@dsl.component +def crust_comp(input: str): + print('input :', input) + + +@dsl.pipeline +def core() -> str: + task = core_comp() + + return task.output + + +@dsl.pipeline +def mantle() -> str: + dag_task = core() + + return dag_task.output + + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + task = crust_comp(input=dag_task.output) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 6e4bc4e8690..fbc3bb463df 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -1872,7 +1872,9 @@ def validate_pipeline_outputs_dict( f'Pipeline outputs may only be returned from the top level of the pipeline function scope. Got pipeline output from within the control flow group dsl.{channel.task.parent_task_group.__class__.__name__}.' ) else: - raise ValueError(f'Got unknown pipeline output: {channel}.') + raise ValueError( + f'Got unknown pipeline output, {channel}, of type {type(channel)}.' + ) def create_pipeline_spec( @@ -2006,13 +2008,18 @@ def convert_pipeline_outputs_to_dict( output name to PipelineChannel.""" if pipeline_outputs is None: return {} + elif isinstance(pipeline_outputs, dict): + # This condition is required to support pipelines that return NamedTuples. + return pipeline_outputs elif isinstance(pipeline_outputs, pipeline_channel.PipelineChannel): return {component_factory.SINGLE_OUTPUT_NAME: pipeline_outputs} elif isinstance(pipeline_outputs, tuple) and hasattr( pipeline_outputs, '_asdict'): return dict(pipeline_outputs._asdict()) else: - raise ValueError(f'Got unknown pipeline output: {pipeline_outputs}') + raise ValueError( + f'Got unknown pipeline output, {pipeline_outputs}, of type {type(pipeline_outputs)}.' + ) def write_pipeline_spec_to_file( From dd65ce17fd79ebb552cf5256e03e3ceab4404f01 Mon Sep 17 00:00:00 2001 From: zazulam Date: Mon, 7 Oct 2024 13:46:56 -0400 Subject: [PATCH 09/19] Address PR comments & handle oneof Signed-off-by: zazulam Co-authored-by: droctothorpe --- backend/src/v2/component/launcher_v2.go | 11 ++ backend/src/v2/driver/driver.go | 167 ++++++++++----------- backend/src/v2/metadata/client.go | 135 ++++++++++++++--- backend/src/v2/metadata/client_test.go | 4 +- backend/src/v2/objectstore/object_store.go | 13 +- samples/v2/sample_test.py | 7 +- samples/v2/subdagio/__init__.py | 1 + samples/v2/subdagio/parameter_oneof.py | 54 +++++++ 8 files changed, 273 insertions(+), 119 deletions(-) create mode 100644 samples/v2/subdagio/parameter_oneof.py diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index 2b5297c7b09..8b40ff346e0 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -148,6 +148,17 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) { } } glog.Infof("publish success.") + // At the end of the current task, we check the statuses of all tasks in the current DAG and update the DAG's status accordingly. + // TODO: If there's a pipeline whose only components are DAGs, this launcher logic will never run and as a result the dag status will never be updated. We need to implement a mechanism to handle this edge case. + dag, err := l.metadataClient.GetDAG(ctx, execution.GetExecution().CustomProperties["parent_dag_id"].GetIntValue()) + if err != nil { + glog.Errorf("DAG Status Update: failed to get DAG: %s", err.Error()) + } + pipeline, _ := l.metadataClient.GetPipelineFromExecution(ctx, execution.GetID()) + err = l.metadataClient.UpdateDAGExecutionsState(ctx, dag, pipeline) + if err != nil { + glog.Errorf("failed to update DAG state: %s", err.Error()) + } }() executedStartedTime := time.Now().Unix() execution, err = l.prePublish(ctx) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 8e1801864c2..f70e22c6392 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -344,7 +344,7 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl return execution, err } if opts.KubernetesExecutorConfig != nil { - dagTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline) + dagTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline, true) if err != nil { return execution, err } @@ -758,29 +758,12 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E ecfg.NotTriggered = !execution.WillTrigger() // Handle writing output parameters to MLMD. - outputParameters := opts.Component.GetDag().GetOutputs().GetParameters() - glog.V(4).Info("outputParameters: ", outputParameters) - ecfg.OutputParameters = make(map[string]*structpb.Value) - for name, value := range outputParameters { - outputParameterKey := value.GetValueFromParameter().GetOutputParameterKey() - producerSubTask := value.GetValueFromParameter().GetProducerSubtask() - glog.V(4).Info("outputParameterKey: ", outputParameterKey) - glog.V(4).Info("producerSubtask: ", producerSubTask) - - outputParameterMap := map[string]interface{}{ - "output_parameter_key": outputParameterKey, - "producer_subtask": producerSubTask, - } - - outputParameterStruct, _ := structpb.NewValue(outputParameterMap) - - ecfg.OutputParameters[name] = outputParameterStruct - } + ecfg.OutputParameters = opts.Component.GetDag().GetOutputs().GetParameters() + glog.V(4).Info("outputParameters: ", ecfg.OutputParameters) // Handle writing output artifacts to MLMD. - outputArtifacts := opts.Component.GetDag().GetOutputs().GetArtifacts() - glog.V(4).Info("outputArtifacts: ", outputArtifacts) - ecfg.OutputArtifacts = outputArtifacts + ecfg.OutputArtifacts = opts.Component.GetDag().GetOutputs().GetArtifacts() + glog.V(4).Info("outputArtifacts: ", ecfg.OutputArtifacts) if opts.Task.GetArtifactIterator() != nil { return execution, fmt.Errorf("ArtifactIterator is not implemented") @@ -1256,55 +1239,109 @@ type resolveUpstreamParametersConfig struct { func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { taskOutput := cfg.paramSpec.GetTaskOutputParameter() glog.V(4).Info("taskOutput: ", taskOutput) - if taskOutput.GetProducerTask() == "" { - return cfg.paramError(fmt.Errorf("producer task is empty")) + producerTaskName := taskOutput.GetProducerTask() + if producerTaskName == "" { + return cfg.paramError(fmt.Errorf("producerTaskName is empty")) } - if taskOutput.GetOutputParameterKey() == "" { + outputParameterKey := taskOutput.GetOutputParameterKey() + if outputParameterKey == "" { return cfg.paramError(fmt.Errorf("output parameter key is empty")) } - tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil) + tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false) if err != nil { return cfg.paramError(err) } - + glog.V(4).Infof("tasks: %#v", tasks) // The producer is the task that produces the output that we need to // consume. - producer := tasks[taskOutput.GetProducerTask()] - outputParameterKey := taskOutput.GetOutputParameterKey() + producer, ok := tasks[producerTaskName] + if !ok { + return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName)) + } glog.V(4).Info("producer: ", producer) currentTask := producer currentSubTaskMaybeDAG := true // Continue looping until we reach a sub-task that is NOT a DAG. for currentSubTaskMaybeDAG { glog.V(4).Info("currentTask: ", currentTask.TaskName()) - _, outputParametersCustomProperty, err := currentTask.GetParameters() - if err != nil { - return err - } // If the current task is a DAG: if *currentTask.GetExecution().Type == "system.DAGExecution" { // Since currentTask is a DAG, we need to deserialize its - // output parameter map so that we can look its + // output parameter map so that we can look up its // corresponding producer sub-task, reassign currentTask, // and iterate through this loop again. - var outputParametersMap map[string]string - b, err := outputParametersCustomProperty[outputParameterKey].GetStructValue().MarshalJSON() - if err != nil { - return err + outputParametersCustomProperty, ok := currentTask.GetExecution().GetCustomProperties()["parameter_producer_task"] + if !ok { + return cfg.paramError(fmt.Errorf("Task, %v, does not have a parameter_producer_task custom property", currentTask.TaskName())) + } + glog.V(4).Infof("outputParametersCustomProperty: %#v", outputParametersCustomProperty) + + dagOutputParametersMap := make(map[string]*pipelinespec.DagOutputsSpec_DagOutputParameterSpec) + glog.V(4).Infof("outputParametersCustomProperty: %v", outputParametersCustomProperty.GetStructValue()) + + for name, value := range outputParametersCustomProperty.GetStructValue().GetFields() { + outputSpec := &pipelinespec.DagOutputsSpec_DagOutputParameterSpec{} + err := protojson.Unmarshal([]byte(value.GetStringValue()), outputSpec) + if err != nil { + return err + } + dagOutputParametersMap[name] = outputSpec + } + + glog.V(4).Infof("Deserialized dagOutputParametersMap: %v", dagOutputParametersMap) + + // Support for the 2 DagOutputParameterSpec types: + // ValueFromParameter & ValueFromOneof + var subTaskName string + switch dagOutputParametersMap[outputParameterKey].Kind.(type) { + case *pipelinespec.DagOutputsSpec_DagOutputParameterSpec_ValueFromParameter: + subTaskName = dagOutputParametersMap[outputParameterKey].GetValueFromParameter().GetProducerSubtask() + outputParameterKey = dagOutputParametersMap[outputParameterKey].GetValueFromParameter().GetOutputParameterKey() + case *pipelinespec.DagOutputsSpec_DagOutputParameterSpec_ValueFromOneof: + // When OneOf is specified in a pipeline, the output of only 1 task is consumed even though there may be more than 1 task output set. In this case we will attempt to grab the first successful task output. + paramSelectors := dagOutputParametersMap[outputParameterKey].GetValueFromOneof().GetParameterSelectors() + glog.V(4).Infof("paramSelectors: %v", paramSelectors) + // Since we have the tasks map, we can iterate through the parameterSelectors if the ProducerSubTask is not present in the task map and then assign the new OutputParameterKey only if it exists. + successfulOneOfTask := false + for !successfulOneOfTask { + for _, paramSelector := range paramSelectors { + subTaskName = paramSelector.GetProducerSubtask() + glog.V(4).Infof("subTaskName from paramSelector: %v", subTaskName) + glog.V(4).Infof("outputParameterKey from paramSelector: %v", paramSelector.GetOutputParameterKey()) + if subTask, ok := tasks[subTaskName]; ok { + subTaskState := subTask.GetExecution().LastKnownState.String() + glog.V(4).Infof("subTask: %w , subTaskState: %v", subTaskName, subTaskState) + if subTaskState == "CACHED" || subTaskState == "COMPLETE" { + + outputParameterKey = paramSelector.GetOutputParameterKey() + successfulOneOfTask = true + break + } + } + } + return cfg.paramError(fmt.Errorf("Processing OneOf: No successful task found")) + } + } + glog.V(4).Infof("SubTaskName from outputParams: %v", subTaskName) + glog.V(4).Infof("OutputParameterKey from outputParams: %v", outputParameterKey) + if subTaskName == "" { + return cfg.paramError(fmt.Errorf("producer_subtask not in outputParams")) } - json.Unmarshal(b, &outputParametersMap) - glog.V(4).Info("Deserialized outputParametersMap: ", outputParametersMap) - subTaskName := outputParametersMap["producer_subtask"] - outputParameterKey = outputParametersMap["output_parameter_key"] glog.V(4).Infof( "Overriding currentTask, %v, output with currentTask's producer_subtask, %v, output.", currentTask.TaskName(), subTaskName, ) + currentTask, ok = tasks[subTaskName] + if !ok { + return cfg.paramError(fmt.Errorf("subTaskName, %v, not in tasks", subTaskName)) + } - // Reassign sub-task before running through the loop again. - currentTask = tasks[subTaskName] } else { + _, outputParametersCustomProperty, err := currentTask.GetParameters() + if err != nil { + return err + } cfg.inputs.ParameterValues[cfg.name] = outputParametersCustomProperty[outputParameterKey] // Exit the loop. currentSubTaskMaybeDAG = false @@ -1340,7 +1377,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { if taskOutput.GetOutputArtifactKey() == "" { cfg.artifactError(fmt.Errorf("output artifact key is empty")) } - tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil) + tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false) if err != nil { cfg.artifactError(err) } @@ -1361,7 +1398,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { // If the current task is a DAG: if *currentTask.GetExecution().Type == "system.DAGExecution" { // Get the sub-task. - outputArtifactsCustomProperty := currentTask.GetExecution().GetCustomProperties()["output_artifacts"] + outputArtifactsCustomProperty := currentTask.GetExecution().GetCustomProperties()["artifact_producer_task"] // Deserialize the output artifacts. var outputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec err := json.Unmarshal([]byte(outputArtifactsCustomProperty.GetStringValue()), &outputArtifacts) @@ -1416,44 +1453,6 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { return nil } -// getDAGTasks gets all the tasks associated with the specified DAG and all of -// its subDAGs. -func getDAGTasks( - ctx context.Context, - dag *metadata.DAG, - pipeline *metadata.Pipeline, - mlmd *metadata.Client, - flattenedTasks map[string]*metadata.Execution, -) (map[string]*metadata.Execution, error) { - if flattenedTasks == nil { - flattenedTasks = make(map[string]*metadata.Execution) - } - currentExecutionTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline) - if err != nil { - return nil, err - } - for k, v := range currentExecutionTasks { - flattenedTasks[k] = v - } - for _, v := range currentExecutionTasks { - if v.GetExecution().GetType() == "system.DAGExecution" { - glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) - subDAG, err := mlmd.GetDAG(ctx, v.GetExecution().GetId()) - if err != nil { - return nil, err - } - // Pass the subDAG into a recursive call to getDAGTasks and update - // tasks to include the subDAG's tasks. - flattenedTasks, err = getDAGTasks(ctx, subDAG, pipeline, mlmd, flattenedTasks) - if err != nil { - return nil, err - } - } - } - - return flattenedTasks, nil -} - func provisionOutputs(pipelineRoot, taskName string, outputsSpec *pipelinespec.ComponentOutputsSpec, outputUriSalt string) *pipelinespec.ExecutorInput_Outputs { outputs := &pipelinespec.ExecutorInput_Outputs{ Artifacts: make(map[string]*pipelinespec.ArtifactList), diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index c11d5f356f0..a3702b20f3b 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -89,7 +89,9 @@ type ClientInterface interface { GetExecutions(ctx context.Context, ids []int64) ([]*pb.Execution, error) GetExecution(ctx context.Context, id int64) (*Execution, error) GetPipelineFromExecution(ctx context.Context, id int64) (*Pipeline, error) - GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline) (executionsMap map[string]*Execution, err error) + GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error) + UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) (err error) + PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) (err error) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error) GetArtifactName(ctx context.Context, artifactId int64) (string, error) GetArtifacts(ctx context.Context, ids []int64) ([]*pb.Artifact, error) @@ -135,7 +137,7 @@ type ExecutionConfig struct { NotTriggered bool // optional, not triggered executions will have CANCELED state. ParentDagID int64 // parent DAG execution ID. Only the root DAG does not have a parent DAG. InputParameters map[string]*structpb.Value - OutputParameters map[string]*structpb.Value + OutputParameters map[string]*pipelinespec.DagOutputsSpec_DagOutputParameterSpec OutputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec InputArtifactIDs map[string][]int64 IterationIndex *int // Index of the iteration. @@ -505,23 +507,25 @@ func (c *Client) PublishExecution(ctx context.Context, execution *Execution, out // metadata keys const ( - keyDisplayName = "display_name" - keyTaskName = "task_name" - keyImage = "image" - keyPodName = "pod_name" - keyPodUID = "pod_uid" - keyNamespace = "namespace" - keyResourceName = "resource_name" - keyPipelineRoot = "pipeline_root" - keyStoreSessionInfo = "store_session_info" - keyCacheFingerPrint = "cache_fingerprint" - keyCachedExecutionID = "cached_execution_id" - keyInputs = "inputs" - keyOutputs = "outputs" // TODO: Consider renaming this to output_parameters to be consistent. - keyOutputArtifacts = "output_artifacts" - keyParentDagID = "parent_dag_id" // Parent DAG Execution ID. - keyIterationIndex = "iteration_index" - keyIterationCount = "iteration_count" + keyDisplayName = "display_name" + keyTaskName = "task_name" + keyImage = "image" + keyPodName = "pod_name" + keyPodUID = "pod_uid" + keyNamespace = "namespace" + keyResourceName = "resource_name" + keyPipelineRoot = "pipeline_root" + keyStoreSessionInfo = "store_session_info" + keyCacheFingerPrint = "cache_fingerprint" + keyCachedExecutionID = "cached_execution_id" + keyInputs = "inputs" + keyOutputs = "outputs" + keyParameterProducerTask = "parameter_producer_task" + keyOutputArtifacts = "output_artifacts" + keyArtifactProducerTask = "artifact_producer_task" + keyParentDagID = "parent_dag_id" // Parent DAG Execution ID. + keyIterationIndex = "iteration_index" + keyIterationCount = "iteration_count" ) // CreateExecution creates a new MLMD execution under the specified Pipeline. @@ -587,9 +591,25 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config // relationships and retrieve outputs downstream in components that depend // on said outputs as inputs. if config.OutputParameters != nil { - e.CustomProperties[keyOutputs] = &pb.Value{Value: &pb.Value_StructValue{ + // Convert OutputParameters to a format that can be saved in MLMD. + glog.V(4).Info("outputParameters: ", config.OutputParameters) + outputParametersCustomPropertyProtoMap := make(map[string]*structpb.Value) + + for name, value := range config.OutputParameters { + if outputParameterProtoMsg, ok := interface{}(value).(proto.Message); ok { + glog.V(4).Infof("name: %v, value: %w", name, value) + glog.V(4).Info("protoMessage: ", outputParameterProtoMsg) + b, err := protojson.Marshal(outputParameterProtoMsg) + if err != nil { + return nil, err + } + outputValue, _ := structpb.NewValue(string(b)) + outputParametersCustomPropertyProtoMap[name] = outputValue + } + } + e.CustomProperties[keyParameterProducerTask] = &pb.Value{Value: &pb.Value_StructValue{ StructValue: &structpb.Struct{ - Fields: config.OutputParameters, + Fields: outputParametersCustomPropertyProtoMap, }, }} } @@ -598,7 +618,7 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config if err != nil { return nil, err } - e.CustomProperties[keyOutputArtifacts] = StringValue(string(b)) + e.CustomProperties[keyArtifactProducerTask] = StringValue(string(b)) } req := &pb.PutExecutionRequest{ @@ -664,6 +684,61 @@ func (c *Client) PrePublishExecution(ctx context.Context, execution *Execution, return execution, nil } +// UpdateDAGExecutionState checks all the statuses of the tasks in the given DAG, based on that it will update the DAG to the corresponding status if necessary. +func (c *Client) UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) error { + tasks, err := c.GetExecutionsInDAG(ctx, dag, pipeline, true) + if err != nil { + return err + } + glog.V(4).Infof("tasks: %v", tasks) + glog.V(4).Infof("Checking Tasks' State") + completedTasks := 0 + failedTasks := 0 + totalTasks := len(tasks) + for _, task := range tasks { + taskState := task.GetExecution().LastKnownState.String() + glog.V(4).Infof("task: %s", task.TaskName()) + glog.V(4).Infof("task state: %s", taskState) + switch taskState { + case "FAILED": + failedTasks++ + case "COMPLETE": + completedTasks++ + case "CACHED": + completedTasks++ + case "CANCELED": + completedTasks++ + } + } + glog.V(4).Infof("completedTasks: %d", completedTasks) + glog.V(4).Infof("failedTasks: %d", failedTasks) + glog.V(4).Infof("totalTasks: %d", totalTasks) + + glog.Infof("Attempting to update DAG state") + if completedTasks == totalTasks { + c.PutDAGExecutionState(ctx, dag.Execution.GetID(), pb.Execution_COMPLETE) + } else if failedTasks > 0 { + c.PutDAGExecutionState(ctx, dag.Execution.GetID(), pb.Execution_FAILED) + } else { + glog.V(4).Infof("DAG is still running") + } + return nil +} + +// PutDAGExecutionState updates the given DAG Id to the state provided. +func (c *Client) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) error { + + e, err := c.GetExecution(ctx, executionID) + if err != nil { + return err + } + e.execution.LastKnownState = state.Enum() + _, err = c.svc.PutExecution(ctx, &pb.PutExecutionRequest{ + Execution: e.execution, + }) + return err +} + // GetExecutions ... func (c *Client) GetExecutions(ctx context.Context, ids []int64) ([]*pb.Execution, error) { req := &pb.GetExecutionsByIDRequest{ExecutionIds: ids} @@ -728,7 +803,7 @@ func (c *Client) GetPipelineFromExecution(ctx context.Context, id int64) (*Pipel // GetExecutionsInDAG gets all executions in the DAG, and organize them // into a map, keyed by task name. -func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline) (executionsMap map[string]*Execution, err error) { +func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error) { defer func() { if err != nil { err = fmt.Errorf("failed to get executions in %s: %w", dag.Info(), err) @@ -737,7 +812,12 @@ func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pip executionsMap = make(map[string]*Execution) // Documentation on query syntax: // https://github.com/google/ml-metadata/blob/839c3501a195d340d2855b6ffdb2c4b0b49862c9/ml_metadata/proto/metadata_store.proto#L831 - parentDAGFilter := fmt.Sprintf("custom_properties.parent_dag_id.int_value = %v", dag.Execution.GetID()) + // If filter is set to true, the MLMD call will only grab executions for the current DAG, else it would grab all the execution for the context which includes sub-DAGs. + parentDAGFilter := "" + if filter { + parentDAGFilter = fmt.Sprintf("custom_properties.parent_dag_id.int_value = %v", dag.Execution.GetID()) + } + // Note, because MLMD does not have index on custom properties right now, we // take a pipeline run context to limit the number of executions the DB needs to // iterate through to find sub-executions. @@ -756,11 +836,16 @@ func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pip } execs := res.GetExecutions() + glog.V(4).Infof("execs: %v", execs) for _, e := range execs { execution := &Execution{execution: e} taskName := execution.TaskName() if taskName == "" { - return nil, fmt.Errorf("empty task name for execution ID: %v", execution.GetID()) + if e.GetCustomProperties()[keyParentDagID] != nil { + return nil, fmt.Errorf("empty task name for execution ID: %v", execution.GetID()) + } + // When retrieving executions without the parentDAGFilter, the rootDAG execution is supplied but does not have an associated TaskName nor is the parentDagID set, therefore we won't include it in the executionsMap. + continue } existing, ok := executionsMap[taskName] if ok { diff --git a/backend/src/v2/metadata/client_test.go b/backend/src/v2/metadata/client_test.go index 94f081b32b0..3cb5e1cc64c 100644 --- a/backend/src/v2/metadata/client_test.go +++ b/backend/src/v2/metadata/client_test.go @@ -311,7 +311,7 @@ func Test_DAG(t *testing.T) { t.Fatal(err) } rootDAG := &metadata.DAG{Execution: root} - rootChildren, err := client.GetExecutionsInDAG(ctx, rootDAG, pipeline) + rootChildren, err := client.GetExecutionsInDAG(ctx, rootDAG, pipeline, true) if err != nil { t.Fatal(err) } @@ -324,7 +324,7 @@ func Test_DAG(t *testing.T) { if rootChildren["task2"].GetID() != task2.GetID() { t.Errorf("executions[\"task2\"].GetID()=%v, task2.GetID()=%v. Not equal", rootChildren["task2"].GetID(), task2.GetID()) } - task1Children, err := client.GetExecutionsInDAG(ctx, &metadata.DAG{Execution: task1DAG}, pipeline) + task1Children, err := client.GetExecutionsInDAG(ctx, &metadata.DAG{Execution: task1DAG}, pipeline, true) if len(task1Children) != 1 { t.Errorf("len(task1Children)=%v, expect 1", len(task1Children)) } diff --git a/backend/src/v2/objectstore/object_store.go b/backend/src/v2/objectstore/object_store.go index 41b5118c49f..42ec6418c43 100644 --- a/backend/src/v2/objectstore/object_store.go +++ b/backend/src/v2/objectstore/object_store.go @@ -17,6 +17,13 @@ package objectstore import ( "context" "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "strings" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" @@ -27,14 +34,8 @@ import ( "gocloud.dev/blob/s3blob" "gocloud.dev/gcp" "golang.org/x/oauth2/google" - "io" - "io/ioutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "os" - "path/filepath" - "regexp" - "strings" ) func OpenBucket(ctx context.Context, k8sClient kubernetes.Interface, namespace string, config *Config) (bucket *blob.Bucket, err error) { diff --git a/samples/v2/sample_test.py b/samples/v2/sample_test.py index ed5fa0da825..b925e29d762 100644 --- a/samples/v2/sample_test.py +++ b/samples/v2/sample_test.py @@ -28,6 +28,8 @@ import pipeline_with_env import producer_consumer_param import two_step_pipeline_containerized +# import subdagio + _MINUTE = 60 # seconds _DEFAULT_TIMEOUT = 5 * _MINUTE @@ -74,10 +76,11 @@ def test(self): # TestCase(pipeline_func=subdagio.parameter.crust), # TestCase(pipeline_func=subdagio.parameter_cache.crust), + # TestCase(pipeline_func=subdagio.mixed_parameters.crust), + # TestCase(pipeline_func=subdagio.multiple_parameters_namedtuple.crust), + # TestCase(pipeline_func=subdagio.parameter_oneof.crust), # TestCase(pipeline_func=subdagio.artifact_cache.crust), # TestCase(pipeline_func=subdagio.artifact.crust), - # TestCase(pipeline_func=subdagio.mixed_parameters.crust), - # TestCase(pipeline_func=subdagio.multiple_parameters_namedtuple.crust) # TestCase(pipeline_func=subdagio.multiple_artifacts_namedtuple.crust), ] diff --git a/samples/v2/subdagio/__init__.py b/samples/v2/subdagio/__init__.py index dc8f8b3ceae..024415d6bd2 100644 --- a/samples/v2/subdagio/__init__.py +++ b/samples/v2/subdagio/__init__.py @@ -5,3 +5,4 @@ from subdagio import multiple_parameters_namedtuple from subdagio import parameter from subdagio import parameter_cache +from subdagio import parameter_oneof diff --git a/samples/v2/subdagio/parameter_oneof.py b/samples/v2/subdagio/parameter_oneof.py new file mode 100644 index 00000000000..6459c155ef6 --- /dev/null +++ b/samples/v2/subdagio/parameter_oneof.py @@ -0,0 +1,54 @@ +import os + +from kfp import Client +from kfp import dsl + +@dsl.component +def flip_coin() -> str: + import random + return 'heads' if random.randint(0, 1) == 0 else 'tails' + +@dsl.component +def core_comp(input: str) -> str: + print('input :', input) + return input + +@dsl.component +def core_output_comp(input: str, output_key: dsl.OutputPath(str)): + print('input :', input) + with open(output_key, 'w') as f: + f.write(input) + +@dsl.component +def crust_comp(input: str): + print('input :', input) + +@dsl.pipeline +def core() -> str: + flip_coin_task = flip_coin().set_caching_options(False) + with dsl.If(flip_coin_task.output == 'heads'): + t1 = core_comp(input='Got heads!').set_caching_options(False) + with dsl.Else(): + t2 = core_output_comp(input='Got tails!').set_caching_options(False) + return dsl.OneOf(t1.output, t2.outputs['output_key']) + +@dsl.pipeline +def mantle() -> str: + dag_task = core() + dag_task.set_caching_options(False) + + return dag_task.output + +@dsl.pipeline(name=os.path.basename(__file__).removesuffix('.py') + '-pipeline') +def crust(): + dag_task = mantle() + dag_task.set_caching_options(False) + + task = crust_comp(input=dag_task.output) + task.set_caching_options(False) + + +if __name__ == '__main__': + # Compiler().compile(pipeline_func=crust, package_path=f"{__file__.removesuffix('.py')}.yaml") + client = Client() + client.create_run_from_pipeline_func(crust) From d15884a545e30ea4525c29e96f33f3b547da68b5 Mon Sep 17 00:00:00 2001 From: zazulam Date: Mon, 7 Oct 2024 14:07:29 -0400 Subject: [PATCH 10/19] update backend metadata client test Signed-off-by: zazulam --- backend/src/v2/metadata/client_fake.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/backend/src/v2/metadata/client_fake.go b/backend/src/v2/metadata/client_fake.go index 8e9b7b84677..beaddcc098c 100644 --- a/backend/src/v2/metadata/client_fake.go +++ b/backend/src/v2/metadata/client_fake.go @@ -19,6 +19,7 @@ package metadata import ( "context" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" @@ -64,10 +65,15 @@ func (c *FakeClient) GetPipelineFromExecution(ctx context.Context, id int64) (*P return nil, nil } -func (c *FakeClient) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline) (executionsMap map[string]*Execution, err error) { +func (c *FakeClient) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error) { return nil, nil } - +func (c *FakeClient) UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) (err error) { + return nil +} +func (c *FakeClient) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) (err error) { + return nil +} func (c *FakeClient) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error) { return nil, nil } From 93d1f3ce5e092063ca64c3287871e908395d343e Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Mon, 21 Oct 2024 10:38:14 -0400 Subject: [PATCH 11/19] Enable nested pipeline IO large tests in CI Signed-off-by: droctothorpe Co-authored-by: CarterFendley Co-authored-by: zazulam --- samples/v2/sample_test.py | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/samples/v2/sample_test.py b/samples/v2/sample_test.py index b925e29d762..2af7c4fba7d 100644 --- a/samples/v2/sample_test.py +++ b/samples/v2/sample_test.py @@ -27,9 +27,8 @@ import pipeline_container_no_input import pipeline_with_env import producer_consumer_param +import subdagio import two_step_pipeline_containerized -# import subdagio - _MINUTE = 60 # seconds _DEFAULT_TIMEOUT = 5 * _MINUTE @@ -65,23 +64,16 @@ def test(self): # TestCase(pipeline_func=pipeline_with_volume.pipeline_with_volume), # TestCase(pipeline_func=pipeline_with_secret_as_volume.pipeline_secret_volume), # TestCase(pipeline_func=pipeline_with_secret_as_env.pipeline_secret_env), - - # This next set of tests needs to be commented out until issue - # https://github.com/kubeflow/pipelines/issues/11239#issuecomment-2374792592 - # is addressed or the driver image that is used in CI is updated - # because otherwise the tests are run against incompatible version - # of the driver. In the meantime, for local validation, these tests - # can be executed (once you've manually deployed an updated driver - # image). - - # TestCase(pipeline_func=subdagio.parameter.crust), - # TestCase(pipeline_func=subdagio.parameter_cache.crust), - # TestCase(pipeline_func=subdagio.mixed_parameters.crust), - # TestCase(pipeline_func=subdagio.multiple_parameters_namedtuple.crust), - # TestCase(pipeline_func=subdagio.parameter_oneof.crust), - # TestCase(pipeline_func=subdagio.artifact_cache.crust), - # TestCase(pipeline_func=subdagio.artifact.crust), - # TestCase(pipeline_func=subdagio.multiple_artifacts_namedtuple.crust), + TestCase(pipeline_func=subdagio.parameter.crust), + TestCase(pipeline_func=subdagio.parameter_cache.crust), + TestCase(pipeline_func=subdagio.mixed_parameters.crust), + TestCase( + pipeline_func=subdagio.multiple_parameters_namedtuple.crust), + TestCase(pipeline_func=subdagio.parameter_oneof.crust), + TestCase(pipeline_func=subdagio.artifact_cache.crust), + TestCase(pipeline_func=subdagio.artifact.crust), + TestCase( + pipeline_func=subdagio.multiple_artifacts_namedtuple.crust), ] with ThreadPoolExecutor() as executor: From b8efd3c87ad78b21fae7d876af50bb8147f43d80 Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Mon, 21 Oct 2024 15:45:23 -0400 Subject: [PATCH 12/19] Execute narrow lookup before broad task lookup Signed-off-by: droctothorpe Co-authored-by: zazulam Co-authored-by: CarterFendley --- backend/src/v2/driver/driver.go | 22 +++++++++++++++++----- backend/src/v2/metadata/client.go | 7 +++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index f70e22c6392..a6469daae55 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1247,18 +1247,30 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { if outputParameterKey == "" { return cfg.paramError(fmt.Errorf("output parameter key is empty")) } - tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false) + + // Get a list of tasks for the current DAG first. + tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, true) if err != nil { return cfg.paramError(err) } - glog.V(4).Infof("tasks: %#v", tasks) - // The producer is the task that produces the output that we need to - // consume. + + // Check to see if the producer is in the list of tasks. producer, ok := tasks[producerTaskName] if !ok { - return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName)) + // If the producer is not in the list of tasks for the current DAG, + // lookup all of the tasks in the context (which includes other DAGs). + tasks, err = cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false) + if err != nil { + return cfg.paramError(err) + } + producer, ok = tasks[producerTaskName] + if !ok { + return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName)) + } } + glog.V(4).Info("producer: ", producer) + glog.V(4).Infof("tasks: %#v", tasks) currentTask := producer currentSubTaskMaybeDAG := true // Continue looping until we reach a sub-task that is NOT a DAG. diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index a3702b20f3b..b261c2e1ee2 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -849,6 +849,13 @@ func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pip } existing, ok := executionsMap[taskName] if ok { + // TODO: The failure to handle this results in a specific edge + // case which has yet to be solved for. If you have three nested + // pipelines: A, which calls B, which calls C, and B and C share + // a task that A does not have but depends on in a producer + // subtask, when GetExecutionsInDAG is called, it will raise + // this error. + // TODO(Bobgy): to support retry, we need to handle multiple tasks with the same task name. return nil, fmt.Errorf("two tasks have the same task name %q, id1=%v id2=%v", taskName, existing.GetID(), execution.GetID()) } From 78beb55110636859075a2233e0941dbc8e43b468 Mon Sep 17 00:00:00 2001 From: zazulam Date: Tue, 22 Oct 2024 11:16:48 -0400 Subject: [PATCH 13/19] reimplement getDAGTasks to address edge cases Signed-off-by: zazulam Co-authored-by: droctothorpe --- backend/src/v2/driver/driver.go | 59 ++++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index a6469daae55..222bf15b314 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1219,6 +1219,43 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, return inputs, nil } +// getDAGTasks is a recursive function that returns a map of all tasks across all DAGs in the context of nested DAGs. +func getDAGTasks( + ctx context.Context, + dag *metadata.DAG, + pipeline *metadata.Pipeline, + mlmd *metadata.Client, + flattenedTasks map[string]*metadata.Execution, +) (map[string]*metadata.Execution, error) { + if flattenedTasks == nil { + flattenedTasks = make(map[string]*metadata.Execution) + } + currentExecutionTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline, true) + if err != nil { + return nil, err + } + for k, v := range currentExecutionTasks { + flattenedTasks[k] = v + } + for _, v := range currentExecutionTasks { + if v.GetExecution().GetType() == "system.DAGExecution" { + glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) + subDAG, err := mlmd.GetDAG(ctx, v.GetExecution().GetId()) + if err != nil { + return nil, err + } + // Pass the subDAG into a recursive call to getDAGTasks and update + // tasks to include the subDAG's tasks. + flattenedTasks, err = getDAGTasks(ctx, subDAG, pipeline, mlmd, flattenedTasks) + if err != nil { + return nil, err + } + } + } + + return flattenedTasks, nil +} + // resolveUpstreamParametersConfig is just a config struct used to store the // input parameters of the resolveUpstreamParameters function. type resolveUpstreamParametersConfig struct { @@ -1249,26 +1286,16 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { } // Get a list of tasks for the current DAG first. - tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, true) + // The reason we use gatDAGTasks instead of mlmd.GetExecutionsInDAG is because the latter does not handle task name collisions in the map which results in a bunch of unhandled edge cases and test failures. + tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil) if err != nil { return cfg.paramError(err) } - // Check to see if the producer is in the list of tasks. producer, ok := tasks[producerTaskName] if !ok { - // If the producer is not in the list of tasks for the current DAG, - // lookup all of the tasks in the context (which includes other DAGs). - tasks, err = cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false) - if err != nil { - return cfg.paramError(err) - } - producer, ok = tasks[producerTaskName] - if !ok { - return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName)) - } + return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName)) } - glog.V(4).Info("producer: ", producer) glog.V(4).Infof("tasks: %#v", tasks) currentTask := producer @@ -1284,7 +1311,7 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { // and iterate through this loop again. outputParametersCustomProperty, ok := currentTask.GetExecution().GetCustomProperties()["parameter_producer_task"] if !ok { - return cfg.paramError(fmt.Errorf("Task, %v, does not have a parameter_producer_task custom property", currentTask.TaskName())) + return cfg.paramError(fmt.Errorf("task, %v, does not have a parameter_producer_task custom property", currentTask.TaskName())) } glog.V(4).Infof("outputParametersCustomProperty: %#v", outputParametersCustomProperty) @@ -1331,7 +1358,9 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { } } } - return cfg.paramError(fmt.Errorf("Processing OneOf: No successful task found")) + if !successfulOneOfTask { + return cfg.paramError(fmt.Errorf("processing OneOf: No successful task found")) + } } } glog.V(4).Infof("SubTaskName from outputParams: %v", subTaskName) From 0076a2d814496ebef96fe97de6804d48dc7f2c24 Mon Sep 17 00:00:00 2001 From: zazulam Date: Tue, 22 Oct 2024 13:35:40 -0400 Subject: [PATCH 14/19] handle parallelfor in getDAGTask Signed-off-by: zazulam Co-authored-by: droctothorpe --- backend/src/v2/driver/driver.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 222bf15b314..0a9f3684649 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1238,7 +1238,16 @@ func getDAGTasks( flattenedTasks[k] = v } for _, v := range currentExecutionTasks { + if v.GetExecution().GetType() == "system.DAGExecution" { + // Iteration index is only applied when using ParallelFor, and in + // that scenario you're guaranteed to have redundant task names even + // within a single DAG, which results in an error when + // mlmd.GetExecutionsInDAG is called. ParallelFor outputs should be + // handled with dsl.Collected. + if _, ok := v.GetExecution().GetCustomProperties()["iteration_index"]; !ok { + continue + } glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) subDAG, err := mlmd.GetDAG(ctx, v.GetExecution().GetId()) if err != nil { From 9127cc13f573272d021999aaace721149b95f729 Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Tue, 22 Oct 2024 15:17:50 -0400 Subject: [PATCH 15/19] Check if iterationIndex is nil Signed-off-by: droctothorpe Co-authored-by: zazulam --- backend/src/v2/driver/driver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 0a9f3684649..1fb4bcb37bd 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1245,7 +1245,8 @@ func getDAGTasks( // within a single DAG, which results in an error when // mlmd.GetExecutionsInDAG is called. ParallelFor outputs should be // handled with dsl.Collected. - if _, ok := v.GetExecution().GetCustomProperties()["iteration_index"]; !ok { + iterationIndex, ok := v.GetExecution().GetCustomProperties()["iteration_index"] + if ok && iterationIndex != nil { continue } glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) From 9d9b578896626279f0830db94b69c0811e288f48 Mon Sep 17 00:00:00 2001 From: zazulam Date: Tue, 22 Oct 2024 18:01:44 -0400 Subject: [PATCH 16/19] check iteration_count for parallel tasks Signed-off-by: zazulam Co-authored-by: droctothorpe --- backend/src/v2/driver/driver.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 1fb4bcb37bd..e1133f27517 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1245,8 +1245,9 @@ func getDAGTasks( // within a single DAG, which results in an error when // mlmd.GetExecutionsInDAG is called. ParallelFor outputs should be // handled with dsl.Collected. - iterationIndex, ok := v.GetExecution().GetCustomProperties()["iteration_index"] - if ok && iterationIndex != nil { + _, ok := v.GetExecution().GetCustomProperties()["iteration_count"] + if ok { + glog.Infof("Found a ParallelFor task, %v. Skipping it.", v.TaskName()) continue } glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName()) From 704d349faafd8c21cc978c9dacbda504605de912 Mon Sep 17 00:00:00 2001 From: zazulam Date: Wed, 23 Oct 2024 13:40:53 -0400 Subject: [PATCH 17/19] update comment & disable v(4) logs Signed-off-by: zazulam --- backend/src/v2/cmd/driver/main.go | 3 ++- backend/src/v2/driver/driver.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/src/v2/cmd/driver/main.go b/backend/src/v2/cmd/driver/main.go index 9437d889862..49d28297280 100644 --- a/backend/src/v2/cmd/driver/main.go +++ b/backend/src/v2/cmd/driver/main.go @@ -85,7 +85,8 @@ func init() { flag.Set("logtostderr", "true") // Change the WARNING to INFO level for debugging. flag.Set("stderrthreshold", "WARNING") - flag.Set("v", "4") + // Enable V(4) logging level for more verbose debugging. + // flag.Set("v", "4") } func validate() error { diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index e1133f27517..7c7cf47b00d 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1240,7 +1240,7 @@ func getDAGTasks( for _, v := range currentExecutionTasks { if v.GetExecution().GetType() == "system.DAGExecution" { - // Iteration index is only applied when using ParallelFor, and in + // Iteration count is only applied when using ParallelFor, and in // that scenario you're guaranteed to have redundant task names even // within a single DAG, which results in an error when // mlmd.GetExecutionsInDAG is called. ParallelFor outputs should be From 4d14cfe9fe44dc431396bdf2b2349dfd58fb85a9 Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Mon, 28 Oct 2024 19:23:26 -0400 Subject: [PATCH 18/19] Address Chen's feedback Signed-off-by: droctothorpe Co-authored-by: zazulam --- backend/src/v2/cmd/driver/main.go | 2 - backend/src/v2/component/launcher_v2.go | 4 +- backend/src/v2/driver/driver.go | 107 +++++++++++------------- 3 files changed, 50 insertions(+), 63 deletions(-) diff --git a/backend/src/v2/cmd/driver/main.go b/backend/src/v2/cmd/driver/main.go index 49d28297280..793ccfe1b80 100644 --- a/backend/src/v2/cmd/driver/main.go +++ b/backend/src/v2/cmd/driver/main.go @@ -85,8 +85,6 @@ func init() { flag.Set("logtostderr", "true") // Change the WARNING to INFO level for debugging. flag.Set("stderrthreshold", "WARNING") - // Enable V(4) logging level for more verbose debugging. - // flag.Set("v", "4") } func validate() error { diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index 8b40ff346e0..a80091699bc 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -148,8 +148,8 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) { } } glog.Infof("publish success.") - // At the end of the current task, we check the statuses of all tasks in the current DAG and update the DAG's status accordingly. - // TODO: If there's a pipeline whose only components are DAGs, this launcher logic will never run and as a result the dag status will never be updated. We need to implement a mechanism to handle this edge case. + // At the end of the current task, we check the statuses of all tasks in + // the current DAG and update the DAG's status accordingly. dag, err := l.metadataClient.GetDAG(ctx, execution.GetExecution().CustomProperties["parent_dag_id"].GetIntValue()) if err != nil { glog.Errorf("DAG Status Update: failed to get DAG: %s", err.Error()) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 7c7cf47b00d..960ec6148e8 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -1148,15 +1148,15 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, // This is the case where the input comes from the output of an upstream task. case *pipelinespec.TaskInputsSpec_InputParameterSpec_TaskOutputParameter: - cfg := resolveUpstreamParametersConfig{ - ctx: ctx, - paramSpec: paramSpec, - dag: dag, - pipeline: pipeline, - mlmd: mlmd, - inputs: inputs, - name: name, - paramError: paramError, + cfg := resolveUpstreamOutputsConfig{ + ctx: ctx, + paramSpec: paramSpec, + dag: dag, + pipeline: pipeline, + mlmd: mlmd, + inputs: inputs, + name: name, + err: paramError, } if err := resolveUpstreamParameters(cfg); err != nil { return nil, err @@ -1198,15 +1198,15 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, inputs.Artifacts[name] = v case *pipelinespec.TaskInputsSpec_InputArtifactSpec_TaskOutputArtifact: - cfg := resolveUpstreamArtifactsConfig{ - ctx: ctx, - artifactSpec: artifactSpec, - dag: dag, - pipeline: pipeline, - mlmd: mlmd, - inputs: inputs, - name: name, - artifactError: artifactError, + cfg := resolveUpstreamOutputsConfig{ + ctx: ctx, + artifactSpec: artifactSpec, + dag: dag, + pipeline: pipeline, + mlmd: mlmd, + inputs: inputs, + name: name, + err: artifactError, } if err := resolveUpstreamArtifacts(cfg); err != nil { return nil, err @@ -1267,45 +1267,47 @@ func getDAGTasks( return flattenedTasks, nil } -// resolveUpstreamParametersConfig is just a config struct used to store the -// input parameters of the resolveUpstreamParameters function. -type resolveUpstreamParametersConfig struct { - ctx context.Context - paramSpec *pipelinespec.TaskInputsSpec_InputParameterSpec - dag *metadata.DAG - pipeline *metadata.Pipeline - mlmd *metadata.Client - inputs *pipelinespec.ExecutorInput_Inputs - name string - paramError func(error) error +// resolveUpstreamOutputsConfig is just a config struct used to store the input +// parameters of the resolveUpstreamParameters and resolveUpstreamArtifacts +// functions. +type resolveUpstreamOutputsConfig struct { + ctx context.Context + paramSpec *pipelinespec.TaskInputsSpec_InputParameterSpec + artifactSpec *pipelinespec.TaskInputsSpec_InputArtifactSpec + dag *metadata.DAG + pipeline *metadata.Pipeline + mlmd *metadata.Client + inputs *pipelinespec.ExecutorInput_Inputs + name string + err func(error) error } // resolveUpstreamParameters resolves input parameters that come from upstream // tasks. These tasks can be components/containers, which is relatively // straightforward, or DAGs, in which case, we need to traverse the graph until // we arrive at a component/container (since there can be n nested DAGs). -func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { +func resolveUpstreamParameters(cfg resolveUpstreamOutputsConfig) error { taskOutput := cfg.paramSpec.GetTaskOutputParameter() glog.V(4).Info("taskOutput: ", taskOutput) producerTaskName := taskOutput.GetProducerTask() if producerTaskName == "" { - return cfg.paramError(fmt.Errorf("producerTaskName is empty")) + return cfg.err(fmt.Errorf("producerTaskName is empty")) } outputParameterKey := taskOutput.GetOutputParameterKey() if outputParameterKey == "" { - return cfg.paramError(fmt.Errorf("output parameter key is empty")) + return cfg.err(fmt.Errorf("output parameter key is empty")) } // Get a list of tasks for the current DAG first. // The reason we use gatDAGTasks instead of mlmd.GetExecutionsInDAG is because the latter does not handle task name collisions in the map which results in a bunch of unhandled edge cases and test failures. tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil) if err != nil { - return cfg.paramError(err) + return cfg.err(err) } producer, ok := tasks[producerTaskName] if !ok { - return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName)) + return cfg.err(fmt.Errorf("producer task, %v, not in tasks", producerTaskName)) } glog.V(4).Info("producer: ", producer) glog.V(4).Infof("tasks: %#v", tasks) @@ -1322,7 +1324,7 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { // and iterate through this loop again. outputParametersCustomProperty, ok := currentTask.GetExecution().GetCustomProperties()["parameter_producer_task"] if !ok { - return cfg.paramError(fmt.Errorf("task, %v, does not have a parameter_producer_task custom property", currentTask.TaskName())) + return cfg.err(fmt.Errorf("task, %v, does not have a parameter_producer_task custom property", currentTask.TaskName())) } glog.V(4).Infof("outputParametersCustomProperty: %#v", outputParametersCustomProperty) @@ -1370,14 +1372,14 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { } } if !successfulOneOfTask { - return cfg.paramError(fmt.Errorf("processing OneOf: No successful task found")) + return cfg.err(fmt.Errorf("processing OneOf: No successful task found")) } } } glog.V(4).Infof("SubTaskName from outputParams: %v", subTaskName) glog.V(4).Infof("OutputParameterKey from outputParams: %v", outputParameterKey) if subTaskName == "" { - return cfg.paramError(fmt.Errorf("producer_subtask not in outputParams")) + return cfg.err(fmt.Errorf("producer_subtask not in outputParams")) } glog.V(4).Infof( "Overriding currentTask, %v, output with currentTask's producer_subtask, %v, output.", @@ -1386,7 +1388,7 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { ) currentTask, ok = tasks[subTaskName] if !ok { - return cfg.paramError(fmt.Errorf("subTaskName, %v, not in tasks", subTaskName)) + return cfg.err(fmt.Errorf("subTaskName, %v, not in tasks", subTaskName)) } } else { @@ -1403,40 +1405,27 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error { return nil } -// resolveUpstreamArtifactsConfig is just a config struct used to store the -// input parameters of the resolveUpstreamArtifacts function. -type resolveUpstreamArtifactsConfig struct { - ctx context.Context - artifactSpec *pipelinespec.TaskInputsSpec_InputArtifactSpec - dag *metadata.DAG - pipeline *metadata.Pipeline - mlmd *metadata.Client - inputs *pipelinespec.ExecutorInput_Inputs - name string - artifactError func(error) error -} - // resolveUpstreamArtifacts resolves input artifacts that come from upstream // tasks. These tasks can be components/containers, which is relatively // straightforward, or DAGs, in which case, we need to traverse the graph until // we arrive at a component/container (since there can be n nested DAGs). -func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { +func resolveUpstreamArtifacts(cfg resolveUpstreamOutputsConfig) error { glog.V(4).Infof("artifactSpec: %#v", cfg.artifactSpec) taskOutput := cfg.artifactSpec.GetTaskOutputArtifact() if taskOutput.GetProducerTask() == "" { - return cfg.artifactError(fmt.Errorf("producer task is empty")) + return cfg.err(fmt.Errorf("producer task is empty")) } if taskOutput.GetOutputArtifactKey() == "" { - cfg.artifactError(fmt.Errorf("output artifact key is empty")) + cfg.err(fmt.Errorf("output artifact key is empty")) } tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false) if err != nil { - cfg.artifactError(err) + cfg.err(err) } producer, ok := tasks[taskOutput.GetProducerTask()] if !ok { - cfg.artifactError( + cfg.err( fmt.Errorf("cannot find producer task %q", taskOutput.GetProducerTask()), ) } @@ -1477,12 +1466,12 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { // Base case, currentTask is a container, not a DAG. outputs, err := cfg.mlmd.GetOutputArtifactsByExecutionId(cfg.ctx, currentTask.GetID()) if err != nil { - cfg.artifactError(err) + cfg.err(err) } glog.V(4).Infof("outputs: %#v", outputs) artifact, ok := outputs[outputArtifactKey] if !ok { - cfg.artifactError( + cfg.err( fmt.Errorf( "cannot find output artifact key %q in producer task %q", taskOutput.GetOutputArtifactKey(), @@ -1492,7 +1481,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error { } runtimeArtifact, err := artifact.ToRuntimeArtifact() if err != nil { - cfg.artifactError(err) + cfg.err(err) } cfg.inputs.Artifacts[cfg.name] = &pipelinespec.ArtifactList{ Artifacts: []*pipelinespec.RuntimeArtifact{runtimeArtifact}, From 17e0c5d9a358a251934fa24fd28e9f2298547570 Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Wed, 30 Oct 2024 15:31:03 -0400 Subject: [PATCH 19/19] Clarify comment Signed-off-by: droctothorpe --- sdk/python/kfp/compiler/pipeline_spec_builder.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index fbc3bb463df..e4ad0938891 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -2009,7 +2009,9 @@ def convert_pipeline_outputs_to_dict( if pipeline_outputs is None: return {} elif isinstance(pipeline_outputs, dict): - # This condition is required to support pipelines that return NamedTuples. + # This condition is required to support the case where a nested pipeline + # returns a namedtuple but its output is converted into a dict by + # earlier invocations of this function (a few lines down). return pipeline_outputs elif isinstance(pipeline_outputs, pipeline_channel.PipelineChannel): return {component_factory.SINGLE_OUTPUT_NAME: pipeline_outputs}