-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(Low-Code Concurrent CDK): Refactor the low-code AsyncRetriever to use an underlying StreamSlicer #170
fix(Low-Code Concurrent CDK): Refactor the low-code AsyncRetriever to use an underlying StreamSlicer #170
Conversation
…llow a more standard low-code pattern
📝 Walkthrough📝 WalkthroughWalkthroughThe changes in this pull request introduce the Changes
Possibly related PRs
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (1)
51-56
: Exception handling infetch_records
method is appropriateRaising an
AirbyteTracedException
when the job orchestrator is not initialized is a good way to alert developers of improper usage. Have you considered adding guidance on how to properly initializestream_slices()
before callingfetch_records()
? Wdyt?unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3305-3396
: Consider expanding tests to cover behavior ofAsyncRetriever
While the current test verifies the instantiation and structure of the
AsyncRetriever
components, would it make sense to include tests that assert the actual behavior, such as making API calls or handling responses? This could help catch potential integration issues early. Wdyt?unit_tests/sources/declarative/async_job/test_integration.py (1)
85-94
: Consider making the job tracker limit configurable?Currently using
_NO_LIMIT
(10000) for JobTracker. Would it make sense to make this configurable through test parameters to allow testing different scenarios? wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
2152-2160
: Consider implementing parent stream bulk detection?The
has_bulk_parent
is currently hardcoded toFalse
. Should we implement the detection logic now to avoid technical debt? This could prevent potential issues if bulk parent streams are added later. wdyt?
2156-2157
: Consider adding validation for job limit configuration?The JobTracker is created with a hard limit of 1 job. When implementing the configurable limit mentioned in the FIXME comment, should we add validation to ensure the limit is positive and reasonable? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)airbyte_cdk/sources/declarative/partition_routers/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/async_retriever.py
(5 hunks)unit_tests/sources/declarative/async_job/test_integration.py
(2 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(7 hunks)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (1)
39-49
: Great use of dictionary merging in stream_slices
method
The merging of partition data with the completed partition information using the |
operator is a clean and efficient approach. This ensures all relevant data is included in the StreamSlice
. Nice work!
airbyte_cdk/sources/declarative/partition_routers/__init__.py (1)
5-19
: Addition of AsyncJobPartitionRouter
is correctly integrated
Including AsyncJobPartitionRouter
in the imports and the __all__
list ensures it is properly exposed for use. Good job on keeping the module's exports up to date!
airbyte_cdk/sources/declarative/retrievers/async_retriever.py (1)
Line range hint 34-96
: Refactor of AsyncRetriever
enhances simplicity and clarity
Simplifying the AsyncRetriever
by utilizing AsyncJobPartitionRouter
directly in the stream_slicer
streamlines the code and reduces unnecessary complexity. This makes the codebase more maintainable. Nice work!
unit_tests/sources/declarative/async_job/test_integration.py (1)
23-25
: LGTM!
The import is correctly placed and follows Python import conventions.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
311-313
: LGTM!
The import is correctly placed and follows Python import conventions.
2167-2172
: LGTM!
The AsyncRetriever instantiation looks good with the new AsyncJobPartitionRouter integration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm fine with those changes but I'd like to see the tests to confirm
Yup definitely. I'm adding a couple more unit tests today and i tested this locally on Sendgrid, but I plan to regression test this against live traffic on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (3)
20-20
: Consider adding a docstring for the _NO_LIMIT constant?Adding a brief docstring explaining the purpose and significance of this value would help future maintainers understand its role in the tests. wdyt?
23-48
: Consider adding error scenario test cases?The happy path is well covered, but we might want to add tests for error scenarios. Some suggestions:
- Invalid configuration
- Failed job status
- Empty stream slices
Would you like me to help draft these additional test cases?
23-82
: Consider adding test documentation and parameterization?A few suggestions to improve test maintainability:
- Add docstrings describing the test scenarios and their purpose
- Consider using @pytest.mark.parametrize for testing different partition values and configurations
- Add comments explaining the expected behavior in each assertion block
This would make it easier for other developers to understand and maintain these tests. Thoughts?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2238-2239
: Consider improving the comment about concurrent job limitations.The FIXME comment could be more descriptive about why we're currently limiting to 1 concurrent job and what conditions need to be met to make it configurable. This would help future contributors understand the constraints and requirements.
What about updating it to something like this?
- JobTracker(1), - # FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1 + JobTracker(1), + # TODO: Make the number of concurrent jobs configurable per connector. + # Currently limited to 1 for safety, but some connectors like source-salesforce + # successfully use 5 concurrent jobs. This requires: + # 1. Connector-specific configuration + # 2. API-specific rate limiting handling + # 3. Testing with various load patterns
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
(1 hunks)unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
326-328
: LGTM!
Clean import statement for the new AsyncJobPartitionRouter
.
2249-2254
: LGTM!
The AsyncRetriever
instantiation looks good, using the new AsyncJobPartitionRouter
as the stream slicer.
2234-2242
: Consider making the number of concurrent jobs configurable.
The current implementation hardcodes the JobTracker
to 1 concurrent job, but as noted in the existing comment, some connectors like source-salesforce
successfully use 5 concurrent jobs in production.
This limitation could impact performance for connectors that can handle multiple concurrent jobs. Additionally, the comment about bulk parent streams suggests there might be more work needed in this area.
Let's verify the concurrent job usage in other connectors:
unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py
Outdated
Show resolved
Hide resolved
live test results: https://github.com/airbytehq/airbyte/actions/runs/12360671390 results analysis:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks @brianjlai
What is the issue
Something uncovered while using the latest version of the Concurrent CDK on
source-sendgrid
was that because it relied on the assumption all Retrievers had an underlyingstream_slicer
defined. This however was not the way theAsyncRetriever
was implemented.Implementation Details
To fix this, I've refactored the
AsyncRetriever
to follow a more established pattern exhibited by our existingRetrievers
. Now the retriever will always have an underlyingstream_slicer
that can be invoked to generate the partitions within the concurrent framework. This allows us to continue to use theStreamSlicerPartitionGenerator
.This change was also designed to be non-breaking because we are not changing the developer facing interface. Instead we use the existing
AsyncRetriever
fields to construct theAsyncJobPartitionGenerator
which is effectively an implementation detail.Note:
This will not completely give us the ability to run streams using the
AsyncRetriever
in the concurrent framework. This just fixed the first issue identified in #168. I will work on a follow up PR that addresses the second issue. But I wanted to separate the PRs so I can release this refactor which should be a no-op for existing connectors likesource-sendgrid
and we should see no change in behavior. And because of that, I have not ungated the async report streamsconcurrent_declarative_source.py
since we need to address part 2.todo: add unit tests to AsyncJobPartitionRouter
Summary by CodeRabbit
Summary by CodeRabbit
New Features
AsyncJobPartitionRouter
for improved management of asynchronous job handling.AsyncRetriever
component for managing asynchronous job operations.Bug Fixes
AsyncRetriever
to enhance performance and reduce complexity.Tests
AsyncRetriever
andAsyncJobPartitionRouter
components to ensure proper functionality and integration.AsyncJobPartitionRouter
to validate its behavior with different partitioning scenarios.