-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/context manager #26 #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @javidsegura, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a significant usability improvement by making the WorkflowEngine an asynchronous context manager. This change streamlines workflow setup and teardown, making the code cleaner and less prone to resource leaks. All existing examples, documentation, and tests have been updated to reflect this new, idiomatic Python usage, ensuring consistency and ease of adoption.
Highlights
- Feature: Async Context Manager: The
WorkflowEngineclass now implements the asynchronous context manager protocol (__aenter__and__aexit__), allowing it to be used withasync withstatements. This simplifies resource management by automatically handling the shutdown of the workflow engine and its backend upon exiting theasync withblock. - Simplified Resource Management: Explicit calls to
await flow.shutdown()are no longer necessary when using theWorkflowEnginewithin anasync withblock. The__aexit__method ensures proper cleanup, even in the presence of exceptions, leading to cleaner and more robust code. - Documentation and Examples Update: All relevant documentation files (
README.md,docs/*.md,examples/*.py,examples/tutorials/*.ipynb) have been updated to reflect the newasync withusage pattern and remove manual shutdown calls, providing up-to-date guidance for users. - Test Suite Refinement: The integration and unit tests have been refactored to utilize the
async withcontext manager forWorkflowEngineinstances, ensuring proper setup and teardown for test cases and removing redundantshutdowncalls, which improves test reliability and readability. - Version Update: A new version file
src/radical/asyncflow/VERSIONhas been added, setting the project version to0.1.0, indicating a new feature release.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a context manager for WorkflowEngine, which is a great enhancement for ensuring resources are properly managed. The implementation is on the right track, but there's a critical issue in the __aexit__ method where the shutdown coroutine is not awaited.
Additionally, there are several inconsistencies in how the new context manager is used across the documentation and tests. Many examples use an incorrect instantiation pattern (WorkflowEngine(...) instead of await WorkflowEngine.create(...)), which will cause runtime errors. I've provided suggestions to fix these issues and recommended a small enhancement to the WorkflowEngine.create method to better support testing scenarios.
| async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
| self.shutdown(self.skip_execution_backend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shutdown method is a coroutine and must be awaited. The current implementation calls it without await, which means the shutdown process won't be executed correctly, potentially leading to resource leaks and tasks not being properly terminated.
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| self.shutdown(self.skip_execution_backend) | |
| await self.shutdown(self.skip_execution_backend) |
tests/unit/test_termination.py
Outdated
| async with WorkflowEngine(backend=backend) as flow: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WorkflowEngine is being instantiated incorrectly. You should use the await WorkflowEngine.create(...) factory method inside the async with statement to ensure the engine is properly initialized.
| async with WorkflowEngine(backend=backend) as flow: | |
| pass | |
| async with await WorkflowEngine.create(backend=backend) as flow: | |
| pass |
| async with await WorkflowEngine.create(backend=backend) as flow: | ||
| yield flow | ||
| await asyncio.sleep(0) # allow any pending tasks to finish |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change alters the test's behavior. The original test explicitly called flow.shutdown(skip_execution_backend=True). The new context manager will call shutdown without skipping the backend shutdown, because skip_execution_backend is not passed to WorkflowEngine.create.
This is related to my other suggestion to add skip_execution_backend to the create method. Once that is done, you can update this to:
async with await WorkflowEngine.create(backend=backend, skip_execution_backend=True) as flow:
to restore the original test behavior.
| async with WorkflowEngine(backend=backend) as flow: | ||
| yield flow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WorkflowEngine is being instantiated incorrectly in this fixture. It should use async with await WorkflowEngine.create(...) to ensure proper initialization and resource management. This same issue exists in test_real_workflow_scenario in this file.
| async with WorkflowEngine(backend=backend) as flow: | |
| yield flow | |
| async with await WorkflowEngine.create(backend=backend) as flow: | |
| yield flow |
docs/index.md
Outdated
| # Create backend and workflow | ||
| backend = await ConcurrentExecutionBackend(ThreadPoolExecutor(max_workers=3)) | ||
| flow = await WorkflowEngine.create(backend=backend) | ||
| async with WorkflowEngine(backend=backend) as flow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WorkflowEngine should be instantiated using the await WorkflowEngine.create(...) factory method to ensure all its async components are started correctly. Using WorkflowEngine(...) directly will result in a non-functional engine.
| async with WorkflowEngine(backend=backend) as flow: | |
| async with await WorkflowEngine.create(backend=backend) as flow: |
docs/exec_backends.md
Outdated
| # HPC backend configuration | ||
| backend = RadicalExecutionBackend({'resource': 'local.localhost'}) # (1)! | ||
| flow = WorkflowEngine(backend=backend) | ||
| async with WorkflowEngine(backend=backend) as flow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WorkflowEngine should be instantiated using the await WorkflowEngine.create(...) factory method. The previous code was also incorrect. Using WorkflowEngine(...) directly will result in a non-functional engine as it skips essential async initialization.
| async with WorkflowEngine(backend=backend) as flow: | |
| async with await WorkflowEngine.create(backend=backend) as flow: |
| print(t2_result) | ||
| # shutdown the execution backend | ||
| await flow.shutdown() | ||
| async with WorkflowEngine(backend=backend) as flow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WorkflowEngine should be instantiated using the await WorkflowEngine.create(...) factory method to ensure all its async components are started correctly. Using WorkflowEngine(...) directly will result in a non-functional engine because it bypasses necessary initialization steps.
| async with WorkflowEngine(backend=backend) as flow: | |
| async with await WorkflowEngine.create(backend=backend) as flow: |
| ```python | ||
| backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) | ||
| flow = await WorkflowEngine.create(backend=backend) | ||
| async with WorkflowEngine.create(backend=backend) as flow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs/async_workflows.md
Outdated
| @flow.function_task | ||
| async def task1(*args): | ||
| return time.time() | ||
| async with WorkflowEngine(backend=backend) as flow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WorkflowEngine should be instantiated using the await WorkflowEngine.create(...) factory method to ensure all its async components are started correctly. Using WorkflowEngine(...) directly will result in a non-functional engine.
| async with WorkflowEngine(backend=backend) as flow: | |
| async with await WorkflowEngine.create(backend=backend) as flow: |
Solved issue #26 .
Integrated feedback for updating tests + docs-