From 849367dc56697887a2c9caf953c516a099902c25 Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Wed, 27 Sep 2023 17:56:28 -0700 Subject: [PATCH] fix nested loop with cel outputs --- .../kfp_tekton/compiler/_tekton_handler.py | 64 ++++++++--- sdk/python/tests/compiler/compiler_tests.py | 7 ++ .../compiler/testdata/nested_cel_outputs.py | 68 +++++++++++ .../compiler/testdata/nested_cel_outputs.yaml | 107 ++++++++++++++++++ .../nested_cel_outputs_noninlined.yaml | 74 ++++++++++++ 5 files changed, 307 insertions(+), 13 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/nested_cel_outputs.py create mode 100644 sdk/python/tests/compiler/testdata/nested_cel_outputs.yaml create mode 100644 sdk/python/tests/compiler/testdata/nested_cel_outputs_noninlined.yaml diff --git a/sdk/python/kfp_tekton/compiler/_tekton_handler.py b/sdk/python/kfp_tekton/compiler/_tekton_handler.py index 98e78cfb02..78e8f15da3 100644 --- a/sdk/python/kfp_tekton/compiler/_tekton_handler.py +++ b/sdk/python/kfp_tekton/compiler/_tekton_handler.py @@ -374,6 +374,17 @@ def process_inline_cr_field(field_name): nested_custom_task['root_ct'] = relationships['root_ct'] for nested_custom_task in nested_custom_tasks: + # Calculate root level tasks + root_tasks = [] + root_task_tmp = [task["name"] for task in tasks] + for root_task in root_task_tmp: + append = True + for task_name in task_list: + if task_name in root_task: + append = False + break + if append: + root_tasks.append(root_task) nested_custom_task_spec = custom_task[nested_custom_task['nested_custom_task']]['spec'] for custom_task_cr in custom_task_crs: if custom_task_cr['metadata']['name'] == nested_custom_task['father_ct']: @@ -415,6 +426,7 @@ def process_inline_cr_field(field_name): if not has_nested_task: break nested_loop_counter_params = [] + nested_loop_counter_param_name = '' # Fetch nested loop counter params so that it won't find the nested parameters from # global param level. for task in custom_task_crs: @@ -424,6 +436,8 @@ def process_inline_cr_field(field_name): for iterate_name in iterate_param_list: if task['spec'].get(iterate_name): nested_loop_counter_params.append(task['spec'].get(iterate_name)) + if task['metadata']['name'] == nested_custom_task_spec['name']: + nested_loop_counter_param_name = task['spec'].get(iterate_name) for task in tasks: if task['name'] == nested_custom_task['root_ct']: @@ -435,30 +449,54 @@ def process_inline_cr_field(field_name): task['params'] = sorted(task['params'], key=lambda k: k['name']) if task['name'] in all_nested_loop: for param in task['params']: - if '$(params.' in param['value'] and 'subvar-' not in param['value']: + add_global = False + search_param_results = re.findall('\$\(tasks.([^ \t\n.:,;\{\}]+).results.([^ \t\n.:,;\{\}]+)\)', + param['value']) if isinstance(param['value'], str) else [] + if search_param_results and search_param_results[0][0] in root_tasks: + add_global = True + if ('$(params.' in param['value'] or add_global) and 'subvar-' not in param['value']: global_task_values.add(param['value']) # Add any pipeline global params to the nested loop layers all_params = [] for custom_param in custom_task_cr['spec']['pipelineSpec']['params']: all_params.append(''.join(['$(params.', custom_param['name'], ')'])) for global_task_value in global_task_values: - if global_task_value not in all_params and \ - re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)[0] not in nested_loop_counter_params: - all_params.append(global_task_value) - custom_task_cr['spec']['pipelineSpec']['params'].append( - {'name': re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)[0], - 'type': 'string'} - ) - for task in tasks: - if task['name'] == nested_custom_task['father_ct']: - task['params'].append( - {'name': re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)[0], - 'value': global_task_value} + if global_task_value not in all_params: + nested_counter_params = re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value) + if not nested_counter_params: + nested_counter_params = re.findall('\$\(tasks.([^ \t\n.:,;\{\}]+).results.([^ \t\n.:,;\{\}]+)\)', + global_task_value) + nested_counter_params = ["-".join(nested_param) for nested_param in nested_counter_params] + if nested_counter_params[0] not in nested_loop_counter_params: + all_params.append(global_task_value) + inject_name = True + for p in custom_task_cr['spec']['pipelineSpec']['params']: + if nested_counter_params[0] == p['name']: + inject_name = False + break + if inject_name: + custom_task_cr['spec']['pipelineSpec']['params'].append( + {'name': nested_counter_params[0], + 'type': 'string'} ) + for task in tasks: + if task['name'] == nested_custom_task['father_ct']: + task['params'].append( + {'name': nested_counter_params[0], + 'value': global_task_value} + ) for special_param in nested_custom_task_special_params: for nested_param in nested_custom_task_spec['params']: if nested_param['name'] == special_param['name']: nested_param['value'] = '$(params.%s)' % nested_param['name'] + for nested_param in nested_custom_task_spec['params']: + if nested_param.get('value') in list(global_task_values) and \ + nested_param.get('name') == nested_loop_counter_param_name: + nested_counter_params = re.findall('\$\(tasks.([^ \t\n.:,;\{\}]+).results.([^ \t\n.:,;\{\}]+)\)', + nested_param["value"]) + if nested_counter_params: + nested_param["value"] = '$(params.%s)' % \ + ["-".join(nested_param) for nested_param in nested_counter_params][0] # need process parameters to replace results custom_task_cr_task_names = [cr_task['name'] for cr_task in custom_task_cr['spec']['pipelineSpec']['tasks']] for nested_custom_task_param in nested_custom_task_spec['params']: diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index e63369c58e..b92ace5acd 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -235,6 +235,13 @@ def test_nested_loop_counter_workflow(self): from .testdata.nested_loop_counter import loop_3_range self._test_pipeline_workflow(loop_3_range, 'nested_loop_counter.yaml') + def test_nested_cel_outputs_workflow(self): + """ + Test compiling nested cel outputs in workflow to verify parameters are generated correctly. + """ + from .testdata.nested_cel_outputs import pipeline + self._test_pipeline_workflow(pipeline, 'nested_cel_outputs.yaml') + def test_nested_loop_param_workflow(self): """ Test compiling nested loop param in workflow to verify parameters are generated correctly. diff --git a/sdk/python/tests/compiler/testdata/nested_cel_outputs.py b/sdk/python/tests/compiler/testdata/nested_cel_outputs.py new file mode 100644 index 0000000000..a2006f9a46 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/nested_cel_outputs.py @@ -0,0 +1,68 @@ +# Copyright 2023 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from kfp import dsl, components +from kfp.components import load_component_from_text +from kfp_tekton.tekton import Loop + +op1_yaml = '''\ +name: 'my-in-coop1' +inputs: +- {name: item, type: Integer} +- {name: param, type: Integer} +implementation: + container: + image: library/bash:4.4.23 + command: ['sh', '-c'] + args: + - | + set -e + echo op1 "$0" "$1" + - {inputValue: item} + - {inputValue: param} +''' + + +@dsl.pipeline(name='pipeline') +def pipeline(): + + cel_run_bash_script = load_component_from_text(r""" + name: cel-run-bash-script + inputs: [] + outputs: + - {name: env-variables} + implementation: + container: + image: aipipeline/cel-eval:latest + command: [cel] + args: [--apiVersion, custom.tekton.dev/v1alpha1, --kind, VariableStore, --name, + 1234567-var-store, --lit_0, 'a', --taskSpec, + '{}'] + fileOutputs: {} + """)() + with Loop(cel_run_bash_script.output) as item: + with Loop(cel_run_bash_script.output) as item2: + op1_template = components.load_component_from_text(op1_yaml) + op1 = op1_template(1, 10) + + +if __name__ == '__main__': + from kfp_tekton.compiler import TektonCompiler as Compiler + from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf + tekton_pipeline_conf = TektonPipelineConf() + tekton_pipeline_conf.set_tekton_inline_spec(True) + tekton_pipeline_conf.set_resource_in_separate_yaml(True) + Compiler().compile(pipeline, __file__.replace('.py', '.yaml'), tekton_pipeline_conf=tekton_pipeline_conf) + diff --git a/sdk/python/tests/compiler/testdata/nested_cel_outputs.yaml b/sdk/python/tests/compiler/testdata/nested_cel_outputs.yaml new file mode 100644 index 0000000000..bfa9eb7fc0 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/nested_cel_outputs.yaml @@ -0,0 +1,107 @@ +# Copyright 2021-2023 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + name: pipeline + annotations: + tekton.dev/output_artifacts: '{}' + tekton.dev/input_artifacts: '{}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"my-in-coop1": []}' + sidecar.istio.io/inject: "false" + tekton.dev/template: '' + pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME + pipelines.kubeflow.org/pipeline_spec: '{"name": "pipeline"}' + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' +spec: + pipelineSpec: + tasks: + - name: cel-run-bash-script + params: + - name: lit_0 + value: a + taskSpec: + apiVersion: custom.tekton.dev/v1alpha1 + kind: VariableStore + spec: {} + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "cel-run-bash-script", + "outputs": [{"name": "env-variables"}], "version": "cel-run-bash-script@sha256=2cb7a267ff559aa219e1f0554002d093cbd9efb18997692171ee99335950d35b"}' + - name: pipeline-for-loop-2 + params: + - name: cel-run-bash-script-env-variables + value: $(tasks.cel-run-bash-script.results.env-variables) + - name: loop-item-param-1 + value: $(tasks.cel-run-bash-script.results.env-variables) + taskSpec: + apiVersion: custom.tekton.dev/v1alpha1 + kind: PipelineLoop + spec: + pipelineSpec: + params: + - name: cel-run-bash-script-env-variables + type: string + - name: loop-item-param-1 + type: string + tasks: + - name: pipeline-for-loop-4 + params: + - name: loop-item-param-3 + value: $(params.cel-run-bash-script-env-variables) + taskSpec: + apiVersion: custom.tekton.dev/v1alpha1 + kind: PipelineLoop + spec: + pipelineSpec: + params: + - name: loop-item-param-3 + type: string + tasks: + - name: my-in-coop1 + taskSpec: + steps: + - name: main + args: + - | + set -e + echo op1 "$0" "$1" + - '1' + - '10' + command: + - sh + - -c + image: library/bash:4.4.23 + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": + "my-in-coop1", "outputs": [], "version": "my-in-coop1@sha256=8ccab3a28a39a406554d964865f2ccb0aed854a43b6de827f613eff2bccd6f8f"}' + iterateParam: loop-item-param-3 + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + iterateParam: loop-item-param-1 + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" diff --git a/sdk/python/tests/compiler/testdata/nested_cel_outputs_noninlined.yaml b/sdk/python/tests/compiler/testdata/nested_cel_outputs_noninlined.yaml new file mode 100644 index 0000000000..dbb155ae52 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/nested_cel_outputs_noninlined.yaml @@ -0,0 +1,74 @@ +# Copyright 2021-2023 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + name: pipeline + annotations: + tekton.dev/output_artifacts: '{}' + tekton.dev/input_artifacts: '{}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"my-in-coop1": []}' + sidecar.istio.io/inject: "false" + tekton.dev/template: '' + pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME + pipelines.kubeflow.org/pipeline_spec: '{"name": "pipeline"}' + tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1", + "kind": "PipelineLoop", "metadata": {"name": "pipeline-for-loop-2"}, "spec": + {"iterateParam": "loop-item-param-1", "pipelineSpec": {"params": [{"name": "cel-run-bash-script-env-variables", + "type": "string"}, {"name": "loop-item-param-1", "type": "string"}], "tasks": + [{"name": "pipeline-for-loop-4", "params": [{"name": "loop-item-param-3", "value": + "$(params.cel-run-bash-script-env-variables)"}], "taskRef": {"apiVersion": "custom.tekton.dev/v1alpha1", + "kind": "PipelineLoop", "name": "pipeline-for-loop-4"}}]}}}, {"apiVersion": + "custom.tekton.dev/v1alpha1", "kind": "PipelineLoop", "metadata": {"name": "pipeline-for-loop-4"}, + "spec": {"iterateParam": "loop-item-param-3", "pipelineSpec": {"params": [{"name": + "loop-item-param-3", "type": "string"}], "tasks": [{"name": "my-in-coop1", "taskSpec": + {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest": + "{\"name\": \"my-in-coop1\", \"outputs\": [], \"version\": \"my-in-coop1@sha256=8ccab3a28a39a406554d964865f2ccb0aed854a43b6de827f613eff2bccd6f8f\"}"}, + "labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "steps": [{"args": + ["set -e\necho op1 \"$0\" \"$1\"\n", "1", "10"], "command": ["sh", "-c"], "image": + "library/bash:4.4.23", "name": "main"}]}}]}}}]' + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' +spec: + pipelineSpec: + tasks: + - name: cel-run-bash-script + params: + - name: lit_0 + value: a + taskSpec: + apiVersion: custom.tekton.dev/v1alpha1 + kind: VariableStore + spec: {} + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "cel-run-bash-script", + "outputs": [{"name": "env-variables"}], "version": "cel-run-bash-script@sha256=2cb7a267ff559aa219e1f0554002d093cbd9efb18997692171ee99335950d35b"}' + - name: pipeline-for-loop-2 + taskRef: + apiVersion: custom.tekton.dev/v1alpha1 + kind: PipelineLoop + name: pipeline-for-loop-2 + params: + - name: loop-item-param-1 + value: $(tasks.cel-run-bash-script.results.env-variables) + - name: cel-run-bash-script-env-variables + value: $(tasks.cel-run-bash-script.results.env-variables)