Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ mypy>=1.2.0
mypy-extensions>=0.4.3
mypy-protobuf>=2.9
tox>=4.3.0
pip>=23.0.0
coverage>=5.3
pytest
wheel
# used in unit test only
opentelemetry-sdk
Expand Down
86 changes: 86 additions & 0 deletions examples/workflow-async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Dapr Workflow Async Examples (Python)

These examples mirror `examples/workflow/` but author orchestrators with `async def` using the
async workflow APIs. Activities can be either sync or async functions.

## Prerequisites

- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
- [Install Python 3.10+](https://www.python.org/downloads/)


How to run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add a venv reference in here too

- Install Dapr CLI: `brew install dapr/tap/dapr-cli` or `choco install dapr-cli`
- Initialize Dapr: `dapr init`
- Install requirements:
```bash
cd examples/workflow-async
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```

or better yet with faster `uv`:
```bash
uv venv .venv
source .venv/bin/activate
uv pip install -r requirements.txt
```
- Run any example with dapr:
- `dapr run --app-id wf_async_symple -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python simple.py`
- `dapr run --app-id wf_task_chain -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python task_chaining.py`
- `dapr run --app-id wf_async_child -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python child_workflow.py`
- `dapr run --app-id wf_async_fafi -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in.py`
- `dapr run --app-id wf_async_gather -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python fan_out_fan_in_with_gather.py`
- `dapr run --app-id wf_async_approval -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python human_approval.py`
- `dapr run --app-id wf_ctx_interceptors -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python context_interceptors_example.py`
- `dapr run --app-id wf_async_http -- /Users/filinto/diagrid/python-sdk/examples/workflow-async/.venv/bin/python async_http_activity.py`

## Examples

- **simple.py**: Comprehensive example showing activities, child workflows, retry policies, and external events
- **task_chaining.py**: Sequential activity calls where each result feeds into the next
- **child_workflow.py**: Parent workflow calling a child workflow
- **fan_out_fan_in.py**: Parallel activity execution pattern
- **fan_out_fan_in_with_gather.py**: Parallel execution using `ctx.when_all()`
- **human_approval.py**: Workflow waiting for external event to proceed
- **context_interceptors_example.py**: Context propagation using interceptors (tenant, request ID, etc.)
- **async_http_activity.py**: Async activities performing I/O-bound operations (HTTP requests with aiohttp)

Notes:
- Orchestrators use `await ctx.activity(...)`, `await ctx.create_timer(...)`, `await ctx.when_all/when_any(...)`, etc.
- No event loop is started manually; the Durable Task worker drives the async orchestrators.
- You can also launch instances using `DaprWorkflowClient` as in the non-async examples.
- The interceptors example demonstrates how to propagate context (tenant, request ID) across workflow and activity boundaries using the wrapper pattern to avoid contextvar loss.

## Async Activities

Activities can be either synchronous or asynchronous functions. Async activities are useful for I/O-bound operations like HTTP requests, database queries, or file operations:

```python
from dapr.ext.workflow import WorkflowActivityContext

# Synchronous activity
@wfr.activity
def sync_activity(ctx: WorkflowActivityContext, data: str) -> str:
return data.upper()

# Asynchronous activity
@wfr.activity
async def async_activity(ctx: WorkflowActivityContext, data: str) -> str:
# Perform async I/O operations
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/{data}") as response:
result = await response.json()
return result
```

Both sync and async activities are registered the same way using the `@wfr.activity` decorator. Orchestrators call them identically regardless of whether they're sync or async - the SDK handles the execution automatically.

**When to use async activities:**
- HTTP requests or API calls
- Database queries
- File I/O operations
- Any I/O-bound work that benefits from async/await

See `async_http_activity.py` for a complete example.
153 changes: 153 additions & 0 deletions examples/workflow-async/async_http_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from dapr.ext.workflow import ( # noqa: E402
AsyncWorkflowContext,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowRuntime,
WorkflowStatus,
)

"""Example demonstrating async activities with HTTP requests.

This example shows how to use async activities to perform I/O-bound operations
like HTTP requests without blocking the worker thread pool.
"""


wfr = WorkflowRuntime()


@wfr.activity(name='fetch_url')
async def fetch_url(ctx: WorkflowActivityContext, url: str) -> dict:
"""Async activity that fetches data from a URL.

This demonstrates using aiohttp for non-blocking HTTP requests.
In production, you would handle errors, timeouts, and retries.
"""
try:
import aiohttp
except ImportError:
# Fallback if aiohttp is not installed
return {
'url': url,
'status': 'error',
'message': 'aiohttp not installed. Install with: pip install aiohttp',
}

try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
status = response.status
if status == 200:
# For JSON responses
try:
data = await response.json()
return {'url': url, 'status': status, 'data': data}
except Exception:
# For text responses
text = await response.text()
return {
'url': url,
'status': status,
'length': len(text),
'preview': text[:100],
}
else:
return {'url': url, 'status': status, 'error': 'HTTP error'}
except Exception as e:
return {'url': url, 'status': 'error', 'message': str(e)}


@wfr.activity(name='process_data')
def process_data(ctx: WorkflowActivityContext, data: dict) -> dict:
"""Sync activity that processes fetched data.

This shows that sync and async activities can coexist in the same workflow.
"""
return {
'processed': True,
'url_count': len([k for k in data if k.startswith('url_')]),
'summary': f'Processed {len(data)} items',
}


@wfr.async_workflow(name='fetch_multiple_urls_async')
async def fetch_multiple_urls(ctx: AsyncWorkflowContext, urls: list[str]) -> dict:
"""Orchestrator that fetches multiple URLs in parallel using async activities.

This demonstrates:
- Calling async activities from async workflows
- Fan-out/fan-in pattern with async activities
- Mixing async and sync activities
"""
# Fan-out: Schedule all URL fetches in parallel
fetch_tasks = [ctx.call_activity(fetch_url, input=url) for url in urls]

# Fan-in: Wait for all to complete
results = await ctx.when_all(fetch_tasks)

# Create a dictionary of results
data = {f'url_{i}': result for i, result in enumerate(results)}

# Process the aggregated data with a sync activity
summary = await ctx.call_activity(process_data, input=data)

return {'results': data, 'summary': summary}


def main():
"""Run the example workflow."""
# Example URLs to fetch (using httpbin.org for testing)
test_urls = [
'https://httpbin.org/json',
'https://httpbin.org/uuid',
'https://httpbin.org/user-agent',
]

wfr.start()
client = DaprWorkflowClient()

try:
instance_id = 'async_http_activity_example'
print(f'Starting workflow {instance_id}...')

# Schedule the workflow
client.schedule_new_workflow(
workflow=fetch_multiple_urls, instance_id=instance_id, input=test_urls
)

# Wait for completion
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)

print(f'\nWorkflow status: {wf_state.runtime_status}')

if wf_state.runtime_status == WorkflowStatus.COMPLETED:
print(f'Workflow output: {wf_state.serialized_output}')
print('\n✓ Workflow completed successfully!')
else:
print('✗ Workflow did not complete successfully')
return 1

finally:
wfr.shutdown()

return 0


if __name__ == '__main__':
import sys

sys.exit(main())
56 changes: 56 additions & 0 deletions examples/workflow-async/child_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-

"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""

from dapr.ext.workflow import (
AsyncWorkflowContext,
DaprWorkflowClient,
WorkflowRuntime,
WorkflowStatus,
)

wfr = WorkflowRuntime()


@wfr.async_workflow(name='child_async')
async def child(ctx: AsyncWorkflowContext, n: int) -> int:
return n * 2


@wfr.async_workflow(name='parent_async')
async def parent(ctx: AsyncWorkflowContext, n: int) -> int:
r = await ctx.call_child_workflow(child, input=n)
print(f'Child workflow returned {r}')
return r + 1


def main():
with wfr:
# the context manager starts the workflow runtime on __enter__ and shutdown on __exit__
client = DaprWorkflowClient()
instance_id = 'parent_async_instance'
client.schedule_new_workflow(workflow=parent, input=5, instance_id=instance_id)
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)

# simple test
if wf_state.runtime_status != WorkflowStatus.COMPLETED:
print('Workflow failed with status ', wf_state.runtime_status)
exit(1)
if wf_state.serialized_output != '11':
print('Workflow result is incorrect!')
exit(1)


if __name__ == '__main__':
main()
Loading