Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code concurrent): Concurrent execution for streams without partition routers nor cursor #61

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Nov 15, 2024

Resolves airbytehq/airbyte-internal-issues#10550

Summary by CodeRabbit

  • New Features

    • Enhanced handling of concurrent streams with improved cursor management.
    • Simplified cursor creation logic for datetime incremental streams.
    • Added validation for primary key handling in stream configurations.
  • Bug Fixes

    • Adjustments made to ensure accurate state tracking for concurrent and synchronous streams.
    • Improved error handling for cursor creation with invalid parameters.
  • Tests

    • Updated test suite to validate new cursor logic and ensure robustness in handling various stream configurations, including expanded scenarios for concurrent state handling.
    • New tests added for primary key handling to cover valid and invalid input cases.

@maxi297 maxi297 marked this pull request as ready for review November 20, 2024 15:39
Copy link
Contributor

coderabbitai bot commented Nov 20, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The changes in this pull request involve modifications to the ConcurrentDeclarativeSource and ModelToComponentFactory classes, focusing on enhancing the handling of concurrent streams and cursor management. Key updates include the introduction of a new private method for datetime incremental checks, adjustments to method return types, and refinements in the test suites to validate the new behaviors. The updates aim to streamline the logic for managing streams and cursors, ensuring that functionality remains intact while improving code clarity and maintainability.

Changes

File Path Change Summary
airbyte_cdk/sources/declarative/concurrent_declarative_source.py - Added method _is_datetime_incremental_without_partition_routing.
- Updated _group_streams method to utilize the new method and refined cursor field handling.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py - Changed return type of create_concurrent_cursor_from_datetime_based_cursor from tuple to ConcurrentCursor.
- Removed Tuple import from typing module.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py - Removed stream_state_converter variable from tests.
- Updated tests to validate properties of concurrent_cursor.
unit_tests/sources/declarative/test_concurrent_declarative_source.py - Adjusted assertions and expected states in tests for concurrent streams.
- Refined disable_emitting_sequential_state_messages function.
airbyte_cdk/sources/streams/concurrent/helpers.py - Enhanced logic in get_primary_key_from_stream to handle primary keys more robustly.
unit_tests/sources/streams/concurrent/test_helpers.py - Added unit tests for get_primary_key_from_stream to validate behavior under various scenarios.

Possibly related PRs

Suggested reviewers

  • aaronsteers

Wdyt?


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (10)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

197-203: Consider renaming is_without_partition_router_nor_cursor for clarity.

The variable name is_without_partition_router_nor_cursor uses a double negative, which can be a bit hard to read and understand. Would it make sense to rename it to something like has_no_partition_router_and_no_cursor or is_non_partitioned_non_cursor_stream to improve readability? WDYT?


271-285: Could we simplify the boolean expression for better readability?

The return statement in _is_datetime_incremental_without_partition_routing combines multiple conditions:

return (
    bool(datetime_based_cursor_component_definition)
    and datetime_based_cursor_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__
    and self._stream_supports_concurrent_partition_processing(declarative_stream=declarative_stream)
    and hasattr(declarative_stream.retriever, "stream_slicer")
    and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
)

Would separating these conditions into intermediate variables or adding comments enhance readability and maintainability? For example:

has_datetime_cursor_def = bool(datetime_based_cursor_component_definition)
is_datetime_cursor_type = datetime_based_cursor_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__
supports_concurrent_processing = self._stream_supports_concurrent_partition_processing(declarative_stream=declarative_stream)
has_stream_slicer = hasattr(declarative_stream.retriever, "stream_slicer")
is_datetime_based_cursor = isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)

return (
    has_datetime_cursor_def
    and is_datetime_cursor_type
    and supports_concurrent_processing
    and has_stream_slicer
    and is_datetime_based_cursor
)

This might make the logic clearer for future maintenance. WDYT?

unit_tests/sources/declarative/test_concurrent_declarative_source.py (3)

482-489: LGTM! Consider enhancing test readability?

The test correctly validates the concurrent stream grouping. What do you think about extracting the stream names into constants at the top of the file to make the test more maintainable? wdyt?

+PARTY_MEMBERS_STREAM = "party_members"
+PALACES_STREAM = "palaces"
+LOCATIONS_STREAM = "locations"

 assert len(concurrent_streams) == 3
 concurrent_stream_0, concurrent_stream_1, concurrent_stream_2 = concurrent_streams
 assert isinstance(concurrent_stream_0, DefaultStream)
-assert concurrent_stream_0.name == "party_members"
+assert concurrent_stream_0.name == PARTY_MEMBERS_STREAM

1277-1278: LGTM! Consider adding descriptive assertions?

The test correctly validates stream categorization. What do you think about adding assertions for the specific streams in each category to make the test more descriptive? wdyt?

 assert len(source._concurrent_streams) == 1
 assert len(source._synchronous_streams) == 3
+# Verify specific streams in each category
+assert source._concurrent_streams[0].name == "palaces"
+assert {stream.name for stream in source._synchronous_streams} == {
+    "party_members",
+    "locations",
+    "party_members_skills"
+}

1573-1575: Consider using a test helper for type safety?

The function disables sequential state messages for testing, but relies on type ignores. What do you think about creating a test-specific interface or helper method to handle this more cleanly? wdyt?

+from typing import Protocol
+
+class TestCursor(Protocol):
+    def disable_sequential_state(self) -> None:
+        ...
+
+class ConcurrentCursor:
+    def disable_sequential_state(self) -> None:
+        self._connector_state_converter._is_sequential_state = False
+
 def disable_emitting_sequential_state_messages(source: ConcurrentDeclarativeSource) -> None:
-    for concurrent_stream in source._concurrent_streams:  # type: ignore
-        if isinstance(concurrent_stream.cursor, ConcurrentCursor):
-            concurrent_stream.cursor._connector_state_converter._is_sequential_state = False  # type: ignore
+    for stream in source._concurrent_streams:
+        if hasattr(stream.cursor, 'disable_sequential_state'):
+            stream.cursor.disable_sequential_state()
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

Line range hint 775-791: Consider adding validation for empty cursor fields?

The cursor field and slice boundary fields setup looks good, but what do you think about adding validation to ensure cursor_field is not empty after evaluation? This could prevent runtime issues, wdyt?

 cursor_field = CursorField(interpolated_cursor_field.eval(config=config))
+if not cursor_field:
+    raise ValueError("cursor_field cannot be empty after evaluation")

Line range hint 792-814: Consider adding validation for cursor granularity format

The datetime format and cursor granularity setup is solid, but should we validate the cursor_granularity format before parsing? Invalid duration strings could cause runtime errors, wdyt?

+def _is_valid_duration_format(duration_str: str) -> bool:
+    try:
+        parse_duration(duration_str)
+        return True
+    except Exception:
+        return False
+
 if datetime_based_cursor_model.cursor_granularity:
+    if not _is_valid_duration_format(datetime_based_cursor_model.cursor_granularity):
+        raise ValueError(f"Invalid cursor_granularity format: {datetime_based_cursor_model.cursor_granularity}")
     cursor_granularity = parse_duration(datetime_based_cursor_model.cursor_granularity)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)

Line range hint 3058-3089: Consider splitting this test into smaller, focused test cases?

The test covers multiple aspects of the concurrent cursor creation. To improve maintainability and make failures more obvious, we could split it into separate test cases for:

  • Basic cursor creation
  • State management
  • Boundary fields
  • Datetime handling

What do you think about breaking this into smaller, more focused test cases? This would make it easier to maintain and debug in the future. wdyt?


3090-3090: Consider moving stream state converter assertions to a dedicated test case?

This assertion block validates important properties of the stream state converter. Given its significance, would you consider moving it to a dedicated test case that focuses on stream state conversion specifically? This would give us more room to test edge cases and error scenarios. wdyt?


Line range hint 3191-3248: Consider adding more explicit assertions for datetime handling?

While the test covers the special case of min/max datetime formats, we could make the test more robust by:

  1. Adding assertions for invalid datetime format combinations
  2. Explicitly testing the format inheritance behavior
  3. Adding assertions for timezone handling

Would you like me to suggest some additional test cases? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 72117aa and a48323f.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (5 hunks)
🔇 Additional comments (4)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)

756-756: LGTM! State message assertion updated correctly

The assertion now properly checks for the no cursor state message, which aligns with the PR objective of handling streams without cursors.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

Line range hint 755-774: LGTM! Clean method signature with proper type hints.

The input validation logic is thorough and handles edge cases well.


886-899: LGTM! Clean ConcurrentCursor instantiation.

The instantiation is well-structured with all required parameters properly passed.


Line range hint 858-885: Verify step length calculation logic

The step length calculation has a potential edge case where evaluated_step could be empty but not None. Let's verify this behavior:

Copy link
Contributor

@pnilan pnilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Still need to test with an manifest-only connector.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

197-203: Consider improving readability of the stream classification logic?

The boolean expression could be more readable. What do you think about:

  1. Renaming to has_no_partition_router_and_no_cursor to make the negative logic more explicit
  2. Extracting the partition router check into a separate variable for clarity
-                is_without_partition_router_or_cursor = not bool(
-                    datetime_based_cursor_component_definition
-                ) and not (
-                    name_to_stream_mapping[declarative_stream.name]
-                    .get("retriever", {})
-                    .get("partition_router")
-                )
+                has_partition_router = bool(
+                    name_to_stream_mapping[declarative_stream.name]
+                    .get("retriever", {})
+                    .get("partition_router")
+                )
+                has_no_partition_router_and_no_cursor = not bool(
+                    datetime_based_cursor_component_definition
+                ) and not has_partition_router

WDYT? 🤔


257-262: Consider extracting cursor field key logic?

The cursor field access checks are good, but could be more maintainable. What if we extracted this into a helper method? This would also make it easier to update when the FIXME is addressed.

+                    def _get_cursor_field_key(cursor: Cursor) -> Optional[str]:
+                        """Extract cursor field key with safety checks."""
+                        if not hasattr(cursor, "cursor_field"):
+                            return None
+                        if not hasattr(cursor.cursor_field, "cursor_field_key"):
+                            return None
+                        return cursor.cursor_field.cursor_field_key
+
                     concurrent_streams.append(
                         DefaultStream(
                             ...
-                            cursor_field=cursor.cursor_field.cursor_field_key
-                            if hasattr(cursor, "cursor_field")
-                            and hasattr(
-                                cursor.cursor_field, "cursor_field_key"
-                            )  # FIXME this will need to be updated once we do the per partition
-                            else None,
+                            cursor_field=_get_cursor_field_key(cursor),  # FIXME: Update for per partition

Would this make the code more maintainable? 🤔


274-288: Consider breaking down the complex boolean expression?

The method has good validation checks, but the long boolean expression could be more readable. What about breaking it down into smaller, well-named checks?

     def _is_datetime_incremental_without_partition_routing(
         self,
         datetime_based_cursor_component_definition: Mapping[str, Any],
         declarative_stream: DeclarativeStream,
     ) -> bool:
+        def _is_datetime_based_cursor(definition: Mapping[str, Any]) -> bool:
+            return bool(definition) and definition.get("type", "") == DatetimeBasedCursorModel.__name__
+
+        def _has_compatible_retriever(stream: DeclarativeStream) -> bool:
+            return (
+                hasattr(stream.retriever, "stream_slicer")
+                and isinstance(stream.retriever.stream_slicer, DatetimeBasedCursor)
+            )
+
         return (
-            bool(datetime_based_cursor_component_definition)
-            and datetime_based_cursor_component_definition.get("type", "")
-            == DatetimeBasedCursorModel.__name__
+            _is_datetime_based_cursor(datetime_based_cursor_component_definition)
             and self._stream_supports_concurrent_partition_processing(
                 declarative_stream=declarative_stream
             )
-            and hasattr(declarative_stream.retriever, "stream_slicer")
-            and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
+            and _has_compatible_retriever(declarative_stream)
         )

This makes each check's purpose more explicit. What do you think? 🤔

unit_tests/sources/declarative/test_concurrent_declarative_source.py (2)

1277-1280: Consider adding more descriptive comments for stream categorization.

The comments could be more specific about why certain streams are categorized as concurrent vs synchronous, wdyt?

-    # 1 stream with parent stream
-    assert len(source._concurrent_streams) == 1
-    # 1 full refresh stream, 2 incremental stream with interpolation on state
-    assert len(source._synchronous_streams) == 3
+    # Only streams without state interpolation can be concurrent
+    assert len(source._concurrent_streams) == 1
+    # Streams with state interpolation or parent streams must be synchronous
+    assert len(source._synchronous_streams) == 3  # 1 full refresh + 2 incremental with state interpolation

1575-1577: Consider improving type safety in the cursor disabling logic.

The current implementation uses type ignores. Would it be cleaner to add proper type checking or create a test-specific interface, wdyt?

-    for concurrent_stream in source._concurrent_streams:  # type: ignore  # This is the easiest way to disable behavior from the test
-        if isinstance(concurrent_stream.cursor, ConcurrentCursor):
-            concurrent_stream.cursor._connector_state_converter._is_sequential_state = False  # type: ignore  # see above
+    from typing import cast
+    for stream in source._concurrent_streams:
+        concurrent_stream = cast(DefaultStream, stream)
+        if isinstance(concurrent_stream.cursor, ConcurrentCursor):
+            concurrent_stream.cursor._connector_state_converter._is_sequential_state = False
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between a48323f and 3acbd4c.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (5 hunks)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

209-211: Verify all concurrent processing paths are covered

The conditions for concurrent processing look good, but let's verify we're not missing any edge cases.

✅ Verification successful

Concurrent processing paths look well-covered

Based on the search results, the concurrent processing implementation is thorough and handles all the necessary paths:

  • Main concurrent processing check in concurrent_declarative_source.py
  • Partition processing in partition_reader.py with proper error handling and sentinel patterns
  • Job orchestration in job_orchestrator.py covering completed, running, and error states

The conditions for enabling concurrent processing are comprehensive, covering both non-partitioned streams and datetime incremental streams without partition routing. No edge cases appear to be missing.

Let's verify the cursor implementations to be thorough:


Concurrent processing paths are well-covered and properly implemented

Based on the comprehensive code analysis:

  1. The cursor implementations properly handle state management for both concurrent and non-concurrent scenarios:

    • DeclarativeCursor as the base abstract class
    • DatetimeBasedCursor for datetime-based incremental syncs
    • PerPartitionCursor for managing state per partition
    • GlobalSubstreamCursor for global state management
    • ConcurrentCursor specifically for concurrent processing
  2. The conditions in concurrent_declarative_source.py correctly identify streams that can be processed concurrently:

    • Streams without partition routers
    • Datetime incremental streams without partition routing
    • Proper state management through various cursor implementations
  3. Stream state handling is robust with appropriate safeguards for concurrent processing, as evidenced by the extensive test coverage and state management implementations.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any other potential concurrent processing paths we might have missed
rg -l "concurrent" | xargs rg "def.*process" -A 5

# Look for any other cursor implementations that might need similar handling
ast-grep --pattern 'class $_ implements Cursor {
  $$$
}'

Length of output: 6265


Script:

# Look for cursor-related configurations and implementations
rg "cursor" -l | xargs rg "class.*Cursor" -B 2 -A 5

# Check for any stream state handling that might affect concurrent processing
rg "stream_state" -B 2 -A 2

Length of output: 119207

unit_tests/sources/declarative/test_concurrent_declarative_source.py (3)

481-494: Consider updating the test description to match the new assertions.

The test description mentions "2 incremental, 1 full refresh as concurrent and 1 substream as synchronous" but the assertions now check for 3 concurrent streams. Should we update the comment to reflect this change?


Line range hint 541-556: LGTM! The cursor assertions are comprehensive.

The test now properly validates the locations stream cursor configuration including:

  • Stream name
  • Cursor field
  • Time boundaries
  • Slice configuration
  • State handling

756-756: LGTM! State blob format update.

The change from a completion state to a no cursor state message aligns with the new state management approach.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

208-214: Consider using safer dictionary access and more descriptive naming

The nested dictionary access could raise KeyError if keys don't exist. Also, the variable name could be more descriptive. WDYT about:

-is_without_partition_router_or_cursor = not bool(
-    incremental_sync_component_definition
-) and not (
-    name_to_stream_mapping[declarative_stream.name]
-    .get("retriever", {})
-    .get("partition_router")
-)
+is_simple_stream_without_cursor = not bool(
+    incremental_sync_component_definition
+) and not (
+    name_to_stream_mapping[declarative_stream.name]
+    .get("retriever", {})
+    .get("partition_router")
+)

264-269: Track the FIXME comment for per partition updates

The code safely handles cursor field access, but there's a FIXME comment about per partition updates that needs to be addressed.

Would you like me to help create a GitHub issue to track this TODO item for per partition updates?

unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)

1670-1672: LGTM! Consider adding docstring

The helper method is well-implemented. Would you like to add a docstring explaining its purpose and the reason for type ignores? WDYT?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 3acbd4c and f2cb524.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

226-240: LGTM! Clean cursor creation logic

The cursor creation logic is well-structured and includes all necessary parameters.


317-329: LGTM! Clear and comprehensive check

The method effectively combines multiple conditions to determine if a stream is datetime incremental without partition routing.

unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)

1372-1375: LGTM! Clear test assertions

The assertions and comments accurately reflect the expected stream categorization.

@maxi297 maxi297 changed the title [ISSUE #10550] have streams without partition routers nor cursor run … feat(low-code concurrent): [ISSUE #10550] have streams without partition routers nor cursor run … Dec 3, 2024
@github-actions github-actions bot added the enhancement New feature or request label Dec 3, 2024
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit from the rebase, otherwise seems like it should work with the other substream changes

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

268-268: Would you like to create an issue for the FIXME?

The comment indicates future work needed for per-partition updates. Would you like me to help create a GitHub issue to track this technical debt?

unit_tests/sources/streams/concurrent/test_helpers.py (1)

6-18: Great test coverage for the core scenarios! Would you consider adding a few more edge cases? 🤔

The current tests cover the main scenarios well. To make the test suite even more robust, what do you think about adding tests for:

  • None input
  • Single string input
  • Empty list input
  • Invalid type input (e.g., dict)

Here's a suggested structure, wdyt?

def test_given_none_when_get_primary_key_from_stream_then_return_empty_list():
    assert get_primary_key_from_stream(None) == []

def test_given_string_when_get_primary_key_from_stream_then_return_single_item_list():
    assert get_primary_key_from_stream("id") == ["id"]

def test_given_empty_list_when_get_primary_key_from_stream_then_return_empty_list():
    assert get_primary_key_from_stream([]) == []

def test_given_invalid_type_when_get_primary_key_from_stream_then_raise_error():
    with pytest.raises(ValueError):
        get_primary_key_from_stream({"id": "value"})
airbyte_cdk/sources/streams/concurrent/helpers.py (2)

19-22: How about making the transformation a bit more concise? 🤔

The logic is solid, but we could make the map operation slightly more pythonic. Also, would you consider making the error message more descriptive? Here's a suggestion:

 if are_all_elements_str:
     return stream_primary_key  # type: ignore # We verified all items in the list are strings
 elif are_all_elements_list_of_size_one:
-    return list(map(lambda x: x[0], stream_primary_key))
+    return [item[0] for item in stream_primary_key]
 else:
-    raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}")
+    raise ValueError(
+        f"Unsupported primary key format. Expected either a list of strings or a list of single-item lists, but got {stream_primary_key}"
+    )

16-22: Changes align well with PR objectives! 🎯

The enhanced primary key handling supports concurrent stream execution by ensuring robust stream identification. The backward compatibility is maintained while adding support for flattened composite keys, which is crucial for concurrent processing.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 7d0b457 and 6bea39c.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4 hunks)
  • airbyte_cdk/sources/streams/concurrent/helpers.py (1 hunks)
  • unit_tests/sources/streams/concurrent/test_helpers.py (1 hunks)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

317-331: LGTM! Nice extraction of complex conditions 👍

The new method improves readability by consolidating multiple conditions into a well-named function. The conditions are logically structured and the method name clearly describes its purpose.


208-214: Consider simplifying the condition check?

As mentioned in a previous review, we could simplify this by using the already retrieved values:

is_without_partition_router_or_cursor = not bool(incremental_sync_component_definition) and not bool(partition_router_component_definition)

This would avoid the nested dictionary access and make the code more concise. WDYT?


264-269: Good addition of the safety check! 👍

The additional hasattr check for cursor_field_key helps prevent potential AttributeError.

airbyte_cdk/sources/streams/concurrent/helpers.py (1)

16-17: Clean and efficient type checking! 👍

The variable names are very descriptive and the use of all() is perfect for this case. The separation into two distinct checks makes the logic very clear.

Copy link
Contributor

@pnilan pnilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor comments and then good to go.

airbyte_cdk/sources/streams/concurrent/helpers.py Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

260-265: Consider tracking the FIXME with an issue

The null safety checks look good! However, the FIXME comment about per-partition updates might get lost. Would you like me to help create a GitHub issue to track this technical debt? This would help ensure it's not forgotten when implementing per-partition updates.


312-326: Would you consider adding a docstring?

The new method nicely encapsulates the complex logic for determining datetime incremental streams without partition routing. However, it might be helpful to add a docstring explaining:

  1. The purpose of each condition
  2. What makes a stream eligible for this classification
  3. Any edge cases to be aware of

What do you think? This would make it easier for future maintainers to understand the requirements.

Here's a suggested docstring:

 def _is_datetime_incremental_without_partition_routing(
     self,
     declarative_stream: DeclarativeStream,
     incremental_sync_component_definition: Mapping[str, Any],
 ) -> bool:
+    """
+    Determines if a stream uses datetime-based incremental syncs without partition routing.
+    
+    A stream is considered datetime incremental without partition routing when:
+    1. It has an incremental sync component of type DatetimeBasedCursor
+    2. It supports concurrent partition processing
+    3. Its retriever has a stream_slicer attribute
+    4. The stream_slicer is an instance of DatetimeBasedCursor
+    
+    Args:
+        declarative_stream: The stream to check
+        incremental_sync_component_definition: The incremental sync component configuration
+    
+    Returns:
+        bool: True if the stream meets all criteria, False otherwise
+    """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between afaa4cd and 6ca33fd.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

213-215: Nice refactoring of the stream grouping logic!

The simplified boolean expression makes the code more maintainable and follows the DRY principle. This addresses brianjlai's previous feedback perfectly.

Also applies to: 270-272

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-reviewing the new parts. the new retriever check and changes seems sane to me. :shipit:

@maxi297 maxi297 merged commit ac6cf92 into main Dec 4, 2024
20 of 22 checks passed
@maxi297 maxi297 deleted the issue-10550/streams-without-partition-routers-nor-cursor-to-concurrent branch December 4, 2024 13:13
@aaronsteers aaronsteers changed the title feat(low-code concurrent): [ISSUE #10550] have streams without partition routers nor cursor run … feat(low-code concurrent): have streams without partition routers nor cursor concurrently Dec 4, 2024
@aaronsteers aaronsteers changed the title feat(low-code concurrent): have streams without partition routers nor cursor concurrently feat(low-code concurrent): Concurrent execution for streams without partition routers nor cursor Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants