Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add IncrementingCountCursor #346

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

artem1205
Copy link
Contributor

@artem1205 artem1205 commented Feb 18, 2025

What

add IncrementingCountCursor

How

Warning

A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor.

Summary by CodeRabbit

  • New Features

    • Introduced support for an incremental synchronization option using an incrementing count mechanism, expanding data extraction capabilities alongside existing datetime-based approaches.
    • Enhanced schema definitions and stream processing configurations to incorporate the new incremental count strategy.
    • Added a new cursor type, IncrementingCountCursor, for improved incremental synchronization.
    • Added a new enumeration value for concurrency-compatible states, allowing for more flexible state representation.
  • Tests

    • Added unit tests to verify the correct conversion between sequential and concurrent state formats for the new synchronization method.
    • Implemented tests for the new IncrementingCountStreamStateConverter to ensure expected functionality.
    • Created tests for the new incremental counting stream and its cursor properties.

@github-actions github-actions bot added the enhancement New feature or request label Feb 18, 2025
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
@artem1205 artem1205 self-assigned this Feb 19, 2025
@artem1205
Copy link
Contributor Author

artem1205 commented Feb 21, 2025

UPD:

found an interesting bug:
concurrentcursor is used only during the read, if we'll run check command on stream with the IncrementingCountCursor we'll face a bug.

we need to implement IncrementingCountCursor to make it pass

Signed-off-by: Artem Inzhyyants <[email protected]>
…g_count_cursor

# Conflicts:
#	airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
@artem1205 artem1205 marked this pull request as ready for review February 24, 2025 17:12
Signed-off-by: Artem Inzhyyants <[email protected]>
Copy link
Contributor

coderabbitai bot commented Feb 24, 2025

📝 Walkthrough

Walkthrough

This pull request introduces a new cursor type, IncrementingCountCursorModel, enhancing the incremental synchronization capabilities within the ConcurrentDeclarativeSource class. The changes include updates to the stream grouping logic to accommodate the new cursor type alongside existing datetime-based cursors. New schema definitions are established in YAML and Python, and a new state converter is implemented to manage incremental counts. Additionally, unit tests are added to validate the functionality of these state conversion methods.

Changes

Files Change Summary
airbyte_cdk/sources/declarative/concurrent_declarative_source.py Updated _group_streams method to support IncrementingCountCursorModel and modified _is_datetime_incremental_without_partition_routing to recognize both datetime-based and incrementing count cursors.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
airbyte_cdk/sources/declarative/models/…/declarative_component_schema.py
Added schema definition and new class IncrementingCountCursor with fields (cursor_field, start_value, start_value_option, parameters), and updated the incremental_sync field in DeclarativeStream to include this new option.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Imported IncrementingCountCursorModel and related state converter; added new methods (create_concurrent_cursor_from_incrementing_count_cursor and static create_incrementing_count_cursor); and updated mapping initialization to handle the new cursor type.
airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py Added a new enum value integer to the ConcurrencyCompatibleStateType class to support state conversions for the new cursor type.
airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py Introduced the IncrementingCountStreamStateConverter class, providing methods for converting and managing stream states for incremental count cursors, including state message conversion, parsing, incrementing, and output formatting.
unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py Added tests for verifying the conversion from sequential state to the concurrent state and vice versa using the IncrementingCountStreamStateConverter.

Sequence Diagram(s)

sequenceDiagram
    participant Caller as Caller
    participant Factory as ModelToComponentFactory
    participant StateManager as ConnectorStateManager
    participant Converter as IncrementingCountStreamStateConverter
    participant Cursor as ConcurrentCursor

    Caller->>Factory: Call create_declarative_stream()
    Factory->>StateManager: Retrieve stream state
    StateManager-->>Factory: Return stream state
    Factory->>Factory: Check incremental_sync type (IncrementingCountCursorModel)
    Factory->>Converter: Initialize IncrementingCountStreamStateConverter
    Converter-->>Factory: Provide state conversion details
    Factory->>Cursor: Create concurrent cursor with parsed parameters
    Cursor-->>Caller: Return constructed ConcurrentCursor
Loading

Possibly related PRs

Suggested reviewers

  • maxi297: Would you like to review the integration of the new cursor type and the corresponding changes in the schema and state conversion logic, wdyt?
  • brianjlai: Could you take a look at the modifications in the concurrent source and the new state converter implementations, wdyt?

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 075ac16 and bc8c583.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (10)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (10)

7-7: Added import for math module

The math module is imported to support the new IncrementingCountCursor functionality, specifically to use math.inf as the end provider value. This is a good addition for handling unbounded incremental counters.


47-49: New import for IncrementingCountStreamStateConverter

Added to support the new incremental counting cursor functionality. This state converter will handle the conversion between sequential and concurrent state formats for the incrementing count cursor.


237-246: New cursor type: incremental_counting_cursor

This defines a new cursor type that uses an incrementing count field ("id") for tracking sync progress. The cursor starts at value 0 and injects the cursor value into the "since_id" request parameter.

A few questions about this implementation:

  1. Does this cursor type handle backward compatibility with existing state formats?
  2. Is there a reason why there's no "end_time_option" equivalent like in the datetime cursor?

Given the PR objectives mention issues when running check with this cursor, should we add a specific test for the check command?


255-261: New base stream definition for incremental counting

This creates a base stream definition that uses the new incremental_counting_cursor. It follows the same pattern as the existing base_incremental_stream which is good for consistency. This will make it easier to create multiple streams that use this cursor type.


551-579: Added incremental_counting_stream definition

This creates a concrete stream implementation using the new cursor type. The schema and other configurations look appropriate. The path is set to "/party_members" which matches the existing party_members stream - is this intentional or should it have its own unique endpoint?

Since this stream shares the same path as the party_members stream but with a different cursor type, how will they behave when used together? Will they potentially fetch the same data but track progress differently?


589-589: Added the new stream to the manifest

The new stream is correctly added to the list of streams in the manifest, making it available for testing. This ensures it will be included in the discover and read operations.


712-714: Updated test_group_streams to include the new stream

The assertions have been updated to account for 8 streams instead of 7 and to check that the 8th stream is the new incremental_counting_stream. This ensures the stream is correctly grouped for concurrent processing.

Also applies to: 723-724, 739-740


813-826: Added testing for the IncrementingCountCursor

Good test coverage for the new cursor type. The test verifies:

  1. The cursor is an instance of ConcurrentCursor
  2. The state converter is the correct type (IncrementingCountStreamStateConverter)
  3. Key properties like stream name, cursor field, and start value are correct
  4. The end provider returns math.inf (infinity)

One observation: unlike the datetime-based cursors, there's no verification of slice boundary fields, slice range, lookback window, or cursor granularity. Are these concepts not applicable to the incrementing count cursor, or should they be tested?

Could you clarify whether slice boundaries and lookback windows are relevant for incrementing count cursors? If they are, consider adding assertions for them.


879-879: Updated expected_stream_names in test_discover

The new stream is correctly added to the expected stream names in the test_discover function, ensuring that it's properly included in the discover command results.


47-826: Regarding the known issue mentioned in PR objectives

The PR objectives mention a known issue: when running a check with the IncrementingCountCursor, it is executed without the ConcurrentCursor, which may lead to unexpected behavior. While the implementation looks solid, I don't see a specific test that addresses this issue.

Consider adding a specific test case that:

  1. Runs the check command with a catalog that only includes the incremental_counting_stream
  2. Verifies the behavior when the stream is executed without the ConcurrentCursor
  3. Potentially implements the solution mentioned in the PR comments about using convert_from_sequential_state

This would help ensure the known issue is properly addressed or at least well-understood through testing.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (14)
airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)

7-7: Remove unused import.

The Callable type is imported but not used in this file. Consider removing it to keep imports clean.

-from typing import TYPE_CHECKING, Any, Callable, List, MutableMapping, Optional, Tuple
+from typing import TYPE_CHECKING, Any, List, MutableMapping, Optional, Tuple
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

420-426: Consider implementing proper type checking.

The comment suggests that IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor. Would you like to implement proper type checking using the commented code to make this more explicit and type-safe? This could help prevent potential issues in the future.

unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py (2)

7-18: Consider adding more test cases.

The test covers the happy path, but would you like to add tests for:

  1. Edge cases (e.g., zero values, negative values)
  2. Error scenarios (e.g., invalid state format)
  3. Different initial state values

20-29: Consider testing state type validation.

The test verifies the conversion but doesn't check if the converter properly validates the state type. Would you like to add a test case that ensures invalid state types are rejected? wdyt?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

780-817: IncrementingCountCursor Schema Definition Review

This new block defines the IncrementingCountCursor very clearly with all the essential fields:

  • type correctly constrained to IncrementingCountCursor.
  • cursor_field is well documented with guidance on maintaining top-level placement.
  • start_value allows both string and integer types, which seems flexible.
  • start_value_option reuses the existing RequestOption definition, ensuring consistency.

A couple of friendly questions to consider:

  1. Since one of the PR objectives mentions that when running a check with IncrementingCountCursor the operation is executed without a ConcurrentCursor, would it be useful to include any additional metadata or hints in this schema (or via documentation) to signal such limitations?
  2. Would adding an example usage or a note about potential behaviors during check operations benefit future developers who interact with this schema?

Wdyt?

airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py (6)

25-63: The state conversion logic looks good, but could use more error handling.

The implementation handles both compatible and incompatible states, but might benefit from additional validation.

Consider adding validation for:

  1. Type checking of start parameter
  2. Ensuring cursor_field exists in stream_state
  3. Validating state structure before conversion

What do you think about adding these validations? wdyt?


77-93: The sync start calculation looks good but could use more documentation.

The implementation handles edge cases well, but the logic could be clearer with additional comments.

Consider adding docstring explaining:

  1. The purpose of prev_sync_low_water_mark
  2. Why we compare with sync_start
  3. The implications of returning different values

Would you like me to suggest a docstring format? wdyt?


15-20: Add return type hints for clarity?

The implementation is correct, but would you consider adding return type hints to make the interface clearer? Something like:

-    def _from_state_message(self, value: Any) -> Any:
+    def _from_state_message(self, value: Any) -> int:
         return value

-    def _to_state_message(self, value: Any) -> Any:
+    def _to_state_message(self, value: Any) -> int:
         return value

77-93: Consider validating for negative values?

The sync start calculation looks good! Would you consider adding validation to ensure we don't get negative values? Something like:

     def _get_sync_start(
         self,
         cursor_field: CursorField,
         stream_state: MutableMapping[str, Any],
         start: Optional[int],
     ) -> int:
         sync_start = start if start is not None else self.zero_value
+        if sync_start < 0:
+            raise ValueError(f"Negative values are not allowed for sync_start: {sync_start}")
         prev_sync_low_water_mark: Optional[int] = (
             stream_state[cursor_field.cursor_field_key]
             if cursor_field.cursor_field_key in stream_state
             else None
         )
         if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start:
+            if prev_sync_low_water_mark < 0:
+                raise ValueError(f"Negative values are not allowed for prev_sync_low_water_mark: {prev_sync_low_water_mark}")
             return prev_sync_low_water_mark
         else:
             return sync_start

25-63: The state conversion logic looks solid, but could use more documentation.

The implementation correctly handles both compatible and incompatible states, creating appropriate slices for prior syncs. However, would it be helpful to add more examples in the docstring showing both cases? For instance, showing what an incompatible state looks like before and after conversion, wdyt?


77-93: The sync start calculation looks good but could benefit from validation.

The logic correctly handles previous state and start values. However, should we add validation to ensure the cursor values are non-negative integers? This could prevent potential issues with negative counts, wdyt?

 def _get_sync_start(
     self,
     cursor_field: CursorField,
     stream_state: MutableMapping[str, Any],
     start: Optional[int],
 ) -> int:
     sync_start = start if start is not None else self.zero_value
+    if sync_start < 0:
+        raise ValueError("Cursor values must be non-negative integers")
     prev_sync_low_water_mark: Optional[int] = (
         stream_state[cursor_field.cursor_field_key]
         if cursor_field.cursor_field_key in stream_state
         else None
     )
+    if prev_sync_low_water_mark is not None and prev_sync_low_water_mark < 0:
+        raise ValueError("Previous cursor value must be a non-negative integer")
     if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start:
         return prev_sync_low_water_mark
     else:
         return sync_start
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

593-593: Consider raising an explicit error for runtime usage
Currently, create_incrementing_count_cursor is mapped here, but the docstring says there’s no runtime usage. Would you like to raise an exception if it’s unexpectedly called, to avoid confusion? Wdyt?


1696-1721: RequestOptionsProvider usage
The code reuses DatetimeBasedRequestOptionsProvider with an incrementing count model. Would you consider introducing a specialized provider for numeric increments to reduce confusion? Wdyt?

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

1511-1531: LGTM! The IncrementingCountCursor class is well-structured.

The class follows best practices with:

  • Clear field definitions with descriptive help text
  • Proper type hints and validation
  • Flexible configuration options for start values

One suggestion though - would it be helpful to add an example in the docstring showing how to use this cursor with a real-world scenario? For instance, showing how to configure it for paginated APIs that use record counts, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 665dc1f and 710698e.

📒 Files selected for processing (7)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6 hunks)
  • airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1 hunks)
  • airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py (1 hunks)
  • unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py (1 hunks)
👮 Files not reviewed due to content moderation or server errors (2)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
  • airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (12)
airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)

14-15: LGTM! Added integer state type.

The addition of the integer state type to ConcurrencyCompatibleStateType is well-aligned with supporting the new IncrementingCountCursor feature.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)

1511-1531: LGTM! The IncrementingCountCursor class is well-structured.

The class follows the established patterns and includes comprehensive field descriptions. The cursor field is properly marked as required, and the optional fields have sensible defaults.

A few suggestions to consider:

  1. Would it make sense to add more examples for the start_value field to demonstrate both integer and string use cases? wdyt?
  2. Should we add validation to ensure start_value is a non-negative number when provided? wdyt?

1973-1979: LGTM! The incremental_sync field is properly updated.

The integration of IncrementingCountCursor into the DeclarativeStream class is clean and maintains backward compatibility.


1973-1979: LGTM! The integration with DeclarativeStream looks good.

The IncrementingCountCursor is properly added as an option in the incremental_sync field alongside existing cursor types.

airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py (5)

14-24: LGTM! The base implementation is clean and follows best practices.

The class properly extends AbstractStreamStateConverter and implements the required methods. The infinity provider is a nice touch for handling unbounded ranges.


64-76: LGTM! The value manipulation methods are simple and effective.

The implementation of parse_value, zero_value, increment, and output_format is straightforward and follows the expected behavior for integer-based cursors.


25-63: LGTM! The state conversion logic is robust.

The implementation correctly handles both compatible and incompatible state messages. The slices creation for representing prior syncs is a nice touch!


14-24: LGTM! The base class implementation looks good.

The class correctly implements the required methods from AbstractStreamStateConverter with proper type hints. The infinity end provider is a nice touch for unbounded streams.


64-76: LGTM! The cursor manipulation methods are clean and straightforward.

Simple and effective implementations for parsing, incrementing, and formatting integer values.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

248-250: Imports look good
These new imports for the IncrementingCountCursorModel appear correct and necessary. All good here!


503-504: Confirmed the converter import
Nice addition of IncrementingCountStreamStateConverter; it cleanly ties into your concurrency approach.


1199-1262: Potential type mismatch for startValue
When you set start=interpolated_start_value, the interpolation could yield a string. Would you like to parse it into an integer before passing it along, to avoid any off-by-one or type errors? Wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2210-2225: 🛠️ Refactor suggestion

Consider raising NotImplementedError in create_incrementing_count_cursor.

Since create_incrementing_count_cursor returns a DatetimeBasedCursor and is not intended for runtime use, would you consider raising NotImplementedError to prevent unexpected behavior if the code is ever called at runtime? Wdyt?

 @staticmethod
 def create_incrementing_count_cursor(
     model: IncrementingCountCursorModel, config: Config, **kwargs: Any
 ) -> DatetimeBasedCursor:
-    # This should not actually get used anywhere at runtime, but needed to add this to pass checks since
-    # we still parse models into components. The issue is that there's no runtime implementation of a
-    # IncrementingCountCursor.
-    # A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor.
-    return DatetimeBasedCursor(
-        cursor_field=model.cursor_field,
-        datetime_format="%Y-%m-%d",
-        start_datetime="2024-12-12",
-        config=config,
-        parameters={},
-    )
+    raise NotImplementedError(
+        "IncrementingCountCursor is only supported with ConcurrentCursor. "
+        "This method exists only for model parsing and should not be called at runtime."
+    )
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1199-1262: Consider adding error handling for invalid cursor field.

The create_concurrent_cursor_from_incrementing_count_cursor implementation looks good, but would you consider adding validation for the cursor field to ensure it exists in the stream's schema? This could help catch configuration errors early. Wdyt?

 def create_concurrent_cursor_from_incrementing_count_cursor(
     self,
     model_type: Type[BaseModel],
     component_definition: ComponentDefinition,
     stream_name: str,
     stream_namespace: Optional[str],
     config: Config,
     message_repository: Optional[MessageRepository] = None,
     **kwargs: Any,
 ) -> ConcurrentCursor:
     stream_state = (
         self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
         if "stream_state" not in kwargs
         else kwargs["stream_state"]
     )

     component_type = component_definition.get("type")
     if component_definition.get("type") != model_type.__name__:
         raise ValueError(
             f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead"
         )

     incrementing_count_cursor_model = model_type.parse_obj(component_definition)

     if not isinstance(incrementing_count_cursor_model, IncrementingCountCursorModel):
         raise ValueError(
             f"Expected {model_type.__name__} component, but received {incrementing_count_cursor_model.__class__.__name__}"
         )

+    if not incrementing_count_cursor_model.cursor_field:
+        raise ValueError("cursor_field must be specified for IncrementingCountCursor")
+
     interpolated_start_value = (
         InterpolatedString.create(
             incrementing_count_cursor_model.start_value,  # type: ignore
             parameters=incrementing_count_cursor_model.parameters or {},
         )
         if incrementing_count_cursor_model.start_value
         else 0
     )

1696-1721: Consider adding validation for start_value_option.

The create_declarative_stream method's handling of IncrementingCountCursorModel looks good, but would you consider adding validation to ensure start_value_option is valid when provided? This could help catch configuration errors early. Wdyt?

 elif model.incremental_sync and isinstance(
     model.incremental_sync, IncrementingCountCursorModel
 ):
     cursor_model: IncrementingCountCursorModel = model.incremental_sync  # type: ignore

+    if cursor_model.start_value_option and not isinstance(cursor_model.start_value_option, RequestOptionModel):
+        raise ValueError("start_value_option must be a RequestOption when provided")
+
     start_time_option = (
         self._create_component_from_model(
             cursor_model.start_value_option,  # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor
             config,
             parameters=cursor_model.parameters or {},
         )
         if cursor_model.start_value_option  # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor
         else None
     )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 710698e and 3b25d18.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

248-250: LGTM! Import of new cursor model.

The import of IncrementingCountCursor from the declarative component schema is correctly added.

@artem1205 artem1205 requested a review from maxi297 February 24, 2025 18:48
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have tests that ensures that a manifest model creates the expected behavior?

# the default DatetimeBasedRequestOptionsProvider() sets them to start_time/end_time
partition_field_start = "start"

request_options_provider = DatetimeBasedRequestOptionsProvider(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming here is weird. Should we generify the DatetimeBasedRequestOptionsProvider to be a StartEndRequestOptionsProvider or something like that? Would this allow us to have only one method for building both DatetimeBasedCursor and IncrementingCountCursor? The comments above refer to DatetimeBasedCursor which makes me think there is duplication

slices = [
{
self.START_KEY: start if start is not None else sync_start,
self.END_KEY: sync_start, # this may not be relevant anymore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't it be relevant anymore? Can we just remove it if it's the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is used only in ConcurrentCursor._get_concurrent_state

def _get_concurrent_state(
self, state: MutableMapping[str, Any]
) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
if self._connector_state_converter.is_state_message_compatible(state):
return (
self._start or self._connector_state_converter.zero_value,
self._connector_state_converter.deserialize(state),
)
return self._connector_state_converter.convert_from_sequential_state(
self._cursor_field, state, self._start
)

and, since we don't have any "old-style" states (is_state_message_compatible -> True) we may never use convert_from_sequential_state for this cursor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might use the convert_from_sequential_state if we can find a way to have slice_range. For now, I'm fine with this.

Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

780-817: The schema definition for IncrementingCountCursor looks well-structured.

This is a good addition that follows the same design pattern as DatetimeBasedCursor. The cursor_field, start_value and other properties are well-defined with appropriate descriptions and examples.

However, I notice that the start_value property accepts both string and integer. Since this is for an incrementing count, should we maybe restrict it to integers only? Or at least document when a string might be appropriate? wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

421-425: Informative comments about the current implementation state.

The comments provide helpful context about why there's no isinstance check for IncrementingCountCursor. It clarifies that it's currently hardcoded as DatetimeBasedCursor, which helps future maintainers understand the implementation.

Would it be valuable to add a TODO comment with a ticket reference to track the future work of creating a proper Declarative IncrementingCountCursor? This would make it easier to find and address this technical debt later. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b25d18 and cf170d5.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

1359-1359: Properly registered the new cursor in DeclarativeStream's incremental_sync property.

Good job adding the reference to the new IncrementingCountCursor in the list of available incremental_sync options.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)

34-36: Added import for the new IncrementingCountCursorModel.

Clean import addition that follows the existing pattern of importing model classes.


228-229: Method name change better reflects its expanded purpose.

Good renaming from _is_datetime_incremental_without_partition_routing to _is_concurrent_cursor_incremental_without_partition_routing to accurately reflect that it now handles both datetime-based and incrementing count cursors.

Also applies to: 406-407


414-417: Updated condition now includes both cursor types.

Good update to make the condition more inclusive, accepting both DatetimeBasedCursorModel and IncrementingCountCursorModel.


260-272:

❓ Verification inconclusive

Proper handling of IncrementingCountCursor type.

The new conditional branch appropriately detects and processes the IncrementingCountCursorModel type by calling the appropriate factory method. This looks good, but I noticed the PR objectives mention a known issue where "when running a check with the declared IncrementingCountCursor, it is executed without the ConcurrentCursor."

Does this implementation address that issue? If not, should we add some validation to ensure ConcurrentCursor is always used with IncrementingCountCursor? wdyt?


🏁 Script executed:

#!/bin/bash
# Look for validation related to ConcurrentCursor usage with IncrementingCountCursor

# Check if there's any validation in model_to_component_factory.py
rg -A 5 "create_concurrent_cursor_from_incrementing_count_cursor" --type py

Length of output: 1662


ConcurrentCursor validation suggestion

Hi there, I can confirm that the code now correctly routes the IncrementingCountCursorModel to the dedicated factory method via the call to create_concurrent_cursor_from_incrementing_count_cursor. However, I didn’t find any extra validation ensuring that the returned cursor is indeed a ConcurrentCursor—a point hinted at by the known issue note. Would it make sense to include an explicit check or validation (for example, an assertion inside the factory method or right after its call) to guarantee that a ConcurrentCursor is always used when dealing with an IncrementingCountCursorModel? wdyt?

@artem1205 artem1205 requested a review from maxi297 February 26, 2025 18:25
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have tests that ensures that a manifest model creates the expected behavior?

cursor_field = CursorField(interpolated_cursor_field.eval(config=config))

connector_state_converter = IncrementingCountStreamStateConverter(
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not concurrent per partition though. I think the real reason to keep it as sequential state is that since the slice_range is never defined for IncrementalCountCursor, there is not value in having partitioned state

slices = [
{
self.START_KEY: start if start is not None else sync_start,
self.END_KEY: sync_start, # this may not be relevant anymore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might use the convert_from_sequential_state if we can find a way to have slice_range. For now, I'm fine with this.

Copy link
Contributor

@lmossman lmossman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had one small suggestion on adjusting the description of a field, otherwise the YAML schema changes make sense to me!

Signed-off-by: Artem Inzhyyants <[email protected]>
@artem1205
Copy link
Contributor Author

artem1205 commented Feb 27, 2025

I think we might use the convert_from_sequential_state if we can find a way to have slice_range. For now, I'm fine with this.

This is interesting. I see only 1 way to get this :: if our API can provide the latest record, e.g. make a request with

limit=1
ORDER BY DESC

so we can get start_index and end_index and then slice it equally.

but i have no idea how to implement it.
P.S. Zendesk Chat does not have such option.

Signed-off-by: Artem Inzhyyants <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

780-817: Review on the new IncrementingCountCursor schema

You've introduced the new IncrementingCountCursor definition, which looks well structured and in line with our schema design. The properties are clear and consistent with similar cursor definitions:

  • cursor_field: The description is comprehensive, explaining that the field must be at the top level and that records are expected in ascending order. This clarity is great for users.
  • start_value: Allowing both string and integer types increases flexibility. Is this dual-type intentionally designed to support cases where the start value might come as a numeric or a templated string? Just checking if that meets the broader use cases.
  • start_value_option: The reference to #/definitions/RequestOption follows our established pattern, which is excellent.

One thought—since the PR objectives mention a known issue where the IncrementingCountCursor is used during a check without the ConcurrentCursor, would it be beneficial to include a short note in the description of this schema to warn users of potential unexpected behavior during check operations? wdyt?

Overall, the changes are clear and consistent.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cf170d5 and 263e662.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Analyze (python)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (7)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (7)

7-7: New imports for the IncrementingCountCursor implementation.

The import of math is likely needed for math.inf used in line 822, and the IncrementingCountStreamStateConverter is required for the new cursor implementation. These imports align well with the functionality being added.

Also applies to: 47-49


237-246: New incremental_counting_cursor definition.

The definition appropriately configures:

  • Cursor type as IncrementingCountCursor
  • Cursor field set to "id"
  • Start value of 0
  • Request parameter for "since_id"

Looks good and aligns with incremental ID-based synchronization patterns.


255-261: New base_incremental_counting_stream definition.

This creates a proper base stream that leverages the new counting cursor for incremental sync.


551-579: Implementation of incremental_counting_stream.

The stream properly extends the base stream and defines appropriate schema. One potential concern: this stream uses the same path ("/party_members") as the existing party_members_stream. Is this intentional or could it lead to confusion since they're different streams fetching from the same endpoint? wdyt?


589-589: Added the new stream to the manifest's streams list.

Good - ensures the stream is available for testing.


810-823: Test assertions for the new IncrementingCountCursor.

These assertions verify the core properties of the cursor:

  • Proper state converter type (IncrementingCountStreamStateConverter)
  • Correct stream name, cursor field, and start value
  • End provider correctly set to infinity

While these tests verify the cursor is correctly created, they don't specifically test the functionality during actual read operations. Would it make sense to add a test case that verifies the cursor behavior during a sync operation (similar to the date-based tests)? wdyt?


810-823: Address the known issue mentioned in PR description.

Per the PR objectives, there's a known issue when running a check with the declared IncrementingCountCursor - it's executed without the ConcurrentCursor, leading to unexpected behavior. This code implements the cursor but doesn't address the check-related issue.

Could we add a targeted test for this known issue? Or is there a plan to address it in a follow-up PR?

Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
@artem1205 artem1205 requested a review from maxi297 February 28, 2025 10:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants