Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Super pipeline KFPv2. #488

Merged
merged 11 commits into from
Aug 19, 2024
2 changes: 1 addition & 1 deletion .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ HEADER_CLEANSER_RAY_VERSION=$(DPK_VERSION)
# Begin versions that the repo depends on.

KFP_v2=2.2.0
KFP_v2_SDK=2.7.0
KFP_v2_SDK=2.8.0
KFP_v1=1.8.5
KFP_v1_SDK=1.8.22
RAY=2.24.0
Expand Down
6 changes: 3 additions & 3 deletions kfp/kfp_ray_components/createRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ description: Creates Ray cluster
inputs:
- { name: ray_name, type: String, description: "Ray cluster user name" }
- { name: run_id, type: String, description: "The KFP Run ID" }
- { name: ray_head_options, type: String, default: "", description: "ray head configuration" }
- { name: ray_worker_options, type: String, default: "", description: "ray worker configuration" }
- { name: ray_head_options, type: JsonObject, default: "{}", description: "ray head configuration" }
- { name: ray_worker_options, type: JsonObject, default: "{}", description: "ray worker configuration" }
- { name: server_url, type: String, default: "", description: "url of api server" }
- { name: additional_params, type: String, default: "{}", description: "additional parameters" }

implementation:
container:
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:latest"
Mohammad-nassar10 marked this conversation as resolved.
Show resolved Hide resolved
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
30 changes: 30 additions & 0 deletions kfp/superworkflows/ray/kfp_v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Chaining transforms using KFP V2

As in [super pipelines of KFP v1](../../../doc/multi_transform_pipeline.md), we want to offer an option of running a series of transforms one after the other on the data. But, in KFP v2 we can make it easier to chain transforms using the [nested pipelines](https://www.kubeflow.org/docs/components/pipelines/user-guides/components/compose-components-into-pipelines/#pipelines-as-components) that KFP v2 offers.

One example of chaining `noop` and `document id` transforms can be found [here](superpipeline_noop_docId_v2.py). When running this pipeline it appears as hierarchical graph with two nested pipelines, one for each transform as shown in the following screenshots.

`root` Layer
![nested_pipeline](nested_pipeline.png)

`root -> noop-ray-pipeline` Layer
![noop_nested_pipeline](noop_nested.png)

`root -> noop-ray-pipeline -> exit-handler-1` Layer
![noop_layer_pipeline](noop_layer.png)

Another useful feature of the KFP v2 is the `Json` editor for the `dict` type input parameter as shown here:
![json_param](json_param.png)

## Main differences from KFP v1 superpipeline:
- It is not required to upload the transforms pipelines before running the superpipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe to add that the entire pipeline and the nested pipelines are up to date.

- It created just one run that includes all the nested transfroms and their sub-tasks.
- No need for additional component as `executeSubWorkflowComponent.yaml`. All the implementation in the same pipeline file.
- One missing thing in this implementation is the overriding parameters option.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here it is not clear what do you mean

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sentence better:
In superpipelines of KFP v1 there exists an option to override the common parameters with specific values for each one of the transforms. This option is missing in the KFP v2 superpipelines.


### How to compile the superpipeline
```
export KFPv2=1
export PYTHONPATH=../../../../transforms
python3 superpipeline_noop_docId_v2.py
Mohammad-nassar10 marked this conversation as resolved.
Show resolved Hide resolved
```
103 changes: 103 additions & 0 deletions kfp/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from typing import Any, NamedTuple
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp import dsl

from universal.noop.kfp_ray.noop_wf import noop
from universal.doc_id.kfp_ray.doc_id_wf import doc_id

noop_image = "quay.io/dataprep1/data-prep-kit/noop-ray:latest"
doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id-ray:latest"


def _remove_unused_params(d: dict[str, Any]) -> None:
d.pop("input_path", None)
d.pop("output_path", None)
d.pop("intermediate_path", None)
d.pop("skip", None)
d.pop("name", None)
d.pop("overriding_params", None)
return


@dsl.component
def prepare_params(input_path: str, output_path: str) -> str:
data_s3_config = "{'input_folder': '" + input_path + "', 'output_folder': '" + output_path + "'}"
return data_s3_config


@dsl.pipeline
def super_pipeline(
# the super pipeline parameters
p1_pipeline_runtime_pipeline_id: str = "pipeline_id",
p1_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p1_pipeline_input_path: str = "test/doc_id/input/",
p1_pipeline_output_path: str = "test/super/output/",
p1_pipeline_intermediate_path: str = "test/super/output/tmp",
p1_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p1_pipeline_data_s3_access_secret: str = "s3-secret",
p1_pipeline_runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"},
p1_pipeline_runtime_actor_options: dict = {'num_cpus': 0.8},
# data access
p1_pipeline_data_max_files: int = -1,
p1_pipeline_data_num_samples: int = -1,
p1_pipeline_data_checkpointing: bool = False,
# noop step parameters
p2_name: str = "noop",
p2_skip: bool = False,
p2_noop_sleep_sec: int = 10,
p2_ray_name: str = "noop-kfp-ray",
p2_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "", "image": noop_image},
p2_ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": "", "image": noop_image},
# Document ID step parameters
p3_name: str = "doc_id",
p3_ray_name: str = "docid-kfp-ray",
p3_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "", "image": doc_id_image},
p3_ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": "", "image": doc_id_image},
# p3_skip: bool = False,
# orchestrator
p3_data_data_sets: str = "",
p3_data_files_to_use: str = "['.parquet']",
# doc id parameters
p3_doc_id_doc_column: str = "contents",
p3_doc_id_hash_column: str = "hash_column",
p3_doc_id_int_column: str = "int_id_column",
):
args = locals()
common_params_prefix = "p1_pipeline_"
transform1_prefix = "p2_"
transform2_prefix = "p3_"
# split the input parameters according to thier prefixes.
common_params = {key[len(common_params_prefix) :]: value for key, value in args.items() if key.startswith(common_params_prefix)}
task1_params = {key[len(transform1_prefix) :]: value for key, value in args.items() if key.startswith(transform1_prefix)}
task2_params = {key[len(transform2_prefix) :]: value for key, value in args.items() if key.startswith(transform2_prefix)}

# get the input path, output path of the whole pipeline, and the intermediate path for storing the files between the transforms
input_path = common_params.get("input_path", "")
output_path = common_params.get("output_path", "")
inter_path = common_params.get("intermediate_path", "")

# execute the first transform
pipeline_prms_to_pass = common_params | task1_params
_remove_unused_params(pipeline_prms_to_pass)
# get the data config
data_config = prepare_params(input_path=input_path, output_path=inter_path)
pipeline_prms_to_pass["data_s3_config"] = data_config.output
# call the noop pipeline from noop_wf.py file with the expected parameters
noop_task = noop(**pipeline_prms_to_pass)

# execute the second transform
pipeline_prms_to_pass = common_params | task2_params
_remove_unused_params(pipeline_prms_to_pass)
# get the data config
data_config = prepare_params(input_path=inter_path, output_path=output_path)
pipeline_prms_to_pass["data_s3_config"] = data_config.output
# call the doc_id pipeline from doc_id_wf.py file with the expected parameters
doc_id_task = doc_id(**pipeline_prms_to_pass)
doc_id_task.after(noop_task)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(super_pipeline, __file__.replace(".py", ".yaml"))
25 changes: 12 additions & 13 deletions transforms/universal/doc_id/kfp_ray/doc_id_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
# the name of the job script
EXEC_SCRIPT_NAME: str = "doc_id_transform_ray.py"
# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:latest"
Mohammad-nassar10 marked this conversation as resolved.
Show resolved Hide resolved

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: str,
actor_options: str,
worker_options: dict,
actor_options: dict,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
Expand All @@ -40,7 +40,7 @@ def compute_exec_params_func(
data_files_to_use: str,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
runtime_code_location: dict,
doc_id_doc_column: str,
doc_id_hash_column: str,
doc_id_int_column: str,
Expand All @@ -52,13 +52,13 @@ def compute_exec_params_func(
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"data_checkpointing": data_checkpointing,
"data_data_sets": data_data_sets,
"data_data_sets": data_data_sets.strip(),
"data_files_to_use": data_files_to_use,
"runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options),
"runtime_worker_options": actor_options,
"runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)),
"runtime_worker_options": str(actor_options),
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
"runtime_code_location": str(runtime_code_location),
"doc_id_doc_column": doc_id_doc_column,
"doc_id_hash_column": doc_id_hash_column,
"doc_id_int_column": doc_id_int_column,
Expand Down Expand Up @@ -106,9 +106,8 @@ def doc_id(
# Ray cluster
ray_name: str = "doc_id-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, '
'"image": "' + task_image + '"}',
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/doc_id/input/', 'output_folder': 'test/doc_id/output/'}",
Expand All @@ -119,9 +118,9 @@ def doc_id(
data_data_sets: str = "",
data_files_to_use: str = "['.parquet']",
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# doc id parameters
doc_id_doc_column: str = "contents",
doc_id_hash_column: str = "hash_column",
Expand Down
26 changes: 13 additions & 13 deletions transforms/universal/noop/kfp_ray/noop_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@
EXEC_SCRIPT_NAME: str = "noop_transform_ray.py"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:latest"
Mohammad-nassar10 marked this conversation as resolved.
Show resolved Hide resolved

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: str,
actor_options: str,
worker_options: dict,
actor_options: dict,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
runtime_code_location: dict,
noop_sleep_sec: int,
) -> dict:
from runtime_utils import KFPUtils
Expand All @@ -47,11 +47,11 @@ def compute_exec_params_func(
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options),
"runtime_worker_options": actor_options,
"runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)),
"runtime_worker_options": str(actor_options),
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
"runtime_code_location": str(runtime_code_location),
"noop_sleep_sec": noop_sleep_sec,
}

Expand Down Expand Up @@ -98,19 +98,19 @@ def noop(
# Ray cluster
ray_name: str = "noop-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, '
'"image": "' + task_image + '"}',
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/noop/input/', 'output_folder': 'test/noop/output/'}",
data_s3_access_secret: str = "s3-secret",
data_s3_access_secret: str = "s3-minio",
data_max_files: int = -1,
data_num_samples: int = -1,
data_checkpointing: bool = False,
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# noop parameters
noop_sleep_sec: int = 10,
# additional parameters
Expand Down
Loading