-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(concurrent cursor): Ensure than when start and state are provided, sequential state value… #52
Conversation
… is initial state if first slice is not closed
/autofix
|
📝 WalkthroughWalkthroughThe changes in this pull request involve modifications to the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant DateTimeStreamStateConverter
participant StateManager
User->>DateTimeStreamStateConverter: Request state conversion
DateTimeStreamStateConverter->>DateTimeStreamStateConverter: Convert state
DateTimeStreamStateConverter->>StateManager: Update state with MOST_RECENT_RECORD_KEY
StateManager-->>DateTimeStreamStateConverter: Return updated state
DateTimeStreamStateConverter-->>User: Return converted state
Would you like to add any specific details to the sequence diagram, or is there something else you want to explore further? Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py (1)
85-89
: Implementation looks good! A few thoughts to consider...The addition of
MOST_RECENT_RECORD_KEY
aligns well with fixing the state reversion issue. Would you consider adding a docstring to explain the significance of this new key and how it prevents the state from reverting? Something like:""" Creates a slice representing records from prior syncs with MOST_RECENT_RECORD_KEY to track the latest processed record's timestamp and prevent state reversion. """wdyt? 🤔
unit_tests/sources/streams/concurrent/test_cursor.py (2)
67-69
: Consider adding a type hint for better code clarity?Would you consider adding a type hint for the
is_sequential_state
parameter? This would improve code clarity and IDE support, wdyt?def _cursor_with_slice_boundary_fields( - self, is_sequential_state: bool = True + self, is_sequential_state: bool = True ) -> ConcurrentCursor:
728-760
: Great test! A few suggestions to make it even better?The test case is well-structured and follows good testing practices. A couple of suggestions to consider:
- The comment on line 746 could be more descriptive about the failure scenario being simulated. Maybe something like:
- # simulate the case where at least the first slice fails but others succeed + # Simulate a partial sync scenario where the first slice fails (0-40) but the last slice succeeds (40-50). + # This verifies that even with partial success, we maintain the initial state for sequential states.
- Would you consider adding assertions to verify that no intermediate state updates occurred? This would make the test more robust by ensuring the state remains unchanged throughout the process, wdyt?
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
Line range hint
3020-3032
: Consider consolidating the initialization ofmost_recent_cursor_value
.In both the
if
andelse
blocks,most_recent_cursor_value
is assigned using similar logic. Would it simplify the code to setmost_recent_cursor_value
after the conditional block to avoid duplication? Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py
(1 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(3 hunks)unit_tests/sources/streams/concurrent/test_cursor.py
(3 hunks)
🔇 Additional comments (2)
airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py (1)
85-89
: Let's verify edge case handling
The implementation looks solid, but let's ensure it handles edge cases correctly:
- When
start
is None and there's no previous state - When
start
is provided but is earlier than the previous state - When transitioning from sequential to concurrent state
✅ Verification successful
Let me analyze the test coverage for these edge cases with a different approach. I'll search for relevant test files and implementation details first.
I found the test file location. Let me check its contents to verify the edge case coverage we're concerned about.
Edge cases are well covered by existing tests
After reviewing the test file, I can confirm that all our edge cases are thoroughly tested:
-
When
start
is None and there's no previous state:- Covered by test cases with IDs "epoch-converter-no-state-no-start-start-is-zero-value" and "isomillis-converter-no-state-no-start-start-is-zero-value"
-
When
start
is provided but is earlier than the previous state:- Covered by test cases with IDs "epoch-converter-state-before-start-start-is-start" and "isomillis-converter-state-before-start-start-is-start"
-
When transitioning from sequential to concurrent state:
- Thoroughly covered by
test_convert_from_sequential_state
with various scenarios including empty states and states with values
- Thoroughly covered by
The implementation handles these edge cases correctly, with proper test coverage for both epoch and ISO milliseconds formats. Looking good! 👍
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for test cases covering these scenarios
ast-grep --pattern 'def test_$_(self):
$$$
start = None
$$$
'
ast-grep --pattern 'def test_$_(self):
$$$
assert most_recent_cursor_value
$$$
'
Length of output: 158
Script:
#!/bin/bash
# Let's search for test files related to datetime state converter
fd "test.*datetime.*state.*\.py$"
# Let's also look for the actual implementation to understand the edge cases
rg -l "class DatetimeStreamStateConverter" --type py
# And examine any test cases that might be testing state conversion
rg "test.*convert.*state" --type py
Length of output: 950
Script:
#!/bin/bash
# Let's examine the test file content to check edge case coverage
cat unit_tests/sources/streams/concurrent/test_datetime_state_converter.py
Length of output: 15547
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3266-3266
: Verify the correctness of most_recent_cursor_value
.
Is expected_start
the correct value for most_recent_cursor_value
in this test case? Could there be scenarios where a different value might be more appropriate? Please verify to ensure it aligns with the intended behavior. Wdyt?
… is initial state if first slice is not closed
What
In an attempt to make source-klaviyo concurrent, we have seen the state revert to the start of the stream.
Most reasonable hypothesis: there is a bug in our state migration logic from single value state to partitioned state that was introduced when we added the
most_recent_cursor_value
and the connector still hadn’t sync the first slice. Note that I’ll provide a fix for that shortly.Why this is the most reasonable hypothesis: I can reproduce locally and will provide a fix for the case I can reproduce
Why I’m doubting just a smidge: It would mean that the first slice in October 2023 was still not processed. This is surprising because this is the first slice being produced and hence the first slice being allocated to a thread. Sure after that, the thread pool can decide never to pick this thread back up but it would be unlucky to say the least. This would also mean that all the previous attempt didn’t close a since slice as the issue mentioned above was present in other versions and the state was never updated in the previous attempts). I’ve tested locally that the first slice is closed with at least one record emitted and this case is behaving fine.
For more information: https://airbytehq-team.slack.com/archives/C07V1RTCRV2/p1731595468942439?thread_ts=1730912193.745319&cid=C07V1RTCRV2
How
By ensuring that if there is a
start
that is different from theend
when creating a cursor, that we set themost_recent_cursor_value
to the same value asend
Summary by CodeRabbit
New Features
Bug Fixes
Tests