Skip to content

Commit

Permalink
feat(V2 backend): Update compiler to produce the merger task of drive…
Browse files Browse the repository at this point in the history
…r + executor (#1412)

* update ci and move manifests to 2.0.3

* update compiler and manifests with the new kfptask controller

* remove comments'

* update parameters

* update tekton parameters to hyphen
  • Loading branch information
Tomcli authored Nov 2, 2023
1 parent baf14d5 commit f825a03
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 1,075 deletions.
96 changes: 22 additions & 74 deletions backend/src/v2/compiler/tektoncompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,39 +237,35 @@ func (i *containerDriverInputs) getParentDagCondition(isExitHandler bool) string

func (c *pipelinerunCompiler) containerDriverTask(name string, inputs *containerDriverInputs) error {

containerDriverName := getContainerDriverTaskName(name)
t, err := c.containerExecutorTemplate(name, inputs.containerDef, c.spec.PipelineInfo.GetName())

if err != nil {
return err
}
driverTask := &pipelineapi.PipelineTask{
Name: containerDriverName,
TaskRef: &pipelineapi.TaskRef{
APIVersion: "kfp-driver.tekton.dev/v1alpha1",
Kind: "KFPDriver",
},
Name: name,
TaskSpec: t,
Params: []pipelineapi.Param{
// "--type", "CONTAINER",
{
Name: paramNameType,
Value: pipelineapi.ParamValue{Type: "string", StringVal: "CONTAINER"},
},
// "--pipeline_name", c.spec.GetPipelineInfo().GetName(),
// "--pipeline-name", c.spec.GetPipelineInfo().GetName(),
{
Name: paramNamePipelineName,
Value: pipelineapi.ParamValue{Type: "string", StringVal: c.spec.GetPipelineInfo().GetName()},
},
// "--run_id", runID(),
// "--run-id", runID(),
{
Name: paramNameRunId,
Name: paramRunId,
Value: pipelineapi.ParamValue{Type: "string", StringVal: runID()},
},
// "--dag_execution_id"
// "--dag-execution-id"
{
Name: paramNameDagExecutionId,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.getParentDagID(c.ExitHandlerScope())},
},
// "--component"
{
Name: paramComponent,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.component},
},
// "--task"
{
Name: paramTask,
Expand All @@ -280,30 +276,33 @@ func (c *pipelinerunCompiler) containerDriverTask(name string, inputs *container
Name: paramContainer,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.container},
},
// "--iteration_index", inputValue(paramIterationIndex),
// "--iteration-index", inputValue(paramIterationIndex),
{
Name: paramNameIterationIndex,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.iterationIndex},
},
// "--kubernetes_config"
// "--kubernetes-config"
{
Name: paramKubernetesConfig,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.kubernetesConfig},
},
// "--mlmd_server_address"
// "--mlmd-server-address"
{
Name: paramNameMLMDServerHost,
Value: pipelineapi.ParamValue{Type: "string", StringVal: GetMLMDHost()},
},
// "--mlmd_server_port"
// "--mlmd-server-port"
{
Name: paramNameMLMDServerPort,
Value: pipelineapi.ParamValue{Type: "string", StringVal: GetMLMDPort()},
},
// "--component"
{
Name: paramComponent,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.component},
},
// produce the following outputs:
// - execution-id
// - executor-input
// - cached-decision
// - condition
},
}
Expand All @@ -325,57 +324,6 @@ func (c *pipelinerunCompiler) containerDriverTask(name string, inputs *container

c.addPipelineTask(driverTask)

// need container driver's output for executor
containerDriverOutputs := containerDriverOutputs{
executionId: taskOutputParameter(containerDriverName, paramExecutionID),
condition: taskOutputParameter(containerDriverName, paramCondition),
executiorInput: taskOutputParameter(containerDriverName, paramExecutorInput),
cached: taskOutputParameter(containerDriverName, paramCachedDecision),
podSpecPatch: taskOutputParameter(containerDriverName, paramPodSpecPatch),
}

t, err := c.containerExecutorTemplate(name, inputs.containerDef, c.spec.PipelineInfo.GetName())

if err != nil {
return err
}

executorTask := &pipelineapi.PipelineTask{
Name: name,
TaskSpec: t,
When: pipelineapi.WhenExpressions{
{
Input: containerDriverOutputs.cached,
Operator: "in",
Values: []string{"false"},
},
},
Params: []pipelineapi.Param{
{
Name: paramExecutorInput,
Value: pipelineapi.ParamValue{Type: "string", StringVal: containerDriverOutputs.executiorInput},
},
{
Name: paramExecutionID,
Value: pipelineapi.ParamValue{Type: "string", StringVal: containerDriverOutputs.executionId},
},
{
Name: paramRunId,
Value: pipelineapi.ParamValue{Type: "string", StringVal: runID()},
},
{
Name: paramComponentSpec,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.component},
},
{
Name: paramPodSpecPatch,
Value: pipelineapi.ParamValue{Type: "string", StringVal: containerDriverOutputs.podSpecPatch},
},
},
}

c.addPipelineTask(executorTask)

return nil
}

Expand All @@ -393,7 +341,7 @@ func (c *pipelinerunCompiler) containerExecutorTemplate(
"--run_id", inputValue(paramRunId),
"--execution_id", inputValue(paramExecutionID),
"--executor_input", inputValue(paramExecutorInput),
"--component_spec", inputValue(paramComponentSpec),
"--component_spec", inputValue(paramComponent),
"--pod_name",
"$(KFP_POD_NAME)",
"--pod_uid",
Expand All @@ -411,7 +359,7 @@ func (c *pipelinerunCompiler) containerExecutorTemplate(
{Name: paramExecutorInput, Type: "string"}, // --executor_input
{Name: paramExecutionID, Type: "string"}, // --execution_id
{Name: paramRunId, Type: "string"}, // --run_id
{Name: paramComponentSpec, Type: "string"}, // --component_spec
{Name: paramComponent, Type: "string"}, // --component
},
Steps: []pipelineapi.Step{
// step 1: copy launcher
Expand Down
36 changes: 18 additions & 18 deletions backend/src/v2/compiler/tektoncompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,26 +233,26 @@ func (c *pipelinerunCompiler) dagDriverTask(
t := &pipelineapi.PipelineTask{
Name: name,
TaskRef: &pipelineapi.TaskRef{
APIVersion: "kfp-driver.tekton.dev/v1alpha1",
Kind: "KFPDriver",
APIVersion: "custom.tekton.dev/v1alpha1",
Kind: "KFPTask",
},
Params: []pipelineapi.Param{
// "--type"
{
Name: paramNameType,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.getDagType()},
},
// "--pipeline_name"
// "--pipeline-name"
{
Name: paramNamePipelineName,
Value: pipelineapi.ParamValue{Type: "string", StringVal: c.spec.GetPipelineInfo().GetName()},
},
// "--run_id"
// "--run-id"
{
Name: paramNameRunId,
Name: paramRunId,
Value: pipelineapi.ParamValue{Type: "string", StringVal: runID()},
},
// "--dag_execution_id"
// "--dag-execution-id"
{
Name: paramNameDagExecutionId,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.getParentDagID(c.ExitHandlerScope())},
Expand All @@ -267,17 +267,17 @@ func (c *pipelinerunCompiler) dagDriverTask(
Name: paramTask,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.task},
},
// "--runtime_config"
// "--runtime-config"
{
Name: paramNameRuntimeConfig,
Value: pipelineapi.ParamValue{Type: "string", StringVal: runtimeConfigJson},
},
// "--iteration_index"
// "--iteration-index"
{
Name: paramNameIterationIndex,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.getIterationIndex()},
},
// "--mlmd_server_address"
// "--mlmd-server-address"
{
Name: paramNameMLMDServerHost,
Value: pipelineapi.ParamValue{Type: "string", StringVal: GetMLMDHost()},
Expand Down Expand Up @@ -309,36 +309,36 @@ func (c *pipelinerunCompiler) dagPubDriverTask(
t := &pipelineapi.PipelineTask{
Name: name,
TaskRef: &pipelineapi.TaskRef{
APIVersion: "kfp-driver.tekton.dev/v1alpha1",
Kind: "KFPDriver",
APIVersion: "custom.tekton.dev/v1alpha1",
Kind: "KFPTask",
},
Params: []pipelineapi.Param{
// "--type"
{
Name: paramNameType,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.getDagType()},
},
// "--pipeline_name"
// "--pipeline-name"
{
Name: paramNamePipelineName,
Value: pipelineapi.ParamValue{Type: "string", StringVal: c.spec.GetPipelineInfo().GetName()},
},
// "--run_id"
// "--run-id"
{
Name: paramNameRunId,
Name: paramRunId,
Value: pipelineapi.ParamValue{Type: "string", StringVal: runID()},
},
// "--dag_execution_id"
// "--dag-execution-id"
{
Name: paramNameDagExecutionId,
Value: pipelineapi.ParamValue{Type: "string", StringVal: inputs.getParentDagID(c.ExitHandlerScope() || rootDagPub)},
},
// "--mlmd_server_address"
// "--mlmd-server-address"
{
Name: paramNameMLMDServerHost,
Value: pipelineapi.ParamValue{Type: "string", StringVal: GetMLMDHost()},
},
// "--mlmd_server_port"
// "--mlmd-server-port"
{
Name: paramNameMLMDServerPort,
Value: pipelineapi.ParamValue{Type: "string", StringVal: GetMLMDPort()},
Expand Down Expand Up @@ -368,7 +368,7 @@ type pubDagDriverInputs struct {
}

func (i *pubDagDriverInputs) getDagType() string {
return "DAG-PUB"
return "DAG_PUB"
}

func (i *pubDagDriverInputs) getParentDagID(isExitHandler bool) string {
Expand Down
4 changes: 2 additions & 2 deletions backend/src/v2/compiler/tektoncompiler/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *pipelinerunCompiler) Importer(name string,
"--component_spec", inputValue(paramComponent),
"--importer_spec", inputValue(paramImporter),
"--pipeline_name", c.spec.PipelineInfo.GetName(),
"--run_id", inputValue(paramRunId),
"--run_id", inputValue(paramNameRunId),
"--parent_dag_id", inputValue(paramParentDagID),
"--pod_name",
"$(KFP_POD_NAME)",
Expand Down Expand Up @@ -140,7 +140,7 @@ func (c *pipelinerunCompiler) Importer(name string,
Value: pipelineapi.ParamValue{Type: "string", StringVal: taskOutputParameter(getDAGDriverTaskName(c.CurrentDag()), paramExecutionID)},
},
{
Name: paramRunId,
Name: paramNameRunId,
Value: pipelineapi.ParamValue{Type: "string", StringVal: runID()},
},
},
Expand Down
30 changes: 15 additions & 15 deletions backend/src/v2/compiler/tektoncompiler/tekton.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,33 +634,33 @@ const (
paramImporter = "importer" // importer spec
paramRuntimeConfig = "runtime-config" // job runtime config, pipeline level inputs
paramParentDagID = "parent-dag-id"
paramExecutionID = "execution-id"
paramIterationItem = "iteration-item"
paramIterationCount = "iteration-count"
paramIterationIndex = "iteration-index"
paramExecutorInput = "executor-input"
paramDriverType = "driver-type"
paramCachedDecision = "cached-decision" // indicate hit cache or not
paramPodSpecPatch = "pod-spec-patch" // a strategic patch merged with the pod spec
paramCondition = "condition" // condition = false -> skip the task
paramRunId = "run-id"
paramComponentSpec = "component-spec"
paramExecutionID = "execution-id"
paramExecutorInput = "executor-input"

paramNameType = "type"
paramNamePipelineName = "pipeline_name"
paramNameRunId = "run_id"
paramNameDagExecutionId = "dag_execution_id"
paramNameRuntimeConfig = "runtime_config"
paramNameIterationIndex = "iteration_index"
paramNameExecutionId = "execution_id"
paramNameIterationCount = "iteration_count"
paramNamePipelineName = "pipeline-name"
paramNameRunId = "run-id"
paramNameDagExecutionId = "dag-execution-id"
paramNameRuntimeConfig = "runtime-config"
paramNameIterationIndex = "iteration-index"
paramNameExecutionId = "execution-id"
paramNameIterationCount = "iteration-count"
paramNameCondition = "condition"
paramNameCachedDecision = "cached_decision"
paramNamePodSpecPatchPath = "pod_spec_patch_path"
paramNameExecutorInput = "executor_input"
paramNameMLMDServerHost = "mlmd_server_address"
paramNameMLMDServerPort = "mlmd_server_port"
paramKubernetesConfig = "kubernetes_config" // stores Kubernetes config
paramNameCachedDecision = "cached-decision"
paramNamePodSpecPatchPath = "pod-spec-patch-path"
paramNameExecutorInput = "executor-input"
paramNameMLMDServerHost = "mlmd-server-address"
paramNameMLMDServerPort = "mlmd-server-port"
paramKubernetesConfig = "kubernetes-config" // stores Kubernetes config

kindPipelineLoop = "PipelineLoop"
subfixPipelineLoop = "-pipelineloop"
Expand Down
Loading

0 comments on commit f825a03

Please sign in to comment.