Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add PoC for state delegating retriever #318

Open
wants to merge 28 commits into
base: main
Choose a base branch
from

Conversation

lazebnyi
Copy link
Contributor

@lazebnyi lazebnyi commented Feb 6, 2025

What

Fixed: https://github.com/airbytehq/airbyte-internal-issues/issues/11589

This is PoC for state delegation retriever.
Note: Some class, methods, variable names and etc. could be changed in the final PR.

Summary by CodeRabbit

  • New Features

    • Introduced a flexible data retrieval mechanism that unifies full and incremental fetching based on current data state.
    • Updated stream configurations to enable selection between different data retrieval strategies.
    • Added a new component, StateDelegatingRetriever, to enhance data retrieval options.
  • Tests

    • Added comprehensive tests to verify that the new data retrieval functionality performs correctly under various scenarios.

@github-actions github-actions bot added the enhancement New feature or request label Feb 6, 2025
@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 6, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

coderabbitai bot commented Feb 6, 2025

📝 Walkthrough

Walkthrough

This pull request introduces the new component StateDelegatingRetriever to the Airbyte declarative framework. The changes add its definition in both the YAML schema and the Python models, update the DeclarativeStream to support the new retriever, and integrate a new factory method for creating instances. Module exports and corresponding tests have also been updated to cover both full and incremental data retrieval scenarios based on the internal state.

Changes

File(s) Summary
airbyte_cdk/.../declarative_component_schema.yaml
airbyte_cdk/.../models/declarative_component_schema.py
Added the StateDelegatingRetriever definition and updated the DeclarativeStream to include it.
airbyte_cdk/.../parsers/model_to_component_factory.py Added the create_state_delegating_retriever method to the factory for instantiating the new component.
airbyte_cdk/.../retrievers/__init__.py
airbyte_cdk/.../retrievers/state_delegating_retriever.py
Updated module exports and introduced the new StateDelegatingRetriever class with delegation logic based on stream state.
unit_tests/.../test_state_delegating_retriever.py Introduced unit tests to verify both full and incremental data retrieval paths using the new retriever.
airbyte_cdk/.../concurrent_declarative_source.py Modified _group_streams method to include logic for handling StateDelegatingRetriever.

Sequence Diagram(s)

sequenceDiagram
    participant DS as DeclarativeStream
    participant F as ModelToComponentFactory
    participant SDR as StateDelegatingRetriever
    participant IR as IncrementalDataRetriever
    participant FR as FullDataRetriever
    participant C as Cursor

    DS->>F: create_state_delegating_retriever(model, config)
    F-->>SDR: instantiate StateDelegatingRetriever
    DS->>SDR: request data retrieval
    SDR->>C: get current state
    alt State exists
      SDR->>IR: delegate to incremental_data_retriever
    else No state
      SDR->>FR: delegate to full_data_retriever
    end
Loading

Possibly related PRs

Suggested reviewers

  • maxi297
  • brianjlai

Would you like to consider any additional reviewers for this PR, wdyt?

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1657-1681: Update type hints to include StateDelegatingRetriever.

The pipeline failures indicate type compatibility issues in _build_stream_slicer_from_partition_router and _build_resumable_cursor_from_paginator. Let's update their type hints to include StateDelegatingRetriever, wdyt?

     def _build_stream_slicer_from_partition_router(
         self,
-        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
         config: Config,
     ) -> Optional[PartitionRouter]:

     def _build_resumable_cursor_from_paginator(
         self,
-        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
         stream_slicer: Optional[StreamSlicer],
     ) -> Optional[StreamSlicer]:

Also applies to: 1682-1692

🧹 Nitpick comments (7)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)

1838-1843: Update the retriever field documentation in DeclarativeStream

Now that StateDelegatingRetriever is included in the retriever field, should we update the description to reflect this addition? Perhaps we can mention that it supports state delegation between incremental and full data retrieval. Wdyt?


2101-2110: Clarify field descriptions in StateDelegatingRetriever

To improve clarity, could we update the description and title of incremental_data_retriever and full_data_retriever to reflect their specific roles? For example, specifying that one is used when state is present and the other when not. Wdyt?

Here's a suggestion:

For incremental_data_retriever:

 description="Component used to coordinate how records are extracted across stream slices and request pages.",
+description="Retriever used when state is present (incremental sync).",
 title="Retriever",

For full_data_retriever:

 description="Component used to coordinate how records are extracted across stream slices and request pages.",
+description="Retriever used when no state is present (full sync).",
 title="Retriever",
airbyte_cdk/sources/declarative/retrievers/__init__.py (1)

15-21: Consider sorting the __all__ list alphabetically

For better readability and consistency, could we sort the __all__ list alphabetically? Wdyt?

Here's a suggested change:

 __all__ = [
-    "Retriever",
-    "SimpleRetriever",
-    "SimpleRetrieverTestReadDecorator",
-    "AsyncRetriever",
-    "StateDelegatingRetriever",
+    "AsyncRetriever",
+    "Retriever",
+    "SimpleRetriever",
+    "SimpleRetrieverTestReadDecorator",
+    "StateDelegatingRetriever",
 ]
unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py (3)

37-41: Consider enhancing the schema definition for better test coverage.

The schema is currently empty. Would you consider adding some fields to match the test data structure (e.g., id, name, updated_at)? This would help ensure schema validation is properly tested, wdyt?

 "schema": {
     "$schema": "http://json-schema.org/schema#",
-    "properties": {},
+    "properties": {
+        "id": {"type": "integer"},
+        "name": {"type": "string"},
+        "updated_at": {"type": "string", "format": "date"}
+    },
     "type": "object",
 },

125-140: Would you like to add type hints to improve code clarity?

The get_records function could benefit from explicit type hints for better code maintainability and IDE support, wdyt?

 def get_records(
     source: ConcurrentDeclarativeSource,
     config: dict,
     catalog: ConfiguredAirbyteCatalog,
-    state: list = None,
+    state: Optional[List[AirbyteStateMessage]] = None,
- ) -> list:
+ ) -> List[Dict[str, Any]]:

175-181: Consider adding more assertions for thorough testing.

The test verifies the retrieved records but could benefit from additional assertions. Would you consider adding checks for:

  1. The number of records returned
  2. The state after retrieval
  3. The HTTP request parameters
 assert expected_full == full_records
+assert len(full_records) == 2
+assert all(record["updated_at"] <= "2024-07-15" for record in full_records)

 incremental_records = get_records(source, _CONFIG, configured_catalog, state)
 expected_incremental = [
     {"id": 3, "name": "item_3", "updated_at": "2024-02-01"},
     {"id": 4, "name": "item_4", "updated_at": "2024-02-01"},
 ]
 assert expected_incremental == incremental_records
+assert len(incremental_records) == 2
+assert all(record["updated_at"] >= "2024-07-13" for record in incremental_records)

Also applies to: 193-198

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

2949-2976: Review of the new StateDelegatingRetriever component:

  1. Component Description
    The description is currently set to "Test state condition retriever." This appears to be a placeholder. Would you consider updating it to better reflect the component’s purpose—e.g., "A retriever that delegates between incremental and full data retrieval based on current state"—to make its intent clearer?
    wdyt?

  2. Enum Value Formatting
    The type property’s enum is defined as [ StateDelegatingRetriever ] with extra whitespace. Removing the surrounding spaces to read "StateDelegatingRetriever" would improve consistency and reduce potential parsing issues. Would you be open to that change?

  3. Property Titles for Clarity
    Both the incremental_data_retriever and full_data_retriever properties use the title "Retriever." Do you think it might be clearer to differentiate them—perhaps renaming them to "Incremental Data Retriever" and "Full Data Retriever" respectively—to avoid ambiguity?

Overall, the structural definition looks consistent with the established schema patterns. Let me know if you’d like suggestions for a diff code block to incorporate these refinements!

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e38f914 and 1f01589.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • airbyte_cdk/sources/declarative/retrievers/__init__.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1 hunks)
  • unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"

airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py

[error] 25-25: Function is missing a type annotation


[error] 38-38: Function is missing a type annotation


[error] 47-47: Function is missing a return type annotation


[error] 55-55: Function is missing a return type annotation


[error] 59-59: Function is missing a type annotation for one or more arguments

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2099-2112: The StateDelegatingRetriever class looks good

The introduction of the StateDelegatingRetriever class seems appropriate and aligns with the overall design. Nice work!

airbyte_cdk/sources/declarative/retrievers/__init__.py (1)

11-13: Importing StateDelegatingRetriever

The addition of StateDelegatingRetriever to the imports looks good. This makes the class accessible for use elsewhere.

unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py (1)

163-164: The test data seems to have inconsistent dates.

In the incremental data retrieval test, the updated_at dates (2024-02-01) are earlier than the state's cursor value (2024-07-13). This might not effectively test the incremental sync logic. Should we update these dates to be more recent than the cursor, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

350-352: LGTM!

The import and model registration for StateDelegatingRetriever are correctly added.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1648-1649: Update method signatures to support StateDelegatingRetriever.

The pipeline failures indicate that _build_stream_slicer_from_partition_router and _build_resumable_cursor_from_paginator methods need to be updated to handle the new StateDelegatingRetriever type. Should we update their type hints to include StateDelegatingRetriever, wdyt?

     def _build_stream_slicer_from_partition_router(
         self,
-        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
         config: Config,
     ) -> Optional[PartitionRouter]:

     def _build_resumable_cursor_from_paginator(
         self,
-        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+        model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
         stream_slicer: Optional[StreamSlicer],
     ) -> Optional[StreamSlicer]:

Also applies to: 1673-1674

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2105-2118: Consider adding validation to ensure retrievers are compatible?

The implementation looks good! However, to prevent potential runtime issues, what do you think about adding validation to ensure both retrievers are compatible (e.g., same schema, similar configuration)? We could use Pydantic's validator decorator, wdyt?

 class StateDelegatingRetriever(BaseModel):
     type: Literal["StateDelegatingRetriever"]
     incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
         ...,
         description="Component used to coordinate how records are extracted across stream slices and request pages.",
         title="Retriever",
     )
     full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
         ...,
         description="Component used to coordinate how records are extracted across stream slices and request pages.",
         title="Retriever",
     )
     parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
+
+    @validator("full_data_retriever")
+    def validate_retrievers_compatibility(cls, v, values):
+        if "incremental_data_retriever" in values:
+            # Add compatibility checks here
+            pass
+        return v
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1f01589 and a0e5d92.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Analyze (python)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

1844-1850: LGTM! Clean addition of StateDelegatingRetriever to the retriever field.

The update to include StateDelegatingRetriever in the Union type is well-documented and maintains consistency with the existing pattern.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

350-352: LGTM!

The import is correctly placed and follows the existing pattern.


604-604: LGTM!

The constructor mapping is correctly added and follows the existing pattern.


2551-2595: ⚠️ Potential issue

Fix type safety issue with cursor parameter.

The pipeline failure indicates a type mismatch for the cursor parameter. The StateDelegatingRetriever expects a non-optional DeclarativeCursor, but we're passing Optional[DeclarativeCursor]. We should add a type check and raise an error if the cursor is None, wdyt?

     def create_state_delegating_retriever(
         self,
         model: StateDelegatingRetrieverModel,
         config: Config,
         *,
         name: str,
         primary_key: Optional[Union[str, List[str], List[List[str]]]],
         stream_slicer: Optional[StreamSlicer],
         request_options_provider: Optional[RequestOptionsProvider] = None,
         stop_condition_on_cursor: bool = False,
         client_side_incremental_sync: Optional[Dict[str, Any]] = None,
         transformations: List[RecordTransformation],
     ) -> StateDelegatingRetriever:
         cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None
+        if cursor is None:
+            raise ValueError("StateDelegatingRetriever requires a DeclarativeCursor")

         full_data_retriever = self._create_component_from_model(

Likely invalid or redundant comment.

🧰 Tools
🪛 GitHub Actions: Linters

[error] 2593-2593: Argument "cursor" to "StateDelegatingRetriever" has incompatible type "DeclarativeCursor | None"; expected "DeclarativeCursor"

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 6, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

octavia-squidington-iii and others added 3 commits February 6, 2025 16:56
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1686-1686: Update method parameter types to include StateDelegatingRetrieverModel

It seems that model.retriever could be a StateDelegatingRetrieverModel, which isn't currently accepted by _build_stream_slicer_from_partition_router. Should we update the type annotation to include it? What do you think?

Here's a suggested change:

 def _build_stream_slicer_from_partition_router(
     self,
-    model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+    model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
     config: Config,
 ) -> Optional[PartitionRouter]:
🧰 Tools
🪛 GitHub Actions: Linters

[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


1770-1770: Include StateDelegatingRetrieverModel in _build_resumable_cursor_from_paginator

Since model.retriever might be a StateDelegatingRetrieverModel, should we adjust the type annotation of _build_resumable_cursor_from_paginator to accept it? Wdyt?

Suggested change:

 def _build_resumable_cursor_from_paginator(
     self,
-    model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
+    model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel, StateDelegatingRetrieverModel],
     stream_slicer: Optional[StreamSlicer],
 ) -> Optional[StreamSlicer]:
🧰 Tools
🪛 GitHub Actions: Linters

[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1)

43-46: Consider adding 'retriever' to the __setattr__ internal names

In the __setattr__ method, we currently exclude 'retriever' from the internal attributes. Since 'retriever' is included in the __getattr__ method's internal names, should we also add it to __setattr__ for consistency? Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a0e5d92 and e46a88a.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py

[error] 59-59: Incompatible return value type (got "Mapping[str, Any]", expected "MutableMapping[str, Any]")

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1686-1686: Argument 1 to "_build_stream_slicer_from_partition_router" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"


[error] 1770-1770: Argument 1 to "_build_resumable_cursor_from_paginator" of "ModelToComponentFactory" has incompatible type "AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever"; expected "AsyncRetriever | CustomRetriever | SimpleRetriever"

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (2)

49-55: ⚠️ Potential issue

Consider adding null check for cursor in retriever property.

The retriever property accesses self.cursor.get_stream_state() without checking if self.cursor is None. Should we add a null check to prevent potential AttributeError? Wdyt?

 @property
 def retriever(self) -> Retriever:
-    return (
-        self.incremental_data_retriever
-        if self.cursor.get_stream_state()
-        else self.full_data_retriever
-    )
+    if self.cursor and self.cursor.get_stream_state():
+        return self.incremental_data_retriever
+    return self.full_data_retriever

57-65: ⚠️ Potential issue

Ensure state property returns MutableMapping.

The state property returns self.cursor.get_stream_state(), which may be an immutable Mapping. Since the return type is Mapping[str, Any], should we convert it to a dict to ensure mutability? Wdyt?

 @property
 def state(self) -> Mapping[str, Any]:
-    return self.cursor.get_stream_state() if self.cursor else {}
+    return dict(self.cursor.get_stream_state()) if self.cursor else {}
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (2)

28-39: Consider adding a docstring to explain the attribute delegation logic.

The __getattr__ implementation is clean and handles attribute delegation well. Would you consider adding a docstring to explain the delegation behavior and list of special attributes that are not delegated? This would help future maintainers understand the design intent better.

 def __getattr__(self, name: str) -> Any:
+    """Delegate attribute access to the active retriever.
+    
+    Special attributes (full_data_retriever, incremental_data_retriever, cursor, retriever, state)
+    are accessed directly from self, while all other attributes are delegated to the active retriever.
+    """
     # Avoid delegation for these internal names.
     if name in {

41-47: Consider adding a docstring to explain the attribute assignment logic.

Similar to __getattr__, would you consider adding a docstring to explain the attribute assignment behavior? This would maintain consistency in documentation.

 def __setattr__(self, name: str, value: Any) -> None:
+    """Delegate attribute assignment to the active retriever.
+    
+    Special attributes (full_data_retriever, incremental_data_retriever, cursor, state)
+    are set directly on self, while all other attributes are delegated to the active retriever.
+    """
     # For the internal attributes, set them directly on self.
     if name in {"full_data_retriever", "incremental_data_retriever", "cursor", "state"}:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2551-2596: Consider adding error handling for retriever creation.

The create method looks good overall, but there are a few suggestions:

  1. The cursor type check is good, but should we also validate that both retrievers are not None?
  2. Consider adding error handling around the retriever creation calls in case they fail.
 def create_state_delegating_retriever(
     self,
     model: StateDelegatingRetrieverModel,
     config: Config,
     *,
     name: str,
     primary_key: Optional[Union[str, List[str], List[List[str]]]],
     stream_slicer: Optional[StreamSlicer],
     request_options_provider: Optional[RequestOptionsProvider] = None,
     stop_condition_on_cursor: bool = False,
     client_side_incremental_sync: Optional[Dict[str, Any]] = None,
     transformations: List[RecordTransformation],
 ) -> StateDelegatingRetriever:
     if not isinstance(stream_slicer, DeclarativeCursor):
         raise ValueError("StateDelegatingRetriever requires a DeclarativeCursor")
+    if not model.full_data_retriever or not model.incremental_data_retriever:
+        raise ValueError("Both full_data_retriever and incremental_data_retriever must be provided")
 
-    full_data_retriever = self._create_component_from_model(
-        model=model.full_data_retriever,
-        config=config,
-        name=name,
-        primary_key=primary_key,
-        stream_slicer=stream_slicer,
-        request_options_provider=request_options_provider,
-        stop_condition_on_cursor=stop_condition_on_cursor,
-        client_side_incremental_sync=client_side_incremental_sync,
-        transformations=transformations,
-    )
+    try:
+        full_data_retriever = self._create_component_from_model(
+            model=model.full_data_retriever,
+            config=config,
+            name=name,
+            primary_key=primary_key,
+            stream_slicer=stream_slicer,
+            request_options_provider=request_options_provider,
+            stop_condition_on_cursor=stop_condition_on_cursor,
+            client_side_incremental_sync=client_side_incremental_sync,
+            transformations=transformations,
+        )
+    except Exception as e:
+        raise ValueError(f"Failed to create full_data_retriever: {str(e)}")
 
-    incremental_data_retriever = self._create_component_from_model(
-        model=model.incremental_data_retriever,
-        config=config,
-        name=name,
-        primary_key=primary_key,
-        stream_slicer=stream_slicer,
-        request_options_provider=request_options_provider,
-        stop_condition_on_cursor=stop_condition_on_cursor,
-        client_side_incremental_sync=client_side_incremental_sync,
-        transformations=transformations,
-    )
+    try:
+        incremental_data_retriever = self._create_component_from_model(
+            model=model.incremental_data_retriever,
+            config=config,
+            name=name,
+            primary_key=primary_key,
+            stream_slicer=stream_slicer,
+            request_options_provider=request_options_provider,
+            stop_condition_on_cursor=stop_condition_on_cursor,
+            client_side_incremental_sync=client_side_incremental_sync,
+            transformations=transformations,
+        )
+    except Exception as e:
+        raise ValueError(f"Failed to create incremental_data_retriever: {str(e)}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e46a88a and 1e71e63.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6 hunks)
  • airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[warning] 1-1: File would be reformatted. Please ensure the code adheres to the formatting standards.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1)

1-26: LGTM! The class structure and attributes are well-defined.

The experimental nature is properly marked with a deprecation warning, and the dataclass attributes are correctly typed. The class structure follows good design principles by delegating to specialized retrievers based on state.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2207-2220: Consider enhancing the documentation for better clarity.

The class looks well-structured, but the documentation could be more descriptive. Would you consider adding:

  1. A brief description of when to use this retriever vs. others?
  2. Examples of how the two retrievers work together?
  3. Documentation for the parameters field similar to other classes?

Here's a suggested diff, wdyt?

 class StateDelegatingRetriever(BaseModel):
     type: Literal["StateDelegatingRetriever"]
     incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
         ...,
-        description="Component used to coordinate how records are extracted across stream slices and request pages.",
+        description="Component used to extract records incrementally based on state.",
         title="Retriever",
     )
     full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
         ...,
-        description="Component used to coordinate how records are extracted across stream slices and request pages.",
+        description="Component used to extract all records when no state is present or when a full refresh is needed.",
         title="Retriever",
     )
-    parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
+    parameters: Optional[Dict[str, Any]] = Field(
+        None,
+        alias="$parameters",
+        description="Additional parameters to be used for further customization of behavior.",
+    )
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

1683-1683: Consider adding/updating docstrings for the modified methods.

The method signatures have been updated correctly to support the new retriever type, but the docstrings haven't been updated to reflect these changes. Would you like to add parameter descriptions for StateDelegatingRetrieverModel in the docstrings, wdyt?

Also applies to: 1707-1711, 1759-1759


1775-1801: Consider enhancing error messages with more context.

The error messages are clear but could be more helpful by including:

  1. The actual type of incremental_sync when it's missing
  2. Suggestions for how to fix the issue

For example:

-            raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")
+            raise ValueError(
+                "StateDelegatingRetriever requires 'incremental_sync' to be enabled. "
+                "Please add an incremental_sync configuration with a DeclarativeCursor implementation."
+            )

What do you think about making the error messages more actionable, wdyt?


2583-2628: Consider additional validation for retrievers.

The cursor validation is good, but we might want to add a few more checks:

  1. Should we validate that both retrievers are properly configured for their respective roles?
  2. Should we ensure that both retrievers are compatible with the cursor type?

Also, would you consider adding debug logging for state transitions between retrievers, wdyt?

❌ Verification inconclusive

🏁 Script executed:

#!/bin/bash
# Check if there are any existing implementations that might provide insights
rg -A 5 "StateDelegatingRetriever.*__init__" 

Length of output: 46


Enhance Validation and Logging for Retriever Configuration

Hi there, I noticed that while the cursor validation is handled well (ensuring it’s a DeclarativeCursor), there isn’t any explicit check that the full‐data and incremental retrievers are fully configured for their intended roles or compatible with the cursor type. Would you consider:

  • Adding additional validations—perhaps within _create_component_from_model or directly in the StateDelegatingRetriever constructor—to ensure each retriever’s configuration meets necessary preconditions?
  • Introducing debug logging around state transitions between the retrievers for easier diagnostics?

These changes might help prevent subtle configuration issues at runtime. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b78cc6e and 53b2980.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (10 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[warning] 1-1: File would be reformatted. Please ensure the code adheres to the formatting standards.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

1946-1952: LGTM! Clean integration of StateDelegatingRetriever.

The update to the retriever field in DeclarativeStream maintains backward compatibility while adding support for the new StateDelegatingRetriever. The field description accurately describes its purpose.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

351-353: LGTM! Clean import additions.

The imports for both the model and implementation classes are properly placed in their respective sections.

Also applies to: 440-440

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1599-1633: Consider extracting cursor creation logic to a separate method.

The cursor creation logic is complex and could benefit from being extracted into a separate method for better readability and maintainability.

What do you think about extracting this into a method like _create_cursor_from_incremental_sync? Here's a suggested implementation:

def _create_cursor_from_incremental_sync(
    self,
    incremental_sync_model: Any,
    config: Config,
    stream_slicer: Optional[PartitionRouter],
) -> Optional[StreamSlicer]:
    cursor_component = self._create_component_from_model(
        model=incremental_sync_model, config=config
    )
    is_global_cursor = (
        hasattr(incremental_sync_model, "global_substream_cursor")
        and incremental_sync_model.global_substream_cursor
    )

    if is_global_cursor:
        return GlobalSubstreamCursor(
            stream_cursor=cursor_component, partition_router=stream_slicer
        )
    return PerPartitionWithGlobalCursor(
        cursor_factory=CursorFactory(
            lambda: self._create_component_from_model(
                model=incremental_sync_model, config=config
            ),
        ),
        partition_router=stream_slicer,
        stream_cursor=cursor_component,
    )

2599-2644: LGTM! Well-structured implementation of create_state_delegating_retriever.

The implementation correctly:

  1. Validates that stream_slicer is a DeclarativeCursor
  2. Creates full_data_retriever and incremental_data_retriever
  3. Returns StateDelegatingRetriever with proper initialization

A few suggestions to consider:

  1. Add docstring to describe the method's purpose and parameters
  2. Consider adding validation for model.full_data_retriever and model.incremental_data_retriever to ensure they're not None

What do you think about adding these improvements? Here's a suggested docstring:

def create_state_delegating_retriever(
    self,
    model: StateDelegatingRetrieverModel,
    config: Config,
    *,
    name: str,
    primary_key: Optional[Union[str, List[str], List[List[str]]]],
    stream_slicer: Optional[StreamSlicer],
    request_options_provider: Optional[RequestOptionsProvider] = None,
    stop_condition_on_cursor: bool = False,
    client_side_incremental_sync: Optional[Dict[str, Any]] = None,
    transformations: List[RecordTransformation],
) -> StateDelegatingRetriever:
    """Creates a StateDelegatingRetriever that switches between full and incremental data retrieval based on state.
    
    Args:
        model: The StateDelegatingRetrieverModel containing configuration
        config: The connector configuration
        name: The name of the stream
        primary_key: The primary key of the stream
        stream_slicer: The stream slicer to use for partitioning data
        request_options_provider: Optional provider for request options
        stop_condition_on_cursor: Whether to stop on cursor condition
        client_side_incremental_sync: Optional client-side incremental sync config
        transformations: List of record transformations to apply
        
    Returns:
        StateDelegatingRetriever: The configured retriever
        
    Raises:
        ValueError: If stream_slicer is not a DeclarativeCursor
    """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 53b2980 and 14138ed.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (10 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1762-1762: Returning Any from function declared to return 'StreamSlicer | None' [no-any-return]


[error] 1796-1796: Item 'None' of 'CustomIncrementalSync | DatetimeBasedCursor | None' has no attribute 'type' [union-attr]

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

351-353: LGTM! Added import for StateDelegatingRetrieverModel.

The import is correctly placed with other model imports.


440-440: LGTM! Added StateDelegatingRetriever to imports.

The import is correctly placed with other retriever imports.


615-615: LGTM! Added StateDelegatingRetrieverModel to factory mappings.

The mapping is correctly added to the PYDANTIC_MODEL_TO_CONSTRUCTOR dictionary.


1683-1688: LGTM! Updated method signature to include StateDelegatingRetrieverModel.

The type annotation is correctly updated to include the new model type.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1788-1809: Consider adding error messages for invalid retriever configurations.

The validation logic is good, but the error messages could be more descriptive to help users understand and fix issues. For example, we could explain why AsyncRetriever only supports DatetimeBasedCursor.

What do you think about enhancing the error messages like this:

-            raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")
+            raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled because it needs to determine when to delegate between full and incremental sync.")

-            raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet.")
+            raise ValueError("AsyncRetriever currently only supports DatetimeBasedCursor because it requires support for unordered slices (e.g., reports for January and February where February's report can complete first).")

1681-1689: Consider refactoring the cursor building logic.

The cursor building logic is becoming complex with multiple types and conditions. Consider extracting the cursor creation logic into separate methods for better maintainability.

What do you think about splitting this into:

  1. _build_concurrent_cursor
  2. _build_global_cursor
  3. _build_per_partition_cursor

This would make the code more modular and easier to test. Here's a suggested structure:

def _build_incremental_cursor(self, model: DeclarativeStreamModel, stream_slicer: Optional[PartitionRouter], config: Config) -> Optional[StreamSlicer]:
    if not model.incremental_sync:
        return None
        
    if model.retriever.type == "AsyncRetriever":
        return self._build_concurrent_cursor(model, stream_slicer, config)
        
    cursor_component = self._create_component_from_model(model=model.incremental_sync, config=config)
    
    if not stream_slicer:
        return cursor_component
        
    if self._is_global_cursor(model.incremental_sync):
        return self._build_global_cursor(cursor_component, stream_slicer)
    
    return self._build_per_partition_cursor(model.incremental_sync, cursor_component, stream_slicer, config)

Also applies to: 1711-1752

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 14138ed and 66001f1.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (10 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[warning] 1-1: File would be reformatted. Please ensure the code adheres to the formatting standards.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

351-353: LGTM! Import of StateDelegatingRetrieverModel.

The import is correctly placed with other model imports and follows the established pattern.


615-616: LGTM! Factory mapping registration.

The StateDelegatingRetrieverModel is properly registered in the PYDANTIC_MODEL_TO_CONSTRUCTOR mapping.


2601-2646: Verify the StateDelegatingRetriever implementation.

The implementation looks good but there are a few things to verify:

  1. The cursor type check is strict - should we allow custom cursor implementations that inherit from DeclarativeCursor?
  2. Both retrievers receive the same parameters - is this intentional or should they have separate configurations?

Let's check if there are any custom cursor implementations in the codebase:

❌ Verification inconclusive

🏁 Script executed:

#!/bin/bash
# Search for classes that inherit from DeclarativeCursor
ast-grep --pattern 'class $_ ($$$, DeclarativeCursor, $$$)'

Length of output: 61


Cursor Type Check and Retriever Configuration Clarification

  • Our search for custom implementations inheriting from DeclarativeCursor returned no results, which suggests that no custom cursor implementations exist at the moment. Should we update the type check to allow subclasses of DeclarativeCursor, or is the strict type check acceptable given the current codebase?
  • Both the full and incremental retrievers receive identical parameters via the _create_component_from_model call. This symmetry appears intentional based on the model, but it might be worth discussing whether separate configurations might offer more flexibility for future changes.

Could you please manually verify if any custom cursor implementations exist in the repository or if there’s a need to differentiate the configurations for the two retrievers? wdyt?

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a4b044 and 1c38282.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (15 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 837-837: Argument 2 to 'LegacyToPerPartitionStateMigration' has incompatible type 'CustomIncrementalSync | DatetimeBasedCursor | None'; expected 'CustomIncrementalSync | DatetimeBasedCursor'


[error] 840-840: Argument 4 to 'LegacyToPerPartitionStateMigration' has incompatible type 'dict[str, Any] | None'; expected 'Mapping[str, Any]'


[error] 870-870: Missing named argument '$parameters' for 'BearerAuthenticator'


[error] 1151-1151: Argument 2 to 'ClampingEndProvider' has incompatible type 'Callable[[], datetime]'; expected 'Callable[[], CursorValueType]'


[error] 1169-1169: Argument 2 to 'ClampingEndProvider' has incompatible type 'Callable[[], datetime]'; expected 'Callable[[], CursorValueType]'


[error] 1177-1177: Argument 2 to 'ClampingEndProvider' has incompatible type 'Callable[[], datetime]'; expected 'Callable[[], CursorValueType]'


[error] 1195-1195: Argument 'start' to 'ConcurrentCursor' has incompatible type 'datetime'; expected 'CursorValueType | None'


[error] 1197-1197: Argument 'end_provider' to 'ConcurrentCursor' has incompatible type 'Callable[[], datetime]'; expected 'Callable[[], CursorValueType]'


[error] 1772-1772: Returning Any from function declared to return 'StreamSlicer | None'


[error] 1811-1811: Item 'CustomRetriever' of 'AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever' has no attribute 'partition_router'


[error] 1811-1811: Item 'StateDelegatingRetriever' of 'AsyncRetriever | CustomRetriever | SimpleRetriever | StateDelegatingRetriever' has no attribute 'partition_router'


[error] 2470-2470: Argument 1 to '_create_component_from_model' of 'ModelToComponentFactory' has incompatible type 'CustomSchemaNormalization | None'; expected 'BaseModel'

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2622-2667: LGTM: StateDelegatingRetriever implementation.

The implementation of create_state_delegating_retriever looks good. It properly validates the cursor requirement and creates both full and incremental data retrievers.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 12, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2605-2650: LGTM! The implementation looks solid.

The method correctly:

  • Validates that stream_slicer is a DeclarativeCursor
  • Creates both full and incremental data retrievers
  • Passes all necessary parameters to the retrievers
  • Returns a properly configured StateDelegatingRetriever

One suggestion though - should we add error handling for the case when either retriever creation fails, wdyt?

     def create_state_delegating_retriever(
         self,
         model: StateDelegatingRetrieverModel,
         config: Config,
         *,
         name: str,
         primary_key: Optional[Union[str, List[str], List[List[str]]]],
         stream_slicer: Optional[StreamSlicer],
         request_options_provider: Optional[RequestOptionsProvider] = None,
         stop_condition_on_cursor: bool = False,
         client_side_incremental_sync: Optional[Dict[str, Any]] = None,
         transformations: List[RecordTransformation],
     ) -> StateDelegatingRetriever:
         if not isinstance(stream_slicer, DeclarativeCursor):
             raise ValueError("StateDelegatingRetriever requires a DeclarativeCursor")
 
+        try:
             full_data_retriever = self._create_component_from_model(
                 model=model.full_data_retriever,
                 config=config,
                 name=name,
                 primary_key=primary_key,
                 stream_slicer=stream_slicer,
                 request_options_provider=request_options_provider,
                 stop_condition_on_cursor=stop_condition_on_cursor,
                 client_side_incremental_sync=client_side_incremental_sync,
                 transformations=transformations,
             )
 
             incremental_data_retriever = self._create_component_from_model(
                 model=model.incremental_data_retriever,
                 config=config,
                 name=name,
                 primary_key=primary_key,
                 stream_slicer=stream_slicer,
                 request_options_provider=request_options_provider,
                 stop_condition_on_cursor=stop_condition_on_cursor,
                 client_side_incremental_sync=client_side_incremental_sync,
                 transformations=transformations,
             )
+        except Exception as e:
+            raise ValueError(f"Failed to create retrievers for StateDelegatingRetriever: {str(e)}")
 
         return StateDelegatingRetriever(
             full_data_retriever=full_data_retriever,
             incremental_data_retriever=incremental_data_retriever,
             cursor=stream_slicer,
         )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1c38282 and 666c4fa.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

1789-1791: LGTM! Good validation check.

The validation ensures that StateDelegatingRetriever is properly configured with incremental sync enabled.


1811-1815: LGTM! Clear error message for unsupported feature.

The error message clearly explains that per partition state is not yet supported for AsyncRetriever.


1683-1688: LGTM! Clean type signature update.

The signature update properly includes StateDelegatingRetrieverModel in the Union type.

@lazebnyi lazebnyi changed the title feat(low-code): add PoC for state delegating retriever feat(low-code cdk): add PoC for state delegating retriever Feb 12, 2025
@lazebnyi
Copy link
Contributor Author

lazebnyi commented Feb 14, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (2)

24-27: Consider adding docstrings to explain the purpose of each field.

The fields are well-typed but could benefit from documentation explaining their roles. Wdyt?

 class StateDelegatingRetriever:
+    """A retriever that delegates between full and incremental data retrievers based on state.
+    
+    Args:
+        full_data_retriever: Retriever used when no state is present
+        incremental_data_retriever: Retriever used when state exists
+        cursor: Cursor for tracking state
+    """
     full_data_retriever: Retriever
     incremental_data_retriever: Retriever
     cursor: DeclarativeCursor
     _started_with_state: bool = field(init=False, repr=False, default=False)

59-60: Consider handling potential None cursor case.

The state property could be more defensive in its implementation. Wdyt about this approach?

-    return self.cursor.get_stream_state() if self.cursor else {}
+    if not self.cursor:
+        return {}
+    try:
+        return self.cursor.get_stream_state() or {}
+    except Exception as e:
+        logger.warning(f"Failed to get stream state: {e}")
+        return {}
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2221-2242: Consider enhancing field descriptions.

The schema fields could benefit from more detailed descriptions to help users understand their purpose. Wdyt?

     ignore_first_request_options_provider: Optional[bool] = Field(
         False,
-        description="If set to true, slice request options will be ignored when sending requests.",
+        description="If set to true, slice request options will be ignored when sending requests. This is useful when the first request should not include any partition-specific parameters.",
     )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 666c4fa and 8417712.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (11 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (12 hunks)
  • airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py

[warning] 1-1: File would be reformatted. Please ensure it adheres to the formatting standards.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

[warning] 1-1: File would be reformatted. Please ensure it adheres to the formatting standards.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/retrievers/state_delegating_retriever.py (2)

18-21: LGTM! Good use of experimental warning.

The experimental warning is well-placed and clearly communicates the status of this component to users.


51-56:

❓ Verification inconclusive

Verify state-based retriever selection logic.

The retriever selection is based on _started_with_state rather than checking the current state. This could lead to using the wrong retriever if the state changes during execution. Should we consider checking the current state instead? Wdyt?


🏁 Script executed:

#!/bin/bash
# Search for any tests that verify the retriever selection behavior changes correctly when state changes
rg -A 5 "test.*retriever.*selection.*state.*changes" unit_tests/

Length of output: 66


Confirm retriever selection behavior regarding dynamic state changes

I couldn’t locate any tests verifying how the retriever selection responds if the state changes during execution. Since the method bases its decision solely on the _started_with_state flag rather than checking the refreshed current state, this might lead to an outdated selection if the state later changes. Would you consider either updating the logic to query the current state dynamically or, at the very least, adding tests that cover the scenario where the state evolves post-initialization? Wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

202-203: LGTM! Appropriate integration of StateDelegatingRetriever.

The StateDelegatingRetriever is correctly added to the type checks for determining concurrent compatibility.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

1956-1958: LGTM! Good type union definition.

The retriever field properly includes StateDelegatingRetriever in its type union, maintaining type safety.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2207-2224: The implementation looks good! Would you consider enhancing the field documentation?

The class structure follows the established pattern, but the field descriptions could be more detailed to help users understand their purpose better. For example:

  • What scenarios would require ignore_first_request_options_provider to be true?
  • What's the difference between incremental_data_retriever and full_data_retriever in terms of their roles?

wdyt about adding more context to help users choose the appropriate configuration?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

3103-3133: New Component "StateDelegatingRetriever" Implementation

This new component is nicely structured and follows the pattern seen in other retriever definitions. It correctly requires the fields type, incremental_data_retriever, and full_data_retriever and introduces the optional ignore_first_request_options_provider property to control slice request options.

Would you consider expanding the description from "Test state condition retriever." to something more descriptive—perhaps outlining its PoC purpose and intended use cases—to help future maintainers and users? Also, the enum value is currently written as [ StateDelegatingRetriever ] with extra spaces; would you be open to standardizing it (for example, as [ "StateDelegatingRetriever" ] or without extra spaces) to match existing patterns? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a05c391 and f0159de.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (12 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

1946-1952: LGTM! The retriever field type has been extended.

The addition of StateDelegatingRetriever to the Union type is consistent with the existing pattern.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant