From 87184fd976057ced8f53490f25edc0c5f1eb0064 Mon Sep 17 00:00:00 2001 From: Helber Belmiro Date: Wed, 31 Jul 2024 04:51:05 -0300 Subject: [PATCH] test: Moved kubeflow-pipelines-samples-v2 to GitHub Actions (#11048) Signed-off-by: hbelmiro --- .github/workflows/kfp-samples.yml | 34 ++++ backend/src/v2/test/Makefile | 15 -- .../src/v2/test/requirements-sample-test.txt | 2 + backend/src/v2/test/sample-test.sh | 16 +- backend/src/v2/test/sample_test.py | 164 ------------------ samples/v2/sample_test.py | 83 +++++++++ 6 files changed, 125 insertions(+), 189 deletions(-) create mode 100644 .github/workflows/kfp-samples.yml create mode 100644 backend/src/v2/test/requirements-sample-test.txt delete mode 100644 backend/src/v2/test/sample_test.py create mode 100644 samples/v2/sample_test.py diff --git a/.github/workflows/kfp-samples.yml b/.github/workflows/kfp-samples.yml new file mode 100644 index 00000000000..08cbed5d14a --- /dev/null +++ b/.github/workflows/kfp-samples.yml @@ -0,0 +1,34 @@ +name: KFP Samples + +on: + push: + branches: + - master + pull_request: + paths: + - 'samples/**' + - 'backend/src/v2/**' + - '.github/workflows/kfp-samples.yml' + +jobs: + samples: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.8 + + - name: Create KFP cluster + uses: ./.github/actions/kfp-cluster + + - name: Forward API port + run: ./scripts/deploy/github/forward-port.sh "kubeflow" "ml-pipeline" 8888 8888 + + - name: Run Samples Tests + run: | + ./backend/src/v2/test/sample-test.sh diff --git a/backend/src/v2/test/Makefile b/backend/src/v2/test/Makefile index cd24ec3e00b..caa945b05fc 100644 --- a/backend/src/v2/test/Makefile +++ b/backend/src/v2/test/Makefile @@ -8,21 +8,6 @@ ENV_PATH?=.env include $(ENV_PATH) SHELL = /bin/bash -.PHONY: all -all: sample-test - -.PHONY: sample-test -sample-test: upload -# The -u flag makes python output unbuffered, so that we can see real time log. -# Reference: https://stackoverflow.com/a/107717 - export KF_PIPELINES_ENDPOINT=$(HOST) \ - && python -u sample_test.py \ - --samples_config samples/test/config.yaml \ - --context $(GCS_ROOT)/src/context.tar.gz \ - --gcs_root $(GCS_ROOT)/data \ - --gcr_root $(GCR_ROOT) \ - --kfp_package_path "$(KFP_PACKAGE_PATH)" - .PHONY: integration-test integration-test: upload export KF_PIPELINES_ENDPOINT=$(HOST) \ diff --git a/backend/src/v2/test/requirements-sample-test.txt b/backend/src/v2/test/requirements-sample-test.txt new file mode 100644 index 00000000000..3686f9f8439 --- /dev/null +++ b/backend/src/v2/test/requirements-sample-test.txt @@ -0,0 +1,2 @@ +../../../../sdk/python +kfp[kubernetes] diff --git a/backend/src/v2/test/sample-test.sh b/backend/src/v2/test/sample-test.sh index 8d8223e65de..b03b5ea9147 100755 --- a/backend/src/v2/test/sample-test.sh +++ b/backend/src/v2/test/sample-test.sh @@ -15,18 +15,14 @@ # limitations under the License. set -ex -source_root=$(pwd) -DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" > /dev/null && pwd)" -cd "${DIR}" -source "${DIR}/scripts/ci-env.sh" +pushd ./backend/src/v2/test -# Install required packages from commit python3 -m pip install --upgrade pip +python3 -m pip install -r ./requirements-sample-test.txt -# TODO: remove deprecated dependency -python3 -m pip install -r $source_root/sdk/python/requirements-deprecated.txt -python3 -m pip install $source_root/sdk/python +popd -# Run sample test -ENV_PATH=kfp-ci.env make +# The -u flag makes python output unbuffered, so that we can see real time log. +# Reference: https://stackoverflow.com/a/107717 +python3 -u ./samples/v2/sample_test.py diff --git a/backend/src/v2/test/sample_test.py b/backend/src/v2/test/sample_test.py deleted file mode 100644 index 6cb4c12a309..00000000000 --- a/backend/src/v2/test/sample_test.py +++ /dev/null @@ -1,164 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# %% -import os -from typing import Dict, List -import json -import yaml -from kubernetes import client as k8s_client -import kfp.deprecated as kfp - -download_gcs_tgz = kfp.components.load_component_from_file( - 'components/download_gcs_tgz.yaml') -run_sample = kfp.components.load_component_from_file( - 'components/run_sample.yaml') -kaniko = kfp.components.load_component_from_file('components/kaniko.yaml') -build_go = kfp.components.load_component_from_file('components/build_go.yaml') - -_MINUTE = 60 # seconds - - -@kfp.dsl.pipeline(name='v2 sample test') -def v2_sample_test( - samples_config: List[Dict] = [ - { # TODO(Bobgy): why is the default value needed to pass argo lint? - 'name': 'example', - 'path': 'samples.v2.hello_world_test' - } - ], - context: 'URI' = 'gs://your-bucket/path/to/context.tar.gz', - gcs_root: 'URI' = 'gs://ml-pipeline-test/v2', - image_registry: 'URI' = 'gcr.io/ml-pipeline-test', - kfp_host: 'URI' = 'http://ml-pipeline:8888', - kfp_package_path: - 'URI' = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python' -): - download_src_op = download_gcs_tgz(gcs_path=context).set_cpu_limit( - '0.5').set_memory_limit('500Mi').set_display_name('download_src') - download_src_op.execution_options.caching_strategy.max_cache_staleness = "P0D" - - def build_image(name: str, dockerfile: str) -> kfp.dsl.ContainerOp: - task: kfp.dsl.ContainerOp = kaniko( - context_artifact=download_src_op.outputs['folder'], - destination=f'{image_registry}/{name}', - dockerfile=dockerfile, - ) - # CPU request/limit can be more flexible (request < limit), because being assigned to a node - # with insufficient CPU resource will only slow the task down, but not fail. - task.container.set_cpu_request('1').set_cpu_limit('2') - # Memory request/limit needs to be more rigid (request == limit), because in a node without - # enough memory, the task can hang indefinetely or OOM. - task.container.set_memory_request('4Gi').set_memory_limit('4Gi') - task.set_display_name(f'build-image-{name}') - task.set_retry( - 1, policy='Always' - ) # Always -> retry on both system error and user code failure. - return task - - # build v2 go images - build_go_op = build_go( - destination=f'{image_registry}/kfp-', - context=download_src_op.outputs['folder'], - ) - build_go_op.set_retry(1, policy='Always') - build_go_op.container.set_cpu_request('1').set_cpu_limit('2') - build_go_op.container.set_memory_request('4Gi').set_memory_limit('4Gi') - - # build sample test image - build_samples_image_op = build_image( - name='v2-sample-test', - dockerfile='backend/src/v2/test/Dockerfile', - ) - - # run test samples in parallel - with kfp.dsl.ParallelFor(samples_config) as sample: - run_sample_op: kfp.dsl.ContainerOp = run_sample( - name=sample.name, - sample_path=sample.path, - gcs_root=gcs_root, - external_host=kfp_host, - launcher_v2_image=build_go_op.outputs['digest_launcher_v2'], - driver_image=build_go_op.outputs['digest_driver'], - backend_compiler=build_go_op.outputs['backend_compiler'], - ) - run_sample_op.container.image = build_samples_image_op.outputs['digest'] - run_sample_op.set_display_name(f'sample_{sample.name}') - run_sample_op.set_retry(1, policy='Always') - - run_sample_op.container.add_env_variable( - k8s_client.V1EnvVar( - name='KFP_PACKAGE_PATH', value=kfp_package_path)) - - -def main( - context: str, - gcr_root: str, - gcs_root: str, - experiment: str = 'v2_sample_test', - timeout_mins: float = 40, - kfp_package_path: - str = 'git+https://github.com/kubeflow/pipelines#egg=kfp&subdirectory=sdk/python', - samples_config: str = os.path.join('samples', 'test', 'config.yaml'), -): - REPO_ROOT = os.path.join('..', '..', '..', '..') - samples_config_path = os.path.join(REPO_ROOT, samples_config) - samples_config_content = None - with open(samples_config_path, 'r') as stream: - samples_config_content = yaml.safe_load(stream) - - client = kfp.Client() - # TODO(Bobgy): avoid using private fields when getting loaded config - host = client._existing_config.host - client.create_experiment( - name=experiment, - description='An experiment with Kubeflow Pipelines v2 sample test runs.' - ) - conf = kfp.dsl.PipelineConf() - conf.set_timeout( - timeout_mins * _MINUTE - ) # add timeout to avoid pipelines stuck in running leak indefinetely - - print('Using KFP package path: {}'.format(kfp_package_path)) - run_result = client.create_run_from_pipeline_func( - v2_sample_test, - { - 'samples_config': samples_config_content, - 'context': context, - 'image_registry': f'{gcr_root}/test', - 'gcs_root': gcs_root, - 'kfp_host': host, - 'kfp_package_path': kfp_package_path, - }, - experiment_name=experiment, - pipeline_conf=conf, - ) - print("Run details page URL:") - print(f"{host}/#/runs/details/{run_result.run_id}") - run_response = run_result.wait_for_run_completion(timeout_mins * _MINUTE) - run = run_response.run - from pprint import pprint - # Hide verbose content - run_response.run.pipeline_spec.workflow_manifest = None - pprint(run_response.run) - print("Run details page URL:") - print(f"{host}/#/runs/details/{run_result.run_id}") - assert run.status == 'Succeeded' - # TODO(Bobgy): print debug info - - -# %% -if __name__ == "__main__": - import fire - fire.Fire(main) diff --git a/samples/v2/sample_test.py b/samples/v2/sample_test.py new file mode 100644 index 00000000000..d34599a3c18 --- /dev/null +++ b/samples/v2/sample_test.py @@ -0,0 +1,83 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import unittest +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from pprint import pprint +from typing import List + +import kfp +from kfp.dsl.graph_component import GraphComponent +import component_with_optional_inputs +import pipeline_with_env +import hello_world +import producer_consumer_param +import pipeline_container_no_input +import two_step_pipeline_containerized + +_MINUTE = 60 # seconds +_DEFAULT_TIMEOUT = 5 * _MINUTE + + +@dataclass +class TestCase: + pipeline_func: GraphComponent + timeout: int = _DEFAULT_TIMEOUT + + +class SampleTest(unittest.TestCase): + _kfp_host_and_port = os.getenv('KFP_API_HOST_AND_PORT', 'http://localhost:8888') + _kfp_ui_and_port = os.getenv('KFP_UI_HOST_AND_PORT', 'http://localhost:8080') + _client = kfp.Client(host=_kfp_host_and_port, ui_host=_kfp_ui_and_port) + + def test(self): + test_cases: List[TestCase] = [ + TestCase(pipeline_func=hello_world.pipeline_hello_world), + TestCase(pipeline_func=producer_consumer_param.producer_consumer_param_pipeline), + TestCase(pipeline_func=pipeline_container_no_input.pipeline_container_no_input), + TestCase(pipeline_func=two_step_pipeline_containerized.two_step_pipeline_containerized), + TestCase(pipeline_func=component_with_optional_inputs.pipeline), + TestCase(pipeline_func=pipeline_with_env.pipeline_with_env), + + # The following tests are not working. Tracking issue: https://github.com/kubeflow/pipelines/issues/11053 + # TestCase(pipeline_func=pipeline_with_importer.pipeline_with_importer), + # TestCase(pipeline_func=pipeline_with_volume.pipeline_with_volume), + # TestCase(pipeline_func=pipeline_with_secret_as_volume.pipeline_secret_volume), + # TestCase(pipeline_func=pipeline_with_secret_as_env.pipeline_secret_env), + ] + + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.run_test_case, test_case.pipeline_func, test_case.timeout) + for test_case in test_cases + ] + for future in as_completed(futures): + future.result() + + def run_test_case(self, pipeline_func: GraphComponent, timeout: int): + with self.subTest(pipeline=pipeline_func, msg=pipeline_func.name): + run_result = self._client.create_run_from_pipeline_func(pipeline_func=pipeline_func) + + run_response = run_result.wait_for_run_completion(timeout) + + pprint(run_response.run_details) + print("Run details page URL:") + print(f"{self._kfp_ui_and_port}/#/runs/details/{run_response.run_id}") + + self.assertEqual(run_response.state, "SUCCEEDED") + + +if __name__ == '__main__': + unittest.main()