Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk): Fix nested loop with cel outputs #1351

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 51 additions & 13 deletions sdk/python/kfp_tekton/compiler/_tekton_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,17 @@ def process_inline_cr_field(field_name):
nested_custom_task['root_ct'] = relationships['root_ct']

for nested_custom_task in nested_custom_tasks:
# Calculate root level tasks
root_tasks = []
root_task_tmp = [task["name"] for task in tasks]
for root_task in root_task_tmp:
append = True
for task_name in task_list:
if task_name in root_task:
append = False
break
if append:
root_tasks.append(root_task)
nested_custom_task_spec = custom_task[nested_custom_task['nested_custom_task']]['spec']
for custom_task_cr in custom_task_crs:
if custom_task_cr['metadata']['name'] == nested_custom_task['father_ct']:
Expand Down Expand Up @@ -415,6 +426,7 @@ def process_inline_cr_field(field_name):
if not has_nested_task:
break
nested_loop_counter_params = []
nested_loop_counter_param_name = ''
# Fetch nested loop counter params so that it won't find the nested parameters from
# global param level.
for task in custom_task_crs:
Expand All @@ -424,6 +436,8 @@ def process_inline_cr_field(field_name):
for iterate_name in iterate_param_list:
if task['spec'].get(iterate_name):
nested_loop_counter_params.append(task['spec'].get(iterate_name))
if task['metadata']['name'] == nested_custom_task_spec['name']:
nested_loop_counter_param_name = task['spec'].get(iterate_name)

for task in tasks:
if task['name'] == nested_custom_task['root_ct']:
Expand All @@ -435,30 +449,54 @@ def process_inline_cr_field(field_name):
task['params'] = sorted(task['params'], key=lambda k: k['name'])
if task['name'] in all_nested_loop:
for param in task['params']:
if '$(params.' in param['value'] and 'subvar-' not in param['value']:
add_global = False
search_param_results = re.findall('\$\(tasks.([^ \t\n.:,;\{\}]+).results.([^ \t\n.:,;\{\}]+)\)',
param['value']) if isinstance(param['value'], str) else []
if search_param_results and search_param_results[0][0] in root_tasks:
add_global = True
if ('$(params.' in param['value'] or add_global) and 'subvar-' not in param['value']:
global_task_values.add(param['value'])
# Add any pipeline global params to the nested loop layers
all_params = []
for custom_param in custom_task_cr['spec']['pipelineSpec']['params']:
all_params.append(''.join(['$(params.', custom_param['name'], ')']))
for global_task_value in global_task_values:
if global_task_value not in all_params and \
re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)[0] not in nested_loop_counter_params:
all_params.append(global_task_value)
custom_task_cr['spec']['pipelineSpec']['params'].append(
{'name': re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)[0],
'type': 'string'}
)
for task in tasks:
if task['name'] == nested_custom_task['father_ct']:
task['params'].append(
{'name': re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)[0],
'value': global_task_value}
if global_task_value not in all_params:
nested_counter_params = re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)
if not nested_counter_params:
nested_counter_params = re.findall('\$\(tasks.([^ \t\n.:,;\{\}]+).results.([^ \t\n.:,;\{\}]+)\)',
global_task_value)
nested_counter_params = ["-".join(nested_param) for nested_param in nested_counter_params]
if nested_counter_params[0] not in nested_loop_counter_params:
all_params.append(global_task_value)
inject_name = True
for p in custom_task_cr['spec']['pipelineSpec']['params']:
if nested_counter_params[0] == p['name']:
inject_name = False
break
if inject_name:
custom_task_cr['spec']['pipelineSpec']['params'].append(
{'name': nested_counter_params[0],
'type': 'string'}
)
for task in tasks:
if task['name'] == nested_custom_task['father_ct']:
task['params'].append(
{'name': nested_counter_params[0],
'value': global_task_value}
)
for special_param in nested_custom_task_special_params:
for nested_param in nested_custom_task_spec['params']:
if nested_param['name'] == special_param['name']:
nested_param['value'] = '$(params.%s)' % nested_param['name']
for nested_param in nested_custom_task_spec['params']:
if nested_param.get('value') in list(global_task_values) and \
nested_param.get('name') == nested_loop_counter_param_name:
nested_counter_params = re.findall('\$\(tasks.([^ \t\n.:,;\{\}]+).results.([^ \t\n.:,;\{\}]+)\)',
nested_param["value"])
if nested_counter_params:
nested_param["value"] = '$(params.%s)' % \
["-".join(nested_param) for nested_param in nested_counter_params][0]
# need process parameters to replace results
custom_task_cr_task_names = [cr_task['name'] for cr_task in custom_task_cr['spec']['pipelineSpec']['tasks']]
for nested_custom_task_param in nested_custom_task_spec['params']:
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ def test_nested_loop_counter_workflow(self):
from .testdata.nested_loop_counter import loop_3_range
self._test_pipeline_workflow(loop_3_range, 'nested_loop_counter.yaml')

def test_nested_cel_outputs_workflow(self):
"""
Test compiling nested cel outputs in workflow to verify parameters are generated correctly.
"""
from .testdata.nested_cel_outputs import pipeline
self._test_pipeline_workflow(pipeline, 'nested_cel_outputs.yaml')

def test_nested_loop_param_workflow(self):
"""
Test compiling nested loop param in workflow to verify parameters are generated correctly.
Expand Down
68 changes: 68 additions & 0 deletions sdk/python/tests/compiler/testdata/nested_cel_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2023 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from kfp import dsl, components
from kfp.components import load_component_from_text
from kfp_tekton.tekton import Loop

op1_yaml = '''\
name: 'my-in-coop1'
inputs:
- {name: item, type: Integer}
- {name: param, type: Integer}
implementation:
container:
image: library/bash:4.4.23
command: ['sh', '-c']
args:
- |
set -e
echo op1 "$0" "$1"
- {inputValue: item}
- {inputValue: param}
'''


@dsl.pipeline(name='pipeline')
def pipeline():

cel_run_bash_script = load_component_from_text(r"""
name: cel-run-bash-script
inputs: []
outputs:
- {name: env-variables}
implementation:
container:
image: aipipeline/cel-eval:latest
command: [cel]
args: [--apiVersion, custom.tekton.dev/v1alpha1, --kind, VariableStore, --name,
1234567-var-store, --lit_0, 'a', --taskSpec,
'{}']
fileOutputs: {}
""")()
with Loop(cel_run_bash_script.output) as item:
with Loop(cel_run_bash_script.output) as item2:
op1_template = components.load_component_from_text(op1_yaml)
op1 = op1_template(1, 10)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler as Compiler
from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf
tekton_pipeline_conf = TektonPipelineConf()
tekton_pipeline_conf.set_tekton_inline_spec(True)
tekton_pipeline_conf.set_resource_in_separate_yaml(True)
Compiler().compile(pipeline, __file__.replace('.py', '.yaml'), tekton_pipeline_conf=tekton_pipeline_conf)

107 changes: 107 additions & 0 deletions sdk/python/tests/compiler/testdata/nested_cel_outputs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2021-2023 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1
kind: PipelineRun
metadata:
name: pipeline
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"my-in-coop1": []}'
sidecar.istio.io/inject: "false"
tekton.dev/template: ''
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"name": "pipeline"}'
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
spec:
pipelineSpec:
tasks:
- name: cel-run-bash-script
params:
- name: lit_0
value: a
taskSpec:
apiVersion: custom.tekton.dev/v1alpha1
kind: VariableStore
spec: {}
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "cel-run-bash-script",
"outputs": [{"name": "env-variables"}], "version": "cel-run-bash-script@sha256=2cb7a267ff559aa219e1f0554002d093cbd9efb18997692171ee99335950d35b"}'
- name: pipeline-for-loop-2
params:
- name: cel-run-bash-script-env-variables
value: $(tasks.cel-run-bash-script.results.env-variables)
- name: loop-item-param-1
value: $(tasks.cel-run-bash-script.results.env-variables)
taskSpec:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
spec:
pipelineSpec:
params:
- name: cel-run-bash-script-env-variables
type: string
- name: loop-item-param-1
type: string
tasks:
- name: pipeline-for-loop-4
params:
- name: loop-item-param-3
value: $(params.cel-run-bash-script-env-variables)
taskSpec:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
spec:
pipelineSpec:
params:
- name: loop-item-param-3
type: string
tasks:
- name: my-in-coop1
taskSpec:
steps:
- name: main
args:
- |
set -e
echo op1 "$0" "$1"
- '1'
- '10'
command:
- sh
- -c
image: library/bash:4.4.23
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name":
"my-in-coop1", "outputs": [], "version": "my-in-coop1@sha256=8ccab3a28a39a406554d964865f2ccb0aed854a43b6de827f613eff2bccd6f8f"}'
iterateParam: loop-item-param-3
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
iterateParam: loop-item-param-1
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2021-2023 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1
kind: PipelineRun
metadata:
name: pipeline
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"my-in-coop1": []}'
sidecar.istio.io/inject: "false"
tekton.dev/template: ''
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"name": "pipeline"}'
tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1",
"kind": "PipelineLoop", "metadata": {"name": "pipeline-for-loop-2"}, "spec":
{"iterateParam": "loop-item-param-1", "pipelineSpec": {"params": [{"name": "cel-run-bash-script-env-variables",
"type": "string"}, {"name": "loop-item-param-1", "type": "string"}], "tasks":
[{"name": "pipeline-for-loop-4", "params": [{"name": "loop-item-param-3", "value":
"$(params.cel-run-bash-script-env-variables)"}], "taskRef": {"apiVersion": "custom.tekton.dev/v1alpha1",
"kind": "PipelineLoop", "name": "pipeline-for-loop-4"}}]}}}, {"apiVersion":
"custom.tekton.dev/v1alpha1", "kind": "PipelineLoop", "metadata": {"name": "pipeline-for-loop-4"},
"spec": {"iterateParam": "loop-item-param-3", "pipelineSpec": {"params": [{"name":
"loop-item-param-3", "type": "string"}], "tasks": [{"name": "my-in-coop1", "taskSpec":
{"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest":
"{\"name\": \"my-in-coop1\", \"outputs\": [], \"version\": \"my-in-coop1@sha256=8ccab3a28a39a406554d964865f2ccb0aed854a43b6de827f613eff2bccd6f8f\"}"},
"labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "steps": [{"args":
["set -e\necho op1 \"$0\" \"$1\"\n", "1", "10"], "command": ["sh", "-c"], "image":
"library/bash:4.4.23", "name": "main"}]}}]}}}]'
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
spec:
pipelineSpec:
tasks:
- name: cel-run-bash-script
params:
- name: lit_0
value: a
taskSpec:
apiVersion: custom.tekton.dev/v1alpha1
kind: VariableStore
spec: {}
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "cel-run-bash-script",
"outputs": [{"name": "env-variables"}], "version": "cel-run-bash-script@sha256=2cb7a267ff559aa219e1f0554002d093cbd9efb18997692171ee99335950d35b"}'
- name: pipeline-for-loop-2
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: pipeline-for-loop-2
params:
- name: loop-item-param-1
value: $(tasks.cel-run-bash-script.results.env-variables)
- name: cel-run-bash-script-env-variables
value: $(tasks.cel-run-bash-script.results.env-variables)
Loading