Skip to content

Commit

Permalink
Merge branch 'main' into 50395-1-2
Browse files Browse the repository at this point in the history
* main:
  fix(airbyte-cdk): unable to create custom retriever (airbytehq#198)
  feat(low-code): added keys replace transformation (airbytehq#183)
  feat: add `min` macros (airbytehq#203)
  fix(low-code cdk pagination): Fix the offset strategy so that it resets back to 0 when a stream is an incremental data feed (airbytehq#202)
  • Loading branch information
rpopov committed Jan 8, 2025
2 parents 1065a20 + e78eaff commit b2dab7b
Show file tree
Hide file tree
Showing 14 changed files with 370 additions and 12 deletions.
45 changes: 45 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/KeysReplace"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1785,6 +1786,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/KeysReplace"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
Expand Down Expand Up @@ -1883,6 +1885,49 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeysReplace:
title: Keys Replace
description: A transformation that replaces symbols in keys.
type: object
required:
- type
- old
- new
properties:
type:
type: string
enum: [KeysReplace]
old:
type: string
title: Old value
description: Old value to replace.
examples:
- " "
- "{{ record.id }}"
- "{{ config['id'] }}"
- "{{ stream_slice['id'] }}"
interpolation_context:
- config
- record
- stream_state
- stream_slice
new:
type: string
title: New value
description: New value to set.
examples:
- "_"
- "{{ record.id }}"
- "{{ config['id'] }}"
- "{{ stream_slice['id'] }}"
interpolation_context:
- config
- record
- stream_state
- stream_slice
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down
21 changes: 21 additions & 0 deletions airbyte_cdk/sources/declarative/interpolation/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,26 @@ def max(*args: typing.Any) -> typing.Any:
return builtins.max(*args)


def min(*args: typing.Any) -> typing.Any:
"""
Returns smallest object of an iterable, or two or more arguments.
min(iterable, *[, default=obj, key=func]) -> value
min(arg1, arg2, *args, *[, key=func]) -> value
Usage:
`"{{ min(2,3) }}"
With a single iterable argument, return its smallest item. The
default keyword-only argument specifies an object to return if
the provided iterable is empty.
With two or more arguments, return the smallest argument.
:param args: args to compare
:return: smallest argument
"""
return builtins.min(*args)


def day_delta(num_days: int, format: str = "%Y-%m-%dT%H:%M:%S.%f%z") -> str:
"""
Returns datetime of now() + num_days
Expand Down Expand Up @@ -147,6 +167,7 @@ def format_datetime(
today_utc,
timestamp,
max,
min,
day_delta,
duration,
format_datetime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,23 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
...,
description="Old value to replace.",
examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
title="Old value",
)
new: str = Field(
...,
description="New value to set.",
examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
title="New value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
Expand Down Expand Up @@ -1701,6 +1718,7 @@ class Config:
KeysToLower,
KeysToSnakeCase,
FlattenFields,
KeysReplace,
]
]
] = Field(
Expand Down Expand Up @@ -1875,6 +1893,7 @@ class DynamicSchemaLoader(BaseModel):
KeysToLower,
KeysToSnakeCase,
FlattenFields,
KeysReplace,
]
]
] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtPayload as JwtPayloadModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysReplace as KeysReplaceModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToLower as KeysToLowerModel,
)
Expand Down Expand Up @@ -417,6 +420,9 @@
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
KeysReplaceTransformation,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
Expand Down Expand Up @@ -509,6 +515,7 @@ def _init_mappings(self) -> None:
GzipParserModel: self.create_gzip_parser,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
KeysReplaceModel: self.create_keys_replace_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -630,6 +637,13 @@ def create_keys_to_snake_transformation(
) -> KeysToSnakeCaseTransformation:
return KeysToSnakeCaseTransformation()

def create_keys_replace_transformation(
self, model: KeysReplaceModel, config: Config, **kwargs: Any
) -> KeysReplaceTransformation:
return KeysReplaceTransformation(
old=model.old, new=model.new, parameters=model.parameters or {}
)

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
Expand Down Expand Up @@ -1560,7 +1574,12 @@ def create_exponential_backoff_strategy(
)

def create_http_requester(
self, model: HttpRequesterModel, decoder: Decoder, config: Config, *, name: str
self,
model: HttpRequesterModel,
config: Config,
decoder: Decoder = JsonDecoder(parameters={}),
*,
name: str,
) -> HttpRequester:
authenticator = (
self._create_component_from_model(
Expand Down Expand Up @@ -1976,9 +1995,9 @@ def create_record_selector(
config: Config,
*,
name: str,
transformations: List[RecordTransformation],
decoder: Optional[Decoder] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation] | None = None,
decoder: Decoder | None = None,
client_side_incremental_sync: Dict[str, Any] | None = None,
**kwargs: Any,
) -> RecordSelector:
assert model.schema_normalization is not None # for mypy
Expand Down Expand Up @@ -2008,7 +2027,7 @@ def create_record_selector(
name=name,
config=config,
record_filter=record_filter,
transformations=transformations,
transformations=transformations or [],
schema_normalization=schema_normalization,
parameters=model.parameters or {},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def next_page_token(
return self._offset

def reset(self, reset_value: Optional[Any] = 0) -> None:
if not isinstance(reset_value, int):
if reset_value is None:
self._offset = 0
elif not isinstance(reset_value, int):
raise ValueError(
f"Reset value {reset_value} for OffsetIncrement pagination strategy was not an integer"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ def next_page_token(
return self._delegate.next_page_token(response, last_page_size, last_record)

def reset(self, reset_value: Optional[Any] = None) -> None:
self._delegate.reset(reset_value)
if reset_value:
self._delegate.reset(reset_value)
else:
self._delegate.reset()

def get_page_size(self) -> Optional[int]:
return self._delegate.get_page_size()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Dict, Mapping, Optional

from airbyte_cdk import InterpolatedString
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class KeysReplaceTransformation(RecordTransformation):
"""
Transformation that applies keys names replacement.
Example usage:
- type: KeysReplace
old: " "
new: "_"
Result:
from: {"created time": ..., "customer id": ..., "user id": ...}
to: {"created_time": ..., "customer_id": ..., "user_id": ...}
"""

old: str
new: str
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._old = InterpolatedString.create(self.old, parameters=parameters)
self._new = InterpolatedString.create(self.new, parameters=parameters)

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
if config is None:
config = {}

kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
old_key = str(self._old.eval(config, **kwargs))
new_key = str(self._new.eval(config, **kwargs))

def _transform(data: Dict[str, Any]) -> Dict[str, Any]:
result = {}
for key, value in data.items():
updated_key = key.replace(old_key, new_key)
if isinstance(value, dict):
result[updated_key] = _transform(value)
else:
result[updated_key] = value
return result

transformed_record = _transform(record)
record.clear()
record.update(transformed_record)
3 changes: 3 additions & 0 deletions unit_tests/sources/declarative/interpolation/test_jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def test_to_string(test_name, input_value, expected_output):
id="test_timestamp_from_rfc3339",
),
pytest.param("{{ max(1,2) }}", 2, id="test_max"),
pytest.param("{{ min(1,2) }}", 1, id="test_min"),
],
)
def test_macros(s, expected_value):
Expand Down Expand Up @@ -291,6 +292,8 @@ def test_undeclared_variables(template_string, expected_error, expected_value):
),
pytest.param("{{ max(2, 3) }}", 3, id="test_max_with_arguments"),
pytest.param("{{ max([2, 3]) }}", 3, id="test_max_with_list"),
pytest.param("{{ min(2, 3) }}", 2, id="test_min_with_arguments"),
pytest.param("{{ min([2, 3]) }}", 2, id="test_min_with_list"),
pytest.param("{{ day_delta(1) }}", "2021-09-02T00:00:00.000000+0000", id="test_day_delta"),
pytest.param(
"{{ day_delta(-1) }}", "2021-08-31T00:00:00.000000+0000", id="test_day_delta_negative"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
("test_now_utc", "now_utc", True),
("test_today_utc", "today_utc", True),
("test_max", "max", True),
("test_min", "min", True),
("test_day_delta", "day_delta", True),
("test_format_datetime", "format_datetime", True),
("test_duration", "duration", True),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,42 @@ def test_create_custom_schema_loader():
assert isinstance(component, MyCustomSchemaLoader)


class MyCustomRetriever(SimpleRetriever):
pass


def test_create_custom_retriever():
stream_model = {
"type": "DeclarativeStream",
"retriever": {
"type": "CustomRetriever",
"class_name": "unit_tests.sources.declarative.parsers.test_model_to_component_factory.MyCustomRetriever",
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "DpathExtractor",
"field_path": [],
},
"$parameters": {"name": ""},
},
"requester": {
"type": "HttpRequester",
"name": "list",
"url_base": "orange.com",
"path": "/v1/api",
"$parameters": {"name": ""},
},
},
}

stream = factory.create_component(
model_type=DeclarativeStreamModel, component_definition=stream_model, config=input_config
)

assert isinstance(stream, DeclarativeStream)
assert isinstance(stream.retriever, MyCustomRetriever)


@freezegun.freeze_time("2021-01-01 00:00:00")
@pytest.mark.parametrize(
"config, manifest, expected",
Expand Down
Loading

0 comments on commit b2dab7b

Please sign in to comment.