Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ on:
permissions:
contents: write


jobs:
deploy:
runs-on: ubuntu-latest
Expand Down
32 changes: 15 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,21 @@ from concurrent.futures import ThreadPoolExecutor
async def main():
# Create backend and workflow
backend = await ConcurrentExecutionBackend(ThreadPoolExecutor())
flow = await WorkflowEngine.create(backend=backend)

@flow.executable_task
async def task1():
return "/bin/echo 5"

@flow.function_task
async def task2(t1_result):
return int(t1_result.strip()) * 2 * 2

# create the workflow
t1_fut = task1()
t2_result = await task2(t1_fut) # t2 depends on t1 (waits for it)

print(t2_result)
# shutdown the execution backend
await flow.shutdown()
async with WorkflowEngine(backend=backend) as flow:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The WorkflowEngine should be instantiated using the await WorkflowEngine.create(...) factory method to ensure all its async components are started correctly. Using WorkflowEngine(...) directly will result in a non-functional engine because it bypasses necessary initialization steps.

Suggested change
async with WorkflowEngine(backend=backend) as flow:
async with await WorkflowEngine.create(backend=backend) as flow:


@flow.executable_task
async def task1():
return "/bin/echo 5"

@flow.function_task
async def task2(t1_result):
return int(t1_result.strip()) * 2 * 2

# create the workflow
t1_fut = task1()
t2_result = await task2(t1_fut) # t2 depends on t1 (waits for it)

print(t2_result)

if __name__ == "__main__":
asyncio.run(main())
Expand Down
41 changes: 19 additions & 22 deletions docs/async_workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,32 @@ from concurrent.futures import ThreadPoolExecutor
from radical.asyncflow import WorkflowEngine

backend = await ConcurrentExecutionBackend(ThreadPoolExecutor())
flow = await WorkflowEngine.create(backend=backend)

async def main():
@flow.function_task
async def task1(*args):
return time.time()
async with WorkflowEngine(backend=backend) as flow:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The WorkflowEngine should be instantiated using the await WorkflowEngine.create(...) factory method to ensure all its async components are started correctly. Using WorkflowEngine(...) directly will result in a non-functional engine.

Suggested change
async with WorkflowEngine(backend=backend) as flow:
async with await WorkflowEngine.create(backend=backend) as flow:

@flow.function_task
async def task1(*args):
return time.time()

@flow.function_task
async def task2(*args):
return time.time()
@flow.function_task
async def task2(*args):
return time.time()

@flow.function_task
async def task3(*args):
return time.time()
@flow.function_task
async def task3(*args):
return time.time()

async def run_wf(wf_id):
print(f'Starting workflow {wf_id} at {time.time()}')
t3 = task3(task1(), task2())
await t3 # Blocking operation so the entire workflow will block
print(f'Workflow {wf_id} completed at {time.time()}')
async def run_wf(wf_id):
print(f'Starting workflow {wf_id} at {time.time()}')
t3 = task3(task1(), task2())
await t3 # Blocking operation so the entire workflow will block
print(f'Workflow {wf_id} completed at {time.time()}')

start_time = time.time()
await asyncio.gather(*[run_wf(i) for i in range(5)])
end_time = time.time()
start_time = time.time()
await asyncio.gather(*[run_wf(i) for i in range(5)])
end_time = time.time()

print(f'\nTotal time running asynchronously is: {end_time - start_time}')
print(f'\nTotal time running asynchronously is: {end_time - start_time}')

# We are in an async context, so we have to use await
await flow.shutdown()

asyncio.run(main())
```
Expand Down
5 changes: 1 addition & 4 deletions docs/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ We initialize the workflow engine with a `ConcurrentExecutionBackend` using Pyth

```python
backend = await ConcurrentExecutionBackend(ThreadPoolExecutor())
flow = await WorkflowEngine.create(backend=backend)
async with WorkflowEngine.create(backend=backend) as flow:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The WorkflowEngine.create method is a coroutine and must be awaited. The await keyword is missing here, which will cause a TypeError at runtime.

Suggested change
async with WorkflowEngine.create(backend=backend) as flow:
async with await WorkflowEngine.create(backend=backend) as flow:

```

---
Expand Down Expand Up @@ -90,9 +90,6 @@ async def main():
end_time = time.time()
print(f"\nWorkflow completed in: {end_time - start_time:.2f} seconds")

# Shutdown the workflow engine
await flow.shutdown()

asyncio.run(main())
```

Expand Down
20 changes: 0 additions & 20 deletions docs/best_practice.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,6 @@ result = await task_c(task_a(), task_b())

---

## Use `await flow.shutdown()`

Always shut down the flow explicitly when finished:
- Releases resources (e.g., thread pools, processes).
- Ensures a clean exit.

At the end of your async main:

```python
await flow.shutdown()
```

---

## Logging & Debugging

Enable detailed logs to diagnose issues:
Expand All @@ -116,12 +102,6 @@ Logs show task dependencies, execution order, errors.

---

## Clean Shutdown

- Use `try`/`finally` in your async main to ensure `flow.shutdown()` is always called, even on exceptions.

---

!!! success

- Define tasks clearly and concisely.
Expand Down
6 changes: 1 addition & 5 deletions docs/composite_workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ from radical.asyncflow import WorkflowEngine
from concurrent.futures import ThreadPoolExecutor

backend = await ConcurrentExecutionBackend(ThreadPoolExecutor())
asyncflow = await WorkflowEngine.create(backend=backend)
async with WorkflowEngine.create(backend=backend) as flow:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The WorkflowEngine.create method is a coroutine and must be awaited. The await keyword is missing, which will lead to a TypeError.

Suggested change
async with WorkflowEngine.create(backend=backend) as flow:
async with await WorkflowEngine.create(backend=backend) as flow:

```

### Define Tasks
Expand Down Expand Up @@ -116,7 +116,6 @@ await asyncio.gather( # (1)!
end_time = time.time()
print(f"\nTotal time running asynchronously is: {end_time - start_time:.2f}s")

await asyncflow.shutdown() # (2)!
```

1. Run all composite workflow blocks concurrently
Expand Down Expand Up @@ -311,8 +310,5 @@ await block3
```


!!! warning
Do not forget to `await asyncflow.shutdown()` when you are done — otherwise, resources may remain allocated.

!!! tip
You can replace `ConcurrentExecutionBackend` with `RadicalExecutionBackend` if you want to run on an HPC cluster instead of local threads/processes.
5 changes: 1 addition & 4 deletions docs/exec_backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ from radical.asyncflow import WorkflowEngine

# HPC backend configuration
backend = RadicalExecutionBackend({'resource': 'local.localhost'}) # (1)!
flow = WorkflowEngine(backend=backend)
async with WorkflowEngine(backend=backend) as flow:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The WorkflowEngine should be instantiated using the await WorkflowEngine.create(...) factory method. The previous code was also incorrect. Using WorkflowEngine(...) directly will result in a non-functional engine as it skips essential async initialization.

Suggested change
async with WorkflowEngine(backend=backend) as flow:
async with await WorkflowEngine.create(backend=backend) as flow:

```

1. Configure for HPC execution - can target supercomputers, GPU clusters, local resources
Expand Down Expand Up @@ -118,8 +118,6 @@ await asyncio.gather(*[run_wf(i) for i in range(5)]) # (1)!
end_time = time.time()
print(f'\nTotal time running asynchronously is: {end_time - start_time}')

# Proper cleanup of HPC resources
await flow.shutdown()
```

1. All workflows execute concurrently across available HPC nodes
Expand Down Expand Up @@ -181,7 +179,6 @@ backend = RadicalExecutionBackend({
```

!!! warning
**Resource Management**: Always call `await flow.shutdown()` to properly release HPC resources and prevent job queue issues.

## Real-World HPC Use Cases

Expand Down
4 changes: 1 addition & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ from radical.asyncflow import ConcurrentExecutionBackend
async def run():
# Create backend and workflow
backend = await ConcurrentExecutionBackend(ThreadPoolExecutor(max_workers=3))
flow = await WorkflowEngine.create(backend=backend)
async with WorkflowEngine(backend=backend) as flow:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The WorkflowEngine should be instantiated using the await WorkflowEngine.create(...) factory method to ensure all its async components are started correctly. Using WorkflowEngine(...) directly will result in a non-functional engine.

Suggested change
async with WorkflowEngine(backend=backend) as flow:
async with await WorkflowEngine.create(backend=backend) as flow:


@flow.executable_task
async def task1():
Expand All @@ -44,8 +44,6 @@ async def run():
t1_fut = task1()
t2_result = await task2(t1_fut) # t2 depends on t1 (waits for it)

# shutdown the execution backend
await flow.shutdown()

if __name__ == "__main__":
asyncio.run(run())
Expand Down
92 changes: 45 additions & 47 deletions examples/async_block_of_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,51 @@
async def main():

backend = await RadicalExecutionBackend({'resource': 'local.localhost'})
flow = await WorkflowEngine.create(backend=backend)

@flow.executable_task
async def task1(*args):
return '/bin/echo "I got executed at" && /bin/date'

@flow.executable_task
async def task2(*args):
return '/bin/echo "I got executed at" && /bin/date'

@flow.block
async def block1(*args):
print(f'block1 started at {time.time()}')
t1 = task1()
t2 = task2(t1)
await t2

@flow.block
async def block2(*args):
print(f'block2 started at {time.time()}')
t3 = task1()
t4 = task2(t3)
await t4

@flow.block
async def block1_of_blocks(*args):
print(f'block of blocks-1 started at {time.time()}')
b1 = block1()
b2 = block2(b1)
await b2

@flow.block
async def block2_of_blocks(*args):
print(f'block of blocks-2 started at {time.time()}')
b1 = block1()
b2 = block2(b1)
await b2

async def run_block_of_blocks(i):
bob1 = block1_of_blocks()
bob2 = block2_of_blocks(bob1)
await bob2
print(f'Block of blocks-{i} is finished')

await asyncio.gather(*[run_block_of_blocks(i) for i in range(2)])

await flow.shutdown()
async with await WorkflowEngine.create(backend=backend) as flow:

@flow.executable_task
async def task1(*args):
return '/bin/echo "I got executed at" && /bin/date'

@flow.executable_task
async def task2(*args):
return '/bin/echo "I got executed at" && /bin/date'

@flow.block
async def block1(*args):
print(f'block1 started at {time.time()}')
t1 = task1()
t2 = task2(t1)
await t2

@flow.block
async def block2(*args):
print(f'block2 started at {time.time()}')
t3 = task1()
t4 = task2(t3)
await t4

@flow.block
async def block1_of_blocks(*args):
print(f'block of blocks-1 started at {time.time()}')
b1 = block1()
b2 = block2(b1)
await b2

@flow.block
async def block2_of_blocks(*args):
print(f'block of blocks-2 started at {time.time()}')
b1 = block1()
b2 = block2(b1)
await b2

async def run_block_of_blocks(i):
bob1 = block1_of_blocks()
bob2 = block2_of_blocks(bob1)
await bob2
print(f'Block of blocks-{i} is finished')

await asyncio.gather(*[run_block_of_blocks(i) for i in range(2)])


if __name__ == '__main__':
Expand Down
Loading
Loading