Skip to content

Commit 899b9d8

Browse files
authored
feat(workflow): support workflow failure (#57)
<!-- Describe what has changed in this PR --> **What changed?** * added WorkflowFailure that wraps the underlying exception * handled WorkflowFailure and respond the workflow failure event * reason is the Class name of the unwrapped exception, i.e. the cause; details are just a structured string representation of error message with the stack trace. * added cadence-web in integration test for debugging <!-- Tell your future self why have you made these changes --> **Why?** add workflow failure handling. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Integration test <img width="1422" height="601" alt="Screenshot 2025-12-02 at 1 51 34 PM" src="https://github.com/user-attachments/assets/5adb705e-b063-4b8d-b4a3-068ce91aa3df" /> <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Shijie Sheng <[email protected]>
1 parent be9d01b commit 899b9d8

File tree

5 files changed

+103
-13
lines changed

5 files changed

+103
-13
lines changed

cadence/_internal/workflow/workflow_engine.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
import logging
22
from dataclasses import dataclass
3+
import traceback
34
from typing import List
45

56
from cadence._internal.workflow.context import Context
67
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
78
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
89
from cadence._internal.workflow.workflow_intance import WorkflowInstance
9-
from cadence.api.v1.common_pb2 import Payload
10+
from cadence.api.v1.common_pb2 import Failure, Payload
1011
from cadence.api.v1.decision_pb2 import (
1112
CompleteWorkflowExecutionDecisionAttributes,
1213
Decision,
14+
FailWorkflowExecutionDecisionAttributes,
1315
)
1416
from cadence.api.v1.history_pb2 import (
1517
HistoryEvent,
1618
WorkflowExecutionStartedEventAttributes,
1719
)
1820
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
21+
from cadence.error import WorkflowFailure
1922
from cadence.workflow import WorkflowDefinition, WorkflowInfo
2023

2124
logger = logging.getLogger(__name__)
@@ -75,23 +78,29 @@ def process_decision(
7578
decisions = self._decision_manager.collect_pending_decisions()
7679

7780
# complete workflow if it is done
78-
try:
79-
if self._workflow_instance.is_done():
81+
if self._workflow_instance.is_done():
82+
try:
8083
result = self._workflow_instance.get_result()
84+
except WorkflowFailure as e:
85+
decisions.append(
86+
Decision(
87+
fail_workflow_execution_decision_attributes=FailWorkflowExecutionDecisionAttributes(
88+
failure=_failure_from_workflow_failure(e)
89+
)
90+
)
91+
)
92+
# TODO: handle cancellation error
93+
except Exception:
94+
raise
95+
else:
8196
decisions.append(
8297
Decision(
8398
complete_workflow_execution_decision_attributes=CompleteWorkflowExecutionDecisionAttributes(
8499
result=result
85100
)
86101
)
87102
)
88-
return DecisionResult(decisions=decisions)
89-
90-
except Exception:
91-
# TODO: handle CancellationError
92-
# TODO: handle WorkflowError
93-
# TODO: handle unknown error, fail decision task and try again instead of breaking the engine
94-
raise
103+
return DecisionResult(decisions=decisions)
95104

96105
except Exception as e:
97106
# Log decision task failure with full context (matches Java ReplayDecisionTaskHandler)
@@ -221,3 +230,16 @@ def _extract_workflow_input(
221230
return started_attrs.input
222231

223232
raise ValueError("No WorkflowExecutionStarted event found in history")
233+
234+
235+
def _failure_from_workflow_failure(e: WorkflowFailure) -> Failure:
236+
cause = e.__cause__
237+
238+
stacktrace = "".join(traceback.format_exception(cause))
239+
240+
details = f"message: {str(cause)}\nstacktrace: {stacktrace}"
241+
242+
return Failure(
243+
reason=type(cause).__name__,
244+
details=details.encode("utf-8"),
245+
)

cadence/_internal/workflow/workflow_intance.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
from asyncio import Task
1+
from asyncio import CancelledError, InvalidStateError, Task
22
from typing import Any, Optional
33
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
44
from cadence.api.v1.common_pb2 import Payload
55
from cadence.data_converter import DataConverter
6+
from cadence.error import WorkflowFailure
67
from cadence.workflow import WorkflowDefinition
78

89

@@ -33,6 +34,11 @@ def is_done(self) -> bool:
3334
def get_result(self) -> Payload:
3435
if self._task is None:
3536
raise RuntimeError("Workflow is not started yet")
36-
result = self._task.result()
37+
try:
38+
result = self._task.result()
39+
except (CancelledError, InvalidStateError) as e:
40+
raise e
41+
except Exception as e:
42+
raise WorkflowFailure(f"Workflow failed: {e}") from e
3743
# TODO: handle result with multiple outputs
3844
return self._data_converter.to_data([result])

cadence/error.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ def __init__(self, message: str) -> None:
66
super().__init__(message)
77

88

9+
class WorkflowFailure(Exception):
10+
def __init__(self, message: str) -> None:
11+
super().__init__(message)
12+
13+
914
class CadenceRpcError(Exception):
1015
def __init__(self, message: str, code: grpc.StatusCode, *args):
1116
super().__init__(message, code, *args)

tests/integration_tests/docker-compose.yml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,19 @@ services:
4141
services-network:
4242
aliases:
4343
- cadence
44+
cadence-web:
45+
image: ubercadence/web:latest
46+
environment:
47+
- "CADENCE_GRPC_PEERS=cadence:7833"
48+
ports:
49+
- "8088:8088"
50+
depends_on:
51+
- cadence
52+
networks:
53+
services-network:
54+
aliases:
55+
- cadence-web
4456
networks:
4557
services-network:
4658
name: services-network
47-
driver: bridge
59+
driver: bridge

tests/integration_tests/workflow/test_workflows.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ async def echo(self, message: str) -> str:
2323
return message
2424

2525

26+
class MockedFailure(Exception):
27+
pass
28+
29+
30+
@reg.workflow()
31+
class FailureWorkflow:
32+
@workflow.run
33+
async def failure(self, message: str) -> str:
34+
raise MockedFailure("mocked workflow failure")
35+
36+
2637
async def test_simple_workflow(helper: CadenceHelper):
2738
async with helper.worker(reg) as worker:
2839
execution = await worker.client.start_workflow(
@@ -50,6 +61,40 @@ async def test_simple_workflow(helper: CadenceHelper):
5061
)
5162

5263

64+
async def test_workflow_failure(helper: CadenceHelper):
65+
async with helper.worker(reg) as worker:
66+
execution = await worker.client.start_workflow(
67+
"FailureWorkflow",
68+
"hello world",
69+
task_list=worker.task_list,
70+
execution_start_to_close_timeout=timedelta(seconds=10),
71+
)
72+
73+
response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
74+
GetWorkflowExecutionHistoryRequest(
75+
domain=DOMAIN_NAME,
76+
workflow_execution=execution,
77+
wait_for_new_event=True,
78+
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
79+
skip_archival=True,
80+
)
81+
)
82+
83+
assert (
84+
"MockedFailure"
85+
== response.history.events[
86+
-1
87+
].workflow_execution_failed_event_attributes.failure.reason
88+
)
89+
90+
assert (
91+
"""raise MockedFailure("mocked workflow failure")"""
92+
in response.history.events[
93+
-1
94+
].workflow_execution_failed_event_attributes.failure.details.decode()
95+
)
96+
97+
5398
@pytest.mark.skip(reason="Incorrect WorkflowType")
5499
async def test_workflow_fn(helper: CadenceHelper):
55100
async with helper.worker(reg) as worker:

0 commit comments

Comments
 (0)