Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrent): do not raise when record does not have cursor value in concurrent cursor #96

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,13 @@ def observe(self, record: Record) -> None:
most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
record.associated_slice
)
cursor_value = self._extract_cursor_value(record)
try:
cursor_value = self._extract_cursor_value(record)

if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
except ValueError:
self._log_for_record_without_cursor_value()

def _extract_cursor_value(self, record: Record) -> Any:
return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
Expand Down Expand Up @@ -459,10 +462,13 @@ def should_be_synced(self, record: Record) -> bool:
try:
record_cursor_value: CursorValueType = self._extract_cursor_value(record) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
except ValueError:
if not self._should_be_synced_logger_triggered:
LOGGER.warning(
f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record. The incremental sync will assume it needs to be synced"
)
self._should_be_synced_logger_triggered = True
self._log_for_record_without_cursor_value()
return True
return self.start <= record_cursor_value <= self._end_provider()

def _log_for_record_without_cursor_value(self) -> None:
if not self._should_be_synced_logger_triggered:
LOGGER.warning(
f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
)
self._should_be_synced_logger_triggered = True
14 changes: 14 additions & 0 deletions unit_tests/sources/streams/concurrent/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor:
_NO_LOOKBACK_WINDOW,
)

def test_given_no_cursor_value_when_observe_then_do_not_raise(self) -> None:
cursor = self._cursor_with_slice_boundary_fields()
partition = _partition(_NO_SLICE)

cursor.observe(
Record(
data={"record_with_A_CURSOR_FIELD_KEY": "any value"},
associated_slice=partition.to_slice(),
stream_name=_A_STREAM_NAME,
)
)

# did not raise

def test_given_boundary_fields_when_close_partition_then_emit_state(self) -> None:
cursor = self._cursor_with_slice_boundary_fields()
cursor.close_partition(
Expand Down
Loading