From 63ebaf90fca15f2c3b6414ea36882a2ab55d8f2a Mon Sep 17 00:00:00 2001 From: lchu Date: Thu, 7 Oct 2021 15:31:12 -0400 Subject: [PATCH 01/19] add metadata put in workflow --- python/ray/workflow/api.py | 3 +++ python/ray/workflow/common.py | 13 +++++++++---- python/ray/workflow/execution.py | 7 +++++-- python/ray/workflow/step_executor.py | 10 ++++++++++ python/ray/workflow/step_function.py | 11 +++++++++-- python/ray/workflow/virtual_actor_class.py | 1 + python/ray/workflow/workflow_storage.py | 16 ++++++++++++++++ 7 files changed, 53 insertions(+), 8 deletions(-) diff --git a/python/ray/workflow/api.py b/python/ray/workflow/api.py index 32c7670aba0d..14989712c3b6 100644 --- a/python/ray/workflow/api.py +++ b/python/ray/workflow/api.py @@ -99,6 +99,9 @@ def step(*args, **kwargs): name = kwargs.pop("name", None) if name is not None: step_options["name"] = name + metadata = kwargs.pop("metadata", None) + if metadata is not None: + step_options["metadata"] = metadata if len(kwargs) != 0: step_options["ray_options"] = kwargs return make_step_decorator(step_options) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index 169b318ed5d7..5437b062bc01 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -128,6 +128,8 @@ class WorkflowData: ray_options: Dict[str, Any] # name of the step name: str + # meta data to store + metadata: Dict def to_metadata(self) -> Dict[str, Any]: f = self.func_body @@ -139,6 +141,7 @@ def to_metadata(self) -> Dict[str, Any]: "workflow_refs": [wr.step_id for wr in self.inputs.workflow_refs], "catch_exceptions": self.catch_exceptions, "ray_options": self.ray_options, + "metadata": self.metadata } return metadata @@ -261,7 +264,7 @@ def __reduce__(self): "remote, or stored in Ray objects.") @PublicAPI(stability="beta") - def run(self, workflow_id: Optional[str] = None) -> Any: + def run(self, workflow_id: Optional[str] = None, metadata: Optional[Dict] = None) -> Any: """Run a workflow. If the workflow with the given id already exists, it will be resumed. @@ -288,11 +291,12 @@ def run(self, workflow_id: Optional[str] = None) -> Any: Args: workflow_id: A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated. + metadata: metadata to add to the workflow. """ - return ray.get(self.run_async(workflow_id)) + return ray.get(self.run_async(workflow_id, metadata)) @PublicAPI(stability="beta") - def run_async(self, workflow_id: Optional[str] = None) -> ObjectRef: + def run_async(self, workflow_id: Optional[str] = None, metadata: Optional[Dict] = None) -> ObjectRef: """Run a workflow asynchronously. If the workflow with the given id already exists, it will be resumed. @@ -319,8 +323,9 @@ def run_async(self, workflow_id: Optional[str] = None) -> ObjectRef: Args: workflow_id: A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated. + metadata: metadata to add to the workflow. """ # TODO(suquark): avoid cyclic importing from ray.workflow.execution import run self._step_id = None - return run(self, workflow_id) + return run(self, workflow_id, metadata) diff --git a/python/ray/workflow/execution.py b/python/ray/workflow/execution.py index b660c65fa8a9..6cf51e7efac1 100644 --- a/python/ray/workflow/execution.py +++ b/python/ray/workflow/execution.py @@ -1,7 +1,7 @@ import asyncio import logging import time -from typing import Set, List, Tuple, Optional, TYPE_CHECKING +from typing import Set, List, Tuple, Optional, TYPE_CHECKING, Dict import uuid import ray @@ -23,7 +23,8 @@ def run(entry_workflow: Workflow, - workflow_id: Optional[str] = None) -> ray.ObjectRef: + workflow_id: Optional[str] = None, + metadata: Optional[Dict] = None) -> ray.ObjectRef: """Run a workflow asynchronously. """ store = get_global_storage() @@ -40,6 +41,8 @@ def run(entry_workflow: Workflow, store.storage_url): # checkpoint the workflow ws = workflow_storage.get_workflow_storage(workflow_id) + asyncio.get_event_loop().run_until_complete(ws._put( + ws._key_workflow_user_metadata(), metadata, True)) wf_exists = True try: diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index b5416b5a4021..d3f7c2564857 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -1,3 +1,5 @@ +import time +import datetime import asyncio from dataclasses import dataclass import logging @@ -179,6 +181,8 @@ async def _write_step_inputs(wf_storage: workflow_storage.WorkflowStorage, # TODO (Alex): Handle the json case better? wf_storage._put( wf_storage._key_step_input_metadata(step_id), metadata, True), + wf_storage._put( + wf_storage._key_step_user_metadata(step_id), inputs.metadata, True), serialization.dump_to_storage( wf_storage._key_step_function_body(step_id), inputs.func_body, workflow_id, storage), @@ -326,6 +330,9 @@ def _workflow_step_executor(step_type: StepType, func: Callable, args, kwargs = _resolve_step_inputs(baked_inputs) store = workflow_storage.get_workflow_storage() try: + step_start_metadata = {'start_time': time.time()} + asyncio.get_event_loop().run_until_complete(store._put( + store._key_pre_step_metadata(step_id), step_start_metadata, True)) persisted_output, volatile_output = _wrap_run( func, step_type, step_id, catch_exceptions, max_retries, *args, **kwargs) @@ -365,6 +372,9 @@ def _workflow_step_executor(step_type: StepType, func: Callable, # advance the progress of the workflow store.advance_progress(step_id) _record_step_status(step_id, WorkflowStatus.SUCCESSFUL) + step_end_metadata = {'end_time': time.time()} + asyncio.get_event_loop().run_until_complete(store._put( + store._key_post_step_metadata(step_id), step_end_metadata, True)) logger.info(get_step_status_info(WorkflowStatus.SUCCESSFUL)) if isinstance(volatile_output, Workflow): # This is the case where a step method is called in the virtual actor. diff --git a/python/ray/workflow/step_function.py b/python/ray/workflow/step_function.py index e2a4467f189b..0d3251d204f6 100644 --- a/python/ray/workflow/step_function.py +++ b/python/ray/workflow/step_function.py @@ -1,5 +1,5 @@ import functools -from typing import Callable +from typing import Callable, Dict, Any from ray._private import signature from ray.workflow import serialization_context @@ -16,11 +16,14 @@ def __init__(self, max_retries=3, catch_exceptions=False, name=None, + metadata=None, ray_options=None): if not isinstance(max_retries, int) or max_retries < 1: raise ValueError("max_retries should be greater or equal to 1.") if ray_options is not None and not isinstance(ray_options, dict): raise ValueError("ray_options must be a dict.") + if metadata is not None and not isinstance(metadata, dict): + raise ValueError("metadata must be a dict.") self._func = func self._max_retries = max_retries @@ -28,6 +31,7 @@ def __init__(self, self._ray_options = ray_options or {} self._func_signature = signature.extract_signature(func) self._name = name or "" + self._metadata = metadata or {} # Override signature and docstring @functools.wraps(func) @@ -48,6 +52,7 @@ def prepare_inputs(): catch_exceptions=self._catch_exceptions, ray_options=self._ray_options, name=self._name, + metadata=self._metadata ) return Workflow(workflow_data, prepare_inputs) @@ -64,6 +69,7 @@ def options(self, max_retries: int = 3, catch_exceptions: bool = False, name: str = None, + metadata: Dict = None, **ray_options) -> "WorkflowStepFunction": """This function set how the step function is going to be executed. @@ -79,6 +85,7 @@ def options(self, generate the step_id of the step. The name will be used directly as the step id if possible, otherwise deduplicated by appending .N suffixes. + metadata: metadata to add to the step. **ray_options: All parameters in this fields will be passed to ray remote function options. @@ -86,4 +93,4 @@ def options(self, The step function itself. """ return WorkflowStepFunction(self._func, max_retries, catch_exceptions, - name, ray_options) + name, metadata, ray_options) diff --git a/python/ray/workflow/virtual_actor_class.py b/python/ray/workflow/virtual_actor_class.py index 3f0f4368ecdc..329b7599c0d1 100644 --- a/python/ray/workflow/virtual_actor_class.py +++ b/python/ray/workflow/virtual_actor_class.py @@ -217,6 +217,7 @@ def step(method_name, method, *args, **kwargs): catch_exceptions=False, ray_options={}, name=None, + metadata=None ) wf = Workflow(workflow_data) return wf diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index 5a188cca1a3f..62ed9c57dabd 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -29,7 +29,10 @@ OBJECTS_DIR = "objects" STEPS_DIR = "steps" STEP_INPUTS_METADATA = "inputs.json" +STEP_USER_METADATA = "user_metadata.json" +PRE_STEP_METADATA = "pre_step_metadata.json" STEP_OUTPUTS_METADATA = "outputs.json" +POST_STEP_METADATA = "post_step_metadata.json" STEP_ARGS = "args.pkl" STEP_OUTPUT = "output.pkl" STEP_EXCEPTION = "exception.pkl" @@ -37,6 +40,7 @@ CLASS_BODY = "class_body.pkl" WORKFLOW_META = "workflow_meta.json" WORKFLOW_PROGRESS = "progress.json" +WORKFLOW_USER_METADATA = "user_metadata.json" # Without this counter, we're going to scan all steps to get the number of # steps with a given name. This can be very expensive if there are too # many duplicates. @@ -517,6 +521,12 @@ def _key_workflow_progress(self): def _key_step_input_metadata(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_INPUTS_METADATA] + def _key_step_user_metadata(self, step_id): + return [self._workflow_id, STEPS_DIR, step_id, STEP_USER_METADATA] + + def _key_pre_step_metadata(self, step_id): + return [self._workflow_id, STEPS_DIR, step_id, PRE_STEP_METADATA] + def _key_step_output(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_OUTPUT] @@ -526,6 +536,9 @@ def _key_step_exception(self, step_id): def _key_step_output_metadata(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_OUTPUTS_METADATA] + def _key_post_step_metadata(self, step_id): + return [self._workflow_id, STEPS_DIR, step_id, POST_STEP_METADATA] + def _key_step_function_body(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_FUNC_BODY] @@ -544,6 +557,9 @@ def _key_class_body(self): def _key_workflow_metadata(self): return [self._workflow_id, WORKFLOW_META] + def _key_workflow_user_metadata(self): + return [self._workflow_id, WORKFLOW_USER_METADATA] + def _key_num_steps_with_name(self, name): return [self._workflow_id, DUPLICATE_NAME_COUNTER, name] From 263a5cd3b4b54b3b9c91fbb11847380dd110bda1 Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 12:54:28 -0400 Subject: [PATCH 02/19] change type hint for step user metadata --- python/ray/workflow/common.py | 4 ++-- python/ray/workflow/step_function.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index 5437b062bc01..1d99c91169fb 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -129,7 +129,7 @@ class WorkflowData: # name of the step name: str # meta data to store - metadata: Dict + metadata: Dict[str, Any] def to_metadata(self) -> Dict[str, Any]: f = self.func_body @@ -141,7 +141,7 @@ def to_metadata(self) -> Dict[str, Any]: "workflow_refs": [wr.step_id for wr in self.inputs.workflow_refs], "catch_exceptions": self.catch_exceptions, "ray_options": self.ray_options, - "metadata": self.metadata + "user_metadata": self.metadata } return metadata diff --git a/python/ray/workflow/step_function.py b/python/ray/workflow/step_function.py index 0d3251d204f6..9098f963cfb9 100644 --- a/python/ray/workflow/step_function.py +++ b/python/ray/workflow/step_function.py @@ -69,7 +69,7 @@ def options(self, max_retries: int = 3, catch_exceptions: bool = False, name: str = None, - metadata: Dict = None, + metadata: Dict[str, Any] = None, **ray_options) -> "WorkflowStepFunction": """This function set how the step function is going to be executed. From d06ab0ecea44c1c7d3420163148fac50e2808eef Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 12:54:49 -0400 Subject: [PATCH 03/19] put prerun and postrun meta into workflow_storage --- python/ray/workflow/step_executor.py | 8 ++--- python/ray/workflow/workflow_storage.py | 43 +++++++++++++++++++++---- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index d3f7c2564857..ec66339e3b3c 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -330,12 +330,11 @@ def _workflow_step_executor(step_type: StepType, func: Callable, args, kwargs = _resolve_step_inputs(baked_inputs) store = workflow_storage.get_workflow_storage() try: - step_start_metadata = {'start_time': time.time()} - asyncio.get_event_loop().run_until_complete(store._put( - store._key_pre_step_metadata(step_id), step_start_metadata, True)) + store.save_step_prerun_metadata(step_id) persisted_output, volatile_output = _wrap_run( func, step_type, step_id, catch_exceptions, max_retries, *args, **kwargs) + store.save_step_postrun_metadata(step_id) except Exception as e: commit_step(store, step_id, None, e) raise e @@ -372,9 +371,6 @@ def _workflow_step_executor(step_type: StepType, func: Callable, # advance the progress of the workflow store.advance_progress(step_id) _record_step_status(step_id, WorkflowStatus.SUCCESSFUL) - step_end_metadata = {'end_time': time.time()} - asyncio.get_event_loop().run_until_complete(store._put( - store._key_post_step_metadata(step_id), step_end_metadata, True)) logger.info(get_step_status_info(WorkflowStatus.SUCCESSFUL)) if isinstance(volatile_output, Workflow): # This is the case where a step method is called in the virtual actor. diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index 62ed9c57dabd..dfb3866b5479 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -4,6 +4,7 @@ """ import asyncio +import time from typing import Dict, List, Optional, Any, Callable, Tuple, Union from dataclasses import dataclass import logging @@ -30,9 +31,9 @@ STEPS_DIR = "steps" STEP_INPUTS_METADATA = "inputs.json" STEP_USER_METADATA = "user_metadata.json" -PRE_STEP_METADATA = "pre_step_metadata.json" +STEP_PRERUN_METADATA = "prerun_metadata.json" STEP_OUTPUTS_METADATA = "outputs.json" -POST_STEP_METADATA = "post_step_metadata.json" +STEP_POSTRUN_METADATA = "postrun_metadata.json" STEP_ARGS = "args.pkl" STEP_OUTPUT = "output.pkl" STEP_EXCEPTION = "exception.pkl" @@ -376,6 +377,36 @@ def save_actor_class_body(self, cls: type) -> None: """ asyncio_run(self._put(self._key_class_body(), cls)) + def save_step_prerun_metadata(self, step_id: StepID): + """Save pre-run metadata for the current step. + + Args: + step_id: ID of the workflow step. + + Raises: + DataSaveError: if we fail to save the pre-run metadata. + """ + + metadata = { + "start_time": time.time(), + } + asyncio_run(self._put(self._key_step_prerun_metadata(step_id), metadata, True)) + + def save_step_postrun_metadata(self, step_id: StepID): + """Save post-run metadata for the current step. + + Args: + step_id: ID of the workflow step. + + Raises: + DataSaveError: if we fail to save the post-run metadata. + """ + + metadata = { + "end_time": time.time(), + } + asyncio_run(self._put(self._key_step_postrun_metadata(step_id), metadata, True)) + def save_workflow_meta(self, metadata: WorkflowMetaData) -> None: """Save the metadata of the current workflow. @@ -524,8 +555,8 @@ def _key_step_input_metadata(self, step_id): def _key_step_user_metadata(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_USER_METADATA] - def _key_pre_step_metadata(self, step_id): - return [self._workflow_id, STEPS_DIR, step_id, PRE_STEP_METADATA] + def _key_step_prerun_metadata(self, step_id): + return [self._workflow_id, STEPS_DIR, step_id, STEP_PRERUN_METADATA] def _key_step_output(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_OUTPUT] @@ -536,8 +567,8 @@ def _key_step_exception(self, step_id): def _key_step_output_metadata(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_OUTPUTS_METADATA] - def _key_post_step_metadata(self, step_id): - return [self._workflow_id, STEPS_DIR, step_id, POST_STEP_METADATA] + def _key_step_postrun_metadata(self, step_id): + return [self._workflow_id, STEPS_DIR, step_id, STEP_POSTRUN_METADATA] def _key_step_function_body(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_FUNC_BODY] From d618fb418943ae53afa68ebf2ca3ffe518b9d0a5 Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 13:24:15 -0400 Subject: [PATCH 04/19] add workflow level metadata put --- python/ray/workflow/execution.py | 5 ++- python/ray/workflow/workflow_storage.py | 50 +++++++++++++++++++++++-- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/python/ray/workflow/execution.py b/python/ray/workflow/execution.py index 6cf51e7efac1..3199494978e7 100644 --- a/python/ray/workflow/execution.py +++ b/python/ray/workflow/execution.py @@ -41,8 +41,8 @@ def run(entry_workflow: Workflow, store.storage_url): # checkpoint the workflow ws = workflow_storage.get_workflow_storage(workflow_id) - asyncio.get_event_loop().run_until_complete(ws._put( - ws._key_workflow_user_metadata(), metadata, True)) + ws.save_workflow_user_metadata(metadata) + ws.save_workflow_prerun_metadata() wf_exists = True try: @@ -65,6 +65,7 @@ def run(entry_workflow: Workflow, result: "WorkflowExecutionResult" = ray.get( workflow_manager.run_or_resume.remote(workflow_id, ignore_existing)) + ws.save_workflow_postrun_metadata() if entry_workflow.data.step_type == StepType.FUNCTION: return flatten_workflow_output(workflow_id, result.persisted_output) diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index dfb3866b5479..a6fbb731dc14 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -40,8 +40,10 @@ STEP_FUNC_BODY = "func_body.pkl" CLASS_BODY = "class_body.pkl" WORKFLOW_META = "workflow_meta.json" -WORKFLOW_PROGRESS = "progress.json" WORKFLOW_USER_METADATA = "user_metadata.json" +WORKFLOW_PRERUN_METADATA = "prerun_metadata.json" +WORKFLOW_POSTRUN_METADATA = "postrun_metadata.json" +WORKFLOW_PROGRESS = "progress.json" # Without this counter, we're going to scan all steps to get the number of # steps with a given name. This can be very expensive if there are too # many duplicates. @@ -378,7 +380,7 @@ def save_actor_class_body(self, cls: type) -> None: asyncio_run(self._put(self._key_class_body(), cls)) def save_step_prerun_metadata(self, step_id: StepID): - """Save pre-run metadata for the current step. + """Save pre-run metadata of the current step. Args: step_id: ID of the workflow step. @@ -393,7 +395,7 @@ def save_step_prerun_metadata(self, step_id: StepID): asyncio_run(self._put(self._key_step_prerun_metadata(step_id), metadata, True)) def save_step_postrun_metadata(self, step_id: StepID): - """Save post-run metadata for the current step. + """Save post-run metadata of the current step. Args: step_id: ID of the workflow step. @@ -407,6 +409,42 @@ def save_step_postrun_metadata(self, step_id: StepID): } asyncio_run(self._put(self._key_step_postrun_metadata(step_id), metadata, True)) + def save_workflow_user_metadata(self, metadata): + """Save user metadata of the current workflow. + + Args: + metadata: user metadata of the current workflow. + + Raises: + DataSaveError: if we fail to save the pre-run metadata. + """ + + asyncio_run(self._put(self._key_workflow_user_metadata(), metadata, True)) + + def save_workflow_prerun_metadata(self): + """Save pre-run metadata of the current workflow. + + Raises: + DataSaveError: if we fail to save the pre-run metadata. + """ + + metadata = { + "start_time": time.time(), + } + asyncio_run(self._put(self._key_workflow_prerun_metadata(), metadata, True)) + + def save_workflow_postrun_metadata(self): + """Save post-run metadata of the current workflow. + + Raises: + DataSaveError: if we fail to save the post-run metadata. + """ + + metadata = { + "end_time": time.time(), + } + asyncio_run(self._put(self._key_workflow_postrun_metadata(), metadata, True)) + def save_workflow_meta(self, metadata: WorkflowMetaData) -> None: """Save the metadata of the current workflow. @@ -591,6 +629,12 @@ def _key_workflow_metadata(self): def _key_workflow_user_metadata(self): return [self._workflow_id, WORKFLOW_USER_METADATA] + def _key_workflow_prerun_metadata(self): + return [self._workflow_id, WORKFLOW_PRERUN_METADATA] + + def _key_workflow_postrun_metadata(self): + return [self._workflow_id, WORKFLOW_POSTRUN_METADATA] + def _key_num_steps_with_name(self, name): return [self._workflow_id, DUPLICATE_NAME_COUNTER, name] From f685598b585524acabab8248609f1f84b9f2492d Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 13:25:59 -0400 Subject: [PATCH 05/19] fix type hint --- python/ray/workflow/common.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index 1d99c91169fb..2cd140a18f2f 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -264,7 +264,7 @@ def __reduce__(self): "remote, or stored in Ray objects.") @PublicAPI(stability="beta") - def run(self, workflow_id: Optional[str] = None, metadata: Optional[Dict] = None) -> Any: + def run(self, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Any: """Run a workflow. If the workflow with the given id already exists, it will be resumed. @@ -294,9 +294,9 @@ def run(self, workflow_id: Optional[str] = None, metadata: Optional[Dict] = None metadata: metadata to add to the workflow. """ return ray.get(self.run_async(workflow_id, metadata)) - + @PublicAPI(stability="beta") - def run_async(self, workflow_id: Optional[str] = None, metadata: Optional[Dict] = None) -> ObjectRef: + def run_async(self, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> ObjectRef: """Run a workflow asynchronously. If the workflow with the given id already exists, it will be resumed. From 94b4734ce8435156a73d6adad73d5a038d02a948 Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 13:28:55 -0400 Subject: [PATCH 06/19] re-order code additions --- python/ray/workflow/common.py | 2 +- python/ray/workflow/workflow_storage.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index 2cd140a18f2f..e94df0558f19 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -294,7 +294,7 @@ def run(self, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, An metadata: metadata to add to the workflow. """ return ray.get(self.run_async(workflow_id, metadata)) - + @PublicAPI(stability="beta") def run_async(self, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> ObjectRef: """Run a workflow asynchronously. diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index a6fbb731dc14..878b9086aa90 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -32,8 +32,8 @@ STEP_INPUTS_METADATA = "inputs.json" STEP_USER_METADATA = "user_metadata.json" STEP_PRERUN_METADATA = "prerun_metadata.json" -STEP_OUTPUTS_METADATA = "outputs.json" STEP_POSTRUN_METADATA = "postrun_metadata.json" +STEP_OUTPUTS_METADATA = "outputs.json" STEP_ARGS = "args.pkl" STEP_OUTPUT = "output.pkl" STEP_EXCEPTION = "exception.pkl" @@ -596,6 +596,9 @@ def _key_step_user_metadata(self, step_id): def _key_step_prerun_metadata(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_PRERUN_METADATA] + def _key_step_postrun_metadata(self, step_id): + return [self._workflow_id, STEPS_DIR, step_id, STEP_POSTRUN_METADATA] + def _key_step_output(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_OUTPUT] @@ -605,9 +608,6 @@ def _key_step_exception(self, step_id): def _key_step_output_metadata(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_OUTPUTS_METADATA] - def _key_step_postrun_metadata(self, step_id): - return [self._workflow_id, STEPS_DIR, step_id, STEP_POSTRUN_METADATA] - def _key_step_function_body(self, step_id): return [self._workflow_id, STEPS_DIR, step_id, STEP_FUNC_BODY] From 7ad2b4c11f5fd93264420a1871207ab79a2ed281 Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 14:17:05 -0400 Subject: [PATCH 07/19] change the location of where workflow metadata recorded --- python/ray/workflow/execution.py | 2 -- python/ray/workflow/workflow_access.py | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/workflow/execution.py b/python/ray/workflow/execution.py index 3199494978e7..9efe2719cbad 100644 --- a/python/ray/workflow/execution.py +++ b/python/ray/workflow/execution.py @@ -42,7 +42,6 @@ def run(entry_workflow: Workflow, # checkpoint the workflow ws = workflow_storage.get_workflow_storage(workflow_id) ws.save_workflow_user_metadata(metadata) - ws.save_workflow_prerun_metadata() wf_exists = True try: @@ -65,7 +64,6 @@ def run(entry_workflow: Workflow, result: "WorkflowExecutionResult" = ray.get( workflow_manager.run_or_resume.remote(workflow_id, ignore_existing)) - ws.save_workflow_postrun_metadata() if entry_workflow.data.step_type == StepType.FUNCTION: return flatten_workflow_output(workflow_id, result.persisted_output) diff --git a/python/ray/workflow/workflow_access.py b/python/ray/workflow/workflow_access.py index c1b5d78d253a..6b1e3c10e77d 100644 --- a/python/ray/workflow/workflow_access.py +++ b/python/ray/workflow/workflow_access.py @@ -169,6 +169,7 @@ def run_or_resume(self, workflow_id: str, ignore_existing: bool = False raise RuntimeError(f"The output of workflow[id={workflow_id}] " "already exists.") wf_store = workflow_storage.WorkflowStorage(workflow_id, self._store) + wf_store.save_workflow_prerun_metadata() step_id = wf_store.get_entrypoint_step_id() try: current_output = self._workflow_outputs[workflow_id].output @@ -229,6 +230,7 @@ def update_step_status(self, workflow_id: str, step_id: str, wf_store.save_workflow_meta( common.WorkflowMetaData(common.WorkflowStatus.SUCCESSFUL)) self._step_status.pop(workflow_id) + wf_store.save_workflow_postrun_metadata() def cancel_workflow(self, workflow_id: str) -> None: self._step_status.pop(workflow_id) From 882df4959211a08fe7145fa3d8785916c6b7306a Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 14:53:36 -0400 Subject: [PATCH 08/19] move application logic out of workflow storage api --- python/ray/workflow/common.py | 4 ++-- python/ray/workflow/step_executor.py | 6 +++-- python/ray/workflow/step_function.py | 4 ++-- python/ray/workflow/workflow_access.py | 7 ++++-- python/ray/workflow/workflow_storage.py | 31 +++++++++++-------------- 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index e94df0558f19..b931aff6239d 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -129,7 +129,7 @@ class WorkflowData: # name of the step name: str # meta data to store - metadata: Dict[str, Any] + user_metadata: Dict[str, Any] def to_metadata(self) -> Dict[str, Any]: f = self.func_body @@ -141,7 +141,7 @@ def to_metadata(self) -> Dict[str, Any]: "workflow_refs": [wr.step_id for wr in self.inputs.workflow_refs], "catch_exceptions": self.catch_exceptions, "ray_options": self.ray_options, - "user_metadata": self.metadata + "user_metadata": self.user_metadata } return metadata diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index ec66339e3b3c..81096792621c 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -330,11 +330,13 @@ def _workflow_step_executor(step_type: StepType, func: Callable, args, kwargs = _resolve_step_inputs(baked_inputs) store = workflow_storage.get_workflow_storage() try: - store.save_step_prerun_metadata(step_id) + step_prerun_metadata = {"start_time": time.time()} + store.save_step_prerun_metadata(step_id, step_prerun_metadata) persisted_output, volatile_output = _wrap_run( func, step_type, step_id, catch_exceptions, max_retries, *args, **kwargs) - store.save_step_postrun_metadata(step_id) + step_postrun_metadata = {"end_time": time.time()} + store.save_step_postrun_metadata(step_id, step_postrun_metadata) except Exception as e: commit_step(store, step_id, None, e) raise e diff --git a/python/ray/workflow/step_function.py b/python/ray/workflow/step_function.py index 9098f963cfb9..3bbd1d462be7 100644 --- a/python/ray/workflow/step_function.py +++ b/python/ray/workflow/step_function.py @@ -31,7 +31,7 @@ def __init__(self, self._ray_options = ray_options or {} self._func_signature = signature.extract_signature(func) self._name = name or "" - self._metadata = metadata or {} + self._user_metadata = metadata or {} # Override signature and docstring @functools.wraps(func) @@ -52,7 +52,7 @@ def prepare_inputs(): catch_exceptions=self._catch_exceptions, ray_options=self._ray_options, name=self._name, - metadata=self._metadata + user_metadata=self._user_metadata ) return Workflow(workflow_data, prepare_inputs) diff --git a/python/ray/workflow/workflow_access.py b/python/ray/workflow/workflow_access.py index 6b1e3c10e77d..c6591041c786 100644 --- a/python/ray/workflow/workflow_access.py +++ b/python/ray/workflow/workflow_access.py @@ -1,4 +1,5 @@ import logging +import time from typing import Any, Dict, List, Tuple, Optional, TYPE_CHECKING from dataclasses import dataclass @@ -169,7 +170,8 @@ def run_or_resume(self, workflow_id: str, ignore_existing: bool = False raise RuntimeError(f"The output of workflow[id={workflow_id}] " "already exists.") wf_store = workflow_storage.WorkflowStorage(workflow_id, self._store) - wf_store.save_workflow_prerun_metadata() + workflow_prerun_metadata = {"start_time": time.time()} + wf_store.save_workflow_prerun_metadata(workflow_prerun_metadata) step_id = wf_store.get_entrypoint_step_id() try: current_output = self._workflow_outputs[workflow_id].output @@ -230,7 +232,8 @@ def update_step_status(self, workflow_id: str, step_id: str, wf_store.save_workflow_meta( common.WorkflowMetaData(common.WorkflowStatus.SUCCESSFUL)) self._step_status.pop(workflow_id) - wf_store.save_workflow_postrun_metadata() + workflow_postrun_metadata = {"end_time": time.time()} + wf_store.save_workflow_postrun_metadata(workflow_postrun_metadata) def cancel_workflow(self, workflow_id: str) -> None: self._step_status.pop(workflow_id) diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index 878b9086aa90..949c35a7dbe7 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -4,7 +4,6 @@ """ import asyncio -import time from typing import Dict, List, Optional, Any, Callable, Tuple, Union from dataclasses import dataclass import logging @@ -379,37 +378,33 @@ def save_actor_class_body(self, cls: type) -> None: """ asyncio_run(self._put(self._key_class_body(), cls)) - def save_step_prerun_metadata(self, step_id: StepID): + def save_step_prerun_metadata(self, step_id: StepID, metadata: Dict[str, Any]): """Save pre-run metadata of the current step. Args: step_id: ID of the workflow step. + metadata: pre-run metadata of the current step. Raises: DataSaveError: if we fail to save the pre-run metadata. """ - metadata = { - "start_time": time.time(), - } asyncio_run(self._put(self._key_step_prerun_metadata(step_id), metadata, True)) - def save_step_postrun_metadata(self, step_id: StepID): + def save_step_postrun_metadata(self, step_id: StepID, metadata: Dict[str, Any]): """Save post-run metadata of the current step. Args: step_id: ID of the workflow step. + metadata: post-run metadata of the current step. Raises: DataSaveError: if we fail to save the post-run metadata. """ - metadata = { - "end_time": time.time(), - } asyncio_run(self._put(self._key_step_postrun_metadata(step_id), metadata, True)) - def save_workflow_user_metadata(self, metadata): + def save_workflow_user_metadata(self, metadata: Dict[str, Any]): """Save user metadata of the current workflow. Args: @@ -421,28 +416,28 @@ def save_workflow_user_metadata(self, metadata): asyncio_run(self._put(self._key_workflow_user_metadata(), metadata, True)) - def save_workflow_prerun_metadata(self): + def save_workflow_prerun_metadata(self, metadata: Dict[str, Any]): """Save pre-run metadata of the current workflow. + Args: + metadata: pre-run metadata of the current workflow. + Raises: DataSaveError: if we fail to save the pre-run metadata. """ - metadata = { - "start_time": time.time(), - } asyncio_run(self._put(self._key_workflow_prerun_metadata(), metadata, True)) - def save_workflow_postrun_metadata(self): + def save_workflow_postrun_metadata(self, metadata: Dict[str, Any]): """Save post-run metadata of the current workflow. + Args: + metadata: post-run metadata of the current workflow. + Raises: DataSaveError: if we fail to save the post-run metadata. """ - metadata = { - "end_time": time.time(), - } asyncio_run(self._put(self._key_workflow_postrun_metadata(), metadata, True)) def save_workflow_meta(self, metadata: WorkflowMetaData) -> None: From 5aa1b18cce99fe1af333cc0cb4895523ab8356bb Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 14:56:32 -0400 Subject: [PATCH 09/19] change checkpoint files naming --- python/ray/workflow/workflow_storage.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index 949c35a7dbe7..496adc77c1c3 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -30,8 +30,8 @@ STEPS_DIR = "steps" STEP_INPUTS_METADATA = "inputs.json" STEP_USER_METADATA = "user_metadata.json" -STEP_PRERUN_METADATA = "prerun_metadata.json" -STEP_POSTRUN_METADATA = "postrun_metadata.json" +STEP_PRERUN_METADATA = "pre_step_metadata.json" +STEP_POSTRUN_METADATA = "pos_step_metadata.json" STEP_OUTPUTS_METADATA = "outputs.json" STEP_ARGS = "args.pkl" STEP_OUTPUT = "output.pkl" @@ -40,8 +40,8 @@ CLASS_BODY = "class_body.pkl" WORKFLOW_META = "workflow_meta.json" WORKFLOW_USER_METADATA = "user_metadata.json" -WORKFLOW_PRERUN_METADATA = "prerun_metadata.json" -WORKFLOW_POSTRUN_METADATA = "postrun_metadata.json" +WORKFLOW_PRERUN_METADATA = "pre_run_metadata.json" +WORKFLOW_POSTRUN_METADATA = "post_run_metadata.json" WORKFLOW_PROGRESS = "progress.json" # Without this counter, we're going to scan all steps to get the number of # steps with a given name. This can be very expensive if there are too From bdc54bdc1b1bdcef2488ddea62167d2dd08d615c Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 16:31:03 -0400 Subject: [PATCH 10/19] fix attribute naming change --- python/ray/workflow/step_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index 81096792621c..de32aa38d796 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -182,7 +182,7 @@ async def _write_step_inputs(wf_storage: workflow_storage.WorkflowStorage, wf_storage._put( wf_storage._key_step_input_metadata(step_id), metadata, True), wf_storage._put( - wf_storage._key_step_user_metadata(step_id), inputs.metadata, True), + wf_storage._key_step_user_metadata(step_id), inputs.user_metadata, True), serialization.dump_to_storage( wf_storage._key_step_function_body(step_id), inputs.func_body, workflow_id, storage), From 148bcffa50334c00cbc12c94d02850328de674a2 Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 16:31:32 -0400 Subject: [PATCH 11/19] fix type --- python/ray/workflow/workflow_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index 496adc77c1c3..b5ff3ee012ae 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -411,7 +411,7 @@ def save_workflow_user_metadata(self, metadata: Dict[str, Any]): metadata: user metadata of the current workflow. Raises: - DataSaveError: if we fail to save the pre-run metadata. + DataSaveError: if we fail to save the user metadata. """ asyncio_run(self._put(self._key_workflow_user_metadata(), metadata, True)) From a0cb8bea82ae07513b3eb55c56a695ee6311bd91 Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 19:08:48 -0400 Subject: [PATCH 12/19] change metadata file names --- python/ray/workflow/workflow_storage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index b5ff3ee012ae..edc9e96581a6 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -29,9 +29,9 @@ OBJECTS_DIR = "objects" STEPS_DIR = "steps" STEP_INPUTS_METADATA = "inputs.json" -STEP_USER_METADATA = "user_metadata.json" +STEP_USER_METADATA = "user_step_metadata.json" STEP_PRERUN_METADATA = "pre_step_metadata.json" -STEP_POSTRUN_METADATA = "pos_step_metadata.json" +STEP_POSTRUN_METADATA = "post_step_metadata.json" STEP_OUTPUTS_METADATA = "outputs.json" STEP_ARGS = "args.pkl" STEP_OUTPUT = "output.pkl" @@ -39,7 +39,7 @@ STEP_FUNC_BODY = "func_body.pkl" CLASS_BODY = "class_body.pkl" WORKFLOW_META = "workflow_meta.json" -WORKFLOW_USER_METADATA = "user_metadata.json" +WORKFLOW_USER_METADATA = "user_run_metadata.json" WORKFLOW_PRERUN_METADATA = "pre_run_metadata.json" WORKFLOW_POSTRUN_METADATA = "post_run_metadata.json" WORKFLOW_PROGRESS = "progress.json" From bbeeb5ed16fa8f57b40c11d62a822550a9483376 Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 20:25:31 -0400 Subject: [PATCH 13/19] add tests for workflow metadata --- .../ray/workflow/tests/test_metadata_put.py | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 python/ray/workflow/tests/test_metadata_put.py diff --git a/python/ray/workflow/tests/test_metadata_put.py b/python/ray/workflow/tests/test_metadata_put.py new file mode 100644 index 000000000000..23214c941dd1 --- /dev/null +++ b/python/ray/workflow/tests/test_metadata_put.py @@ -0,0 +1,108 @@ +import asyncio + +from ray import workflow +from ray.tests.conftest import * # noqa +from ray.workflow import workflow_storage +from ray.workflow.storage import get_global_storage + + +def get_metadata(paths, is_json=True): + store = get_global_storage() + key = store.make_key(*paths) + return asyncio.get_event_loop().run_until_complete(store.get(key, is_json)) + + +def test_step_user_metadata(workflow_start_regular): + + metadata = {"k1": "v1"} + step_name = "simple_step" + workflow_id = "simple" + + @workflow.step(name=step_name, metadata=metadata) + def simple(): + return 0 + + simple.step().run(workflow_id) + + checkpointed_metadata = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_USER_METADATA]) + assert metadata == checkpointed_metadata + + +def test_step_runtime_metadata(workflow_start_regular): + + step_name = "simple_step" + workflow_id = "simple" + + @workflow.step(name=step_name) + def simple(): + return 0 + + simple.step().run(workflow_id) + + prerun_meta = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_PRERUN_METADATA]) + postrun_meta = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_POSTRUN_METADATA]) + assert "start_time" in prerun_meta + assert "end_time" in postrun_meta + + +def test_workflow_user_metadata(workflow_start_regular): + + metadata = {"k1": "v1"} + workflow_id = "simple" + + @workflow.step + def simple(): + return 0 + + simple.step().run(workflow_id, metadata=metadata) + + checkpointed_metadata = get_metadata([workflow_id, workflow_storage.WORKFLOW_USER_METADATA]) + assert metadata == checkpointed_metadata + + +def test_workflow_runtime_metadata(workflow_start_regular): + + workflow_id = "simple" + + @workflow.step + def simple(): + return 0 + + simple.step().run(workflow_id) + + prerun_meta = get_metadata([workflow_id, workflow_storage.WORKFLOW_PRERUN_METADATA]) + postrun_meta = get_metadata([workflow_id, workflow_storage.WORKFLOW_POSTRUN_METADATA]) + assert "start_time" in prerun_meta + assert "end_time" in postrun_meta + + +def test_all_metadata(workflow_start_regular): + + user_step_metadata = {"k1": "v1"} + user_run_metadata = {"k2": "v2"} + step_name = "simple_step" + workflow_id = "simple" + + @workflow.step + def simple(): + return 0 + + simple.options(name=step_name, metadata=user_step_metadata).step().run(workflow_id, metadata=user_run_metadata) + + checkpointed_user_step_metadata = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_USER_METADATA]) + checkpointed_user_run_metadata = get_metadata([workflow_id, workflow_storage.WORKFLOW_USER_METADATA]) + checkpointed_pre_step_meta = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_PRERUN_METADATA]) + checkpointed_post_step_meta = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_POSTRUN_METADATA]) + checkpointed_pre_run_meta = get_metadata([workflow_id, workflow_storage.WORKFLOW_PRERUN_METADATA]) + checkpointed_post_run_meta = get_metadata([workflow_id, workflow_storage.WORKFLOW_POSTRUN_METADATA]) + assert user_step_metadata == checkpointed_user_step_metadata + assert user_run_metadata == checkpointed_user_run_metadata + assert "start_time" in checkpointed_pre_step_meta + assert "start_time" in checkpointed_pre_run_meta + assert "end_time" in checkpointed_post_step_meta + assert "end_time" in checkpointed_post_run_meta + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__])) From 6ae3aa5c94dea8983ef69167295a87f59bd8cf86 Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 22:57:34 -0400 Subject: [PATCH 14/19] change metadata name --- python/ray/workflow/virtual_actor_class.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/workflow/virtual_actor_class.py b/python/ray/workflow/virtual_actor_class.py index 329b7599c0d1..c7c03d2bc301 100644 --- a/python/ray/workflow/virtual_actor_class.py +++ b/python/ray/workflow/virtual_actor_class.py @@ -217,7 +217,7 @@ def step(method_name, method, *args, **kwargs): catch_exceptions=False, ray_options={}, name=None, - metadata=None + user_metadata=None, ) wf = Workflow(workflow_data) return wf From c496afa343ad71727b73156be3cf21544e6ebb4e Mon Sep 17 00:00:00 2001 From: lchu Date: Mon, 11 Oct 2021 23:59:23 -0400 Subject: [PATCH 15/19] add json serialization check for user metadata --- python/ray/workflow/execution.py | 12 +++++++++++- python/ray/workflow/step_function.py | 12 ++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/python/ray/workflow/execution.py b/python/ray/workflow/execution.py index 9efe2719cbad..650edb708021 100644 --- a/python/ray/workflow/execution.py +++ b/python/ray/workflow/execution.py @@ -1,11 +1,11 @@ import asyncio +import json import logging import time from typing import Set, List, Tuple, Optional, TYPE_CHECKING, Dict import uuid import ray - from ray.workflow import workflow_context from ray.workflow import workflow_storage from ray.workflow.common import (Workflow, WorkflowStatus, WorkflowMetaData, @@ -27,6 +27,16 @@ def run(entry_workflow: Workflow, metadata: Optional[Dict] = None) -> ray.ObjectRef: """Run a workflow asynchronously. """ + if metadata is not None: + if not isinstance(metadata, dict): + raise ValueError("metadata must be a dict.") + for k, v in metadata.items(): + try: + json.dumps(v) + except TypeError as e: + raise ValueError("metadata values must be JSON serializable, " + "however '{}' has a value whose {}.".format(k, e)) + store = get_global_storage() assert ray.is_initialized() if workflow_id is None: diff --git a/python/ray/workflow/step_function.py b/python/ray/workflow/step_function.py index 3bbd1d462be7..c886b66d5637 100644 --- a/python/ray/workflow/step_function.py +++ b/python/ray/workflow/step_function.py @@ -1,4 +1,5 @@ import functools +import json from typing import Callable, Dict, Any from ray._private import signature @@ -22,8 +23,15 @@ def __init__(self, raise ValueError("max_retries should be greater or equal to 1.") if ray_options is not None and not isinstance(ray_options, dict): raise ValueError("ray_options must be a dict.") - if metadata is not None and not isinstance(metadata, dict): - raise ValueError("metadata must be a dict.") + if metadata is not None: + if not isinstance(metadata, dict): + raise ValueError("metadata must be a dict.") + for k, v in metadata.items(): + try: + json.dumps(v) + except TypeError as e: + raise ValueError("metadata values must be JSON serializable, " + "however '{}' has a value whose {}.".format(k, e)) self._func = func self._max_retries = max_retries From 4dfc0ed0bc7d3ff943fb660b478ccd390b4332ac Mon Sep 17 00:00:00 2001 From: lchu Date: Tue, 12 Oct 2021 00:51:09 -0400 Subject: [PATCH 16/19] lint --- python/ray/workflow/common.py | 8 +++- python/ray/workflow/execution.py | 3 +- python/ray/workflow/step_executor.py | 4 +- python/ray/workflow/step_function.py | 8 ++-- .../ray/workflow/tests/test_metadata_put.py | 48 +++++++++++++------ python/ray/workflow/workflow_storage.py | 22 ++++++--- 6 files changed, 63 insertions(+), 30 deletions(-) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index b931aff6239d..41789dc2dc92 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -264,7 +264,9 @@ def __reduce__(self): "remote, or stored in Ray objects.") @PublicAPI(stability="beta") - def run(self, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Any: + def run(self, + workflow_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None) -> Any: """Run a workflow. If the workflow with the given id already exists, it will be resumed. @@ -296,7 +298,9 @@ def run(self, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, An return ray.get(self.run_async(workflow_id, metadata)) @PublicAPI(stability="beta") - def run_async(self, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> ObjectRef: + def run_async(self, + workflow_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None) -> ObjectRef: """Run a workflow asynchronously. If the workflow with the given id already exists, it will be resumed. diff --git a/python/ray/workflow/execution.py b/python/ray/workflow/execution.py index 650edb708021..58168cb049b0 100644 --- a/python/ray/workflow/execution.py +++ b/python/ray/workflow/execution.py @@ -35,7 +35,8 @@ def run(entry_workflow: Workflow, json.dumps(v) except TypeError as e: raise ValueError("metadata values must be JSON serializable, " - "however '{}' has a value whose {}.".format(k, e)) + "however '{}' has a value whose {}.".format( + k, e)) store = get_global_storage() assert ray.is_initialized() diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index de32aa38d796..42b22a3080df 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -1,5 +1,4 @@ import time -import datetime import asyncio from dataclasses import dataclass import logging @@ -182,7 +181,8 @@ async def _write_step_inputs(wf_storage: workflow_storage.WorkflowStorage, wf_storage._put( wf_storage._key_step_input_metadata(step_id), metadata, True), wf_storage._put( - wf_storage._key_step_user_metadata(step_id), inputs.user_metadata, True), + wf_storage._key_step_user_metadata(step_id), inputs.user_metadata, + True), serialization.dump_to_storage( wf_storage._key_step_function_body(step_id), inputs.func_body, workflow_id, storage), diff --git a/python/ray/workflow/step_function.py b/python/ray/workflow/step_function.py index c886b66d5637..671b804aa6f3 100644 --- a/python/ray/workflow/step_function.py +++ b/python/ray/workflow/step_function.py @@ -30,8 +30,9 @@ def __init__(self, try: json.dumps(v) except TypeError as e: - raise ValueError("metadata values must be JSON serializable, " - "however '{}' has a value whose {}.".format(k, e)) + raise ValueError( + "metadata values must be JSON serializable, " + "however '{}' has a value whose {}.".format(k, e)) self._func = func self._max_retries = max_retries @@ -60,8 +61,7 @@ def prepare_inputs(): catch_exceptions=self._catch_exceptions, ray_options=self._ray_options, name=self._name, - user_metadata=self._user_metadata - ) + user_metadata=self._user_metadata) return Workflow(workflow_data, prepare_inputs) self.step = _build_workflow diff --git a/python/ray/workflow/tests/test_metadata_put.py b/python/ray/workflow/tests/test_metadata_put.py index 23214c941dd1..6d7653858186 100644 --- a/python/ray/workflow/tests/test_metadata_put.py +++ b/python/ray/workflow/tests/test_metadata_put.py @@ -5,6 +5,8 @@ from ray.workflow import workflow_storage from ray.workflow.storage import get_global_storage +import pytest + def get_metadata(paths, is_json=True): store = get_global_storage() @@ -24,7 +26,8 @@ def simple(): simple.step().run(workflow_id) - checkpointed_metadata = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_USER_METADATA]) + checkpointed_metadata = get_metadata( + [workflow_id, "steps", step_name, workflow_storage.STEP_USER_METADATA]) assert metadata == checkpointed_metadata @@ -39,8 +42,12 @@ def simple(): simple.step().run(workflow_id) - prerun_meta = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_PRERUN_METADATA]) - postrun_meta = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_POSTRUN_METADATA]) + prerun_meta = get_metadata([ + workflow_id, "steps", step_name, workflow_storage.STEP_PRERUN_METADATA + ]) + postrun_meta = get_metadata([ + workflow_id, "steps", step_name, workflow_storage.STEP_POSTRUN_METADATA + ]) assert "start_time" in prerun_meta assert "end_time" in postrun_meta @@ -56,7 +63,8 @@ def simple(): simple.step().run(workflow_id, metadata=metadata) - checkpointed_metadata = get_metadata([workflow_id, workflow_storage.WORKFLOW_USER_METADATA]) + checkpointed_metadata = get_metadata( + [workflow_id, workflow_storage.WORKFLOW_USER_METADATA]) assert metadata == checkpointed_metadata @@ -70,8 +78,10 @@ def simple(): simple.step().run(workflow_id) - prerun_meta = get_metadata([workflow_id, workflow_storage.WORKFLOW_PRERUN_METADATA]) - postrun_meta = get_metadata([workflow_id, workflow_storage.WORKFLOW_POSTRUN_METADATA]) + prerun_meta = get_metadata( + [workflow_id, workflow_storage.WORKFLOW_PRERUN_METADATA]) + postrun_meta = get_metadata( + [workflow_id, workflow_storage.WORKFLOW_POSTRUN_METADATA]) assert "start_time" in prerun_meta assert "end_time" in postrun_meta @@ -87,14 +97,24 @@ def test_all_metadata(workflow_start_regular): def simple(): return 0 - simple.options(name=step_name, metadata=user_step_metadata).step().run(workflow_id, metadata=user_run_metadata) - - checkpointed_user_step_metadata = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_USER_METADATA]) - checkpointed_user_run_metadata = get_metadata([workflow_id, workflow_storage.WORKFLOW_USER_METADATA]) - checkpointed_pre_step_meta = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_PRERUN_METADATA]) - checkpointed_post_step_meta = get_metadata([workflow_id, 'steps', step_name, workflow_storage.STEP_POSTRUN_METADATA]) - checkpointed_pre_run_meta = get_metadata([workflow_id, workflow_storage.WORKFLOW_PRERUN_METADATA]) - checkpointed_post_run_meta = get_metadata([workflow_id, workflow_storage.WORKFLOW_POSTRUN_METADATA]) + simple.options( + name=step_name, metadata=user_step_metadata).step().run( + workflow_id, metadata=user_run_metadata) + + checkpointed_user_step_metadata = get_metadata( + [workflow_id, "steps", step_name, workflow_storage.STEP_USER_METADATA]) + checkpointed_user_run_metadata = get_metadata( + [workflow_id, workflow_storage.WORKFLOW_USER_METADATA]) + checkpointed_pre_step_meta = get_metadata([ + workflow_id, "steps", step_name, workflow_storage.STEP_PRERUN_METADATA + ]) + checkpointed_post_step_meta = get_metadata([ + workflow_id, "steps", step_name, workflow_storage.STEP_POSTRUN_METADATA + ]) + checkpointed_pre_run_meta = get_metadata( + [workflow_id, workflow_storage.WORKFLOW_PRERUN_METADATA]) + checkpointed_post_run_meta = get_metadata( + [workflow_id, workflow_storage.WORKFLOW_POSTRUN_METADATA]) assert user_step_metadata == checkpointed_user_step_metadata assert user_run_metadata == checkpointed_user_run_metadata assert "start_time" in checkpointed_pre_step_meta diff --git a/python/ray/workflow/workflow_storage.py b/python/ray/workflow/workflow_storage.py index edc9e96581a6..a702d72b9ba5 100644 --- a/python/ray/workflow/workflow_storage.py +++ b/python/ray/workflow/workflow_storage.py @@ -378,7 +378,8 @@ def save_actor_class_body(self, cls: type) -> None: """ asyncio_run(self._put(self._key_class_body(), cls)) - def save_step_prerun_metadata(self, step_id: StepID, metadata: Dict[str, Any]): + def save_step_prerun_metadata(self, step_id: StepID, + metadata: Dict[str, Any]): """Save pre-run metadata of the current step. Args: @@ -389,9 +390,11 @@ def save_step_prerun_metadata(self, step_id: StepID, metadata: Dict[str, Any]): DataSaveError: if we fail to save the pre-run metadata. """ - asyncio_run(self._put(self._key_step_prerun_metadata(step_id), metadata, True)) + asyncio_run( + self._put(self._key_step_prerun_metadata(step_id), metadata, True)) - def save_step_postrun_metadata(self, step_id: StepID, metadata: Dict[str, Any]): + def save_step_postrun_metadata(self, step_id: StepID, + metadata: Dict[str, Any]): """Save post-run metadata of the current step. Args: @@ -402,7 +405,9 @@ def save_step_postrun_metadata(self, step_id: StepID, metadata: Dict[str, Any]): DataSaveError: if we fail to save the post-run metadata. """ - asyncio_run(self._put(self._key_step_postrun_metadata(step_id), metadata, True)) + asyncio_run( + self._put( + self._key_step_postrun_metadata(step_id), metadata, True)) def save_workflow_user_metadata(self, metadata: Dict[str, Any]): """Save user metadata of the current workflow. @@ -414,7 +419,8 @@ def save_workflow_user_metadata(self, metadata: Dict[str, Any]): DataSaveError: if we fail to save the user metadata. """ - asyncio_run(self._put(self._key_workflow_user_metadata(), metadata, True)) + asyncio_run( + self._put(self._key_workflow_user_metadata(), metadata, True)) def save_workflow_prerun_metadata(self, metadata: Dict[str, Any]): """Save pre-run metadata of the current workflow. @@ -426,7 +432,8 @@ def save_workflow_prerun_metadata(self, metadata: Dict[str, Any]): DataSaveError: if we fail to save the pre-run metadata. """ - asyncio_run(self._put(self._key_workflow_prerun_metadata(), metadata, True)) + asyncio_run( + self._put(self._key_workflow_prerun_metadata(), metadata, True)) def save_workflow_postrun_metadata(self, metadata: Dict[str, Any]): """Save post-run metadata of the current workflow. @@ -438,7 +445,8 @@ def save_workflow_postrun_metadata(self, metadata: Dict[str, Any]): DataSaveError: if we fail to save the post-run metadata. """ - asyncio_run(self._put(self._key_workflow_postrun_metadata(), metadata, True)) + asyncio_run( + self._put(self._key_workflow_postrun_metadata(), metadata, True)) def save_workflow_meta(self, metadata: WorkflowMetaData) -> None: """Save the metadata of the current workflow. From 7c0fe882dd7b8992f7891bc36ce261ff8e77c428 Mon Sep 17 00:00:00 2001 From: lchu Date: Tue, 12 Oct 2021 13:45:29 -0400 Subject: [PATCH 17/19] =?UTF-8?q?=E2=80=98empty=E2=80=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From db34a29add35e1807800eca7e1e5238b73e41991 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Tue, 12 Oct 2021 18:04:46 +0000 Subject: [PATCH 18/19] try to fix --- python/ray/workflow/common.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index 41789dc2dc92..f99b43718fc8 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -294,6 +294,10 @@ def run(self, workflow_id: A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated. metadata: metadata to add to the workflow. + + Returns: + The running result. + """ return ray.get(self.run_async(workflow_id, metadata)) @@ -328,6 +332,10 @@ def run_async(self, workflow_id: A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated. metadata: metadata to add to the workflow. + + Returns: + The running result as ray.ObjectRef. + """ # TODO(suquark): avoid cyclic importing from ray.workflow.execution import run From bfb67ee462f3242ce6612b0027fb544394058a2e Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Tue, 12 Oct 2021 23:25:13 +0000 Subject: [PATCH 19/19] up --- python/ray/workflow/common.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index f99b43718fc8..eb648e2b1cc6 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -293,11 +293,11 @@ def run(self, Args: workflow_id: A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated. - metadata: metadata to add to the workflow. + metadata: The metadata to add to the workflow. It has to be able + to serialize to json. Returns: The running result. - """ return ray.get(self.run_async(workflow_id, metadata)) @@ -331,7 +331,8 @@ def run_async(self, Args: workflow_id: A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated. - metadata: metadata to add to the workflow. + metadata: The metadata to add to the workflow. It has to be able + to serialize to json. Returns: The running result as ray.ObjectRef.