diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index b40709078..4f103b224 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -5,7 +5,18 @@ import functools import logging from abc import ABC, abstractmethod -from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Protocol, Tuple +from typing import ( + Any, + Callable, + Iterable, + List, + Mapping, + MutableMapping, + Optional, + Protocol, + Tuple, + Union, +) from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.message import MessageRepository @@ -175,7 +186,9 @@ def __init__( self.start, self._concurrent_state = self._get_concurrent_state(stream_state) self._lookback_window = lookback_window self._slice_range = slice_range - self._most_recent_cursor_value_per_partition: MutableMapping[StreamSlice, Any] = {} + self._most_recent_cursor_value_per_partition: MutableMapping[ + Union[StreamSlice, Mapping[str, Any], None], Any + ] = {} self._has_closed_at_least_one_slice = False self._cursor_granularity = cursor_granularity # Flag to track if the logger has been triggered (per stream) @@ -216,10 +229,13 @@ def observe(self, record: Record) -> None: most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( record.associated_slice ) - cursor_value = self._extract_cursor_value(record) + try: + cursor_value = self._extract_cursor_value(record) - if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: - self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value + if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: + self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value + except ValueError: + self._log_for_record_without_cursor_value() def _extract_cursor_value(self, record: Record) -> Any: return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) @@ -459,10 +475,13 @@ def should_be_synced(self, record: Record) -> bool: try: record_cursor_value: CursorValueType = self._extract_cursor_value(record) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ except ValueError: - if not self._should_be_synced_logger_triggered: - LOGGER.warning( - f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record. The incremental sync will assume it needs to be synced" - ) - self._should_be_synced_logger_triggered = True + self._log_for_record_without_cursor_value() return True return self.start <= record_cursor_value <= self._end_provider() + + def _log_for_record_without_cursor_value(self) -> None: + if not self._should_be_synced_logger_triggered: + LOGGER.warning( + f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" + ) + self._should_be_synced_logger_triggered = True diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index ea9b29ca3..008b5f780 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -101,6 +101,20 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor: _NO_LOOKBACK_WINDOW, ) + def test_given_no_cursor_value_when_observe_then_do_not_raise(self) -> None: + cursor = self._cursor_with_slice_boundary_fields() + partition = _partition(_NO_SLICE) + + cursor.observe( + Record( + data={"record_with_A_CURSOR_FIELD_KEY": "any value"}, + associated_slice=partition.to_slice(), + stream_name=_A_STREAM_NAME, + ) + ) + + # did not raise + def test_given_boundary_fields_when_close_partition_then_emit_state(self) -> None: cursor = self._cursor_with_slice_boundary_fields() cursor.close_partition(