Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix path issues when running superworkflow pipeline sample for kfp v2 #935

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.
################################################################################

import os
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
Expand All @@ -19,7 +20,7 @@
# empty comment to triigger pre-commit
# Components
# For every sub workflow we need a separate components, that knows about this subworkflow.
component_spec_path = "../../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", "../../../../../kfp/kfp_ray_components/")
run_code_to_parquet_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_code_quality_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_malware_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

import kfp.compiler as compiler
import kfp.components as comp
Expand All @@ -17,7 +18,7 @@

# Components
# path to kfp component specifications files
component_spec_path = "../../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", "../../../../../kfp/kfp_ray_components/")
# For every sub workflow we need a separate components, that knows about this subworkflow.
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_exact_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand Down
2 changes: 1 addition & 1 deletion examples/kfp-pipelines/superworkflows/ray/kfp_v2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ worflow-clean:: .workflows.clean
.PHONY: workflow-build
workflow-build: workflow-venv
@for file in $(YAML_WF); do \
$(MAKE) $$file; \
$(MAKE) KFP_COMPONENT_SPEC_PATH=${REPOROOT}/kfp/kfp_ray_components/ $$file; \
done

workflow-test::
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from workflow_support.compile_utils.component import ONE_HOUR_SEC, ONE_DAY_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils.component import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

logger = get_logger(__name__)

# Default path for KFP component specification files
DEFAULT_KFP_COMPONENT_SPEC_PATH = "../../../../kfp/kfp_ray_components/"

ONE_HOUR_SEC = 60 * 60
ONE_DAY_SEC = ONE_HOUR_SEC * 24
ONE_WEEK_SEC = ONE_DAY_SEC * 7
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from workflow_support.compile_utils.component import ONE_HOUR_SEC, ONE_DAY_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils.component import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

RUN_NAME = "KFP_RUN_NAME"

# Default path for KFP component specification files
DEFAULT_KFP_COMPONENT_SPEC_PATH = "../../../../kfp/kfp_ray_components/"

ONE_HOUR_SEC = 60 * 60
ONE_DAY_SEC = ONE_HOUR_SEC * 24
ONE_WEEK_SEC = ONE_DAY_SEC * 7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pipeline_parameters:
multi_s3: False
compute_func_name: ""
compute_func_import: ""
component_spec_path: ""

pipeline_common_input_parameters_values:
kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
Expand Down
5 changes: 0 additions & 5 deletions kfp/pipeline_generator/single-pipeline/pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@
common_input_params_values = pipeline_definitions[PIPELINE_COMMON_INPUT_PARAMETERS_VALUES]
pipeline_transform_input_parameters = pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS]

component_spec_path = pipeline_parameters.get("component_spec_path", "")
if component_spec_path == "":
component_spec_path = "../../../../kfp/kfp_ray_components/"

content = template.render(
transform_image=common_input_params_values["transform_image"],
script_name=pipeline_parameters["script_name"],
kfp_base_image=common_input_params_values["kfp_base_image"],
component_spec_path=component_spec_path,
pipeline_arguments=pipeline_transform_input_parameters["pipeline_arguments"],
pipeline_name=pipeline_parameters[NAME],
pipeline_description=pipeline_parameters["description"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.components as comp
import kfp.dsl as dsl

from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


task_image = "{{ transform_image }}"
Expand All @@ -27,7 +32,7 @@
base_kfp_image = "{{ kfp_base_image }}"

# path to kfp component specifications files
component_spec_path = "{{ component_spec_path }}"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# NOTE: This file is auto generated by Pipeline Generator.
import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# path to kfp component specifications files
component_spec_path = "../../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)
# For every sub workflow we need a separate components, that knows about this subworkflow.
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_ededup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
################################################################################

import yaml

import os

PRE_COMMIT = "../pre-commit-config.yaml"
PIPELINE_TEMPLATE_FILE = "template_superpipeline.py"
Expand Down Expand Up @@ -69,10 +69,6 @@ def get_generic_params(params, prefix="") -> str:
pipeline_tasks = pipeline_definitions[PIPELINE_TASKS]
common_input_params = pipeline_definitions[COMMON_INPUT_PARAMETERS]

component_spec_path = pipeline_metadata.get("component_spec_path", "")
if component_spec_path == "":
component_spec_path = "../../../../../kfp/kfp_ray_components/"

for task in pipeline_tasks:
task_name = task["name"]
task_pipeline_name = task["pipeline_name"]
Expand Down Expand Up @@ -144,7 +140,6 @@ def get_generic_params(params, prefix="") -> str:
superpipeline_name=pipeline_metadata[NAME],
superpipeline_description=pipeline_metadata[DESCRIPTION],
sub_workflows_components=pipeline_definitions[PIPELINE_TASKS],
component_spec_path=component_spec_path,
p1_parameters=pipeline_definitions[PIPELINE_TASKS],
add_p2_parameters=common_input_params,
sub_workflows_parameters=sub_workflows_parameters,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# NOTE: This file is auto generated by Pipeline Generator.

import os
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# path to kfp component specifications files
component_spec_path = "__component_spec_path__"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)
# For every sub workflow we need a separate components, that knows about this subworkflow.
__sub_workflows_components__

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
# NOTE: This file is auto generated by Pipeline Generator.

import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# path to kfp component specifications files
component_spec_path = "{{ component_spec_path }}"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)
# For every sub workflow we need a separate components, that knows about this subworkflow.
{%- for component in sub_workflows_components %}
run_{{ component.name }}_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -28,7 +33,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/code_quality/kfp_ray/code_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/license_select/kfp_ray/license_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -28,7 +33,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/malware/kfp_ray/malware_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/proglang_select/kfp_ray/proglang_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)



# compute execution parameters. Here different transforms might need different implementations. As
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


task_image = "quay.io/dataprep1/data-prep-kit/repo_level_order-ray:latest"
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
Loading