Skip to content

Commit 469c2e4

Browse files
committed
fixes, lint, remove grpc changes
Signed-off-by: Filinto Duran <[email protected]>
1 parent e67276a commit 469c2e4

File tree

69 files changed

+7964
-76
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+7964
-76
lines changed

dapr/conf/__init__.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ def __init__(self):
2424
default_value = getattr(global_settings, setting)
2525
env_variable = os.environ.get(setting)
2626
if env_variable:
27-
val = (
28-
type(default_value)(env_variable) if default_value is not None else env_variable
29-
)
27+
val = self._coerce_env_value(default_value, env_variable)
3028
setattr(self, setting, val)
3129
else:
3230
setattr(self, setting, default_value)
@@ -36,5 +34,27 @@ def __getattr__(self, name):
3634
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
3735
return getattr(self, name)
3836

37+
@staticmethod
38+
def _coerce_env_value(default_value, env_variable: str):
39+
if default_value is None:
40+
return env_variable
41+
# Handle booleans explicitly to avoid bool('false') == True
42+
if isinstance(default_value, bool):
43+
s = env_variable.strip().lower()
44+
if s in ('1', 'true', 't', 'yes', 'y', 'on'):
45+
return True
46+
if s in ('0', 'false', 'f', 'no', 'n', 'off'):
47+
return False
48+
# Fallback: non-empty -> True for backward-compat
49+
return bool(s)
50+
# Integers
51+
if isinstance(default_value, int) and not isinstance(default_value, bool):
52+
return int(env_variable)
53+
# Floats
54+
if isinstance(default_value, float):
55+
return float(env_variable)
56+
# Other types: try to cast as before
57+
return type(default_value)(env_variable)
58+
3959

4060
settings = Settings()

dapr/conf/global_settings.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,23 @@
3434

3535
DAPR_HTTP_TIMEOUT_SECONDS = 60
3636

37+
# gRPC keepalive (disabled by default; enable via env to help with idle debugging sessions)
38+
DAPR_GRPC_KEEPALIVE_ENABLED: bool = False
39+
DAPR_GRPC_KEEPALIVE_TIME_MS: int = 120000 # send keepalive pings every 120s
40+
DAPR_GRPC_KEEPALIVE_TIMEOUT_MS: int = (
41+
20000 # wait 20s for ack before considering the connection dead
42+
)
43+
DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS: bool = False # allow pings when there are no active calls
44+
45+
# gRPC retries (disabled by default; enable via env to apply channel service config)
46+
DAPR_GRPC_RETRY_ENABLED: bool = False
47+
DAPR_GRPC_RETRY_MAX_ATTEMPTS: int = 4
48+
DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS: int = 100
49+
DAPR_GRPC_RETRY_MAX_BACKOFF_MS: int = 1000
50+
DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER: float = 2.0
51+
# Comma-separated list of status codes, e.g., 'UNAVAILABLE,DEADLINE_EXCEEDED'
52+
DAPR_GRPC_RETRY_CODES: str = 'UNAVAILABLE,DEADLINE_EXCEEDED'
53+
3754
# ----- Conversation API settings ------
3855

3956
# Configuration for handling large enums to avoid massive JSON schemas that can exceed LLM token limits

dev-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ mypy>=1.2.0
22
mypy-extensions>=0.4.3
33
mypy-protobuf>=2.9
44
tox>=4.3.0
5+
pip>=23.0.0
56
coverage>=5.3
7+
pytest
68
wheel
79
# used in unit test only
810
opentelemetry-sdk

examples/workflow-async/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Dapr Workflow Async Examples (Python)
2+
3+
These examples mirror `examples/workflow/` but author orchestrators with `async def` using the
4+
async workflow APIs. Activities remain regular functions unless noted.
5+
6+
How to run:
7+
- Ensure a Dapr sidecar is running locally. If needed, set `DURABLETASK_GRPC_ENDPOINT`, or
8+
`DURABLETASK_GRPC_HOST/PORT`.
9+
- Install requirements: `pip install -r requirements.txt`
10+
- Run any example: `python simple.py`
11+
12+
Notes:
13+
- Orchestrators use `await ctx.activity(...)`, `await ctx.sleep(...)`, `await ctx.when_all/when_any(...)`, etc.
14+
- No event loop is started manually; the Durable Task worker drives the async orchestrators.
15+
- You can also launch instances using `DaprWorkflowClient` as in the non-async examples.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2025 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
from dapr.ext.workflow import (
17+
AsyncWorkflowContext,
18+
DaprWorkflowClient,
19+
WorkflowRuntime,
20+
)
21+
22+
wfr = WorkflowRuntime()
23+
24+
25+
@wfr.async_workflow(name='child_async')
26+
async def child(ctx: AsyncWorkflowContext, n: int) -> int:
27+
return n * 2
28+
29+
30+
@wfr.async_workflow(name='parent_async')
31+
async def parent(ctx: AsyncWorkflowContext, n: int) -> int:
32+
r = await ctx.call_child_workflow(child, input=n)
33+
print(f'Child workflow returned {r}')
34+
return r + 1
35+
36+
37+
def main():
38+
wfr.start()
39+
client = DaprWorkflowClient()
40+
instance_id = 'parent_async_instance'
41+
client.schedule_new_workflow(workflow=parent, input=5, instance_id=instance_id)
42+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
43+
wfr.shutdown()
44+
45+
46+
if __name__ == '__main__':
47+
main()
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Copyright 2025 The Dapr Authors
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the specific language governing permissions and
12+
limitations under the License.
13+
"""
14+
15+
from dapr.ext.workflow import (
16+
AsyncWorkflowContext,
17+
DaprWorkflowClient,
18+
WorkflowActivityContext,
19+
WorkflowRuntime,
20+
)
21+
22+
wfr = WorkflowRuntime()
23+
24+
25+
@wfr.activity(name='square')
26+
def square(ctx: WorkflowActivityContext, x: int) -> int:
27+
return x * x
28+
29+
30+
@wfr.async_workflow(name='fan_out_fan_in_async')
31+
async def orchestrator(ctx: AsyncWorkflowContext):
32+
tasks = [ctx.call_activity(square, input=i) for i in range(1, 6)]
33+
results = await ctx.when_all(tasks)
34+
total = sum(results)
35+
return total
36+
37+
38+
def main():
39+
wfr.start()
40+
client = DaprWorkflowClient()
41+
instance_id = 'fofi_async'
42+
client.schedule_new_workflow(workflow=orchestrator, instance_id=instance_id)
43+
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
44+
print(f'Workflow state: {wf_state}')
45+
wfr.shutdown()
46+
47+
48+
if __name__ == '__main__':
49+
main()
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2025 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
from dapr.ext.workflow import AsyncWorkflowContext, DaprWorkflowClient, WorkflowRuntime
17+
18+
wfr = WorkflowRuntime()
19+
20+
21+
@wfr.async_workflow(name='human_approval_async')
22+
async def orchestrator(ctx: AsyncWorkflowContext, request_id: str):
23+
decision = await ctx.when_any(
24+
[
25+
ctx.wait_for_external_event(f'approve:{request_id}'),
26+
ctx.wait_for_external_event(f'reject:{request_id}'),
27+
ctx.create_timer(300.0),
28+
]
29+
)
30+
if isinstance(decision, dict) and decision.get('approved'):
31+
return 'APPROVED'
32+
if isinstance(decision, dict) and decision.get('rejected'):
33+
return 'REJECTED'
34+
return 'TIMEOUT'
35+
36+
37+
def main():
38+
wfr.start()
39+
client = DaprWorkflowClient()
40+
instance_id = 'human_approval_async_1'
41+
client.schedule_new_workflow(workflow=orchestrator, input='REQ-1', instance_id=instance_id)
42+
# In a real scenario, raise approve/reject event from another service.
43+
wfr.shutdown()
44+
45+
46+
if __name__ == '__main__':
47+
main()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
dapr-ext-workflow-dev>=1.15.0.dev
2+
dapr-dev>=1.15.0.dev

examples/workflow-async/simple.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Copyright 2025 The Dapr Authors
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the specific language governing permissions and
12+
limitations under the License.
13+
"""
14+
15+
from datetime import timedelta
16+
from time import sleep
17+
18+
from dapr.ext.workflow import (
19+
AsyncWorkflowContext,
20+
DaprWorkflowClient,
21+
RetryPolicy,
22+
WorkflowActivityContext,
23+
WorkflowRuntime,
24+
)
25+
26+
counter = 0
27+
retry_count = 0
28+
child_orchestrator_string = ''
29+
instance_id = 'asyncExampleInstanceID'
30+
child_instance_id = 'asyncChildInstanceID'
31+
workflow_name = 'async_hello_world_wf'
32+
child_workflow_name = 'async_child_wf'
33+
input_data = 'Hi Async Counter!'
34+
event_name = 'event1'
35+
event_data = 'eventData'
36+
37+
retry_policy = RetryPolicy(
38+
first_retry_interval=timedelta(seconds=1),
39+
max_number_of_attempts=3,
40+
backoff_coefficient=2,
41+
max_retry_interval=timedelta(seconds=10),
42+
retry_timeout=timedelta(seconds=100),
43+
)
44+
45+
wfr = WorkflowRuntime()
46+
47+
48+
@wfr.async_workflow(name=workflow_name)
49+
async def hello_world_wf(ctx: AsyncWorkflowContext, wf_input):
50+
# activities
51+
result_1 = await ctx.call_activity(hello_act, input=1)
52+
print(f'Activity 1 returned {result_1}')
53+
result_2 = await ctx.call_activity(hello_act, input=10)
54+
print(f'Activity 2 returned {result_2}')
55+
result_3 = await ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
56+
print(f'Activity 3 returned {result_3}')
57+
result_4 = await ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
58+
print(f'Child workflow returned {result_4}')
59+
60+
# Event vs timeout using when_any
61+
first = await ctx.when_any(
62+
[
63+
ctx.wait_for_external_event(event_name),
64+
ctx.create_timer(timedelta(seconds=30)),
65+
]
66+
)
67+
68+
# Proceed only if event won
69+
if isinstance(first, dict) and 'event' in first:
70+
await ctx.call_activity(hello_act, input=100)
71+
await ctx.call_activity(hello_act, input=1000)
72+
return 'Completed'
73+
return 'Timeout'
74+
75+
76+
@wfr.activity(name='async_hello_act')
77+
def hello_act(ctx: WorkflowActivityContext, wf_input):
78+
global counter
79+
counter += wf_input
80+
return f'Activity returned {wf_input}'
81+
82+
83+
@wfr.activity(name='async_hello_retryable_act')
84+
def hello_retryable_act(ctx: WorkflowActivityContext):
85+
global retry_count
86+
if (retry_count % 2) == 0:
87+
retry_count += 1
88+
raise ValueError('Retryable Error')
89+
retry_count += 1
90+
return f'Activity returned {retry_count}'
91+
92+
93+
@wfr.async_workflow(name=child_workflow_name)
94+
async def child_retryable_wf(ctx: AsyncWorkflowContext):
95+
# Call activity with retry and simulate retryable workflow failure until certain state
96+
child_activity_result = await ctx.call_activity(
97+
act_for_child_wf, input='x', retry_policy=retry_policy
98+
)
99+
print(f'Child activity returned {child_activity_result}')
100+
# In a real sample, you might check state and raise to trigger retry
101+
return 'ok'
102+
103+
104+
@wfr.activity(name='async_act_for_child_wf')
105+
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
106+
global child_orchestrator_string
107+
child_orchestrator_string += inp
108+
109+
110+
def main():
111+
wfr.start()
112+
wf_client = DaprWorkflowClient()
113+
114+
wf_client.schedule_new_workflow(
115+
workflow=hello_world_wf, input=input_data, instance_id=instance_id
116+
)
117+
118+
wf_client.wait_for_workflow_start(instance_id)
119+
120+
# Let initial activities run
121+
sleep(5)
122+
123+
# Raise event to continue
124+
wf_client.raise_workflow_event(
125+
instance_id=instance_id, event_name=event_name, data={'ok': True}
126+
)
127+
128+
# Wait for completion
129+
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
130+
print(f'Workflow status: {state.runtime_status.name}')
131+
132+
wfr.shutdown()
133+
134+
135+
if __name__ == '__main__':
136+
main()

0 commit comments

Comments
 (0)