Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable handling of nested fields when injecting request_option in request body_json #201

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f2d644f
feat: add field_path attribute to Requestoption
ChristoGrab Jan 3, 2025
00d2036
task: update combine_mappings to handle nested paths
ChristoGrab Jan 4, 2025
9c98313
feat: add reusable method to inject value into target dict
ChristoGrab Jan 6, 2025
a524f3e
chore: fix test and format
ChristoGrab Jan 6, 2025
fcaf110
feat: update ApiKeyAuthenticator
ChristoGrab Jan 6, 2025
fd2de58
feat: update DatetimeBasedCursor injection logic
ChristoGrab Jan 7, 2025
20cc5d6
chore: update declarative schema/generate RequestOption model
ChristoGrab Jan 7, 2025
7292b57
fix: add type validations and unit tests
ChristoGrab Jan 7, 2025
ebb1791
chore: format
ChristoGrab Jan 7, 2025
5051d0d
refactor: update tests
ChristoGrab Jan 7, 2025
fef4a46
Merge branch 'main' into christo/request-option-field-path
ChristoGrab Jan 7, 2025
9525357
refactor: centralize injection logic in RequestOption
ChristoGrab Jan 9, 2025
6638dc8
chore: add components tests
ChristoGrab Jan 9, 2025
eba438b
refactor: shuffle unit tests
ChristoGrab Jan 9, 2025
5dc29ca
chore: format
ChristoGrab Jan 9, 2025
6fc897f
chore: cleanup and code comments
ChristoGrab Jan 9, 2025
97cff3f
chore: remove redundant check in DatetimeBasedCursor
ChristoGrab Jan 10, 2025
7191128
feat: update remaining components to use injection logic
ChristoGrab Jan 17, 2025
4ef2423
refactor: simplify validations
ChristoGrab Jan 21, 2025
d1fde99
refactor: simplify combine_mappings logic
ChristoGrab Jan 21, 2025
adec90b
Merge branch 'main' into christo/request-option-field-path
ChristoGrab Jan 21, 2025
6e8e13c
chore: update unit tests
ChristoGrab Jan 21, 2025
0fabdd7
chore: add deprecation comment to field_name
ChristoGrab Jan 21, 2025
fdfa92c
refactor: update merge_mappings to handle body_json differently than …
ChristoGrab Jan 22, 2025
a2262a9
chore: fix all the tests
ChristoGrab Jan 22, 2025
e36a06a
Merge branch 'main' into christo/request-option-field-path
ChristoGrab Jan 22, 2025
b8287d6
fix: update component factory
ChristoGrab Jan 24, 2025
fd919b1
Merge branch 'christo/request-option-field-path' of https://github.co…
ChristoGrab Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions airbyte_cdk/sources/declarative/auth/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import base64
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union
from typing import Any, Mapping, MutableMapping, Union

import requests
from cachetools import TTLCache, cached
Expand Down Expand Up @@ -45,11 +45,6 @@ class ApiKeyAuthenticator(DeclarativeAuthenticator):
config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_name = InterpolatedString.create(
self.request_option.field_name, parameters=parameters
)

@property
def auth_header(self) -> str:
options = self._get_request_options(RequestOptionType.header)
Expand All @@ -60,9 +55,9 @@ def token(self) -> str:
return self.token_provider.get_token()

def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
options: MutableMapping[str, Any] = {}
if self.request_option.inject_into == option_type:
options[self._field_name.eval(self.config)] = self.token
self.request_option.inject_into_request(options, self.token, self.config)
return options

def get_request_params(self) -> Mapping[str, Any]:
Expand Down
18 changes: 14 additions & 4 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2798,25 +2798,35 @@ definitions:
enum: [RequestPath]
RequestOption:
title: Request Option
description: Specifies the key field and where in the request a component's value should be injected.
description: Specifies the key field or path and where in the request a component's value should be injected.
type: object
required:
- type
- field_name
- inject_into
properties:
type:
type: string
enum: [RequestOption]
field_name:
title: Request Option
description: Configures which key should be used in the location that the descriptor is being injected into
title: Field Name
description: Configures which key should be used in the location that the descriptor is being injected into. We hope to eventually deprecate this field in favor of `field_path` for all request_options, but must currently maintain it for backwards compatibility in the Builder.
type: string
examples:
- segment_id
interpolation_context:
- config
- parameters
field_path:
title: Field Path
description: Configures a path to be used for nested structures in JSON body requests (e.g. GraphQL queries)
type: array
items:
type: string
examples:
- ["data", "viewer", "id"]
interpolation_context:
- config
- parameters
inject_into:
title: Inject Into
description: Configures where the descriptor should be set on the HTTP requests. Note that request parameters that are already encoded in the URL path will not be duplicated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,15 @@ def _get_request_options(
options: MutableMapping[str, Any] = {}
if not stream_slice:
return options

if self.start_time_option and self.start_time_option.inject_into == option_type:
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string
self._partition_field_start.eval(self.config)
)
start_time_value = stream_slice.get(self._partition_field_start.eval(self.config))
self.start_time_option.inject_into_request(options, start_time_value, self.config)

if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore [union-attr]
self._partition_field_end.eval(self.config)
)
end_time_value = stream_slice.get(self._partition_field_end.eval(self.config))
self.end_time_option.inject_into_request(options, end_time_value, self.config)

return options

def should_be_synced(self, record: Record) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ class HttpResponseFilter(BaseModel):
class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]
condition: Optional[str]
condition: Optional[str] = None


class SchemaTypeIdentifier(BaseModel):
Expand Down Expand Up @@ -797,14 +797,11 @@ class DpathFlattenFields(BaseModel):
field_path: List[str] = Field(
...,
description="A path to field that needs to be flattened.",
examples=[
["data"],
["data", "*", "field"],
],
examples=[["data"], ["data", "*", "field"]],
title="Field Path",
)
delete_origin_value: Optional[bool] = Field(
False,
None,
description="Whether to delete the origin value or keep it. Default is False.",
title="Delete Origin Value",
)
Expand Down Expand Up @@ -1173,11 +1170,17 @@ class InjectInto(Enum):

class RequestOption(BaseModel):
type: Literal["RequestOption"]
field_name: str = Field(
...,
description="Configures which key should be used in the location that the descriptor is being injected into",
field_name: Optional[str] = Field(
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
None,
description="Configures which key should be used in the location that the descriptor is being injected into. We hope to eventually deprecate this field in favor of `field_path` for all request_options, but must currently maintain it for backwards compatibility in the Builder.",
examples=["segment_id"],
title="Request Option",
title="Field Name",
)
field_path: Optional[List[str]] = Field(
None,
description="Configures a path to be used for nested structures in JSON body requests (e.g. GraphQL queries)",
examples=[["data", "viewer", "id"]],
title="Field Path",
)
inject_into: InjectInto = Field(
...,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2072,7 +2072,11 @@ def create_request_option(
model: RequestOptionModel, config: Config, **kwargs: Any
) -> RequestOption:
inject_into = RequestOptionType(model.inject_into.value)
return RequestOption(field_name=model.field_name, inject_into=inject_into, parameters={})
field_path: Optional[List[Union[InterpolatedString, str]]] = model.field_path # type: ignore
field_name = model.field_name if model.field_name else None
return RequestOption(
field_name=field_name, field_path=field_path, inject_into=inject_into, parameters={}
)

def create_record_selector(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, Iterable, List, Mapping, Optional, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
Expand Down Expand Up @@ -100,7 +100,9 @@ def _get_request_option(
):
slice_value = stream_slice.get(self._cursor_field.eval(self.config))
if slice_value:
return {self.request_option.field_name.eval(self.config): slice_value} # type: ignore # field_name is always casted to InterpolatedString
options: MutableMapping[str, Any] = {}
self.request_option.inject_into_request(options, slice_value, self.config)
return options
else:
return {}
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import copy
import logging
from dataclasses import InitVar, dataclass
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, Optional, Union
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath

Expand Down Expand Up @@ -118,7 +118,7 @@ def get_request_body_json(
def _get_request_option(
self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
) -> Mapping[str, Any]:
params = {}
params: MutableMapping[str, Any] = {}
if stream_slice:
for parent_config in self.parent_stream_configs:
if (
Expand All @@ -128,13 +128,7 @@ def _get_request_option(
key = parent_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string
value = stream_slice.get(key)
if value:
params.update(
{
parent_config.request_option.field_name.eval( # type: ignore [union-attr]
config=self.config
): value
}
)
parent_config.request_option.inject_into_request(params, value, self.config)
return params

def stream_slices(self) -> Iterable[StreamSlice]:
Expand Down
6 changes: 5 additions & 1 deletion airbyte_cdk/sources/declarative/requesters/http_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ def _get_request_options(
Raise a ValueError if there's a key collision
Returned merged mapping otherwise
"""

is_body_json = requester_method.__name__ == "get_request_body_json"

return combine_mappings(
[
requester_method(
Expand All @@ -208,7 +211,8 @@ def _get_request_options(
),
auth_options_method(),
extra_options,
]
],
allow_same_value_merge=is_body_json,
)

def _request_headers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def get_request_body_json(
def _get_request_options(
self, option_type: RequestOptionType, next_page_token: Optional[Mapping[str, Any]]
) -> MutableMapping[str, Any]:
options = {}
options: MutableMapping[str, Any] = {}

token = next_page_token.get("next_page_token") if next_page_token else None
if (
Expand All @@ -196,15 +196,16 @@ def _get_request_options(
and isinstance(self.page_token_option, RequestOption)
and self.page_token_option.inject_into == option_type
):
options[self.page_token_option.field_name.eval(config=self.config)] = token # type: ignore # field_name is always cast to an interpolated string
self.page_token_option.inject_into_request(options, token, self.config)

if (
self.page_size_option
and self.pagination_strategy.get_page_size()
and self.page_size_option.inject_into == option_type
):
options[self.page_size_option.field_name.eval(config=self.config)] = ( # type: ignore [union-attr]
self.pagination_strategy.get_page_size()
) # type: ignore # field_name is always cast to an interpolated string
page_size = self.pagination_strategy.get_page_size()
self.page_size_option.inject_into_request(options, page_size, self.config)

return options


Expand Down
87 changes: 83 additions & 4 deletions airbyte_cdk/sources/declarative/requesters/request_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Mapping, Union
from typing import Any, List, Literal, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config


class RequestOptionType(Enum):
Expand All @@ -26,13 +27,91 @@ class RequestOption:
Describes an option to set on a request

Attributes:
field_name (str): Describes the name of the parameter to inject
field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path.
field_path (list(str)): Describes the path to a nested field as a list of field names.
Only valid for body_json injection type, and mutually exclusive with field_name.
inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
"""

field_name: Union[InterpolatedString, str]
inject_into: RequestOptionType
parameters: InitVar[Mapping[str, Any]]
field_name: Optional[Union[InterpolatedString, str]] = None
field_path: Optional[List[Union[InterpolatedString, str]]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.field_name = InterpolatedString.create(self.field_name, parameters=parameters)
# Validate inputs. We should expect either field_name or field_path, but not both
if self.field_name is None and self.field_path is None:
raise ValueError("RequestOption requires either a field_name or field_path")

if self.field_name is not None and self.field_path is not None:
raise ValueError(
"Only one of field_name or field_path can be provided to RequestOption"
)

# Nested field injection is only supported for body JSON injection
if self.field_path is not None and self.inject_into != RequestOptionType.body_json:
raise ValueError(
"Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
)

# Convert field_name and field_path into InterpolatedString objects if they are strings
if self.field_name is not None:
self.field_name = InterpolatedString.create(self.field_name, parameters=parameters)
elif self.field_path is not None:
self.field_path = [
InterpolatedString.create(segment, parameters=parameters)
for segment in self.field_path
]

@property
def _is_field_path(self) -> bool:
"""Returns whether this option is a field path (ie, a nested field)"""
return self.field_path is not None

def inject_into_request(
self,
target: MutableMapping[str, Any],
value: Any,
config: Config,
) -> None:
"""
Inject a request option value into a target request structure using either field_name or field_path.
For non-body-json injection, only top-level field names are supported.
For body-json injection, both field names and nested field paths are supported.

Args:
target: The request structure to inject the value into
value: The value to inject
config: The config object to use for interpolation
"""
if self._is_field_path:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simplify the logic by having in post_init:

        if self.field_name is not None:
            self.field_path = [InterpolatedString.create(self.field_name, parameters=parameters)]

This way, we would only have one logic to maintain and it would be the field_path one.

Copy link
Collaborator Author

@ChristoGrab ChristoGrab Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit skeptical that this could potentially lead to conflicts in the Builder if we started mixing the attributes, but I think I see what you're saying. If we just modify the internal logic here in the CDK to allow field_name, but only use it to update and use field_path instead, we simplify the backend handling while still allowing the Builder to define them as entirely separate fields, correct?

My only hangup is wondering if there isn't still some scenario where by allowing field_path to essentially override field_name, we could accidentally send those updated values back to the frontend? And it would be picked up as a manifest change? I don't yet know enough about the full lifecycle of the back and forth between the CDK and Builder components to know if that's a valid concern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit skeptical that this could potentially lead to conflicts in the Builder if we started mixing the attributes

If this is the case, we haven't made a good job at designing our interfaces and we will lack the benefits of easy maintenance. In an ideal world, we should be able to replace the whole CDK implementation by something else and the Connector Builder should continue to work.

we simplify the backend handling

Yes! Instead of knowing that there are two cases, we all rely on the cases where we handle a list and that's it!

we could accidentally send those updated values back to the frontend?

My understand is that the only case we send back manifest information to the frontend is when we do resolve_manifest (or something like that) and it feels like this flow only work with the manifest directly and not with Python implementations so I would expect that we would be fine.

if self.inject_into != RequestOptionType.body_json:
raise ValueError(
"Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types."
)

assert self.field_path is not None # for type checker
current = target
# Convert path segments into strings, evaluating any interpolated segments
# Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"]
*path_parts, final_key = [
str(
segment.eval(config=config)
if isinstance(segment, InterpolatedString)
else segment
)
for segment in self.field_path
]

# Build a nested dictionary structure and set the final value at the deepest level
for part in path_parts:
current = current.setdefault(part, {})
current[final_key] = value
else:
# For non-nested fields, evaluate the field name if it's an interpolated string
key = (
self.field_name.eval(config=config)
if isinstance(self.field_name, InterpolatedString)
else self.field_name
)
target[str(key)] = value
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ def _get_request_options(
options: MutableMapping[str, Any] = {}
if not stream_slice:
return options

if self.start_time_option and self.start_time_option.inject_into == option_type:
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string
self._partition_field_start.eval(self.config)
)
start_time_value = stream_slice.get(self._partition_field_start.eval(self.config))
self.start_time_option.inject_into_request(options, start_time_value, self.config)

if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore [union-attr]
self._partition_field_end.eval(self.config)
)
end_time_value = stream_slice.get(self._partition_field_end.eval(self.config))
self.end_time_option.inject_into_request(options, end_time_value, self.config)

return options
Loading