-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(Low-Code Concurrent CDK): Allow non-incremental substreams and list based partition router streams with parents to be processed by the concurrent cdk #89
Conversation
… to be processed by the concurrent cdk. Non-incremental only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. There only the RFR part that I think I don't fully understand and would like more clarity before I approve.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Outdated
Show resolved
Hide resolved
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request introduces enhancements to the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Wanna consider adding more context to the comments in the code for clarity? wdyt? 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (6)
🚧 Files skipped from review as they are similar to previous changes (4)
👮 Files not reviewed due to content moderation or server errors (2)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1339-1340
: Consider adding a docstring to explain the behavior change?The new condition changes how full-refresh streams are handled when
disable_resumable_full_refresh
is True. Would you consider adding a docstring to explain this behavior change and its implications? Something like:def _merge_stream_slicers( self, model: DeclarativeStreamModel, config: Config ) -> Optional[StreamSlicer]: + """Merges stream slicers for a declarative stream. + + When disable_resumable_full_refresh is True, returns the stream_slicer directly without wrapping it in a + PerPartitionCursor, effectively disabling the resumable full refresh functionality for non-incremental streams. + """airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
69-76
: Consider makingdisable_resumable_full_refresh
configurableI noticed that
disable_resumable_full_refresh
is set toTrue
when initializingModelToComponentFactory
. Would it be beneficial to make this parameter configurable to allow flexibility in enabling or disabling resumable full refresh, depending on the use case? WDYT?unit_tests/sources/declarative/test_concurrent_declarative_source.py (3)
588-600
: Should we update the comments to reflect the new streams?In the
test_group_streams
function, the comment mentions "2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental." With the addition of new streams, would it be clearer to update the comment to accurately describe the current stream grouping? Wdyt?
724-745
: Should we generate expected stream names dynamically intest_discover
?In the
test_discover
function, we're using a hardcoded list of expected stream names. Considering potential future additions, would it be beneficial to derive this list dynamically from the manifest to reduce maintenance overhead? Wdyt?
1665-1666
: Could we address the type ignores differently here?In
disable_emitting_sequential_state_messages
, we're using# type: ignore
comments. Would it be possible to refactor the code to avoid these type ignores, perhaps by adjusting access levels or interfaces? Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(4 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(3 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(9 hunks)
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
386-386
: LGTM: New parameter for controlling resumable full refresh behavior
The addition of disable_resumable_full_refresh
parameter with a default value of False
maintains backward compatibility while allowing opt-out of resumable full refresh functionality. The parameter is properly initialized as an instance variable.
Also applies to: 395-395
Line range hint 1339-1347
: Verify the impact on existing full-refresh streams
The change to bypass PerPartitionCursor for full-refresh streams when disable_resumable_full_refresh
is True could affect existing streams. Let's verify there are no unintended side effects.
✅ Verification successful
Let me gather more information about the usage of disable_resumable_full_refresh
to better understand its impact.
Let me gather more specific information about the PerPartitionCursor
and its relationship with full-refresh streams.
Based on the evidence gathered, I can now provide a final response:
The change to bypass PerPartitionCursor is safe when disable_resumable_full_refresh is True
The code change is safe because:
PerPartitionCursor
is specifically designed for managing state per partition, and whendisable_resumable_full_refresh
is True, we don't need state management for full-refresh streams.- The
ChildPartitionResumableFullRefreshCursor
is only used for full-refresh sub-streams to track their progress, which isn't needed when resumable full refresh is disabled. - All test cases involving full-refresh streams are focused on testing the resumable functionality, and there are no tests that depend on
PerPartitionCursor
for non-resumable full-refresh streams.
The change aligns with the expected behavior - when resumable full refresh is disabled, we want to bypass the state management entirely. What do you think about this assessment? 🤔
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all full-refresh streams that might be affected by this change
# Look for stream definitions that don't have incremental_sync
rg -A 5 'type:\s*"DeclarativeStream"' | rg -v 'incremental_sync'
# Look for streams using PerPartitionCursor
ast-grep --pattern 'PerPartitionCursor($$$)'
Length of output: 6083
Script:
#!/bin/bash
# Look for tests and usages of disable_resumable_full_refresh
rg -A 5 "disable_resumable_full_refresh"
# Look for any test cases involving full refresh streams
rg -A 10 "test.*full.*refresh" --type py
# Look for any configuration examples of full refresh streams
fd "manifest.yaml" --exec rg -A 5 "full_refresh"
Length of output: 64605
Script:
#!/bin/bash
# Look for the implementation of PerPartitionCursor and its usage with full-refresh
rg -A 10 "class PerPartitionCursor"
# Look for tests involving PerPartitionCursor with full-refresh streams
rg -A 10 "test.*PerPartitionCursor.*full.*refresh"
# Look for the implementation of ChildPartitionResumableFullRefreshCursor
rg -A 10 "class ChildPartitionResumableFullRefreshCursor"
Length of output: 2947
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
204-214
: Consider renaming datetime_based_cursor_component_definition
for clarity
The variable name datetime_based_cursor_component_definition
might be a bit confusing. Should we rename it to incremental_sync_definition
to clarify that it can represent custom incremental syncs? WDYT?
267-301
: Is setting cursor_field
to None
appropriate in DefaultStream
?
In the initialization of DefaultStream
for substreams without incremental sync, cursor_field
is set to None
. Could this potentially cause issues with state management or downstream processing? Should we consider setting a default value or handling this case differently? WDYT?
unit_tests/sources/declarative/test_concurrent_declarative_source.py (5)
Line range hint 315-335
: Is changing party_members_skills_stream
to a full-refresh stream intentional?
I noticed that party_members_skills_stream
now references base_stream
instead of base_incremental_stream
, which changes it from an incremental stream to a full-refresh stream. Just wanted to confirm if this is the desired behavior. Wdyt?
352-456
: Great addition of arcana_personas_stream
and palace_enemies_stream
!
The new streams are well-configured with their respective partition routers and schemas. This enhancement should significantly improve data retrieval capabilities.
602-608
: Confirming the synchronous streams count and names.
The assertion assert len(synchronous_streams) == 2
aligns with our expectations, and the streams are correctly identified as palaces
and palace_enemies
. Just double-checking that this matches the intended behavior.
Line range hint 879-893
: Is the state assertion in test_read_with_concurrent_and_synchronous_streams
accurate?
The test compares palaces_states[0].stream.stream_state.__dict__
to AirbyteStateBlob(__ab_no_cursor_state_message=True).__dict__
. Should we verify that this assertion correctly reflects the expected state, especially after adding new streams? Wdyt?
1368-1368
: Nice work on handling streams with stream state interpolation!
In test_streams_with_stream_state_interpolation_should_be_synchronous
, you've correctly ensured that streams using stream state interpolation are processed synchronously. This maintains data consistency across the sync process.
Regression test results for
I'll push the catalog fix and max's comments, but aside from that, asana doesn't seem to show any issues running on concurrent candidate version vs the legacy flow. |
…mable to full refresh streams
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
70-70
: Consider extracting the resumability logic into a descriptive method, wdyt?The logic for determining if a stream is resumable is spread across multiple places. We could make it more maintainable by extracting it into a method like
_is_stream_resumable()
. This would make the code more self-documenting and easier to modify in the future.class DefaultStream(AbstractStream): + def _is_stream_resumable(self) -> bool: + return bool(self._cursor_field) + def as_airbyte_stream(self) -> AirbyteStream: stream = AirbyteStream( name=self.name, json_schema=dict(self._json_schema), supported_sync_modes=[SyncMode.full_refresh], - is_resumable=False, + is_resumable=self._is_stream_resumable(), )unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py (1)
118-118
: Consider reducing duplication in test scenarios, wdyt?I notice that we're repeating the
is_resumable: False
expectation across multiple test scenarios. We could make the tests more maintainable by extracting common catalog expectations into helper functions or fixtures.Here's a suggestion for reducing duplication:
def create_expected_catalog(name: str, schema: dict, *, is_resumable: bool = False) -> dict: return { "streams": [{ "json_schema": schema, "name": name, "supported_sync_modes": ["full_refresh"], "is_resumable": is_resumable }] }This would allow us to simplify the test scenarios like:
.set_expected_catalog(create_expected_catalog("stream1", { "type": "object", "properties": { "id": {"type": ["null", "string"]}, }, }))Also applies to: 163-163, 195-195, 228-228, 239-239, 270-270, 303-303, 336-336
unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py (2)
305-305
: Consider adding test cases for resumable streams?I notice we're setting
is_resumable: False
for all streams, which makes sense for the current PR scope. However, should we also add test cases for resumable streams (those with cursor fields) to ensure the behavior works correctly in both scenarios? wdyt?
Line range hint
1-524
: Verify test coverage for concurrent processing scenariosThe test scenarios cover various cases including single stream, multiple streams, primary keys, and error handling. However, I noticed a few scenarios that might be worth considering:
- Mixed resumable and non-resumable streams in the same source
- Streams with cursor fields
- Edge cases around partition handling with resumable/non-resumable combinations
Would you like me to help draft these additional test scenarios? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(5 hunks)airbyte_cdk/sources/streams/concurrent/default_stream.py
(1 hunks)unit_tests/sources/declarative/decoders/test_json_decoder.py
(1 hunks)unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py
(8 hunks)unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py
(7 hunks)unit_tests/sources/streams/concurrent/test_default_stream.py
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🔇 Additional comments (3)
unit_tests/sources/declarative/decoders/test_json_decoder.py (1)
56-56
:
Consider maintaining a larger test dataset for memory testing?
I notice we're reducing the test data size from 2M to 2 lines. While this speeds up the test, it might not effectively test memory constraints anymore. The test is marked with @pytest.mark.limit_memory("20 MB")
, but with only 2 lines, we might not come close to testing this limit.
Would you consider using a more moderate size (maybe 100K lines) to balance between test duration and effective memory testing? This would help ensure we're still catching potential memory issues while keeping the test runtime reasonable. wdyt? 🤔
unit_tests/sources/streams/concurrent/test_default_stream.py (1)
77-77
: LGTM! Test coverage looks comprehensive
The test cases properly verify the resumability behavior for both cursor-based and non-cursor streams.
Also applies to: 114-114, 151-151, 218-218
unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py (1)
265-265
: Verify consistent handling of is_resumable
across the codebase
The addition of is_resumable: False
to all stream definitions in the test scenarios looks good, as it aligns with the PR's objective to support non-incremental substreams. However, should we verify that this matches the actual implementation in the source code? wdyt?
Also applies to: 305-305, 384-384, 396-396, 435-435, 476-476, 517-517
✅ Verification successful
Setting is_resumable: False
is correct for concurrent streams
Based on the implementation in airbyte_cdk/sources/streams/concurrent/default_stream.py
, concurrent streams are initialized with is_resumable=False
by default, and this property is only set to True
when a cursor field is defined. This aligns with the core stream's behavior where resumability is tied to checkpointing capability.
The changes in the test scenarios correctly set is_resumable: False
for concurrent streams without cursor fields, maintaining consistency with the implementation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if is_resumable handling is consistent across the codebase
# Look for is_resumable in source files
echo "Checking source implementation..."
rg -A 5 "is_resumable" --type python
# Look for related test assertions
echo "Checking test assertions..."
rg -A 5 "assert.*is_resumable" --type python
# Look for related configuration
echo "Checking configuration..."
rg -A 5 "is_resumable.*=.*" --type python
Length of output: 423
Script:
#!/bin/bash
# Let's try without specifying the file type since rg should auto-detect it
echo "Checking source implementation..."
rg "is_resumable" -A 5
echo "Checking test assertions..."
rg "assert.*is_resumable" -A 5
echo "Checking configuration..."
rg "is_resumable.*=" -A 5
Length of output: 80868
Script:
#!/bin/bash
# Let's check the implementation details to understand the relationship between concurrent streams and is_resumable
echo "Checking concurrent stream implementation..."
rg -A 10 "class.*ConcurrentStream" --type-list
# Look for is_resumable property definition
echo "Checking is_resumable property definition..."
ast-grep --pattern 'class $_ {
$$$
@property
def is_resumable($_):
$$$
}'
# Look for stream base class implementation
echo "Checking stream base class..."
ast-grep --pattern 'class Stream($_):
$$$
def is_resumable($_):
$$$
}'
Length of output: 872
Script:
#!/bin/bash
# Let's try a simpler approach to find concurrent stream implementations
echo "Checking concurrent stream implementation..."
rg -A 10 "class.*ConcurrentStream"
# Look for is_resumable property definition without curly braces
echo "Checking is_resumable property definition..."
ast-grep --pattern '@property
def is_resumable($_):
$$$'
# Look for stream base class implementation without curly braces
echo "Checking stream base class..."
ast-grep --pattern 'class Stream:
$$$
def is_resumable(self):
$$$'
Length of output: 7950
Script:
#!/bin/bash
# Let's look for the actual implementation of is_resumable in the codebase
echo "Looking for is_resumable implementation..."
rg "def is_resumable" -B 2 -A 4
# Also check for any direct references to is_resumable in the concurrent stream context
echo "Checking is_resumable usage in concurrent streams..."
rg "is_resumable.*=.*" -B 2 -A 2 "airbyte_cdk/sources/streams/concurrent/"
# Let's also check for any documentation or comments about is_resumable
echo "Checking for is_resumable documentation..."
rg "is_resumable" -g "*.md" -g "*.rst" -g "*.py" -C 5
Length of output: 81496
validated regression testing again on latest RC: https://github.com/airbytehq/airbyte/actions/runs/12130315383/job/33820422548 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the implementation and the validation on source-asana
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/10552
What this solves
Specific areas to highlight
Summary by CodeRabbit
New Features
arcana_personas_stream
andpalace_enemies_stream
.as_airbyte_stream
method to include anis_resumable
attribute for better stream configuration.Bug Fixes
Tests
is_resumable
attribute in multiple test scenarios to ensure consistency.Documentation