-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(refactor): refactor partition generator to take any stream slicer #39
chore(refactor): refactor partition generator to take any stream slicer #39
Conversation
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Show resolved
Hide resolved
9e44ed4
to
35290eb
Compare
…0552/introduce-concurrent-stream-slicer
…0552/introduce-concurrent-stream-slicer
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me. I think the changes make sense and agree in moving towards the consolidation under the StreamSlicer
in how we generate partitions
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Outdated
Show resolved
Hide resolved
…tition_generator.py Co-authored-by: Brian Lai <[email protected]>
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request introduces significant modifications across several files within the Airbyte CDK framework, focusing on enhancing the handling of streams and partition management. Key changes include the addition of new classes and methods for partition generation, updates to existing classes for improved state management, and refinements in type hints for clarity. The Changes
Possibly related PRs
Suggested reviewers
Would you like to consider any additional reviewers or related PRs? Wdyt? Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (20)
airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py (1)
10-21
: Would you consider enhancing the documentation with more details? wdyt?The current documentation provides a good overview, but given this is a core abstraction for parallel processing, it might be helpful to add:
- Examples of typical
StreamSlice
structures- Thread safety considerations for parallel processing
- State management implications
- Best practices for implementing this interface
This would make it easier for other developers to implement this interface correctly. What do you think?
Here's a suggested enhancement to the docstrings:
class StreamSlicer(ABC): """ Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization. + + Implementation considerations: + - Implementations must be thread-safe as slices may be processed in parallel + - Each slice should maintain its own state for proper checkpointing + - Slices should be sized appropriately to balance parallelism and overhead """ @abstractmethod def stream_slices(self) -> Iterable[StreamSlice]: """ Defines stream slices :return: An iterable of stream slices + + Example: + def stream_slices(self) -> Iterable[StreamSlice]: + return [ + {"start_date": "2021-01-01", "end_date": "2021-06-30"}, + {"start_date": "2021-07-01", "end_date": "2021-12-31"} + ] """ passairbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (2)
5-5
: LGTM! Clean inheritance restructuring.The changes nicely align with the PR objective of integrating the
StreamSlicer
abstraction with the Concurrent CDK. The multiple inheritance fromConcurrentStreamSlicer
,RequestOptionsProvider
, andABC
provides a solid foundation for both concurrent and declarative frameworks to work together.Just to double-check our design - this means any concrete implementation will need to satisfy both the concurrent and declarative interfaces. Wdyt about documenting this requirement in the class docstring to help future implementers? 🤔
Also applies to: 10-12, 15-15
Line range hint
16-25
: LGTM! Clean interface definition.The transformation into a pure interface with clear documentation is great! The docstring nicely explains the purpose of stream slicing and state management.
What do you think about enhancing the docstring to mention that this interface now bridges both concurrent and declarative frameworks? Something like:
""" Slices the stream into a subset of records. Slices enable state checkpointing and data retrieval parallelization. The stream slicer keeps track of the cursor state as a dict of cursor_field -> cursor_value + +This interface bridges both concurrent and declarative frameworks, requiring implementations +to satisfy both ConcurrentStreamSlicer and RequestOptionsProvider contracts. See the stream slicing section of the docs for more information. """airbyte_cdk/utils/slice_hasher.py (1)
5-11
: Consider adding documentation and type safety?The implementation looks solid! A few suggestions to make it even better:
Would you consider adding a docstring explaining the purpose and usage of this encoder? This would help other developers understand when and how to implement
__json_serializable__
in their classes, wdyt?We could add a Protocol to define the shape of serializable objects, something like:
from typing import Protocol, runtime_checkable @runtime_checkable class JSONSerializable(Protocol): def __json_serializable__(self) -> Any: ...This would provide better type safety and IDE support. What do you think? 🤔
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (4)
14-27
: Consider adding type hints to constants for better clarity.The constants are well-defined, but adding type hints could make them even more self-documenting. What do you think about adding them like this?
-_STREAM_NAME = "a_stream_name" -_JSON_SCHEMA = {"type": "object", "properties": {}} +_STREAM_NAME: str = "a_stream_name" +_JSON_SCHEMA: dict[str, object] = {"type": "object", "properties": {}}
29-39
: Add class and method docstrings for better documentation.The test class setup looks great! Would you consider adding docstrings to describe the test class's purpose and the setUp method's responsibilities? This would help other developers understand the test suite better. wdyt?
class StreamSlicerPartitionGeneratorTest(TestCase): + """Test suite for DeclarativePartitionFactory implementation.""" def setUp(self) -> None: + """Initialize mock objects and test subject for each test."""
40-69
: Consider adding error scenario test cases.The happy path test cases look solid! Would you consider adding some error scenarios to make the test suite more comprehensive? For example:
- What happens when the retriever raises an exception?
- How does it handle empty or None stream slices?
Here's a suggested test case structure:
def test_given_retriever_raises_exception_when_read_then_propagate_error(self) -> None: retriever = self._mock_retriever([]) retriever.read_records.side_effect = ValueError("Simulated error") self._retriever_factory.return_value = retriever with self.assertRaises(ValueError): list(self._partition_factory.create(_A_STREAM_SLICE).read())
70-73
: Consider enhancing mock helper method.The helper method looks good! To support testing error scenarios, what do you think about adding an optional parameter for side_effects? Something like:
- def _mock_retriever(self, read_return_value: List[StreamData]) -> Mock: + def _mock_retriever( + self, + read_return_value: List[StreamData], + side_effect: Exception | None = None + ) -> Mock: retriever = Mock(spec=Retriever) - retriever.read_records.return_value = iter(read_return_value) + if side_effect is not None: + retriever.read_records.side_effect = side_effect + else: + retriever.read_records.return_value = iter(read_return_value) return retrieverairbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)
127-132
: Consider enhancing the method documentation with examples? 🤔The new
output_format
method looks good, but would it be helpful to add some example inputs/outputs in the docstring to guide implementers? For instance, showing how different cursor value types (datetime, int, etc.) should be converted to JSON-valid types? wdyt?Here's a suggestion for the docstring:
def output_format(self, value: Any) -> Any: """ Convert the cursor value type to a JSON valid type. + + Examples: + >>> output_format(datetime(2023, 1, 1)) + "2023-01-01T00:00:00Z" + >>> output_format(Decimal("123.45")) + 123.45 """ ...airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
Line range hint
138-157
: Consider breaking down the complex nested logicThe function handles multiple levels of nested conditions for different types of parent stream configurations. What do you think about breaking this into smaller, more focused helper methods? For example:
def _update_parent_stream_cache(stream_config: dict[str, Any], parent_streams: set[str]) -> None: """Handle parent stream cache configuration for a single stream""" if stream_config.get("incremental_sync", {}).get("parent_stream"): _handle_incremental_sync_parent(stream_config, parent_streams) elif stream_config.get("retriever", {}).get("partition_router", {}): _handle_partition_router_parent(stream_config, parent_streams) def _handle_partition_router_parent(stream_config: dict[str, Any], parent_streams: set[str]) -> None: partition_router = stream_config["retriever"]["partition_router"] if isinstance(partition_router, dict): _update_parent_configs(partition_router.get("parent_stream_configs", []), parent_streams) elif isinstance(partition_router, list): for router in partition_router: _update_parent_configs(router.get("parent_stream_configs", []), parent_streams)This could make the code more maintainable and easier to test. What do you think? 🤔
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
271-271
: Consider validating the slice parameter before hashingThe hash computation looks good, but what do you think about adding a validation step for
self._slice
? WhileNone
is a valid value, we might want to ensure consistent handling across different slice types, wdyt?- self._hash = SliceHasher.hash(self._stream.name, self._slice) + slice_value = {} if self._slice is None else self._slice + self._hash = SliceHasher.hash(self._stream.name, slice_value)unit_tests/sources/streams/concurrent/test_cursor.py (3)
Line range hint
44-52
: Consider adding type hints to helper function parameters?The helper functions
_partition
and_record
could benefit from explicit type hints for better code maintainability and IDE support, wdyt?-def _partition( - _slice: Optional[Mapping[str, Any]], _stream_name: Optional[str] = Mock() -) -> Partition: +def _partition( + _slice: Optional[Mapping[str, Any]], + _stream_name: Optional[str] = Mock() +) -> Partition: -def _record( - cursor_value: CursorValueType, partition: Optional[Partition] = Mock() -) -> Record: +def _record( + cursor_value: CursorValueType, + partition: Optional[Partition] = Mock() +) -> Record:
232-241
: Would you like to add docstrings to test methods?The test methods could benefit from docstrings explaining the test scenario and expected behavior. For example:
def test_given_no_state_when_generate_slices_then_create_slice_from_start_to_end(self): """ Test that when no state is provided, the cursor generates a slice from start to end. Expected behavior: - Should create a single slice - Slice should start from the provided start time - Slice should end at the current time """
Line range hint
842-944
: Consider organizing complex test scenarios into test classes?The datetime-based cursor integration tests are quite extensive. Would it make sense to move them into a dedicated test class like
TestDatetimeBasedCursorIntegration
? This could help with:
- Better organization
- Shared setup code
- Clearer test boundaries
- Easier maintenance
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (5)
33-40
: Consider adding a docstring to thecreate
method for clarity.Adding a docstring to the
create
method would enhance readability and help others understand its purpose and parameters. Wdyt?
44-51
: Would it be helpful to include a docstring for the__init__
method inDeclarativePartition
?Providing a docstring here could clarify the initialization process and the roles of the parameters. What do you think?
66-67
: Simplify the return type of theto_slice
method.Since
self._stream_slice
is always assigned and cannot beNone
, perhaps we can change the return type fromOptional[Mapping[str, Any]]
to justMapping[str, Any]
. Wdyt?
76-85
: Consider adding docstrings to theStreamSlicerPartitionGenerator
methods.Including docstrings for the
__init__
andgenerate
methods could improve understanding of their functionality and usage. Would this be beneficial?
1-85
: What are your thoughts on adding unit tests for these new classes?Adding unit tests for
DeclarativePartitionFactory
,DeclarativePartition
, andStreamSlicerPartitionGenerator
would help ensure their correct behavior and maintain code quality. Would you like assistance in creating these tests? Wdyt?airbyte_cdk/sources/streams/concurrent/cursor.py (1)
93-98
: Should the docstring reference 'stream_slices' instead of 'generate_slices'?The docstring mentions
generate_slices
, but the method is now namedstream_slices
. Would it be better to update the docstring to reflect the new method name? Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (13)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(4 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(1 hunks)airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
(2 hunks)airbyte_cdk/sources/streams/concurrent/adapters.py
(3 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(8 hunks)airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py
(1 hunks)airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py
(1 hunks)airbyte_cdk/utils/slice_hasher.py
(1 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(2 hunks)unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
(1 hunks)unit_tests/sources/streams/concurrent/test_adapters.py
(0 hunks)unit_tests/sources/streams/concurrent/test_cursor.py
(19 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/sources/streams/concurrent/test_adapters.py
🔇 Additional comments (14)
airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py (1)
1-21
: Clean and well-structured implementation!
The abstract base class is well-designed with proper type hints and follows good practices for abstract class definition.
airbyte_cdk/utils/slice_hasher.py (1)
1-4
: Clean imports and structure!
The imports are well-organized and the type hints are properly used.
airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (2)
127-132
: The changes align well with the StreamSlicer integration! 👍
The addition of output_format
is a solid foundation for the broader refactoring to support StreamSlicer integration. It provides a clear contract for type conversion that will be essential when generating StreamSlices.
127-132
: Should we integrate output_format
with existing serialization methods? 🤔
I notice that serialize
and convert_to_state_message
methods handle state conversion but don't use the new output_format
method. Would it make sense to utilize it there for consistent type conversion? This could help ensure all cursor values are properly JSON-serializable before they reach the state message. wdyt?
Let's check if there are any existing type conversion issues in the codebase:
airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
Line range hint 138-138
: LGTM! Nice type hint improvement! 👍
The more specific type hint list[dict[str, Any]]
makes the expected structure much clearer than just list
. This will help with IDE support and static type checking.
97-98
: Verify removal of None from return type
I notice we're changing the return type from Union[None, MessageRepository]
to just MessageRepository
. Looking at the constructor, it seems _message_repository
is always initialized, but would you mind if we add a quick verification script to ensure there are no edge cases where this could be None? 🤔
✅ Verification successful
Changing return type to MessageRepository is safe
Looking at the code, I can confirm that _message_repository
is always initialized in the constructor through self._constructor.get_message_repository()
. The only None check I found is in FileBasedSource
which is a different class. The change from Union[None, MessageRepository]
to MessageRepository
is safe! 🎯
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any potential None checks on message_repository in the codebase
# This helps verify if any code relies on the property potentially returning None
# Search for None checks on message_repository
rg -A 2 "message_repository.*(?:is|==|!=)\s*None"
# Search for try-except blocks around message_repository access
ast-grep --pattern 'try {
$$$
message_repository
$$$
}'
Length of output: 458
Script:
#!/bin/bash
# Let's check the constructor and initialization of _message_repository in ManifestDeclarativeSource
ast-grep --pattern 'class ManifestDeclarativeSource {
$$$
__init__($$$) {
$$$
}
$$$
}'
# Also check for any direct assignments to _message_repository
rg "_message_repository\s*=" -A 2
# And check the actual file to see the full context
cat airbyte_cdk/sources/declarative/manifest_declarative_source.py
Length of output: 24931
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
46-47
: LGTM! Clean import addition
The new import aligns well with the PR's objective to enhance stream slicing functionality.
311-311
: LGTM! Good optimization
Nice optimization to return the pre-computed hash value! This change improves performance by avoiding redundant hash computations while maintaining thread safety. 👍
unit_tests/sources/streams/concurrent/test_cursor.py (1)
Line range hint 771-785
: Should we revisit this behavior?
The test comment mentions "Not sure what is the value of this behavior but I'm simply documenting how it is today". This suggests we might want to:
- Evaluate if this behavior is intentional
- Document the rationale if it is
- Consider fixing it if it isn't
What are your thoughts on this?
Let's check if this behavior is referenced in other tests or documentation:
✅ Verification successful
The behavior seems intentional - shall we add some documentation to explain why? 🤔
Looking at the implementation in airbyte_cdk/sources/streams/concurrent/cursor.py
, this behavior appears to be part of the state management design where:
- The cursor tracks the most recent value per partition regardless of boundaries
- This value is used to determine the next sync's starting point in sequential states
- For non-sequential states, it helps maintain the complete history of cursor progression
Instead of revisiting the behavior, perhaps we could update the test's docstring to explain this rationale? Something like:
"""
This behavior ensures we don't lose track of the latest cursor value even if it falls outside
the current slice boundaries, which is important for determining the next sync's starting point
and maintaining cursor progression history.
"""
What do you think about adding this documentation? 🙂
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for similar test cases or documentation
rg -l "most_recent_cursor_value.*outside.*boundaries" .
Length of output: 112
Script:
#!/bin/bash
# Let's check the actual test and surrounding context
rg -B 5 -A 10 "most_recent_cursor_value.*outside.*boundaries" .
Length of output: 1716
Script:
#!/bin/bash
# Let's also look for any related documentation in docstrings or comments
rg "most_recent_cursor_value" . --type py
Length of output: 5457
Script:
#!/bin/bash
# Check for any related implementation details
ast-grep --pattern 'def most_recent_cursor_value($$$) { $$$ }'
Length of output: 64
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
222-233
: The partition generator is correctly instantiated
The partition_generator
setup appears accurate, integrating DeclarativePartitionFactory
and StreamSlicerPartitionGenerator
effectively. This should enhance the partition management within the concurrent streams.
346-376
: Question about setting the cursor to None
after initialization
In the _retriever_factory
method, after setting the initial state on the cursor, we're setting declarative_stream.retriever.cursor
to None
(lines 370-372). Could setting the cursor to None
potentially impact components that might rely on it later? Perhaps we can verify if this is necessary or if there's an alternative approach that avoids any side effects. Wdyt?
6-6
: Imports updated appropriately
The addition of Callable
to the imports ensures that type hints are accurately defined throughout the code.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
3076-3082
: Tests validate concurrent cursor slice boundaries correctly
The assertions confirm that the concurrent cursor's slice boundary fields are set as expected. The test implementation looks solid.
Line range hint 3099-3115
: Parameterization covers scenarios with missing partition fields
The parameterized tests effectively handle cases where partition_field_start
or partition_field_end
are None
. This ensures robustness in handling optional fields. Nicely done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
airbyte_cdk/utils/slice_hasher.py (2)
6-12
: Consider adding error handling and documentation?The implementation looks clean! A couple of suggestions to make it even better:
- What do you think about adding error handling for the
__json_serializable__
call? It could raise unexpected errors.- Would you consider adding a docstring to explain the purpose and usage of this encoder?
Here's a possible implementation, wdyt? 🤔
class SliceEncoder(json.JSONEncoder): + """JSON encoder that handles objects implementing __json_serializable__ method. + + This encoder is used to serialize stream slices that may contain custom objects. + """ def default(self, obj: Any) -> Any: if hasattr(obj, "__json_serializable__"): - return obj.__json_serializable__() + try: + return obj.__json_serializable__() + except Exception as e: + raise TypeError(f"Object's __json_serializable__ method failed: {e}") # Let the base class default method raise the TypeError return super().default(obj)
15-30
: Great implementation of stable hashing! A few minor suggestions?Love how you implemented the stable hashing using SHA-256! 🎉 The code looks solid and follows best practices. A couple of tiny suggestions to make it even better:
- Would you consider adding a docstring to explain the hash generation strategy? It would help others understand why we're using the last 8 bytes.
- What do you think about catching
json.JSONEncodeError
specifically? Like this:class SliceHasher: + """Generates stable hash values for stream slices. + + Uses SHA-256 for stable hashing and returns a well-distributed 64-bit integer + by taking the last 8 bytes of the hash digest. + """ _ENCODING: Final = "utf-8" @classmethod def hash(cls, stream_name: str, stream_slice: Optional[Mapping[str, Any]] = None) -> int: if stream_slice: try: s = json.dumps(stream_slice, sort_keys=True, cls=SliceEncoder) hash_input = f"{stream_name}:{s}".encode(cls._ENCODING) - except TypeError as e: + except (TypeError, json.JSONEncodeError) as e: raise ValueError(f"Failed to serialize stream slice: {e}")unit_tests/sources/streams/concurrent/test_adapters.py (1)
173-176
: Consider adding documentation for the hash values?The hardcoded hash values make the test more stable, but they might be confusing for future maintainers. Would you consider adding a comment explaining how these values were derived? Something like:
@pytest.mark.parametrize( "_slice, expected_hash", [ + # Hash value derived from hash(("stream", '{"partition": 1, "k": "v"}')) pytest.param( {"partition": 1, "k": "v"}, 1088629586613270006, id="test_hash_with_slice", ), + # Hash value derived from hash("stream") pytest.param(None, 5149571505982114308, id="test_hash_no_slice"), ], )wdyt? This would make it easier to understand and update these values if the hashing logic changes in the future.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/utils/slice_hasher.py
(1 hunks)unit_tests/sources/streams/concurrent/test_adapters.py
(2 hunks)
🔇 Additional comments (2)
airbyte_cdk/utils/slice_hasher.py (1)
1-4
: LGTM! Clean imports and type hints.
The imports are well-organized and include all necessary modules. The type hints are comprehensive and follow best practices.
unit_tests/sources/streams/concurrent/test_adapters.py (1)
146-146
: LGTM! Good catch on adding the stream name.
The addition of stream.name
assignment makes the test setup more complete and consistent with other test methods.
What
Work as part of https://github.com/airbytehq/airbyte-internal-issues/issues/10552
In order to easily use any type of stream slicing that is done in the Declarative CDK as part of the Concurrent CDK, we need the
StreamSlicer
abstraction to be available in the Concurrent CDK.How
airbyte_cdk.sources.streams.concurrent.Cursor
implementStreamSlicer
and updateConcurrentCursor
to generateStreamSlice
sCursorPartitionGenerator
by a more generic partition generator that takes any stream slicerSummary by CodeRabbit
Release Notes
New Features
DeclarativePartitionFactory
andStreamSlicerPartitionGenerator
.StreamSlicer
class for improved state checkpointing.Improvements
Bug Fixes
Tests
DeclarativePartitionFactory
and enhanced scenarios for cursor handling.