Skip to content

Commit ce64e6d

Browse files
lchu6fishbone
andauthored
[workflow] add metadata put in workflow (#19195)
## Why are these changes needed? Add metadata to workflow. Currently there is no option for user to attach any metadata to a step or workflow run, and workflow running metrics (except status) are not captured nor checkpointed. We are adding various of metadata including: 1. step-level user metadata. can be set with `step.options(metadata={})` 2. step-level pre-run metadata. this captures pre-run metadata such as step_start_time, more metrics can be added later. 3. step-level post-run metadata. this captures post-run metadata such as step_end_time, more metrics can be added later. 4. workflow-level user metadata. can be set with `workflow.run(metadata={})` 5. workflow-level pre-run metadata. this captures pre-run metadata such as workflow_start_time, more metrics can be added later. 6. workflow-level post-run metadata. this captures post-run metadata such as workflow_end_time, more metrics can be added later. ## Related issue number #17090 Co-authored-by: Yi Cheng <[email protected]>
1 parent 1b179ad commit ce64e6d

File tree

9 files changed

+295
-10
lines changed

9 files changed

+295
-10
lines changed

python/ray/workflow/api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ def step(*args, **kwargs):
9999
name = kwargs.pop("name", None)
100100
if name is not None:
101101
step_options["name"] = name
102+
metadata = kwargs.pop("metadata", None)
103+
if metadata is not None:
104+
step_options["metadata"] = metadata
102105
if len(kwargs) != 0:
103106
step_options["ray_options"] = kwargs
104107
return make_step_decorator(step_options)

python/ray/workflow/common.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ class WorkflowData:
128128
ray_options: Dict[str, Any]
129129
# name of the step
130130
name: str
131+
# meta data to store
132+
user_metadata: Dict[str, Any]
131133

132134
def to_metadata(self) -> Dict[str, Any]:
133135
f = self.func_body
@@ -139,6 +141,7 @@ def to_metadata(self) -> Dict[str, Any]:
139141
"workflow_refs": [wr.step_id for wr in self.inputs.workflow_refs],
140142
"catch_exceptions": self.catch_exceptions,
141143
"ray_options": self.ray_options,
144+
"user_metadata": self.user_metadata
142145
}
143146
return metadata
144147

@@ -261,7 +264,9 @@ def __reduce__(self):
261264
"remote, or stored in Ray objects.")
262265

263266
@PublicAPI(stability="beta")
264-
def run(self, workflow_id: Optional[str] = None) -> Any:
267+
def run(self,
268+
workflow_id: Optional[str] = None,
269+
metadata: Optional[Dict[str, Any]] = None) -> Any:
265270
"""Run a workflow.
266271
267272
If the workflow with the given id already exists, it will be resumed.
@@ -288,11 +293,18 @@ def run(self, workflow_id: Optional[str] = None) -> Any:
288293
Args:
289294
workflow_id: A unique identifier that can be used to resume the
290295
workflow. If not specified, a random id will be generated.
296+
metadata: The metadata to add to the workflow. It has to be able
297+
to serialize to json.
298+
299+
Returns:
300+
The running result.
291301
"""
292-
return ray.get(self.run_async(workflow_id))
302+
return ray.get(self.run_async(workflow_id, metadata))
293303

294304
@PublicAPI(stability="beta")
295-
def run_async(self, workflow_id: Optional[str] = None) -> ObjectRef:
305+
def run_async(self,
306+
workflow_id: Optional[str] = None,
307+
metadata: Optional[Dict[str, Any]] = None) -> ObjectRef:
296308
"""Run a workflow asynchronously.
297309
298310
If the workflow with the given id already exists, it will be resumed.
@@ -319,8 +331,14 @@ def run_async(self, workflow_id: Optional[str] = None) -> ObjectRef:
319331
Args:
320332
workflow_id: A unique identifier that can be used to resume the
321333
workflow. If not specified, a random id will be generated.
334+
metadata: The metadata to add to the workflow. It has to be able
335+
to serialize to json.
336+
337+
Returns:
338+
The running result as ray.ObjectRef.
339+
322340
"""
323341
# TODO(suquark): avoid cyclic importing
324342
from ray.workflow.execution import run
325343
self._step_id = None
326-
return run(self, workflow_id)
344+
return run(self, workflow_id, metadata)

python/ray/workflow/execution.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import asyncio
2+
import json
23
import logging
34
import time
4-
from typing import Set, List, Tuple, Optional, TYPE_CHECKING
5+
from typing import Set, List, Tuple, Optional, TYPE_CHECKING, Dict
56
import uuid
67

78
import ray
8-
99
from ray.workflow import workflow_context
1010
from ray.workflow import workflow_storage
1111
from ray.workflow.common import (Workflow, WorkflowStatus, WorkflowMetaData,
@@ -23,9 +23,21 @@
2323

2424

2525
def run(entry_workflow: Workflow,
26-
workflow_id: Optional[str] = None) -> ray.ObjectRef:
26+
workflow_id: Optional[str] = None,
27+
metadata: Optional[Dict] = None) -> ray.ObjectRef:
2728
"""Run a workflow asynchronously.
2829
"""
30+
if metadata is not None:
31+
if not isinstance(metadata, dict):
32+
raise ValueError("metadata must be a dict.")
33+
for k, v in metadata.items():
34+
try:
35+
json.dumps(v)
36+
except TypeError as e:
37+
raise ValueError("metadata values must be JSON serializable, "
38+
"however '{}' has a value whose {}.".format(
39+
k, e))
40+
2941
store = get_global_storage()
3042
assert ray.is_initialized()
3143
if workflow_id is None:
@@ -40,6 +52,7 @@ def run(entry_workflow: Workflow,
4052
store.storage_url):
4153
# checkpoint the workflow
4254
ws = workflow_storage.get_workflow_storage(workflow_id)
55+
ws.save_workflow_user_metadata(metadata)
4356

4457
wf_exists = True
4558
try:

python/ray/workflow/step_executor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
import asyncio
23
from dataclasses import dataclass
34
import logging
@@ -179,6 +180,9 @@ async def _write_step_inputs(wf_storage: workflow_storage.WorkflowStorage,
179180
# TODO (Alex): Handle the json case better?
180181
wf_storage._put(
181182
wf_storage._key_step_input_metadata(step_id), metadata, True),
183+
wf_storage._put(
184+
wf_storage._key_step_user_metadata(step_id), inputs.user_metadata,
185+
True),
182186
serialization.dump_to_storage(
183187
wf_storage._key_step_function_body(step_id), inputs.func_body,
184188
workflow_id, storage),
@@ -326,9 +330,13 @@ def _workflow_step_executor(step_type: StepType, func: Callable,
326330
args, kwargs = _resolve_step_inputs(baked_inputs)
327331
store = workflow_storage.get_workflow_storage()
328332
try:
333+
step_prerun_metadata = {"start_time": time.time()}
334+
store.save_step_prerun_metadata(step_id, step_prerun_metadata)
329335
persisted_output, volatile_output = _wrap_run(
330336
func, step_type, step_id, catch_exceptions, max_retries, *args,
331337
**kwargs)
338+
step_postrun_metadata = {"end_time": time.time()}
339+
store.save_step_postrun_metadata(step_id, step_postrun_metadata)
332340
except Exception as e:
333341
commit_step(store, step_id, None, e)
334342
raise e

python/ray/workflow/step_function.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import functools
2-
from typing import Callable
2+
import json
3+
from typing import Callable, Dict, Any
34

45
from ray._private import signature
56
from ray.workflow import serialization_context
@@ -16,18 +17,30 @@ def __init__(self,
1617
max_retries=3,
1718
catch_exceptions=False,
1819
name=None,
20+
metadata=None,
1921
ray_options=None):
2022
if not isinstance(max_retries, int) or max_retries < 1:
2123
raise ValueError("max_retries should be greater or equal to 1.")
2224
if ray_options is not None and not isinstance(ray_options, dict):
2325
raise ValueError("ray_options must be a dict.")
26+
if metadata is not None:
27+
if not isinstance(metadata, dict):
28+
raise ValueError("metadata must be a dict.")
29+
for k, v in metadata.items():
30+
try:
31+
json.dumps(v)
32+
except TypeError as e:
33+
raise ValueError(
34+
"metadata values must be JSON serializable, "
35+
"however '{}' has a value whose {}.".format(k, e))
2436

2537
self._func = func
2638
self._max_retries = max_retries
2739
self._catch_exceptions = catch_exceptions
2840
self._ray_options = ray_options or {}
2941
self._func_signature = signature.extract_signature(func)
3042
self._name = name or ""
43+
self._user_metadata = metadata or {}
3144

3245
# Override signature and docstring
3346
@functools.wraps(func)
@@ -48,7 +61,7 @@ def prepare_inputs():
4861
catch_exceptions=self._catch_exceptions,
4962
ray_options=self._ray_options,
5063
name=self._name,
51-
)
64+
user_metadata=self._user_metadata)
5265
return Workflow(workflow_data, prepare_inputs)
5366

5467
self.step = _build_workflow
@@ -64,6 +77,7 @@ def options(self,
6477
max_retries: int = 3,
6578
catch_exceptions: bool = False,
6679
name: str = None,
80+
metadata: Dict[str, Any] = None,
6781
**ray_options) -> "WorkflowStepFunction":
6882
"""This function set how the step function is going to be executed.
6983
@@ -79,11 +93,12 @@ def options(self,
7993
generate the step_id of the step. The name will be used
8094
directly as the step id if possible, otherwise deduplicated by
8195
appending .N suffixes.
96+
metadata: metadata to add to the step.
8297
**ray_options: All parameters in this fields will be passed
8398
to ray remote function options.
8499
85100
Returns:
86101
The step function itself.
87102
"""
88103
return WorkflowStepFunction(self._func, max_retries, catch_exceptions,
89-
name, ray_options)
104+
name, metadata, ray_options)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import asyncio
2+
3+
from ray import workflow
4+
from ray.tests.conftest import * # noqa
5+
from ray.workflow import workflow_storage
6+
from ray.workflow.storage import get_global_storage
7+
8+
import pytest
9+
10+
11+
def get_metadata(paths, is_json=True):
12+
store = get_global_storage()
13+
key = store.make_key(*paths)
14+
return asyncio.get_event_loop().run_until_complete(store.get(key, is_json))
15+
16+
17+
def test_step_user_metadata(workflow_start_regular):
18+
19+
metadata = {"k1": "v1"}
20+
step_name = "simple_step"
21+
workflow_id = "simple"
22+
23+
@workflow.step(name=step_name, metadata=metadata)
24+
def simple():
25+
return 0
26+
27+
simple.step().run(workflow_id)
28+
29+
checkpointed_metadata = get_metadata(
30+
[workflow_id, "steps", step_name, workflow_storage.STEP_USER_METADATA])
31+
assert metadata == checkpointed_metadata
32+
33+
34+
def test_step_runtime_metadata(workflow_start_regular):
35+
36+
step_name = "simple_step"
37+
workflow_id = "simple"
38+
39+
@workflow.step(name=step_name)
40+
def simple():
41+
return 0
42+
43+
simple.step().run(workflow_id)
44+
45+
prerun_meta = get_metadata([
46+
workflow_id, "steps", step_name, workflow_storage.STEP_PRERUN_METADATA
47+
])
48+
postrun_meta = get_metadata([
49+
workflow_id, "steps", step_name, workflow_storage.STEP_POSTRUN_METADATA
50+
])
51+
assert "start_time" in prerun_meta
52+
assert "end_time" in postrun_meta
53+
54+
55+
def test_workflow_user_metadata(workflow_start_regular):
56+
57+
metadata = {"k1": "v1"}
58+
workflow_id = "simple"
59+
60+
@workflow.step
61+
def simple():
62+
return 0
63+
64+
simple.step().run(workflow_id, metadata=metadata)
65+
66+
checkpointed_metadata = get_metadata(
67+
[workflow_id, workflow_storage.WORKFLOW_USER_METADATA])
68+
assert metadata == checkpointed_metadata
69+
70+
71+
def test_workflow_runtime_metadata(workflow_start_regular):
72+
73+
workflow_id = "simple"
74+
75+
@workflow.step
76+
def simple():
77+
return 0
78+
79+
simple.step().run(workflow_id)
80+
81+
prerun_meta = get_metadata(
82+
[workflow_id, workflow_storage.WORKFLOW_PRERUN_METADATA])
83+
postrun_meta = get_metadata(
84+
[workflow_id, workflow_storage.WORKFLOW_POSTRUN_METADATA])
85+
assert "start_time" in prerun_meta
86+
assert "end_time" in postrun_meta
87+
88+
89+
def test_all_metadata(workflow_start_regular):
90+
91+
user_step_metadata = {"k1": "v1"}
92+
user_run_metadata = {"k2": "v2"}
93+
step_name = "simple_step"
94+
workflow_id = "simple"
95+
96+
@workflow.step
97+
def simple():
98+
return 0
99+
100+
simple.options(
101+
name=step_name, metadata=user_step_metadata).step().run(
102+
workflow_id, metadata=user_run_metadata)
103+
104+
checkpointed_user_step_metadata = get_metadata(
105+
[workflow_id, "steps", step_name, workflow_storage.STEP_USER_METADATA])
106+
checkpointed_user_run_metadata = get_metadata(
107+
[workflow_id, workflow_storage.WORKFLOW_USER_METADATA])
108+
checkpointed_pre_step_meta = get_metadata([
109+
workflow_id, "steps", step_name, workflow_storage.STEP_PRERUN_METADATA
110+
])
111+
checkpointed_post_step_meta = get_metadata([
112+
workflow_id, "steps", step_name, workflow_storage.STEP_POSTRUN_METADATA
113+
])
114+
checkpointed_pre_run_meta = get_metadata(
115+
[workflow_id, workflow_storage.WORKFLOW_PRERUN_METADATA])
116+
checkpointed_post_run_meta = get_metadata(
117+
[workflow_id, workflow_storage.WORKFLOW_POSTRUN_METADATA])
118+
assert user_step_metadata == checkpointed_user_step_metadata
119+
assert user_run_metadata == checkpointed_user_run_metadata
120+
assert "start_time" in checkpointed_pre_step_meta
121+
assert "start_time" in checkpointed_pre_run_meta
122+
assert "end_time" in checkpointed_post_step_meta
123+
assert "end_time" in checkpointed_post_run_meta
124+
125+
126+
if __name__ == "__main__":
127+
import sys
128+
sys.exit(pytest.main(["-v", __file__]))

python/ray/workflow/virtual_actor_class.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ def step(method_name, method, *args, **kwargs):
217217
catch_exceptions=False,
218218
ray_options={},
219219
name=None,
220+
user_metadata=None,
220221
)
221222
wf = Workflow(workflow_data)
222223
return wf

python/ray/workflow/workflow_access.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import time
23
from typing import Any, Dict, List, Tuple, Optional, TYPE_CHECKING
34

45
from dataclasses import dataclass
@@ -169,6 +170,8 @@ def run_or_resume(self, workflow_id: str, ignore_existing: bool = False
169170
raise RuntimeError(f"The output of workflow[id={workflow_id}] "
170171
"already exists.")
171172
wf_store = workflow_storage.WorkflowStorage(workflow_id, self._store)
173+
workflow_prerun_metadata = {"start_time": time.time()}
174+
wf_store.save_workflow_prerun_metadata(workflow_prerun_metadata)
172175
step_id = wf_store.get_entrypoint_step_id()
173176
try:
174177
current_output = self._workflow_outputs[workflow_id].output
@@ -229,6 +232,8 @@ def update_step_status(self, workflow_id: str, step_id: str,
229232
wf_store.save_workflow_meta(
230233
common.WorkflowMetaData(common.WorkflowStatus.SUCCESSFUL))
231234
self._step_status.pop(workflow_id)
235+
workflow_postrun_metadata = {"end_time": time.time()}
236+
wf_store.save_workflow_postrun_metadata(workflow_postrun_metadata)
232237

233238
def cancel_workflow(self, workflow_id: str) -> None:
234239
self._step_status.pop(workflow_id)

0 commit comments

Comments
 (0)