diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index aa3cea705..7de937782 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -3,7 +3,7 @@ # import logging -from typing import Any, Callable, Generic, Iterator, List, Mapping, Optional, Tuple, Union +from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple from airbyte_cdk.models import ( AirbyteCatalog, @@ -28,15 +28,11 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DatetimeBasedCursor as DatetimeBasedCursorModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - DeclarativeStream as DeclarativeStreamModel, -) from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( - ComponentDefinition, ModelToComponentFactory, ) from airbyte_cdk.sources.declarative.requesters import HttpRequester -from airbyte_cdk.sources.declarative.retrievers import Retriever, SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartitionFactory, StreamSlicerPartitionGenerator, @@ -52,7 +48,6 @@ from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream -from airbyte_cdk.sources.types import Config, StreamState class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): @@ -194,10 +189,11 @@ def _group_streams( # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, # so we need to treat them as synchronous - if ( - isinstance(declarative_stream, DeclarativeStream) - and name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] + if isinstance(declarative_stream, DeclarativeStream) and ( + name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] == "SimpleRetriever" + or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] + == "AsyncRetriever" ): incremental_sync_component_definition = name_to_stream_mapping[ declarative_stream.name @@ -234,15 +230,27 @@ def _group_streams( stream_state=stream_state, ) + retriever = declarative_stream.retriever + + # This is an optimization so that we don't invoke any cursor or state management flows within the + # low-code framework because state management is handled through the ConcurrentCursor. + if declarative_stream and isinstance(retriever, SimpleRetriever): + # Also a temporary hack. In the legacy Stream implementation, as part of the read, + # set_initial_state() is called to instantiate incoming state on the cursor. Although we no + # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components + # like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator + # still rely on a DatetimeBasedCursor that is properly initialized with state. + if retriever.cursor: + retriever.cursor.set_initial_state(stream_state=stream_state) + # We zero it out here, but since this is a cursor reference, the state is still properly + # instantiated for the other components that reference it + retriever.cursor = None + partition_generator = StreamSlicerPartitionGenerator( DeclarativePartitionFactory( declarative_stream.name, declarative_stream.get_json_schema(), - self._retriever_factory( - name_to_stream_mapping[declarative_stream.name], - config, - stream_state, - ), + retriever, self.message_repository, ), cursor, @@ -272,11 +280,7 @@ def _group_streams( DeclarativePartitionFactory( declarative_stream.name, declarative_stream.get_json_schema(), - self._retriever_factory( - name_to_stream_mapping[declarative_stream.name], - config, - {}, - ), + declarative_stream.retriever, self.message_repository, ), declarative_stream.retriever.stream_slicer, @@ -415,34 +419,3 @@ def _remove_concurrent_streams_from_catalog( if stream.stream.name not in concurrent_stream_names ] ) - - def _retriever_factory( - self, stream_config: ComponentDefinition, source_config: Config, stream_state: StreamState - ) -> Callable[[], Retriever]: - def _factory_method() -> Retriever: - declarative_stream: DeclarativeStream = self._constructor.create_component( - DeclarativeStreamModel, - stream_config, - source_config, - emit_connector_builder_messages=self._emit_connector_builder_messages, - ) - - # This is an optimization so that we don't invoke any cursor or state management flows within the - # low-code framework because state management is handled through the ConcurrentCursor. - if ( - declarative_stream - and declarative_stream.retriever - and isinstance(declarative_stream.retriever, SimpleRetriever) - ): - # Also a temporary hack. In the legacy Stream implementation, as part of the read, set_initial_state() is - # called to instantiate incoming state on the cursor. Although we no longer rely on the legacy low-code cursor - # for concurrent checkpointing, low-code components like StopConditionPaginationStrategyDecorator and - # ClientSideIncrementalRecordFilterDecorator still rely on a DatetimeBasedCursor that is properly initialized - # with state. - if declarative_stream.retriever.cursor: - declarative_stream.retriever.cursor.set_initial_state(stream_state=stream_state) - declarative_stream.retriever.cursor = None - - return declarative_stream.retriever - - return _factory_method diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py b/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py index e9476447a..59255c75b 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py @@ -112,27 +112,39 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: ) if isinstance(self.url_base, str): self.url_base = InterpolatedString(string=self.url_base, parameters=parameters) - self._token: Optional[Any] = self.pagination_strategy.initial_token + + def get_initial_token(self) -> Optional[Any]: + """ + Return the page token that should be used for the first request of a stream + + WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing + of state using page numbers. Because paginators are stateless + """ + return self.pagination_strategy.initial_token def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any] = None, ) -> Optional[Mapping[str, Any]]: - self._token = self.pagination_strategy.next_page_token( - response, last_page_size, last_record + next_page_token = self.pagination_strategy.next_page_token( + response=response, + last_page_size=last_page_size, + last_record=last_record, + last_page_token_value=last_page_token_value, ) - if self._token: - return {"next_page_token": self._token} + if next_page_token: + return {"next_page_token": next_page_token} else: return None - def path(self) -> Optional[str]: - if ( - self._token - and self.page_token_option - and isinstance(self.page_token_option, RequestPath) - ): + def path(self, next_page_token: Optional[Mapping[str, Any]]) -> Optional[str]: + token = next_page_token.get("next_page_token") if next_page_token else None + if token and self.page_token_option and isinstance(self.page_token_option, RequestPath): # Replace url base to only return the path - return str(self._token).replace(self.url_base.eval(self.config), "") # type: ignore # url_base is casted to a InterpolatedString in __post_init__ + return str(token).replace(self.url_base.eval(self.config), "") # type: ignore # url_base is casted to a InterpolatedString in __post_init__ else: return None @@ -143,7 +155,7 @@ def get_request_params( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: - return self._get_request_options(RequestOptionType.request_parameter) + return self._get_request_options(RequestOptionType.request_parameter, next_page_token) def get_request_headers( self, @@ -152,7 +164,7 @@ def get_request_headers( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, str]: - return self._get_request_options(RequestOptionType.header) + return self._get_request_options(RequestOptionType.header, next_page_token) def get_request_body_data( self, @@ -161,7 +173,7 @@ def get_request_body_data( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.body_data) + return self._get_request_options(RequestOptionType.body_data, next_page_token) def get_request_body_json( self, @@ -170,25 +182,21 @@ def get_request_body_json( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.body_json) - - def reset(self, reset_value: Optional[Any] = None) -> None: - if reset_value: - self.pagination_strategy.reset(reset_value=reset_value) - else: - self.pagination_strategy.reset() - self._token = self.pagination_strategy.initial_token + return self._get_request_options(RequestOptionType.body_json, next_page_token) - def _get_request_options(self, option_type: RequestOptionType) -> MutableMapping[str, Any]: + def _get_request_options( + self, option_type: RequestOptionType, next_page_token: Optional[Mapping[str, Any]] + ) -> MutableMapping[str, Any]: options = {} + token = next_page_token.get("next_page_token") if next_page_token else None if ( self.page_token_option - and self._token is not None + and token is not None and isinstance(self.page_token_option, RequestOption) and self.page_token_option.inject_into == option_type ): - options[self.page_token_option.field_name.eval(config=self.config)] = self._token # type: ignore # field_name is always cast to an interpolated string + options[self.page_token_option.field_name.eval(config=self.config)] = token # type: ignore # field_name is always cast to an interpolated string if ( self.page_size_option and self.pagination_strategy.get_page_size() @@ -204,6 +212,9 @@ class PaginatorTestReadDecorator(Paginator): """ In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of pages that are queried throughout a read command. + + WARNING: This decorator is not currently thread-safe like the rest of the low-code framework because it has + an internal state to track the current number of pages counted so that it can exit early during a test read """ _PAGE_COUNT_BEFORE_FIRST_NEXT_CALL = 1 @@ -217,17 +228,27 @@ def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> No self._decorated = decorated self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL + def get_initial_token(self) -> Optional[Any]: + self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL + return self._decorated.get_initial_token() + def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any] = None, ) -> Optional[Mapping[str, Any]]: if self._page_count >= self._maximum_number_of_pages: return None self._page_count += 1 - return self._decorated.next_page_token(response, last_page_size, last_record) + return self._decorated.next_page_token( + response, last_page_size, last_record, last_page_token_value + ) - def path(self) -> Optional[str]: - return self._decorated.path() + def path(self, next_page_token: Optional[Mapping[str, Any]]) -> Optional[str]: + return self._decorated.path(next_page_token) def get_request_params( self, @@ -272,7 +293,3 @@ def get_request_body_json( return self._decorated.get_request_body_json( stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token ) - - def reset(self, reset_value: Optional[Any] = None) -> None: - self._decorated.reset() - self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index cb0592793..7de91f5e9 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -19,7 +19,7 @@ class NoPagination(Paginator): parameters: InitVar[Mapping[str, Any]] - def path(self) -> Optional[str]: + def path(self, next_page_token: Optional[Mapping[str, Any]]) -> Optional[str]: return None def get_request_params( @@ -58,11 +58,14 @@ def get_request_body_json( ) -> Mapping[str, Any]: return {} + def get_initial_token(self) -> Optional[Any]: + return None + def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] - ) -> Mapping[str, Any]: + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any], + ) -> Optional[Mapping[str, Any]]: return {} - - def reset(self, reset_value: Optional[Any] = None) -> None: - # No state to reset - pass diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index d47124628..8b1fea69b 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -24,14 +24,18 @@ class Paginator(ABC, RequestOptionsProvider): """ @abstractmethod - def reset(self, reset_value: Optional[Any] = None) -> None: + def get_initial_token(self) -> Optional[Any]: """ - Reset the pagination's inner state + Get the page token that should be included in the request to get the first page of records """ @abstractmethod def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any], ) -> Optional[Mapping[str, Any]]: """ Returns the next_page_token to use to fetch the next page of records. @@ -39,12 +43,13 @@ def next_page_token( :param response: the response to process :param last_page_size: the number of records read from the response :param last_record: the last record extracted from the response + :param last_page_token_value: The current value of the page token made on the last request :return: A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response. """ pass @abstractmethod - def path(self) -> Optional[str]: + def path(self, next_page_token: Optional[Mapping[str, Any]]) -> Optional[str]: """ Returns the URL path to hit to fetch the next page of records diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index beebf9e83..e35c84c7c 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -43,7 +43,6 @@ class CursorPaginationStrategy(PaginationStrategy): ) def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self._initial_cursor = None if isinstance(self.cursor_value, str): self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters) else: @@ -57,10 +56,19 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: @property def initial_token(self) -> Optional[Any]: - return self._initial_cursor + """ + CursorPaginationStrategy does not have an initial value because the next cursor is typically included + in the response of the first request. For Resumable Full Refresh streams that checkpoint the page + cursor, the next cursor should be read from the state or stream slice object. + """ + return None def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any] = None, ) -> Optional[Any]: decoded_response = next(self.decoder.decode(response)) @@ -87,8 +95,5 @@ def next_page_token( ) return token if token else None - def reset(self, reset_value: Optional[Any] = None) -> None: - self._initial_cursor = reset_value - def get_page_size(self) -> Optional[int]: return self.page_size diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py index 7b17c81b4..512d8143c 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py @@ -52,7 +52,6 @@ class OffsetIncrement(PaginationStrategy): inject_on_first_request: bool = False def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self._offset = 0 page_size = str(self.page_size) if isinstance(self.page_size, int) else self.page_size if page_size: self._page_size: Optional[InterpolatedString] = InterpolatedString( @@ -64,11 +63,15 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: @property def initial_token(self) -> Optional[Any]: if self.inject_on_first_request: - return self._offset + return 0 return None def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any] = None, ) -> Optional[Any]: decoded_response = next(self.decoder.decode(response)) @@ -78,19 +81,16 @@ def next_page_token( and last_page_size < self._page_size.eval(self.config, response=decoded_response) ) or last_page_size == 0: return None - else: - self._offset += last_page_size - return self._offset - - def reset(self, reset_value: Optional[Any] = 0) -> None: - if reset_value is None: - self._offset = 0 - elif not isinstance(reset_value, int): + elif last_page_token_value is None: + # If the OffsetIncrement strategy does not inject on the first request, the incoming last_page_token_value + # will be None. For this case, we assume that None was the first page and progress to the next offset + return 0 + last_page_size + elif not isinstance(last_page_token_value, int): raise ValueError( - f"Reset value {reset_value} for OffsetIncrement pagination strategy was not an integer" + f"Last page token value {last_page_token_value} for OffsetIncrement pagination strategy was not an integer" ) else: - self._offset = reset_value + return last_page_token_value + last_page_size def get_page_size(self) -> Optional[int]: if self._page_size: diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py index 2227fffec..2e1643b56 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py @@ -31,7 +31,6 @@ class PageIncrement(PaginationStrategy): inject_on_first_request: bool = False def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self._page = self.start_from_page if isinstance(self.page_size, int) or (self.page_size is None): self._page_size = self.page_size else: @@ -43,28 +42,30 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: @property def initial_token(self) -> Optional[Any]: if self.inject_on_first_request: - return self._page + return self.start_from_page return None def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any], ) -> Optional[Any]: # Stop paginating when there are fewer records than the page size or the current page has no records if (self._page_size and last_page_size < self._page_size) or last_page_size == 0: return None - else: - self._page += 1 - return self._page - - def reset(self, reset_value: Optional[Any] = None) -> None: - if reset_value is None: - self._page = self.start_from_page - elif not isinstance(reset_value, int): + elif last_page_token_value is None: + # If the PageIncrement strategy does not inject on the first request, the incoming last_page_token_value + # may be None. When this is the case, we assume we've already requested the first page specified by + # start_from_page and must now get the next page + return self.start_from_page + 1 + elif not isinstance(last_page_token_value, int): raise ValueError( - f"Reset value {reset_value} for PageIncrement pagination strategy was not an integer" + f"Last page token value {last_page_token_value} for PageIncrement pagination strategy was not an integer" ) else: - self._page = reset_value + return last_page_token_value + 1 def get_page_size(self) -> Optional[int]: return self._page_size diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py index a55dcb131..dae02ba13 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py @@ -26,22 +26,21 @@ def initial_token(self) -> Optional[Any]: @abstractmethod def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any], ) -> Optional[Any]: """ :param response: response to process :param last_page_size: the number of records read from the response :param last_record: the last record extracted from the response + :param last_page_token_value: The current value of the page token made on the last request :return: next page token. Returns None if there are no more pages to fetch """ pass - @abstractmethod - def reset(self, reset_value: Optional[Any] = None) -> None: - """ - Reset the pagination's inner state - """ - @abstractmethod def get_page_size(self) -> Optional[int]: """ diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py index a3c977f18..7c89ba552 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py @@ -44,19 +44,19 @@ def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStop self._stop_condition = stop_condition def next_page_token( - self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any] = None, ) -> Optional[Any]: - # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure will return records in - # descending order. In terms of performance/memory, we return the records lazily + # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure + # will return records in descending order. In terms of performance/memory, we return the records lazily if last_record and self._stop_condition.is_met(last_record): return None - return self._delegate.next_page_token(response, last_page_size, last_record) - - def reset(self, reset_value: Optional[Any] = None) -> None: - if reset_value: - self._delegate.reset(reset_value) - else: - self._delegate.reset() + return self._delegate.next_page_token( + response, last_page_size, last_record, last_page_token_value + ) def get_page_size(self) -> Optional[int]: return self._delegate.get_page_size() diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 7a44d4ea4..1b8860289 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -7,10 +7,7 @@ from typing_extensions import deprecated from airbyte_cdk.models import FailureType -from airbyte_cdk.sources.declarative.async_job.job_orchestrator import ( - AsyncJobOrchestrator, - AsyncPartition, -) +from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncPartition from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( AsyncJobPartitionRouter, diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index cc7040595..d167a84bc 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -6,18 +6,7 @@ from dataclasses import InitVar, dataclass, field from functools import partial from itertools import islice -from typing import ( - Any, - Callable, - Iterable, - List, - Mapping, - MutableMapping, - Optional, - Set, - Tuple, - Union, -) +from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union import requests @@ -90,9 +79,6 @@ class SimpleRetriever(Retriever): def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._paginator = self.paginator or NoPagination(parameters=parameters) - self._last_response: Optional[requests.Response] = None - self._last_page_size: int = 0 - self._last_record: Optional[Record] = None self._parameters = parameters self._name = ( InterpolatedString(self._name, parameters=parameters) @@ -100,10 +86,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: else self._name ) - # This mapping is used during a resumable full refresh syncs to indicate whether a partition has started syncing - # records. Partitions serve as the key and map to True if they already began processing records - self._partition_started: MutableMapping[Any, bool] = dict() - @property # type: ignore def name(self) -> str: """ @@ -251,17 +233,13 @@ def _request_body_json( raise ValueError("Request body json cannot be a string") return body_json - def _paginator_path( - self, - ) -> Optional[str]: + def _paginator_path(self, next_page_token: Optional[Mapping[str, Any]] = None) -> Optional[str]: """ If the paginator points to a path, follow it, else return nothing so the requester is used. - :param stream_state: - :param stream_slice: :param next_page_token: :return: """ - return self._paginator.path() + return self._paginator.path(next_page_token=next_page_token) def _parse_response( self, @@ -272,22 +250,15 @@ def _parse_response( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Iterable[Record]: if not response: - self._last_response = None yield from [] else: - self._last_response = response - record_generator = self.record_selector.select_records( + yield from self.record_selector.select_records( response=response, stream_state=stream_state, records_schema=records_schema, stream_slice=stream_slice, next_page_token=next_page_token, ) - self._last_page_size = 0 - for record in record_generator: - self._last_page_size += 1 - self._last_record = record - yield record @property # type: ignore def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: @@ -299,7 +270,13 @@ def primary_key(self, value: str) -> None: if not isinstance(value, property): self._primary_key = value - def _next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + def _next_page_token( + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any], + ) -> Optional[Mapping[str, Any]]: """ Specifies a pagination strategy. @@ -307,7 +284,12 @@ def _next_page_token(self, response: requests.Response) -> Optional[Mapping[str, :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. """ - return self._paginator.next_page_token(response, self._last_page_size, self._last_record) + return self._paginator.next_page_token( + response=response, + last_page_size=last_page_size, + last_record=last_record, + last_page_token_value=last_page_token_value, + ) def _fetch_next_page( self, @@ -316,7 +298,7 @@ def _fetch_next_page( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[requests.Response]: return self.requester.send_request( - path=self._paginator_path(), + path=self._paginator_path(next_page_token=next_page_token), stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, @@ -345,20 +327,37 @@ def _fetch_next_page( # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well. def _read_pages( self, - records_generator_fn: Callable[[Optional[requests.Response]], Iterable[StreamData]], + records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], stream_state: Mapping[str, Any], stream_slice: StreamSlice, - ) -> Iterable[StreamData]: + ) -> Iterable[Record]: pagination_complete = False - next_page_token = None + initial_token = self._paginator.get_initial_token() + next_page_token: Optional[Mapping[str, Any]] = ( + {"next_page_token": initial_token} if initial_token else None + ) while not pagination_complete: response = self._fetch_next_page(stream_state, stream_slice, next_page_token) - yield from records_generator_fn(response) + + last_page_size = 0 + last_record: Optional[Record] = None + for record in records_generator_fn(response): + last_page_size += 1 + last_record = record + yield record if not response: pagination_complete = True else: - next_page_token = self._next_page_token(response) + last_page_token_value = ( + next_page_token.get("next_page_token") if next_page_token else None + ) + next_page_token = self._next_page_token( + response=response, + last_page_size=last_page_size, + last_record=last_record, + last_page_token_value=last_page_token_value, + ) if not next_page_token: pagination_complete = True @@ -367,19 +366,38 @@ def _read_pages( def _read_single_page( self, - records_generator_fn: Callable[[Optional[requests.Response]], Iterable[StreamData]], + records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], stream_state: Mapping[str, Any], stream_slice: StreamSlice, ) -> Iterable[StreamData]: - response = self._fetch_next_page(stream_state, stream_slice) - yield from records_generator_fn(response) + initial_token = stream_state.get("next_page_token") + if initial_token is None: + initial_token = self._paginator.get_initial_token() + next_page_token: Optional[Mapping[str, Any]] = ( + {"next_page_token": initial_token} if initial_token else None + ) + + response = self._fetch_next_page(stream_state, stream_slice, next_page_token) + + last_page_size = 0 + last_record: Optional[Record] = None + for record in records_generator_fn(response): + last_page_size += 1 + last_record = record + yield record if not response: - next_page_token: Mapping[str, Any] = {FULL_REFRESH_SYNC_COMPLETE_KEY: True} + next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True} else: - next_page_token = self._next_page_token(response) or { - FULL_REFRESH_SYNC_COMPLETE_KEY: True - } + last_page_token_value = ( + next_page_token.get("next_page_token") if next_page_token else None + ) + next_page_token = self._next_page_token( + response=response, + last_page_size=last_page_size, + last_record=last_record, + last_page_token_value=last_page_token_value, + ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True} if self.cursor: self.cursor.close_slice( @@ -414,25 +432,14 @@ def read_records( if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): stream_state = self.state - # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to fetch more records - # The platform deletes stream state for full refresh streams before starting a new job, so we don't need to worry about - # this value existing for the initial attempt + # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to + # fetch more records. The platform deletes stream state for full refresh streams before starting a + # new job, so we don't need to worry about this value existing for the initial attempt if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): return - cursor_value = stream_state.get("next_page_token") - - # The first attempt to read a page for the current partition should reset the paginator to the current - # cursor state which is initially assigned to the incoming state from the platform - partition_key = self._to_partition_key(_slice.partition) - if partition_key not in self._partition_started: - self._partition_started[partition_key] = True - self._paginator.reset(reset_value=cursor_value) yield from self._read_single_page(record_generator, stream_state, _slice) else: - # Fixing paginator types has a long tail of dependencies - self._paginator.reset() - for stream_data in self._read_pages(record_generator, self.state, _slice): current_record = self._extract_record(stream_data, _slice) if self.cursor and current_record: @@ -518,7 +525,7 @@ def _parse_records( stream_state: Mapping[str, Any], records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice], - ) -> Iterable[StreamData]: + ) -> Iterable[Record]: yield from self._parse_response( response, stream_slice=stream_slice, @@ -562,7 +569,7 @@ def _fetch_next_page( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[requests.Response]: return self.requester.send_request( - path=self._paginator_path(), + path=self._paginator_path(next_page_token=next_page_token), stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, diff --git a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py index 31f6377f7..91ce28e7a 100644 --- a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py +++ b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py @@ -1,6 +1,6 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -from typing import Any, Callable, Iterable, Mapping, Optional +from typing import Any, Iterable, Mapping, Optional from airbyte_cdk.sources.declarative.retrievers import Retriever from airbyte_cdk.sources.message import MessageRepository @@ -16,7 +16,7 @@ def __init__( self, stream_name: str, json_schema: Mapping[str, Any], - retriever_factory: Callable[[], Retriever], + retriever: Retriever, message_repository: MessageRepository, ) -> None: """ @@ -26,14 +26,14 @@ def __init__( """ self._stream_name = stream_name self._json_schema = json_schema - self._retriever_factory = retriever_factory + self._retriever = retriever self._message_repository = message_repository def create(self, stream_slice: StreamSlice) -> Partition: return DeclarativePartition( self._stream_name, self._json_schema, - self._retriever_factory(), + self._retriever, self._message_repository, stream_slice, ) diff --git a/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py b/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py index 83d46918d..997920687 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py @@ -12,6 +12,7 @@ from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import ( CursorPaginationStrategy, ) +from airbyte_cdk.sources.types import Record @pytest.mark.parametrize( @@ -79,7 +80,7 @@ def test_cursor_pagination_strategy(template_string, stop_condition, expected_to "characters": {}, } response._content = json.dumps(response_body).encode("utf-8") - last_record = {"id": 1, "more_records": True} + last_record = Record(data={"id": 1, "more_records": True}, stream_name="stream_name") token = strategy.next_page_token(response, 1, last_record) assert expected_token == token @@ -111,18 +112,3 @@ def test_last_record_is_node_if_no_records(): response = requests.Response() next_page_token = strategy.next_page_token(response, 0, None) assert next_page_token is None - - -def test_reset_with_initial_token(): - strategy = CursorPaginationStrategy( - page_size=10, - cursor_value="{{ response.next_page }}", - config={}, - parameters={}, - ) - - assert strategy.initial_token is None - - strategy.reset("https://for-all-mankind.nasa.com/api/v1/astronauts") - - assert strategy.initial_token == "https://for-all-mankind.nasa.com/api/v1/astronauts" diff --git a/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py b/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py index fcf631f7f..cbe185a37 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py @@ -9,7 +9,6 @@ import requests from airbyte_cdk.sources.declarative.decoders import JsonDecoder, XmlDecoder -from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import ( DefaultPaginator, @@ -23,11 +22,11 @@ from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import ( OffsetIncrement, ) -from airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition import ( - CursorStopCondition, - StopConditionPaginationStrategyDecorator, +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import ( + PageIncrement, ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath +from airbyte_cdk.sources.declarative.types import Record @pytest.mark.parametrize( @@ -213,12 +212,12 @@ def test_default_paginator_with_cursor( json.dumps(response_body).encode("utf-8") if decoder == JsonDecoder else response_body ) - actual_next_page_token = paginator.next_page_token(response, 2, last_record) - actual_next_path = paginator.path() - actual_request_params = paginator.get_request_params() - actual_headers = paginator.get_request_headers() - actual_body_data = paginator.get_request_body_data() - actual_body_json = paginator.get_request_body_json() + actual_next_page_token = paginator.next_page_token(response, 2, last_record, None) + actual_next_path = paginator.path(actual_next_page_token) + actual_request_params = paginator.get_request_params(next_page_token=actual_next_page_token) + actual_headers = paginator.get_request_headers(next_page_token=actual_next_page_token) + actual_body_data = paginator.get_request_body_data(next_page_token=actual_next_page_token) + actual_body_json = paginator.get_request_body_json(next_page_token=actual_next_page_token) assert actual_next_page_token == expected_next_page_token assert actual_next_path == expected_updated_path assert actual_request_params == expected_request_params @@ -286,8 +285,8 @@ def test_paginator_request_param_interpolation( response_body = {"next": "https://airbyte.io/next_url"} response._content = json.dumps(response_body).encode("utf-8") last_record = {"id": 1} - paginator.next_page_token(response, 2, last_record) - actual_request_params = paginator.get_request_params() + next_page_token = paginator.next_page_token(response, 2, last_record, None) + actual_request_params = paginator.get_request_params(next_page_token=next_page_token) assert actual_request_params == expected_request_params @@ -319,18 +318,7 @@ def test_page_size_option_cannot_be_set_if_strategy_has_no_limit(): pass -@pytest.mark.parametrize( - "inject_on_first_request", - [ - (True), - (False), - ], - ids=[ - "test_reset_inject_on_first_request", - "test_reset_no_inject_on_first_request", - ], -) -def test_reset(inject_on_first_request): +def test_initial_token_with_offset_pagination(): page_size_request_option = RequestOption( inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={} ) @@ -339,9 +327,7 @@ def test_reset(inject_on_first_request): ) url_base = "https://airbyte.io" config = {} - strategy = OffsetIncrement( - config={}, page_size=2, inject_on_first_request=inject_on_first_request, parameters={} - ) + strategy = OffsetIncrement(config={}, page_size=2, parameters={}, inject_on_first_request=True) paginator = DefaultPaginator( strategy, config, @@ -350,52 +336,50 @@ def test_reset(inject_on_first_request): page_size_option=page_size_request_option, page_token_option=page_token_request_option, ) - initial_request_parameters = paginator.get_request_params() - response = requests.Response() - response._content = json.dumps({}).encode("utf-8") - paginator.next_page_token(response, 2, {"a key": "a value"}) - request_parameters_for_second_request = paginator.get_request_params() - paginator.reset() - request_parameters_after_reset = paginator.get_request_params() - assert initial_request_parameters == request_parameters_after_reset - assert request_parameters_for_second_request != request_parameters_after_reset + initial_token = paginator.get_initial_token() + next_page_token = {"next_page_token": initial_token} + initial_request_parameters = paginator.get_request_params(next_page_token=next_page_token) -def test_data_feed_paginator_with_stop_page_condition(): - config = {} - - cursor = DatetimeBasedCursor( - cursor_field="updated_at", - datetime_format="%Y-%m-%d", - start_datetime="2024-01-01", - config=config, - parameters={}, - ) + assert initial_request_parameters == {"limit": 2, "offset": 0} - wrapped_strategy = StopConditionPaginationStrategyDecorator( - _delegate=OffsetIncrement( - config={}, page_size=2, inject_on_first_request=False, parameters={} - ), - stop_condition=CursorStopCondition(cursor=cursor), - ) - paginator = DefaultPaginator( - pagination_strategy=wrapped_strategy, - config=config, - url_base="https://airbyte.io", - parameters={}, - page_size_option=RequestOption( - inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={} +@pytest.mark.parametrize( + "pagination_strategy,last_page_size,expected_next_page_token,expected_second_next_page_token", + [ + pytest.param( + OffsetIncrement(config={}, page_size=10, parameters={}, inject_on_first_request=True), + 10, + {"next_page_token": 10}, + {"next_page_token": 20}, ), - page_token_option=RequestOption( - inject_into=RequestOptionType.request_parameter, field_name="offset", parameters={} + pytest.param( + PageIncrement( + config={}, + page_size=5, + start_from_page=0, + parameters={}, + inject_on_first_request=True, + ), + 5, + {"next_page_token": 1}, + {"next_page_token": 2}, ), - ) + ], +) +def test_no_inject_on_first_request_offset_pagination( + pagination_strategy, last_page_size, expected_next_page_token, expected_second_next_page_token +): + """ + Validate that the stateless next_page_token() works when the first page does not inject the value + """ - paginator.reset() + response = requests.Response() + response.headers = {"A_HEADER": "HEADER_VALUE"} + response._content = {} + last_record = Record(data={}, stream_name="test") -def test_initial_token_with_offset_pagination(): page_size_request_option = RequestOption( inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={} ) @@ -404,18 +388,23 @@ def test_initial_token_with_offset_pagination(): ) url_base = "https://airbyte.io" config = {} - strategy = OffsetIncrement(config={}, page_size=2, parameters={}, inject_on_first_request=True) paginator = DefaultPaginator( - strategy, + pagination_strategy, config, url_base, parameters={}, page_size_option=page_size_request_option, page_token_option=page_token_request_option, ) - initial_request_parameters = paginator.get_request_params() - assert initial_request_parameters == {"limit": 2, "offset": 0} + actual_next_page_token = paginator.next_page_token(response, last_page_size, last_record, None) + assert actual_next_page_token == expected_next_page_token + + last_page_token_value = actual_next_page_token["next_page_token"] + actual_next_page_token = paginator.next_page_token( + response, last_page_size, last_record, last_page_token_value + ) + assert actual_next_page_token == expected_second_next_page_token def test_limit_page_fetched(): diff --git a/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py b/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py index 92bcc55a2..21beed576 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py @@ -9,5 +9,5 @@ def test(): paginator = NoPagination(parameters={}) - next_page_token = paginator.next_page_token(requests.Response(), 0, []) + next_page_token = paginator.next_page_token(requests.Response(), 0, [], None) assert next_page_token == {} diff --git a/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py b/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py index 692b80272..4cd827e88 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py @@ -14,36 +14,46 @@ @pytest.mark.parametrize( - "page_size, parameters, last_page_size, last_record, expected_next_page_token, expected_offset", + "page_size, parameters, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset", [ - pytest.param("2", {}, 2, {"id": 1}, 2, 2, id="test_same_page_size"), - pytest.param(2, {}, 2, {"id": 1}, 2, 2, id="test_same_page_size"), + pytest.param("2", {}, 2, {"id": 1}, 4, 6, 2, id="test_same_page_size"), + pytest.param(2, {}, 2, {"id": 1}, 4, 6, 2, id="test_same_page_size"), pytest.param( "{{ parameters['page_size'] }}", {"page_size": 3}, 2, {"id": 1}, + 3, None, 0, id="test_larger_page_size", ), - pytest.param(None, {}, 0, [], None, 0, id="test_stop_if_no_records"), + pytest.param(None, {}, 0, [], 3, None, 0, id="test_stop_if_no_records"), pytest.param( "{{ response['page_metadata']['limit'] }}", {}, 2, {"id": 1}, + 3, None, 0, id="test_page_size_from_response", ), + pytest.param( + 2, {}, 2, {"id": 1}, None, 2, 2, id="test_get_second_page_with_first_page_not_injected" + ), ], ) def test_offset_increment_paginator_strategy( - page_size, parameters, last_page_size, last_record, expected_next_page_token, expected_offset + page_size, + parameters, + last_page_size, + last_record, + last_page_token_value, + expected_next_page_token, + expected_offset, ): paginator_strategy = OffsetIncrement(page_size=page_size, parameters=parameters, config={}) - assert paginator_strategy._offset == 0 response = requests.Response() @@ -51,12 +61,16 @@ def test_offset_increment_paginator_strategy( response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}} response._content = json.dumps(response_body).encode("utf-8") - next_page_token = paginator_strategy.next_page_token(response, last_page_size, last_record) + next_page_token = paginator_strategy.next_page_token( + response, last_page_size, last_record, last_page_token_value + ) assert expected_next_page_token == next_page_token - assert expected_offset == paginator_strategy._offset - paginator_strategy.reset() - assert 0 == paginator_strategy._offset + # Validate that the PaginationStrategy is stateless and calling next_page_token() again returns the same result + next_page_token = paginator_strategy.next_page_token( + response, last_page_size, last_record, last_page_token_value + ) + assert expected_next_page_token == next_page_token def test_offset_increment_paginator_strategy_rises(): @@ -85,24 +99,3 @@ def test_offset_increment_paginator_strategy_initial_token( ) assert paginator_strategy.initial_token == expected_initial_token - - -@pytest.mark.parametrize( - "reset_value, expected_initial_token, expected_error", - [ - pytest.param(25, 25, None, id="test_reset_with_offset_value"), - pytest.param(None, 0, None, id="test_reset_with_default"), - pytest.param("Nope", None, ValueError, id="test_reset_with_invalid_value"), - ], -) -def test_offset_increment_reset(reset_value, expected_initial_token, expected_error): - paginator_strategy = OffsetIncrement( - page_size=20, parameters={}, config={}, inject_on_first_request=True - ) - - if expected_error: - with pytest.raises(expected_error): - paginator_strategy.reset(reset_value=reset_value) - else: - paginator_strategy.reset(reset_value=reset_value) - assert paginator_strategy.initial_token == expected_initial_token diff --git a/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py b/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py index 56564f925..32af20b50 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py @@ -14,26 +14,45 @@ @pytest.mark.parametrize( - "page_size, start_from, last_page_size, last_record, expected_next_page_token, expected_offset", + "page_size, start_from, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset", [ - pytest.param(2, 1, 2, {"id": 1}, 2, 2, id="test_same_page_size_start_from_0"), - pytest.param(3, 1, 2, {"id": 1}, None, 1, id="test_larger_page_size_start_from_0"), - pytest.param(2, 0, 2, {"id": 1}, 1, 1, id="test_same_page_size_start_from_1"), - pytest.param(3, 0, 2, {"id": 1}, None, 0, id="test_larger_page_size_start_from_0"), - pytest.param(None, 0, 0, None, None, 0, id="test_no_page_size"), - pytest.param("2", 0, 2, {"id": 1}, 1, 1, id="test_page_size_from_string"), + pytest.param(2, 1, 2, {"id": 1}, 3, 4, 2, id="test_same_page_size_start_from_1"), + pytest.param(3, 1, 2, {"id": 1}, 3, None, 1, id="test_larger_page_size_start_from_1"), + pytest.param(2, 0, 2, {"id": 1}, 3, 4, 1, id="test_same_page_size_start_from_0"), + pytest.param(3, 0, 2, {"id": 1}, 3, None, 0, id="test_larger_page_size_start_from_0"), + pytest.param(None, 0, 0, None, 2, None, 0, id="test_no_page_size"), + pytest.param("2", 0, 2, {"id": 1}, 3, 4, 1, id="test_page_size_from_string"), pytest.param( - "{{ config['value'] }}", 0, 2, {"id": 1}, 1, 1, id="test_page_size_from_config" + "{{ config['value'] }}", 0, 2, {"id": 1}, 3, 4, 1, id="test_page_size_from_config" + ), + pytest.param( + 2, 0, 2, {"id": 1}, None, 1, 2, id="test_start_from_not_injected_returns_second_page" + ), + pytest.param( + 2, + 10, + 2, + {"id": 1}, + None, + 11, + 2, + id="test_non_default_start_from_not_injected_returns_next_page", ), ], ) def test_page_increment_paginator_strategy( - page_size, start_from, last_page_size, last_record, expected_next_page_token, expected_offset + page_size, + start_from, + last_page_size, + last_record, + last_page_token_value, + expected_next_page_token, + expected_offset, ): paginator_strategy = PageIncrement( page_size=page_size, parameters={}, start_from_page=start_from, config={"value": 2} ) - assert paginator_strategy._page == start_from + assert paginator_strategy.start_from_page == start_from response = requests.Response() @@ -41,12 +60,16 @@ def test_page_increment_paginator_strategy( response_body = {"next": "https://airbyte.io/next_url"} response._content = json.dumps(response_body).encode("utf-8") - next_page_token = paginator_strategy.next_page_token(response, last_page_size, last_record) + next_page_token = paginator_strategy.next_page_token( + response, last_page_size, last_record, last_page_token_value + ) assert expected_next_page_token == next_page_token - assert expected_offset == paginator_strategy._page - paginator_strategy.reset() - assert start_from == paginator_strategy._page + # Validate that the PaginationStrategy is stateless and calling next_page_token() again returns the same result + next_page_token = paginator_strategy.next_page_token( + response, last_page_size, last_record, last_page_token_value + ) + assert expected_next_page_token == next_page_token @pytest.mark.parametrize( @@ -82,24 +105,3 @@ def test_page_increment_paginator_strategy_initial_token( ) assert paginator_strategy.initial_token == expected_initial_token - - -@pytest.mark.parametrize( - "reset_value, expected_initial_token, expected_error", - [ - pytest.param(25, 25, None, id="test_reset_with_offset_value"), - pytest.param(None, 0, None, id="test_reset_with_default"), - pytest.param("Nope", None, ValueError, id="test_reset_with_invalid_value"), - ], -) -def test_offset_increment_reset(reset_value, expected_initial_token, expected_error): - paginator_strategy = PageIncrement( - page_size=100, parameters={}, config={}, inject_on_first_request=True - ) - - if expected_error: - with pytest.raises(expected_error): - paginator_strategy.reset(reset_value=reset_value) - else: - paginator_strategy.reset(reset_value=reset_value) - assert paginator_strategy.initial_token == expected_initial_token diff --git a/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py b/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py index 88da84589..5561f92ab 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py @@ -86,7 +86,9 @@ def test_given_stop_condition_is_not_met_when_next_page_token_then_delegate( next_page_token = decorator.next_page_token(ANY_RESPONSE, 2, last_record) assert next_page_token == mocked_pagination_strategy.next_page_token.return_value - mocked_pagination_strategy.next_page_token.assert_called_once_with(ANY_RESPONSE, 2, last_record) + mocked_pagination_strategy.next_page_token.assert_called_once_with( + ANY_RESPONSE, 2, last_record, None + ) mocked_stop_condition.is_met.assert_has_calls([call(last_record)]) @@ -100,15 +102,9 @@ def test_given_no_records_when_next_page_token_then_delegate( next_page_token = decorator.next_page_token(ANY_RESPONSE, 0, NO_RECORD) assert next_page_token == mocked_pagination_strategy.next_page_token.return_value - mocked_pagination_strategy.next_page_token.assert_called_once_with(ANY_RESPONSE, 0, NO_RECORD) - - -def test_when_reset_then_delegate(mocked_pagination_strategy, mocked_stop_condition): - decorator = StopConditionPaginationStrategyDecorator( - mocked_pagination_strategy, mocked_stop_condition + mocked_pagination_strategy.next_page_token.assert_called_once_with( + ANY_RESPONSE, 0, NO_RECORD, None ) - decorator.reset() - mocked_pagination_strategy.reset.assert_called_once_with() def test_when_get_page_size_then_delegate(mocked_pagination_strategy, mocked_stop_condition): diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index b54527c13..5878c758f 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -3,6 +3,8 @@ # import json +from functools import partial +from typing import Any, Iterable, Mapping, Optional from unittest.mock import MagicMock, Mock, patch import pytest @@ -11,6 +13,8 @@ from airbyte_cdk import YamlDeclarativeSource from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth +from airbyte_cdk.sources.declarative.decoders import JsonDecoder +from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordSelector from airbyte_cdk.sources.declarative.incremental import ( DatetimeBasedCursor, DeclarativeCursor, @@ -22,7 +26,10 @@ ) from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator -from airbyte_cdk.sources.declarative.requesters.paginators.strategies import PageIncrement +from airbyte_cdk.sources.declarative.requesters.paginators.strategies import ( + CursorPaginationStrategy, + PageIncrement, +) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( @@ -30,6 +37,7 @@ SimpleRetrieverTestReadDecorator, ) from airbyte_cdk.sources.types import Record, StreamSlice +from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer A_SLICE_STATE = {"slice_state": "slice state value"} A_STREAM_SLICE = StreamSlice(cursor_slice={"stream slice": "slice value"}, partition={}) @@ -51,10 +59,11 @@ def test_simple_retriever_full(mock_http_stream): requester.get_request_params.return_value = request_params paginator = MagicMock() + paginator.get_initial_token.return_value = None next_page_token = {"cursor": "cursor_value"} paginator.path.return_value = None paginator.next_page_token.return_value = next_page_token - paginator.get_requesyyt_headers.return_value = {} + paginator.get_request_headers.return_value = {} record_selector = MagicMock() record_selector.select_records.return_value = records @@ -66,6 +75,10 @@ def test_simple_retriever_full(mock_http_stream): response = requests.Response() response.status_code = 200 + last_page_size = 2 + last_record = Record(data={"id": "1a"}, stream_name="stream_name") + last_page_token_value = 0 + underlying_state = {"date": "2021-01-01"} cursor.get_stream_state.return_value = underlying_state @@ -102,19 +115,13 @@ def test_simple_retriever_full(mock_http_stream): assert retriever.primary_key == primary_key assert retriever.state == underlying_state - assert retriever._next_page_token(response) == next_page_token + assert ( + retriever._next_page_token(response, last_page_size, last_record, last_page_token_value) + == next_page_token + ) assert retriever._request_params(None, None, None) == {} assert retriever.stream_slices() == stream_slices - assert retriever._last_response is None - assert retriever._last_record is None - assert list(retriever._parse_response(response, stream_state={}, records_schema={})) == records - assert retriever._last_response == response - assert retriever._last_page_size == 2 - - [r for r in retriever.read_records(SyncMode.full_refresh)] - paginator.reset.assert_called() - @patch.object(SimpleRetriever, "_read_pages", return_value=iter([*request_response_logs, *records])) def test_simple_retriever_with_request_response_logs(mock_http_stream): @@ -144,7 +151,6 @@ def test_simple_retriever_with_request_response_logs(mock_http_stream): ) actual_messages = [r for r in retriever.read_records(SyncMode.full_refresh)] - paginator.reset.assert_called() assert isinstance(actual_messages[0], AirbyteLogMessage) assert isinstance(actual_messages[1], AirbyteLogMessage) @@ -209,7 +215,6 @@ def test_simple_retriever_resumable_full_refresh_cursor_page_increment( url_base="https://airbyte.io", parameters={}, ) - paginator.reset = Mock(wraps=paginator.reset) stream_slicer = ResumableFullRefreshCursor(parameters={}) if initial_state: @@ -243,8 +248,6 @@ def test_simple_retriever_resumable_full_refresh_cursor_page_increment( assert actual_records == expected_records[5:] assert retriever.state == {"__ab_full_refresh_sync_complete": True} - paginator.reset.assert_called_once_with(reset_value=expected_reset_value) - @pytest.mark.parametrize( "initial_state, expected_reset_value, expected_next_page", @@ -331,7 +334,6 @@ def test_simple_retriever_resumable_full_refresh_cursor_reset_cursor_pagination( "https://for-all-mankind.nasa.com/api/v1/astronauts?next_page=gordo_stevens", json=response_body_2, ) - stream.retriever.paginator.reset = Mock(wraps=stream.retriever.paginator.reset) stream_slicer = ResumableFullRefreshCursor(parameters={}) if initial_state: stream_slicer.set_initial_state(initial_state) @@ -360,8 +362,6 @@ def test_simple_retriever_resumable_full_refresh_cursor_reset_cursor_pagination( assert actual_records == expected_records[5:] assert stream.retriever.state == {"__ab_full_refresh_sync_complete": True} - stream.retriever.paginator.reset.assert_called_once_with(reset_value=expected_reset_value) - def test_simple_retriever_resumable_full_refresh_cursor_reset_skip_completed_stream(): expected_records = [ @@ -391,7 +391,7 @@ def test_simple_retriever_resumable_full_refresh_cursor_reset_skip_completed_str url_base="https://airbyte.io", parameters={}, ) - paginator.reset = Mock(wraps=paginator.reset) + paginator.get_initial_token = Mock(wraps=paginator.get_initial_token) stream_slicer = ResumableFullRefreshCursor(parameters={}) stream_slicer.set_initial_state({"__ab_full_refresh_sync_complete": True}) @@ -416,7 +416,7 @@ def test_simple_retriever_resumable_full_refresh_cursor_reset_skip_completed_str assert len(actual_records) == 0 assert retriever.state == {"__ab_full_refresh_sync_complete": True} - paginator.reset.assert_not_called() + paginator.get_initial_token.assert_not_called() @pytest.mark.parametrize( @@ -614,8 +614,6 @@ def test_request_body_data( paginator.get_request_body_data.return_value = paginator_body_data requester = MagicMock(use_cache=False) - # stream_slicer = MagicMock() - # stream_slicer.get_request_body_data.return_value = request_options_provider_body_data request_option_provider = MagicMock() request_option_provider.get_request_body_data.return_value = request_options_provider_body_data @@ -667,7 +665,7 @@ def test_path(test_name, requester_path, paginator_path, expected_path): config={}, ) - actual_path = retriever._paginator_path() + actual_path = retriever._paginator_path(next_page_token=None) assert actual_path == expected_path @@ -847,3 +845,213 @@ def test_emit_log_request_response_messages(mocker): requester.send_request.call_args_list[0][1]["log_formatter"](response) == format_http_message_mock.return_value ) + + +def test_retriever_last_page_size_for_page_increment(): + requester = MagicMock() + requester.send_request.return_value = MagicMock() + + paginator = DefaultPaginator( + config={}, + pagination_strategy=PageIncrement(config={}, page_size=5, parameters={}), + url_base="https://airbyte.io", + parameters={}, + ) + + retriever = SimpleRetriever( + name="employees", + primary_key=primary_key, + requester=requester, + paginator=paginator, + record_selector=MagicMock(), + stream_slicer=SinglePartitionRouter(parameters={}), + parameters={}, + config={}, + ) + + expected_records = [ + Record(data={"id": "1a", "name": "Cross Product Sales"}, stream_name="departments"), + Record(data={"id": "2b", "name": "Foreign Exchange"}, stream_name="departments"), + Record(data={"id": "3c", "name": "Wealth Management"}, stream_name="departments"), + Record(data={"id": "4d", "name": "Investment Banking Division"}, stream_name="departments"), + ] + + def mock_parse_records(response: Optional[requests.Response]) -> Iterable[Record]: + yield from expected_records + + actual_records = list( + retriever._read_pages( + records_generator_fn=mock_parse_records, + stream_state={}, + stream_slice=StreamSlice(cursor_slice={}, partition={}), + ) + ) + assert actual_records == expected_records + + +def test_retriever_last_record_for_page_increment(): + requester = MagicMock() + requester.send_request.return_value = MagicMock() + + paginator = DefaultPaginator( + config={}, + pagination_strategy=CursorPaginationStrategy( + cursor_value="{{ last_record['id'] }}", + stop_condition="{{ last_record['last_record'] }}", + config={}, + parameters={}, + ), + url_base="https://airbyte.io", + parameters={}, + ) + + retriever = SimpleRetriever( + name="employees", + primary_key=primary_key, + requester=requester, + paginator=paginator, + record_selector=MagicMock(), + stream_slicer=SinglePartitionRouter(parameters={}), + parameters={}, + config={}, + ) + + expected_records = [ + Record(data={"id": "a", "name": "Cross Product Sales"}, stream_name="departments"), + Record(data={"id": "b", "name": "Foreign Exchange"}, stream_name="departments"), + Record(data={"id": "c", "name": "Wealth Management"}, stream_name="departments"), + Record( + data={"id": "d", "name": "Investment Banking Division", "last_record": True}, + stream_name="departments", + ), + ] + + def mock_parse_records(response: Optional[requests.Response]) -> Iterable[Record]: + yield from expected_records + + actual_records = list( + retriever._read_pages( + records_generator_fn=mock_parse_records, + stream_state={}, + stream_slice=StreamSlice(cursor_slice={}, partition={}), + ) + ) + assert actual_records == expected_records + + +def test_retriever_is_stateless(): + """ + Special test case to verify that retrieving the pages for a given slice does not affect an internal + state of the component. Specifically, because this test don't call any type of reset so invoking the + _read_pages() method twice will fail if there is an internal state (and is therefore not stateless) + because the page count will not be reset. + """ + + page_response_1 = requests.Response() + page_response_1.status_code = 200 + page_response_1._content = json.dumps( + { + "employees": [ + {"id": "0", "first_name": "eric", "last_name": "tao"}, + {"id": "1", "first_name": "rishi", "last_name": "ramdani"}, + {"id": "2", "first_name": "harper", "last_name": "stern"}, + {"id": "3", "first_name": "erobertric", "last_name": "spearing"}, + {"id": "4", "first_name": "yasmin", "last_name": "kara-hanani"}, + ] + } + ).encode("utf-8") + + page_response_2 = requests.Response() + page_response_2.status_code = 200 + page_response_2._content = json.dumps( + { + "employees": [ + {"id": "5", "first_name": "daria", "last_name": "greenock"}, + {"id": "6", "first_name": "venetia", "last_name": "berens"}, + {"id": "7", "first_name": "kenny", "last_name": "killbane"}, + ] + } + ).encode("utf-8") + + def mock_send_request( + next_page_token: Optional[Mapping[str, Any]] = None, **kwargs + ) -> Optional[requests.Response]: + page_number = next_page_token.get("next_page_token") if next_page_token else None + if page_number is None: + return page_response_1 + elif page_number == 1: + return page_response_2 + else: + raise ValueError(f"Requested an invalid page number {page_number}") + + requester = MagicMock() + requester.send_request.side_effect = mock_send_request + + decoder = JsonDecoder(parameters={}) + extractor = DpathExtractor( + field_path=["employees"], decoder=decoder, config=config, parameters={} + ) + record_selector = RecordSelector( + name="employees", + extractor=extractor, + record_filter=None, + transformations=[], + config=config, + parameters={}, + schema_normalization=TypeTransformer(TransformConfig.DefaultSchemaNormalization), + ) + + paginator = DefaultPaginator( + config={}, + pagination_strategy=PageIncrement(config={}, page_size=5, parameters={}), + url_base="https://airbyte.io", + parameters={}, + ) + + retriever = SimpleRetriever( + name="employees", + primary_key=primary_key, + requester=requester, + paginator=paginator, + record_selector=record_selector, + stream_slicer=SinglePartitionRouter(parameters={}), + parameters={}, + config={}, + ) + + _slice = StreamSlice(cursor_slice={}, partition={}) + + record_generator = partial( + retriever._parse_records, + stream_state=retriever.state or {}, + stream_slice=_slice, + records_schema={}, + ) + + # We call _read_pages() because the existing read_records() used to modify and reset state whereas + # _read_pages() did not invoke any methods to reset state + actual_records = list( + retriever._read_pages( + records_generator_fn=record_generator, stream_state={}, stream_slice=_slice + ) + ) + assert len(actual_records) == 8 + assert actual_records[0] == Record( + data={"id": "0", "first_name": "eric", "last_name": "tao"}, stream_name="employees" + ) + assert actual_records[7] == Record( + data={"id": "7", "first_name": "kenny", "last_name": "killbane"}, stream_name="employees" + ) + + actual_records = list( + retriever._read_pages( + records_generator_fn=record_generator, stream_state={}, stream_slice=_slice + ) + ) + assert len(actual_records) == 8 + assert actual_records[2] == Record( + data={"id": "2", "first_name": "harper", "last_name": "stern"}, stream_name="employees" + ) + assert actual_records[5] == Record( + data={"id": "5", "first_name": "daria", "last_name": "greenock"}, stream_name="employees" + ) diff --git a/unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py b/unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py index 1a4f07212..3ced03a69 100644 --- a/unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py +++ b/unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py @@ -1,3 +1,5 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + from typing import List from unittest import TestCase from unittest.mock import Mock @@ -27,31 +29,32 @@ class StreamSlicerPartitionGeneratorTest(TestCase): - def setUp(self) -> None: - self._retriever_factory = Mock() - self._message_repository = Mock(spec=MessageRepository) - self._partition_factory = DeclarativePartitionFactory( + def test_given_multiple_slices_partition_generator_uses_the_same_retriever(self) -> None: + retriever = self._mock_retriever([]) + message_repository = Mock(spec=MessageRepository) + partition_factory = DeclarativePartitionFactory( _STREAM_NAME, _JSON_SCHEMA, - self._retriever_factory, - self._message_repository, + retriever, + message_repository, ) - def test_given_multiple_slices_when_read_then_read_from_different_retrievers(self) -> None: - first_retriever = self._mock_retriever([]) - second_retriever = self._mock_retriever([]) - self._retriever_factory.side_effect = [first_retriever, second_retriever] - - list(self._partition_factory.create(_A_STREAM_SLICE).read()) - list(self._partition_factory.create(_ANOTHER_STREAM_SLICE).read()) + list(partition_factory.create(_A_STREAM_SLICE).read()) + list(partition_factory.create(_ANOTHER_STREAM_SLICE).read()) - first_retriever.read_records.assert_called_once() - second_retriever.read_records.assert_called_once() + assert retriever.read_records.call_count == 2 def test_given_a_mapping_when_read_then_yield_record(self) -> None: retriever = self._mock_retriever([_A_RECORD]) - self._retriever_factory.return_value = retriever - partition = self._partition_factory.create(_A_STREAM_SLICE) + message_repository = Mock(spec=MessageRepository) + partition_factory = DeclarativePartitionFactory( + _STREAM_NAME, + _JSON_SCHEMA, + retriever, + message_repository, + ) + + partition = partition_factory.create(_A_STREAM_SLICE) records = list(partition.read()) @@ -61,13 +64,20 @@ def test_given_a_mapping_when_read_then_yield_record(self) -> None: def test_given_not_a_record_when_read_then_send_to_message_repository(self) -> None: retriever = self._mock_retriever([_AIRBYTE_LOG_MESSAGE]) - self._retriever_factory.return_value = retriever + message_repository = Mock(spec=MessageRepository) + partition_factory = DeclarativePartitionFactory( + _STREAM_NAME, + _JSON_SCHEMA, + retriever, + message_repository, + ) - list(self._partition_factory.create(_A_STREAM_SLICE).read()) + list(partition_factory.create(_A_STREAM_SLICE).read()) - self._message_repository.emit_message.assert_called_once_with(_AIRBYTE_LOG_MESSAGE) + message_repository.emit_message.assert_called_once_with(_AIRBYTE_LOG_MESSAGE) - def _mock_retriever(self, read_return_value: List[StreamData]) -> Mock: + @staticmethod + def _mock_retriever(read_return_value: List[StreamData]) -> Mock: retriever = Mock(spec=Retriever) retriever.read_records.return_value = iter(read_return_value) return retriever diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 18f5a97f8..3b5dd50c9 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -651,13 +651,15 @@ def test_group_streams(): concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) # 1 full refresh stream, 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental - assert len(concurrent_streams) == 5 + # 1 async job stream + assert len(concurrent_streams) == 6 ( concurrent_stream_0, concurrent_stream_1, concurrent_stream_2, concurrent_stream_3, concurrent_stream_4, + concurrent_stream_5, ) = concurrent_streams assert isinstance(concurrent_stream_0, DefaultStream) assert concurrent_stream_0.name == "party_members" @@ -669,13 +671,13 @@ def test_group_streams(): assert concurrent_stream_3.name == "party_members_skills" assert isinstance(concurrent_stream_4, DefaultStream) assert concurrent_stream_4.name == "arcana_personas" + assert isinstance(concurrent_stream_5, DefaultStream) + assert concurrent_stream_5.name == "async_job_stream" # 1 substream w/ incremental, 1 stream with async retriever - assert len(synchronous_streams) == 2 + assert len(synchronous_streams) == 1 assert isinstance(synchronous_streams[0], DeclarativeStream) assert synchronous_streams[0].name == "palace_enemies" - assert isinstance(synchronous_streams[1], DeclarativeStream) - assert synchronous_streams[1].name == "async_job_stream" @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @@ -1456,10 +1458,10 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous(): ) concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - # 1 full refresh stream, 2 with parent stream without incremental dependency - assert len(concurrent_streams) == 3 - # 2 incremental stream with interpolation on state (locations and party_members), 1 incremental with parent stream (palace_enemies), 1 stream with async retriever - assert len(synchronous_streams) == 4 + # 1 full refresh stream, 2 with parent stream without incremental dependency, 1 stream with async retriever + assert len(concurrent_streams) == 4 + # 2 incremental stream with interpolation on state (locations and party_members), 1 incremental with parent stream (palace_enemies) + assert len(synchronous_streams) == 3 def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurrent(): diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index a132757a6..b3c9ab4bb 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -1278,7 +1278,7 @@ def _create_page(response_body): ) * 10, [{"ABC": 0}, {"AED": 1}], - [call({}, {})], + [call({}, {}, None)], ), ( "test_read_manifest_with_added_fields", @@ -1365,7 +1365,7 @@ def _create_page(response_body): {"ABC": 0, "added_field_key": "added_field_value"}, {"AED": 1, "added_field_key": "added_field_value"}, ], - [call({}, {})], + [call({}, {}, None)], ), ( "test_read_manifest_with_flatten_fields", @@ -1449,7 +1449,7 @@ def _create_page(response_body): {"ABC": 0, "id": 1}, {"AED": 1, "id": 2}, ], - [call({}, {})], + [call({}, {}, None)], ), ( "test_read_with_pagination_no_partitions", @@ -1535,7 +1535,14 @@ def _create_page(response_body): ) * 10, [{"ABC": 0}, {"AED": 1}, {"USD": 2}], - [call({}, {}), call({"next_page_token": "next"}, {"next_page_token": "next"})], + [ + call({}, {}, None), + call( + {"next_page_token": "next"}, + {"next_page_token": "next"}, + {"next_page_token": "next"}, + ), + ], ), ( "test_no_pagination_with_partition_router",