Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parallel for operations, like data parallel training, model parallel training etc #3102

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions elyra/kfp/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,79 @@
operation_name = None # global used in formatted logging


def set_dist_train_config(rank, nranks, step_name, port=9888):
"""
Set distributed training envs for general uses.
For Tensorflow: TF_CONFIG is configured.
For Pytorch: MASTER_ADDR and MASTER_PORT is configured.
For general use cases: NRANKS and RANK is configured.

TODO: this function is Argo specific, should add Tekton support.
"""
from kubernetes import client, config

wf_id = os.getenv("WORKFLOW_ID")
ns = os.getenv("KFP_NAMESPACE")
if not wf_id or not ns:
raise ValueError("WORKFLOW_ID and KFP_NAMESPACE env must be set in the workflow pod!")

config.load_incluster_config()
api = client.CustomObjectsApi()

worker_started = 0
while worker_started != nranks:
resource = api.get_namespaced_custom_object(
group="argoproj.io",
version="v1alpha1",
name=wf_id,
namespace=ns,
plural="workflows",
)
if not resource.get("status"):
time.sleep(2)
continue
if not resource["status"].get("nodes"):
time.sleep(2)
continue

nodes = resource["status"]["nodes"]
workers_spec = []
for nk in nodes:
node_info = nodes[nk]
OpUtil.log_operation_info(
"kfpdist: searching for {}, curr node: {}, templateName: {}, type: {}".format(
step_name, nk, node_info["templateName"], node_info["type"]
)
)
if node_info["templateName"] == step_name and node_info["type"] == "Pod":
podid = node_info["id"]
for input_param in node_info["inputs"]["parameters"]:
if input_param["name"].find("loop-item") >= 0:
# FIXME: argo parameter with "loop-item" is the rank.
curr_rank = int(input_param["value"])
break
v1 = client.CoreV1Api()
podinfo = v1.read_namespaced_pod(podid, ns)
if podinfo.status.pod_ip:
workers_spec.append((curr_rank, "%s:%d" % (podinfo.status.pod_ip, port)))
worker_started = len(workers_spec)
time.sleep(2)

workers_spec.sort(key=lambda item: item[0])
workers_spec_list = [i[1] for i in workers_spec]
# set TF_CONFIG env for tf dist train
os.environ["TF_CONFIG"] = json.dumps(
{"cluster": {"worker": workers_spec_list}, "task": {"type": "worker", "index": rank}}
)
OpUtil.log_operation_info("Setting TF_CONFIG: %s" % os.environ["TF_CONFIG"])
os.environ["MASTER_ADDR"] = workers_spec[0][1].split(":")[0]
os.environ["MASTER_PORT"] = workers_spec[0][1].split(":")[1]
OpUtil.log_operation_info(
"Setting MASTER_ADDR: {}, MASTER_PORT: {}".format(os.environ["MASTER_ADDR"], os.environ["MASTER_PORT"])
)
OpUtil.log_operation_info("Setting RANK: {}, NRANKS: {}".format(os.environ["RANK"], os.environ["NRANKS"]))


class FileOpBase(ABC):
"""Abstract base class for file-based operations"""

Expand Down Expand Up @@ -724,6 +797,22 @@ def main():
)
# Setup packages and gather arguments
input_params = OpUtil.parse_arguments(sys.argv[1:])

if os.getenv("RANK"):
op_name = os.getenv("ELYRA_OP_NAME")
if not op_name:
raise ValueError(
"env ELYRA_OP_NAME is not set. please check whether elyra version is matching bootstrapper.py"
)

# FIXME: operation name will be updated by kfp, replace these chars for matching.
op_name = op_name.replace("_", "-")
rank = int(os.getenv("RANK"))
nranks = int(os.getenv("NRANKS"))
if not nranks:
raise ValueError("rank argument setted but no NRANKS env found!")
set_dist_train_config(rank, nranks, op_name, port=9888)

OpUtil.log_operation_info("starting operation")
t0 = time.time()
OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))
Expand Down
4 changes: 4 additions & 0 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ def _generate_workflow_tasks(
}

component_definition = generic_component_template.render(
op_name=sanitize_label_value(operation.name),
container_image=operation.runtime_image,
task_parameters=task_parameters,
command_args=self._compose_container_command_args(
Expand Down Expand Up @@ -847,6 +848,7 @@ def _generate_workflow_tasks(
if operation.gpu_vendor:
gpu_vendor = operation.gpu_vendor
workflow_task["task_modifiers"]["gpu_limit"] = {"size": operation.gpu, "vendor": gpu_vendor}
workflow_task["task_modifiers"]["parallel_count"] = operation.parallel_count

if is_crio_runtime:
# Attach empty dir volume
Expand Down Expand Up @@ -880,6 +882,8 @@ def _generate_workflow_tasks(
)
# Pipeline node name
workflow_task["task_modifiers"]["pod_labels"]["elyra/node-name"] = sanitize_label_value(operation.name)
# Original operation name for runtime lookups
workflow_task["task_modifiers"]["env_variables"]["ELYRA_OP_NAME"] = operation.name

# Add non-identifying metadata
if workflow_task["task_modifiers"].get("pod_annotations") is None:
Expand Down
6 changes: 6 additions & 0 deletions elyra/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def __init__(
gpu: number of gpus requested to run the operation
parameters: a list of names of pipeline parameters that should be passed to this operation
gpu_vendor: gpu resource type, eg. nvidia.com/gpu, amd.com/gpu etc.
parallel_count: operation parallel count to run parallelfor steps.
Entries for other (non-built-in) component types are a function of the respective component.

:param elyra_props: dictionary of property key:value pairs that are owned by Elyra
Expand Down Expand Up @@ -276,6 +277,7 @@ def __init__(
self._component_props["memory"] = component_props.get("memory")
self._component_props["gpu"] = component_props.get("gpu")
self._component_props["gpu_vendor"] = component_props.get("gpu_vendor")
self._component_props["parallel_count"] = component_props.get("parallel_count")
self._component_props["parameters"] = component_props.get(PIPELINE_PARAMETERS, [])

if not elyra_props:
Expand Down Expand Up @@ -332,6 +334,10 @@ def parameters(self) -> Optional[List[str]]:
def gpu_vendor(self) -> Optional[str]:
return self._component_props.get("gpu_vendor")

@property
def parallel_count(self) -> Optional[str]:
return self._component_props.get("parallel_count", 1)

def __eq__(self, other: GenericOperation) -> bool:
if isinstance(self, other.__class__):
return super().__eq__(other)
Expand Down
7 changes: 7 additions & 0 deletions elyra/templates/components/generic_properties_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@
"ui:placeholder": "nvidia.com/gpu"
}
},
"parallel_count": {
"type": "integer",
"title": "ParallelCount",
"description": "Each component can be run as parallel step, set this >1 to do parallelfor-like operation.",
"minimum": 1,
"default": 1
},
"pipeline_parameters": {
"type": "array",
"title": "Pipeline Parameters",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Run a file
name: {{ op_name }}
description: Run a Jupyter notebook or Python/R script
{% if task_parameters %}
inputs:
Expand Down
52 changes: 52 additions & 0 deletions elyra/templates/kubeflow/v1/python_dsl_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def generated_pipeline(
{% for workflow_task in workflow_tasks.values() %}
{% set task_name = "task_" + workflow_task.escaped_task_id %}
# Task for node '{{ workflow_task.name }}'
{% set parallel_indent = 0 %}
{% if 'parallel_count' in workflow_task.task_modifiers and workflow_task.task_modifiers.parallel_count is not none %}
{% if workflow_task.task_modifiers.parallel_count > 1 %}
{% set parallel_indent = 4 %}
parallel_count = {{workflow_task.task_modifiers.parallel_count}}
with kfp.dsl.ParallelFor(list(range(parallel_count))) as rank:
{% endif %}
{% endif %}

{% filter indent(width=parallel_indent) %}
{{ task_name }} = factory_{{ workflow_task.component_definition_hash | python_safe }}(
{% for task_input_name, task_input_spec in workflow_task.task_inputs.items() %}
{% if task_input_spec.task_output_reference %}
Expand Down Expand Up @@ -73,6 +83,46 @@ def generated_pipeline(
{% for env_var_name, env_var_value in workflow_task.task_modifiers.env_variables.items() %}
{{ task_name }}.add_env_variable(V1EnvVar(name="{{ env_var_name }}", value="{{ env_var_value | string_delimiter_safe }}"))
{% endfor %}
{% if 'parallel_count' in workflow_task.task_modifiers and workflow_task.task_modifiers.parallel_count is not none %}
{% if workflow_task.task_modifiers.parallel_count > 1 %}
{{ task_name }}.add_env_variable(V1EnvVar(name="NRANKS", value=str(parallel_count)))
{{ task_name }}.add_env_variable(V1EnvVar(name="RANK", value=str(rank)))
{% endif %}
{% endif %}
{% if workflow_engine == "argo" %}
{{ task_name }}.add_env_variable(V1EnvVar(
name="WORKFLOW_ID",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(
api_version="v1", field_path="metadata.labels['workflows.argoproj.io/workflow']"
)
),
))
{{ task_name }}.add_env_variable(V1EnvVar(
name="KFP_NAMESPACE",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.namespace")
),
))
{{ task_name }}.add_env_variable(V1EnvVar(
name="KFP_POD_NAME",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.name")
),
))
{{ task_name }}.add_env_variable(V1EnvVar(
name="KFP_POD_UID",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.uid")
),
))
{{ task_name }}.add_env_variable(V1EnvVar(
name="KFP_RUN_ID",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.labels['pipeline/runid']")
),
))
{% endif %}
{% endif %}
{% if workflow_task.task_modifiers.set_run_name %}
{% if workflow_engine == "tekton" %}
Expand Down Expand Up @@ -163,6 +213,8 @@ def generated_pipeline(
{{ task_name }}.after(task_{{ upstream_workflow_task_id | python_safe }})
{% endfor %}
{% endif %}
{% endfilter %}

{% endfor %}

if __name__ == "__main__":
Expand Down
15 changes: 15 additions & 0 deletions elyra/tests/kfp/test_bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,21 @@ def test_main_method(monkeypatch, s3_setup, tmpdir):
main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict)


def test_main_method_with_parallel_count(monkeypatch, s3_setup, tmpdir):
argument_dict = {
"cos-endpoint": "http://" + MINIO_HOST_PORT,
"cos-bucket": "test-bucket",
"cos-directory": "test-directory",
"cos-dependencies-archive": "test-archive.tgz",
"filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"),
"inputs": "test-file.txt;test,file.txt",
"outputs": "test-file/test-file-copy.txt;test-file/test,file/test,file-copy.txt",
"user-volume-path": None,
"parallel_count": 2,
}
main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict)


def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir):
argument_dict = {
"cos-endpoint": "http://" + MINIO_HOST_PORT,
Expand Down
14 changes: 5 additions & 9 deletions elyra/tests/pipeline/kfp/test_processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_one_generic_node_pipeline_te

# Verify component definition information (see generic_component_definition_template.jinja2)
# - property 'name'
assert node_template["name"] == "run-a-file"
assert node_template["name"] == sanitize_label_value(op.name)
# - property 'implementation.container.command'
assert node_template["container"]["command"] == ["sh", "-c"]
# - property 'implementation.container.args'
Expand Down Expand Up @@ -1416,11 +1416,9 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_generic_components_data_exch
assert len(compiled_spec["spec"]["templates"]) >= 3
template_specs = {}
for node_template in compiled_spec["spec"]["templates"]:
if node_template["name"] == compiled_spec["spec"]["entrypoint"] or not node_template["name"].startswith(
"run-a-file"
):
if node_template["name"] == compiled_spec["spec"]["entrypoint"]:
continue
template_specs[node_template["name"]] = node_template
template_specs[sanitize_label_value(node_template["name"])] = node_template

# Iterate through sorted operations and verify that their inputs
# and outputs are properly represented in their respective template
Expand All @@ -1430,10 +1428,8 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_generic_components_data_exch
if not op.is_generic:
# ignore custom nodes
continue
if template_index == 1:
template_name = "run-a-file"
else:
template_name = f"run-a-file-{template_index}"
template_name = sanitize_label_value(op.name)
template_name = template_name.replace("_", "-") # kubernetes does this replace
template_index = template_index + 1
# compare outputs
if len(op.outputs) > 0:
Expand Down
4 changes: 4 additions & 0 deletions packages/pipeline-editor/src/FileSubmissionDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ export const FileSubmissionDialog: React.FC<IProps> = ({
</div>
</div>
<br />

<label htmlFor="parallel_count"> Parallel Count:</label>
<input id="parallel_count" type="number" name="parallel_count" />

<input
type="checkbox"
className="elyra-Dialog-checkbox"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Object {
"output-1.csv",
"output-2.csv",
],
"parallel_count": 1,
"runtime_image": "continuumio/anaconda3@sha256:a2816acd3acda208d92e0bf6c11eb41fda9009ea20f24e123dbf84bb4bd4c4b8",
},
"label": "",
Expand Down Expand Up @@ -94,6 +95,7 @@ Object {
"kubernetes_tolerations": Array [],
"mounted_volumes": Array [],
"outputs": Array [],
"parallel_count": 1,
"runtime_image": "continuumio/anaconda3@sha256:a2816acd3acda208d92e0bf6c11eb41fda9009ea20f24e123dbf84bb4bd4c4b8",
},
"label": "",
Expand Down Expand Up @@ -153,6 +155,7 @@ Object {
"kubernetes_tolerations": Array [],
"mounted_volumes": Array [],
"outputs": Array [],
"parallel_count": 1,
"runtime_image": "continuumio/anaconda3@sha256:a2816acd3acda208d92e0bf6c11eb41fda9009ea20f24e123dbf84bb4bd4c4b8",
},
"label": "",
Expand Down Expand Up @@ -213,6 +216,7 @@ Object {
"input-1.csv",
"input-2.csv",
],
"parallel_count": 1,
"runtime_image": "continuumio/anaconda3@sha256:a2816acd3acda208d92e0bf6c11eb41fda9009ea20f24e123dbf84bb4bd4c4b8",
},
"label": "",
Expand Down Expand Up @@ -273,6 +277,7 @@ Object {
"output-3.csv",
"output-4.csv",
],
"parallel_count": 1,
"runtime_image": "continuumio/anaconda3@sha256:a2816acd3acda208d92e0bf6c11eb41fda9009ea20f24e123dbf84bb4bd4c4b8",
},
"label": "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Object {
"kubernetes_tolerations": Array [],
"mounted_volumes": Array [],
"outputs": Array [],
"parallel_count": 1,
},
"label": "",
"ui_data": Object {
Expand Down