diff --git a/.make.versions b/.make.versions index 64a2e2a45..9ca89e930 100644 --- a/.make.versions +++ b/.make.versions @@ -102,7 +102,7 @@ HEADER_CLEANSER_RAY_VERSION=$(DPK_VERSION) # Begin versions that the repo depends on. KFP_v2=2.2.0 -KFP_v2_SDK=2.7.0 +KFP_v2_SDK=2.8.0 KFP_v1=1.8.5 KFP_v1_SDK=1.8.22 RAY=2.24.0 diff --git a/kfp/kfp_ray_components/createRayClusterComponent.yaml b/kfp/kfp_ray_components/createRayClusterComponent.yaml index da9070ec8..30b0b66d8 100644 --- a/kfp/kfp_ray_components/createRayClusterComponent.yaml +++ b/kfp/kfp_ray_components/createRayClusterComponent.yaml @@ -4,8 +4,8 @@ description: Creates Ray cluster inputs: - { name: ray_name, type: String, description: "Ray cluster user name" } - { name: run_id, type: String, description: "The KFP Run ID" } - - { name: ray_head_options, type: String, default: "", description: "ray head configuration" } - - { name: ray_worker_options, type: String, default: "", description: "ray worker configuration" } + - { name: ray_head_options, type: JsonObject, default: "{}", description: "ray head configuration" } + - { name: ray_worker_options, type: JsonObject, default: "{}", description: "ray worker configuration" } - { name: server_url, type: String, default: "", description: "url of api server" } - { name: additional_params, type: String, default: "{}", description: "additional parameters" } diff --git a/kfp/superworkflows/ray/kfp_v2/Makefile b/kfp/superworkflows/ray/kfp_v2/Makefile new file mode 100644 index 000000000..e3014cdc6 --- /dev/null +++ b/kfp/superworkflows/ray/kfp_v2/Makefile @@ -0,0 +1,29 @@ +REPOROOT=${CURDIR}/../../../.. +WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate +include $(REPOROOT)/transforms/.make.workflows + +PYTHON_WF := $(shell find ./ -name "*_wf.py") +YAML_WF=$(patsubst %.py, %.yaml, ${PYTHON_WF}) + +workflow-venv:: .check_python_version ${WORKFLOW_VENV_ACTIVATE} + +worflow-clean:: .workflows.clean + +.PHONY: workflow-build +workflow-build: workflow-venv + @for file in $(YAML_WF); do \ + $(MAKE) $$file; \ + done + +workflow-test:: + +.PHONY: workflow-upload +workflow-upload: workflow-build + @for file in $(YAML_WF); do \ + $(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \ + done + +.PHONY: clean +clean: + rm -rf *.back || true + rm -rf $(REPOROOT)/transforms/venv diff --git a/kfp/superworkflows/ray/kfp_v2/README.md b/kfp/superworkflows/ray/kfp_v2/README.md new file mode 100644 index 000000000..13f9a8369 --- /dev/null +++ b/kfp/superworkflows/ray/kfp_v2/README.md @@ -0,0 +1,32 @@ +# Chaining transforms using KFP V2 + +As in [super pipelines of KFP v1](../../../doc/multi_transform_pipeline.md), we want to offer an option of running a series of transforms one after the other on the data. But, in KFP v2 we can make it easier to chain transforms using the [nested pipelines](https://www.kubeflow.org/docs/components/pipelines/user-guides/components/compose-components-into-pipelines/#pipelines-as-components) that KFP v2 offers. + +One example of chaining `noop` and `document id` transforms can be found [here](superpipeline_noop_docId_v2.py). When running this pipeline it appears as hierarchical graph with two nested pipelines, one for each transform as shown in the following screenshots. + +`root` Layer +![nested_pipeline](nested_pipeline.png) + +`root -> noop-ray-pipeline` Layer +![noop_nested_pipeline](noop_nested.png) + +`root -> noop-ray-pipeline -> exit-handler-1` Layer +![noop_layer_pipeline](noop_layer.png) + +Another useful feature of the KFP v2 is the `Json` editor for the `dict` type input parameter as shown here: +![json_param](json_param.png) + +## Main differences from KFP v1 superpipeline: +- It is not required to upload the transforms pipelines before running the superpipeline. So, when compiling the superpipeline code it gets the up to date versions of the transforms. +- It creates just one run that includes all the nested transfroms and their sub-tasks. +- No need for additional component as `executeSubWorkflowComponent.yaml`. All the implementation in the same pipeline file. +- In superpipelines of KFP v1 there exists an option to override the common parameters with specific values for each one of the transforms. This option is missing in the KFP v2 superpipelines. + +### How to compile the superpipeline +``` +cd kfp/superworkflows/ray/kfp_v2/ +make clean +export KFPv2=1 +export PYTHONPATH=../../../../transforms +make workflow-build +``` diff --git a/kfp/superworkflows/ray/kfp_v2/json_param.png b/kfp/superworkflows/ray/kfp_v2/json_param.png new file mode 100644 index 000000000..9d2300aa6 Binary files /dev/null and b/kfp/superworkflows/ray/kfp_v2/json_param.png differ diff --git a/kfp/superworkflows/ray/kfp_v2/nested_pipeline.png b/kfp/superworkflows/ray/kfp_v2/nested_pipeline.png new file mode 100644 index 000000000..5f0b9e98c Binary files /dev/null and b/kfp/superworkflows/ray/kfp_v2/nested_pipeline.png differ diff --git a/kfp/superworkflows/ray/kfp_v2/noop_layer.png b/kfp/superworkflows/ray/kfp_v2/noop_layer.png new file mode 100644 index 000000000..35be49ff4 Binary files /dev/null and b/kfp/superworkflows/ray/kfp_v2/noop_layer.png differ diff --git a/kfp/superworkflows/ray/kfp_v2/noop_nested.png b/kfp/superworkflows/ray/kfp_v2/noop_nested.png new file mode 100644 index 000000000..e846861c9 Binary files /dev/null and b/kfp/superworkflows/ray/kfp_v2/noop_nested.png differ diff --git a/kfp/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py b/kfp/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py new file mode 100755 index 000000000..48a3597dc --- /dev/null +++ b/kfp/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py @@ -0,0 +1,103 @@ +from typing import Any, NamedTuple +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl +from kfp import dsl + +from universal.noop.kfp_ray.noop_wf import noop +from universal.doc_id.kfp_ray.doc_id_wf import doc_id + +noop_image = "quay.io/dataprep1/data-prep-kit/noop-ray:latest" +doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id-ray:latest" + + +def _remove_unused_params(d: dict[str, Any]) -> None: + d.pop("input_path", None) + d.pop("output_path", None) + d.pop("intermediate_path", None) + d.pop("skip", None) + d.pop("name", None) + d.pop("overriding_params", None) + return + + +@dsl.component +def prepare_params(input_path: str, output_path: str) -> str: + data_s3_config = "{'input_folder': '" + input_path + "', 'output_folder': '" + output_path + "'}" + return data_s3_config + + +@dsl.pipeline +def super_pipeline( + # the super pipeline parameters + p1_pipeline_runtime_pipeline_id: str = "pipeline_id", + p1_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + p1_pipeline_input_path: str = "test/doc_id/input/", + p1_pipeline_output_path: str = "test/super/output/", + p1_pipeline_intermediate_path: str = "test/super/output/tmp", + p1_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + p1_pipeline_data_s3_access_secret: str = "s3-secret", + p1_pipeline_runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"}, + p1_pipeline_runtime_actor_options: dict = {'num_cpus': 0.8}, + # data access + p1_pipeline_data_max_files: int = -1, + p1_pipeline_data_num_samples: int = -1, + p1_pipeline_data_checkpointing: bool = False, + # noop step parameters + p2_name: str = "noop", + p2_skip: bool = False, + p2_noop_sleep_sec: int = 10, + p2_ray_name: str = "noop-kfp-ray", + p2_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "", "image": noop_image}, + p2_ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": "", "image": noop_image}, + # Document ID step parameters + p3_name: str = "doc_id", + p3_ray_name: str = "docid-kfp-ray", + p3_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "", "image": doc_id_image}, + p3_ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": "", "image": doc_id_image}, + # p3_skip: bool = False, + # orchestrator + p3_data_data_sets: str = "", + p3_data_files_to_use: str = "['.parquet']", + # doc id parameters + p3_doc_id_doc_column: str = "contents", + p3_doc_id_hash_column: str = "hash_column", + p3_doc_id_int_column: str = "int_id_column", +): + args = locals() + common_params_prefix = "p1_pipeline_" + transform1_prefix = "p2_" + transform2_prefix = "p3_" + # split the input parameters according to thier prefixes. + common_params = {key[len(common_params_prefix) :]: value for key, value in args.items() if key.startswith(common_params_prefix)} + task1_params = {key[len(transform1_prefix) :]: value for key, value in args.items() if key.startswith(transform1_prefix)} + task2_params = {key[len(transform2_prefix) :]: value for key, value in args.items() if key.startswith(transform2_prefix)} + + # get the input path, output path of the whole pipeline, and the intermediate path for storing the files between the transforms + input_path = common_params.get("input_path", "") + output_path = common_params.get("output_path", "") + inter_path = common_params.get("intermediate_path", "") + + # execute the first transform + pipeline_prms_to_pass = common_params | task1_params + _remove_unused_params(pipeline_prms_to_pass) + # get the data config + data_config = prepare_params(input_path=input_path, output_path=inter_path) + pipeline_prms_to_pass["data_s3_config"] = data_config.output + # call the noop pipeline from noop_wf.py file with the expected parameters + noop_task = noop(**pipeline_prms_to_pass) + + # execute the second transform + pipeline_prms_to_pass = common_params | task2_params + _remove_unused_params(pipeline_prms_to_pass) + # get the data config + data_config = prepare_params(input_path=inter_path, output_path=output_path) + pipeline_prms_to_pass["data_s3_config"] = data_config.output + # call the doc_id pipeline from doc_id_wf.py file with the expected parameters + doc_id_task = doc_id(**pipeline_prms_to_pass) + doc_id_task.after(noop_task) + + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile(super_pipeline, __file__.replace(".py", ".yaml")) diff --git a/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py b/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py index 04e4a7827..7cc12fd60 100644 --- a/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py +++ b/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py @@ -33,15 +33,15 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, data_files_to_use: str, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, code2parquet_supported_langs_file: str, code2parquet_domain: str, code2parquet_snapshot: str, @@ -54,11 +54,11 @@ def compute_exec_params_func( "data_max_files": data_max_files, "data_num_samples": data_num_samples, "data_files_to_use": data_files_to_use, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "code2parquet_supported_langs_file": code2parquet_supported_langs_file, "code2parquet_domain": code2parquet_domain, "code2parquet_snapshot": code2parquet_snapshot, @@ -108,9 +108,8 @@ def compute_exec_params_func( def code2parquet( ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/code2parquet/input', 'output_folder': 'test/code2parquet/output/'}", @@ -119,9 +118,9 @@ def code2parquet( data_num_samples: int = -1, data_files_to_use: str = "['.zip']", # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # code to parquet code2parquet_supported_langs_file: str = "test/code2parquet/languages/lang_extensions.json", code2parquet_detect_programming_lang: bool = True, diff --git a/transforms/code/code_quality/kfp_ray/code_quality_wf.py b/transforms/code/code_quality/kfp_ray/code_quality_wf.py index 9cd867e84..afefde191 100644 --- a/transforms/code/code_quality/kfp_ray/code_quality_wf.py +++ b/transforms/code/code_quality/kfp_ray/code_quality_wf.py @@ -32,14 +32,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, cq_contents_column_name: str, cq_language_column_name: str, cq_tokenizer: str, @@ -51,11 +51,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "cq_contents_column_name": cq_contents_column_name, "cq_language_column_name": cq_language_column_name, "cq_tokenizer": cq_tokenizer, @@ -107,11 +107,8 @@ def code_quality( # Ray cluster ray_name: str = "code_quality-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, \ - "image": "' - + task_image - + '" }', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/code_quality/input/', 'output_folder': 'test/code_quality/output/'}", @@ -119,9 +116,9 @@ def code_quality( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "runtime_pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # code quality parameters cq_contents_column_name: str = "contents", cq_language_column_name: str = "language", diff --git a/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py b/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py index ce74496ee..f16d90528 100644 --- a/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py +++ b/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py @@ -32,14 +32,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, header_cleanser_contents_column_name: str, header_cleanser_license: bool, header_cleanser_copyright: bool, @@ -50,11 +50,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "header_cleanser_contents_column_name": header_cleanser_contents_column_name, "header_cleanser_license": header_cleanser_license, "header_cleanser_copyright": header_cleanser_copyright, @@ -105,11 +105,8 @@ def header_cleanser( # Ray cluster ray_name: str = "header_cleanser-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, \ - "image": "' - + task_image - + '" }', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/header_cleanser/input/', 'output_folder': 'test/header_cleanser/output/'}", @@ -117,9 +114,9 @@ def header_cleanser( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "runtime_pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # header cleanser parameters header_cleanser_contents_column_name: str = "contents", header_cleanser_license: bool = True, diff --git a/transforms/code/malware/kfp_ray/malware_wf.py b/transforms/code/malware/kfp_ray/malware_wf.py index d341060e2..b31fa14c4 100644 --- a/transforms/code/malware/kfp_ray/malware_wf.py +++ b/transforms/code/malware/kfp_ray/malware_wf.py @@ -32,14 +32,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, malware_input_column: str, malware_output_column: str, ) -> dict: @@ -49,11 +49,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "malware_input_column": malware_input_column, "malware_output_column": malware_output_column, } @@ -99,9 +99,8 @@ def compute_exec_params_func( def malware( ray_name: str = "malware-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/malware/input', 'output_folder': 'test/malware/output'}", @@ -109,9 +108,9 @@ def malware( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # mallware malware_input_column: str = "contents", malware_output_column: str = "virus_detection", diff --git a/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py b/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py index 8f95aac5a..eea17222e 100644 --- a/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py +++ b/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py @@ -33,14 +33,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, proglang_select_allowed_langs_file: str, proglang_select_language_column: str, ) -> dict: @@ -50,11 +50,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "proglang_select_allowed_langs_file": proglang_select_allowed_langs_file, "proglang_select_language_column": proglang_select_language_column, } @@ -101,9 +101,8 @@ def compute_exec_params_func( def lang_select( ray_name: str = "proglang-match-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/proglang_select/input/', 'output_folder': 'test/proglang_select/output/'}", @@ -111,9 +110,9 @@ def lang_select( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # Proglang match parameters proglang_select_allowed_langs_file: str = "test/proglang_select/languages/allowed-code-languages.txt", proglang_select_language_column: str = "language", diff --git a/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py b/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py index b1f3f3c95..995e44a3b 100644 --- a/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py +++ b/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, doc_chunk_chunking_type: str, doc_chunk_content_column_name: str, doc_chunk_output_chunk_column_name: str, @@ -49,11 +49,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "doc_chunk_chunking_type": doc_chunk_chunking_type, "doc_chunk_content_column_name": doc_chunk_content_column_name, "doc_chunk_output_chunk_column_name": doc_chunk_output_chunk_column_name, @@ -101,9 +101,8 @@ def doc_chunk( # Ray cluster ray_name: str = "doc-json-chunk-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, ' '"image": "' + task_image + '"}', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "[{'input_folder': 'test/doc_chunk/input/', 'output_folder': 'test/doc_chunk/output/'}]", @@ -111,9 +110,9 @@ def doc_chunk( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # doc_chunk parameters doc_chunk_chunking_type: str = "dl_json", doc_chunk_content_column_name: str = "contents", diff --git a/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py b/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py index 1e4c317dd..77a40c150 100644 --- a/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py +++ b/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, doc_chunk_chunking_type: str, doc_chunk_content_column_name: str, doc_chunk_output_chunk_column_name: str, @@ -49,11 +49,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "doc_chunk_chunking_type": doc_chunk_chunking_type, "doc_chunk_content_column_name": doc_chunk_content_column_name, "doc_chunk_output_chunk_column_name": doc_chunk_output_chunk_column_name, @@ -102,9 +102,8 @@ def doc_chunk( # Ray cluster ray_name: str = "doc-json-chunk-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/doc_chunk/input/', 'output_folder': 'test/doc_chunk/output/'}", @@ -112,9 +111,9 @@ def doc_chunk( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # doc_chunk parameters doc_chunk_chunking_type: str = "dl_json", doc_chunk_content_column_name: str = "contents", diff --git a/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py b/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py index 224768246..e9e825748 100644 --- a/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py +++ b/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different tranforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, docq_text_lang: str, docq_doc_content_column: str, docq_bad_word_filepath: str, @@ -50,11 +50,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "docq_text_lang": docq_text_lang, "docq_doc_content_column": docq_doc_content_column, "docq_bad_word_filepath": docq_bad_word_filepath, @@ -101,10 +101,8 @@ def compute_exec_params_func( def doc_quality( # Ray cluster ray_name: str = "doc_quality-kfp-ray", # name of Ray cluster - ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "", ' - '"image": "' + task_image + '", "image_pull_policy": "Always" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image_pull_secret": "", "image": "' + task_image + '", "image_pull_policy": "Always" }', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image, "image_pull_policy": "Always"}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image, "image_pull_policy": "Always"}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "[{'input_folder': 'test/doc_quality/input/', 'output_folder': 'test/doc_quality/output/'}]", @@ -112,9 +110,9 @@ def doc_quality( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # doc_quality parameters docq_text_lang: str = "en", docq_doc_content_column: str = "contents", diff --git a/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py b/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py index 98c26cf5d..b76732a58 100644 --- a/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py +++ b/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py @@ -32,14 +32,14 @@ # compute execution parameters. Here different tranforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, docq_text_lang: str, docq_doc_content_column: str, docq_bad_word_filepath: str, @@ -50,11 +50,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "docq_text_lang": docq_text_lang, "docq_doc_content_column": docq_doc_content_column, "docq_bad_word_filepath": docq_bad_word_filepath, @@ -101,9 +101,8 @@ def compute_exec_params_func( def doc_quality( # Ray cluster ray_name: str = "doc_quality-kfp-ray", # name of Ray cluster - ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "", ' '"image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image_pull_secret": "", "image": "' + task_image + '" }', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image, "image_pull_policy": "", "image_pull_secret": ""}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image, "image_pull_policy": ""}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/doc_quality/input/', 'output_folder': 'test/doc_quality/output/'}", @@ -111,9 +110,9 @@ def doc_quality( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # doc_quality parameters docq_text_lang: str = "en", docq_doc_content_column: str = "contents", diff --git a/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py b/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py index 752c8e335..a7416875c 100644 --- a/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py +++ b/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different tranforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, lang_id_model_credential: str, lang_id_model_kind: str, lang_id_model_url: str, @@ -52,11 +52,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "lang_id_model_credential": lang_id_model_credential, "lang_id_model_kind": lang_id_model_kind, "lang_id_model_url": lang_id_model_url, @@ -107,9 +107,8 @@ def lang_id( # Ray cluster ray_name: str = "lang_id-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, ' '"image": "' + task_image + '"}', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "[{'input_folder': 'test/lang_id/input/', 'output_folder': 'test/lang_id/output/'}]", @@ -117,9 +116,9 @@ def lang_id( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # lang_id parameters lang_id_model_credential: str = "PUT YOUR OWN HUGGINGFACE CREDENTIAL", lang_id_model_kind: str = "fasttext", diff --git a/transforms/language/lang_id/kfp_ray/lang_id_wf.py b/transforms/language/lang_id/kfp_ray/lang_id_wf.py index 0d9d175bb..9f02da7af 100644 --- a/transforms/language/lang_id/kfp_ray/lang_id_wf.py +++ b/transforms/language/lang_id/kfp_ray/lang_id_wf.py @@ -32,14 +32,14 @@ # compute execution parameters. Here different tranforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, lang_id_model_credential: str, lang_id_model_kind: str, lang_id_model_url: str, @@ -53,11 +53,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "lang_id_model_credential": lang_id_model_credential, "lang_id_model_kind": lang_id_model_kind, "lang_id_model_url": lang_id_model_url, @@ -108,9 +108,8 @@ def lang_id( # Ray cluster ray_name: str = "lang_id-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '"}', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/lang_id/input/', 'output_folder': 'test/lang_id/output/'}", @@ -118,9 +117,9 @@ def lang_id( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # lang_id parameters lang_id_model_credential: str = "PUT YOUR OWN HUGGINGFACE CREDENTIAL", lang_id_model_kind: str = "fasttext", diff --git a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py index 24d9f12d3..00ec76aad 100644 --- a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py +++ b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, pdf2parquet_do_table_structure: bool, pdf2parquet_do_ocr: bool, ) -> dict: @@ -48,11 +48,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "pdf2parquet_do_table_structure": pdf2parquet_do_table_structure, "pdf2parquet_do_ocr": pdf2parquet_do_ocr, } @@ -99,9 +99,8 @@ def pdf2parquet( # Ray cluster ray_name: str = "pdf2parquet-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, ' '"image": "' + task_image + '"}', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "[{'input_folder': 'test/pdf2parquet/input/', 'output_folder': 'test/pdf2parquet/output/'}]", @@ -109,9 +108,9 @@ def pdf2parquet( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # pdf2parquet parameters pdf2parquet_do_table_structure: bool = True, pdf2parquet_do_ocr: bool = False, diff --git a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py index 74c5e1588..6d7ef2808 100644 --- a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py +++ b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py @@ -31,15 +31,15 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, data_files_to_use: str, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, pdf2parquet_do_table_structure: bool, pdf2parquet_do_ocr: bool, ) -> dict: @@ -50,11 +50,11 @@ def compute_exec_params_func( "data_max_files": data_max_files, "data_num_samples": data_num_samples, "data_files_to_use": data_files_to_use, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "pdf2parquet_do_table_structure": pdf2parquet_do_table_structure, "pdf2parquet_do_ocr": pdf2parquet_do_ocr, } @@ -102,9 +102,8 @@ def pdf2parquet( # Ray cluster ray_name: str = "pdf2parquet-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 4, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 5, "memory": 12, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/pdf2parquet/input/', 'output_folder': 'test/pdf2parquet/output/'}", @@ -113,9 +112,9 @@ def pdf2parquet( data_num_samples: int = -1, data_files_to_use: str = "['.pdf']", # orchestrator - runtime_actor_options: str = "{'num_cpus': 4}", + runtime_actor_options: dict = {'num_cpus': 4}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # pdf2parquet parameters pdf2parquet_do_table_structure: bool = True, pdf2parquet_do_ocr: bool = False, diff --git a/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py b/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py index 6800b378d..bad66f21e 100644 --- a/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py +++ b/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, text_encoder_model_name: str, text_encoder_content_column_name: str, text_encoder_output_embeddings_column_name: str, @@ -49,11 +49,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "text_encoder_model_name": text_encoder_model_name, "text_encoder_content_column_name": text_encoder_content_column_name, "text_encoder_output_embeddings_column_name": text_encoder_output_embeddings_column_name, @@ -101,9 +101,8 @@ def text_encoder( # Ray cluster ray_name: str = "text-encoder-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, ' '"image": "' + task_image + '"}', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "[{'input_folder': 'test/text_encoder/input/', 'output_folder': 'test/text_encoder/output/'}]", @@ -111,9 +110,9 @@ def text_encoder( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # text_encoder parameters text_encoder_model_name: str = "BAAI/bge-small-en-v1.5", text_encoder_content_column_name: str = "contents", diff --git a/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py b/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py index f2a3eb8bd..269ce4366 100644 --- a/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py +++ b/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, text_encoder_model_name: str, text_encoder_content_column_name: str, text_encoder_output_embeddings_column_name: str, @@ -49,11 +49,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "text_encoder_model_name": text_encoder_model_name, "text_encoder_content_column_name": text_encoder_content_column_name, "text_encoder_output_embeddings_column_name": text_encoder_output_embeddings_column_name, @@ -102,9 +102,8 @@ def text_encoder( # Ray cluster ray_name: str = "text-encoder-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/text_encoder/input/', 'output_folder': 'test/text_encoder/output/'}", @@ -112,9 +111,9 @@ def text_encoder( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # text_encoder parameters text_encoder_model_name: str = "BAAI/bge-small-en-v1.5", text_encoder_content_column_name: str = "contents", diff --git a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py index e9fdc65ec..b0af45fc7 100644 --- a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py +++ b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py @@ -30,8 +30,8 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, @@ -40,7 +40,7 @@ def compute_exec_params_func( data_files_to_use: str, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, doc_id_doc_column: str, doc_id_hash_column: str, doc_id_int_column: str, @@ -52,13 +52,13 @@ def compute_exec_params_func( "data_max_files": data_max_files, "data_num_samples": data_num_samples, "data_checkpointing": data_checkpointing, - "data_data_sets": data_data_sets, + "data_data_sets": data_data_sets.strip(), "data_files_to_use": data_files_to_use, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "doc_id_doc_column": doc_id_doc_column, "doc_id_hash_column": doc_id_hash_column, "doc_id_int_column": doc_id_int_column, @@ -106,9 +106,8 @@ def doc_id( # Ray cluster ray_name: str = "doc_id-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/doc_id/input/', 'output_folder': 'test/doc_id/output/'}", @@ -119,9 +118,9 @@ def doc_id( data_data_sets: str = "", data_files_to_use: str = "['.parquet']", # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # doc id parameters doc_id_doc_column: str = "contents", doc_id_hash_column: str = "hash_column", diff --git a/transforms/universal/ededup/kfp_ray/ededup_wf.py b/transforms/universal/ededup/kfp_ray/ededup_wf.py index 7658a627c..845fd86da 100644 --- a/transforms/universal/ededup/kfp_ray/ededup_wf.py +++ b/transforms/universal/ededup/kfp_ray/ededup_wf.py @@ -73,9 +73,8 @@ def ededup( # Ray cluster ray_name: str = "ededup-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access. checkpointing is not supported by dedup data_s3_config: str = "{'input_folder': 'test/ededup/input/', 'output_folder': 'test/ededup/output'}", @@ -83,9 +82,9 @@ def ededup( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {"num_cpus": 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # ededup ededup_hash_cpu: float = 0.5, ededup_doc_column: str = "contents", diff --git a/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py b/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py index f325a34a4..2786d68f0 100644 --- a/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py +++ b/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py @@ -14,14 +14,14 @@ def ededup_compute_execution_params( - worker_options: str, # ray worker configuration - actor_options: str, # actor's resource requirements + worker_options: dict, # ray worker configuration + actor_options: dict, # actor's resource requirements data_s3_config: str, # s3 configuration data_max_files: int, # max files to process data_num_samples: int, # num samples to process runtime_pipeline_id: str, # pipeline id runtime_job_id: str, # job id - runtime_code_location: str, # code location + runtime_code_location: dict, # code location doc_column: str, # key for accessing data hash_cpu: float, # number of CPUs per hash n_samples: int, # number of samples for parameters computation @@ -53,14 +53,14 @@ def ededup_compute_execution_params( EXECUTION_OF_KB_DOC = 0.00025 # Get cluster parameters - w_options = KFPUtils.load_from_json(worker_options.replace("'", '"')) + w_options = worker_options cluster_cpu = w_options["replicas"] * w_options["cpu"] cluster_memory = w_options["replicas"] * w_options["memory"] print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}") cluster_cpu *= 0.85 cluster_memory *= 0.85 # get actor requirements - a_options = KFPUtils.load_from_json(actor_options.replace("'", '"')) + a_options = actor_options actor_cpu = a_options["num_cpus"] print(f"actor required cpu {actor_cpu}") # get credentials @@ -114,10 +114,10 @@ def ededup_compute_execution_params( "data_max_files": data_max_files, "data_num_samples": data_num_samples, "runtime_num_workers": n_workers, - "runtime_worker_options": actor_options, + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "ededup_doc_column": doc_column, "ededup_hash_cpu": hash_cpu, "ededup_num_hashes": n_hashes, diff --git a/transforms/universal/fdedup/kfp_ray/fdedup_wf.py b/transforms/universal/fdedup/kfp_ray/fdedup_wf.py index 5160cd9f9..da29c9fa5 100644 --- a/transforms/universal/fdedup/kfp_ray/fdedup_wf.py +++ b/transforms/universal/fdedup/kfp_ray/fdedup_wf.py @@ -73,9 +73,8 @@ def fdedup( # Ray cluster ray_name: str = "fdedup-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access. checkpointing is not supported by dedup data_s3_config: str = "{'input_folder': 'test/fdedup/input/', 'output_folder': 'test/fdedup/output/'}", @@ -83,9 +82,9 @@ def fdedup( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {"num_cpus": 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # columns used fdedup_doc_column: str = "contents", fdedup_id_column: str = "int_id_column", diff --git a/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py b/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py index 4b7e74e44..0691f3129 100644 --- a/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py +++ b/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py @@ -12,14 +12,14 @@ def fdedup_compute_execution_params( - worker_options: str, # ray worker configuration - actor_options: str, # actor's resource requirements + worker_options: dict, # ray worker configuration + actor_options: dict, # actor's resource requirements data_s3_config: str, # s3 configuration data_max_files: int, # max files to process data_num_samples: int, # num samples to process runtime_pipeline_id: str, # pipeline id runtime_job_id: str, # job id - runtime_code_location: str, # code location + runtime_code_location: dict, # code location doc_column: str, # document column name id_column: str, # integer document id column name cluster_column: str, # cluster column name @@ -137,14 +137,14 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float: ) print(f"Fuzzy parameters: num buckets {num_buckets}, bucket length {length_bucket}") # Get cluster parameters - w_options = KFPUtils.load_from_json(worker_options.replace("'", '"')) + w_options = worker_options cluster_cpu = w_options["replicas"] * w_options["cpu"] cluster_memory = w_options["replicas"] * w_options["memory"] print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}") cluster_cpu *= 0.85 cluster_memory *= 0.85 # get actor requirements - a_options = KFPUtils.load_from_json(actor_options.replace("'", '"')) + a_options = actor_options, actor_cpu = a_options["num_cpus"] print(f"actor required cpu {actor_cpu}") # get credentials @@ -209,10 +209,10 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float: "data_max_files": data_max_files, "data_num_samples": data_num_samples, "runtime_num_workers": n_workers, - "runtime_worker_options": actor_options, + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "fdedup_doc_column": doc_column, "fdedup_id_column": id_column, "fdedup_cluster_column": cluster_column, diff --git a/transforms/universal/filter/kfp_ray/filter_wf.py b/transforms/universal/filter/kfp_ray/filter_wf.py index c52a29401..bc37440e7 100644 --- a/transforms/universal/filter/kfp_ray/filter_wf.py +++ b/transforms/universal/filter/kfp_ray/filter_wf.py @@ -32,14 +32,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, filter_criteria_list: str, filter_logical_operator: str, filter_columns_to_drop: str, @@ -50,11 +50,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "filter_criteria_list": filter_criteria_list, "filter_logical_operator": filter_logical_operator, "filter_columns_to_drop": filter_columns_to_drop, @@ -102,9 +102,8 @@ def filtering( # Ray cluster ray_name: str = "filter-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/filter/input/', 'output_folder': 'test/filter/output/'}", @@ -112,9 +111,9 @@ def filtering( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # filtering parameters filter_criteria_list: str = "['docq_total_words > 100 AND docq_total_words < 200', 'ibmkenlm_docq_perplex_score < 230']", filter_logical_operator: str = "AND", diff --git a/transforms/universal/noop/kfp_ray/noop_multiple_wf.py b/transforms/universal/noop/kfp_ray/noop_multiple_wf.py index cbeb03655..90b585718 100644 --- a/transforms/universal/noop/kfp_ray/noop_multiple_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_multiple_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, noop_sleep_sec: int, ) -> dict: from runtime_utils import KFPUtils @@ -47,11 +47,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "noop_sleep_sec": noop_sleep_sec, } @@ -97,9 +97,8 @@ def noop( # Ray cluster ray_name: str = "noop-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, ' '"image": "' + task_image + '"}', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "[{'input_folder': 'test/noop/input/', 'output_folder': 'test/noop/output/'}]", @@ -107,9 +106,9 @@ def noop( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # noop parameters noop_sleep_sec: int = 10, # additional parameters diff --git a/transforms/universal/noop/kfp_ray/noop_wf.py b/transforms/universal/noop/kfp_ray/noop_wf.py index 5635cbe0d..dfa044017 100644 --- a/transforms/universal/noop/kfp_ray/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_wf.py @@ -31,14 +31,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, noop_sleep_sec: int, ) -> dict: from runtime_utils import KFPUtils @@ -47,11 +47,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "noop_sleep_sec": noop_sleep_sec, } @@ -98,19 +98,19 @@ def noop( # Ray cluster ray_name: str = "noop-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/noop/input/', 'output_folder': 'test/noop/output/'}", - data_s3_access_secret: str = "s3-secret", + data_s3_access_secret: str = "s3-minio", data_max_files: int = -1, data_num_samples: int = -1, + data_checkpointing: bool = False, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # noop parameters noop_sleep_sec: int = 10, # additional parameters diff --git a/transforms/universal/profiler/kfp_ray/profiler_wf.py b/transforms/universal/profiler/kfp_ray/profiler_wf.py index 6cc213e60..362a3ebee 100644 --- a/transforms/universal/profiler/kfp_ray/profiler_wf.py +++ b/transforms/universal/profiler/kfp_ray/profiler_wf.py @@ -73,9 +73,8 @@ def profiler( # Ray cluster ray_name: str = "profiler-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access. checkpointing is not supported by dedup data_s3_config: str = "{'input_folder': 'test/profiler/input/', 'output_folder': 'test/profiler/output'}", @@ -83,9 +82,9 @@ def profiler( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {"num_cpus": 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # profiler profiler_aggregator_cpu: float = 0.5, profiler_doc_column: str = "contents", diff --git a/transforms/universal/profiler/kfp_ray/src/profiler_compute_execution_params.py b/transforms/universal/profiler/kfp_ray/src/profiler_compute_execution_params.py index 74cd6456a..666734eda 100644 --- a/transforms/universal/profiler/kfp_ray/src/profiler_compute_execution_params.py +++ b/transforms/universal/profiler/kfp_ray/src/profiler_compute_execution_params.py @@ -12,14 +12,14 @@ def profiler_compute_execution_params( - worker_options: str, # ray worker configuration - actor_options: str, # actor's resource requirements + worker_options: dict, # ray worker configuration + actor_options: dict, # actor's resource requirements data_s3_config: str, # s3 configuration data_max_files: int, # max files to process data_num_samples: int, # num samples to process runtime_pipeline_id: str, # pipeline id runtime_job_id: str, # job id - runtime_code_location: str, # code location + runtime_code_location: dict, # code location doc_column: str, # key for accessing data aggregator_cpu: float, # number of CPUs per aggregatotor n_samples: int, # number of samples for parameters computation @@ -51,14 +51,14 @@ def profiler_compute_execution_params( EXECUTION_OF_KB_DOC = 0.00025 # Get cluster parameters - w_options = KFPUtils.load_from_json(worker_options.replace("'", '"')) + w_options = worker_options cluster_cpu = w_options["replicas"] * w_options["cpu"] cluster_memory = w_options["replicas"] * w_options["memory"] print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}") cluster_cpu *= 0.85 cluster_memory *= 0.85 # get actor requirements - a_options = KFPUtils.load_from_json(actor_options.replace("'", '"')) + a_options = actor_options actor_cpu = a_options["num_cpus"] print(f"actor required cpu {actor_cpu}") # get credentials @@ -112,10 +112,10 @@ def profiler_compute_execution_params( "data_max_files": data_max_files, "data_num_samples": data_num_samples, "runtime_num_workers": n_workers, - "runtime_worker_options": actor_options, + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "profiler_aggregator_cpu": aggregator_cpu, "profiler_num_aggregators": n_aggregators, "profiler_doc_column": doc_column, diff --git a/transforms/universal/resize/kfp_ray/resize_wf.py b/transforms/universal/resize/kfp_ray/resize_wf.py index 3c861b11e..bc425e305 100644 --- a/transforms/universal/resize/kfp_ray/resize_wf.py +++ b/transforms/universal/resize/kfp_ray/resize_wf.py @@ -30,8 +30,8 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, @@ -40,7 +40,7 @@ def compute_exec_params_func( data_files_to_use: str, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, resize_max_rows_per_table: int, resize_max_mbytes_per_table: int, resize_size_type: str, @@ -54,11 +54,11 @@ def compute_exec_params_func( "data_checkpointing": data_checkpointing, "data_data_sets": data_data_sets, "data_files_to_use": data_files_to_use, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "resize_max_rows_per_table": resize_max_rows_per_table, "resize_max_mbytes_per_table": resize_max_mbytes_per_table, "resize_size_type": resize_size_type, @@ -106,9 +106,8 @@ def resize( # Ray cluster ray_name: str = "resize-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/resize/input/', 'output_folder': 'test/resize/output/'}", @@ -119,9 +118,9 @@ def resize( data_data_sets: str = "", data_files_to_use: str = "['.parquet']", # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # doc id parameters resize_max_rows_per_table: int = 20, resize_max_mbytes_per_table: int = -1, diff --git a/transforms/universal/tokenization/kfp_ray/tokenization_wf.py b/transforms/universal/tokenization/kfp_ray/tokenization_wf.py index 530f57dd2..bb958e69d 100644 --- a/transforms/universal/tokenization/kfp_ray/tokenization_wf.py +++ b/transforms/universal/tokenization/kfp_ray/tokenization_wf.py @@ -33,14 +33,14 @@ # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. def compute_exec_params_func( - worker_options: str, - actor_options: str, + worker_options: dict, + actor_options: dict, data_s3_config: str, data_max_files: int, data_num_samples: int, runtime_pipeline_id: str, runtime_job_id: str, - runtime_code_location: str, + runtime_code_location: dict, tkn_tokenizer: str, tkn_tokenizer_args: str, tkn_doc_id_column: str, @@ -54,11 +54,11 @@ def compute_exec_params_func( "data_s3_config": data_s3_config, "data_max_files": data_max_files, "data_num_samples": data_num_samples, - "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), - "runtime_worker_options": actor_options, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, - "runtime_code_location": runtime_code_location, + "runtime_code_location": str(runtime_code_location), "tkn_tokenizer": tkn_tokenizer, "tkn_tokenizer_args": tkn_tokenizer_args, "tkn_doc_id_column": tkn_doc_id_column, @@ -111,9 +111,8 @@ def tokenization( # Ray cluster ray_name: str = "tkn-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed - ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }', - ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' - '"image": "' + task_image + '"}', + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/tokenization/ds01/input/', 'output_folder': 'test/tokenization/ds01/output/'}", @@ -121,9 +120,9 @@ def tokenization( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # tokenizer parameters tkn_tokenizer: str = "hf-internal-testing/llama-tokenizer", tkn_doc_id_column: str = "document_id",