Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(taps): add argument for nan handling strategy to _flatten_record #2222

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class FlatteningOptions(t.NamedTuple):
max_level: int
flattening_enabled: bool = True
separator: str = DEFAULT_FLATTENING_SEPARATOR
nan_strategy: t.Literal["fail", "allow", "convert_null"] = "fail"


def get_flattening_options(
Expand Down Expand Up @@ -81,13 +82,15 @@ def flatten_schema(
schema: dict,
max_level: int,
separator: str = "__",
nan_strategy: t.Literal["fail", "allow", "convert_null"] = "fail",
) -> dict:
"""Flatten the provided schema up to a depth of max_level.

Args:
schema: The schema definition to flatten.
separator: The string to use when concatenating key names.
max_level: The max recursion level (zero-based, exclusive).
nan_strategy: Strategy for handling NaNs in json.

Returns:
A flattened version of the provided schema definition.
Expand Down Expand Up @@ -274,6 +277,7 @@ def flatten_schema(
schema_node=new_schema,
max_level=max_level,
separator=separator,
nan_strategy=nan_strategy,
)
return new_schema

Expand All @@ -284,6 +288,7 @@ def _flatten_schema( # noqa: C901, PLR0912
separator: str = "__",
level: int = 0,
max_level: int = 0,
nan_strategy: t.Literal["fail", "allow", "convert_null"] = "fail",
) -> dict:
"""Flatten the provided schema node, recursively up to depth of `max_level`.

Expand All @@ -293,6 +298,7 @@ def _flatten_schema( # noqa: C901, PLR0912
separator: The string to use when concatenating key names.
level: The current recursion level (zero-based).
max_level: The max recursion level (zero-based, exclusive).
nan_strategy: Strategy for handling NaNs in json.

Returns:
A flattened version of the provided node.
Expand All @@ -319,6 +325,7 @@ def _flatten_schema( # noqa: C901, PLR0912
separator=separator,
level=level + 1,
max_level=max_level,
nan_strategy=nan_strategy,
).items(),
)
elif (
Expand Down Expand Up @@ -364,6 +371,7 @@ def flatten_record(
flattened_schema: dict,
max_level: int,
separator: str = "__",
nan_strategy: t.Literal["fail", "allow", "convert_null"] = "fail",
) -> dict:
"""Flatten a record up to max_level.

Expand All @@ -372,6 +380,7 @@ def flatten_record(
flattened_schema: The already flattened schema.
separator: The string used to separate concatenated key names. Defaults to "__".
max_level: The maximum depth of keys to flatten recursively.
nan_strategy: Strategy for handling NaNs in json.

Returns:
A flattened version of the record.
Expand All @@ -381,6 +390,7 @@ def flatten_record(
flattened_schema=flattened_schema,
separator=separator,
max_level=max_level,
nan_strategy=nan_strategy,
)


Expand All @@ -392,6 +402,7 @@ def _flatten_record(
separator: str = "__",
level: int = 0,
max_level: int = 0,
nan_strategy: t.Literal["fail", "allow", "convert_null"] = "fail",
) -> dict:
"""This recursive function flattens the record node.

Expand All @@ -405,6 +416,7 @@ def _flatten_record(
separator: The string to use when concatenating key names.
level: The current recursion level (zero-based).
max_level: The max recursion level (zero-based, exclusive).
nan_strategy: Strategy for handling NaNs in json.

Returns:
A flattened version of the provided node.
Expand All @@ -431,13 +443,27 @@ def _flatten_record(
separator=separator,
level=level + 1,
max_level=max_level,
nan_strategy=nan_strategy,
).items(),
)
else:
nan_strategies = {
"fail": False,
"allow": True,
"convert_null": False,
}

ignore_nan = nan_strategy == "convert_null"
items.append(
(
new_key,
json.dumps(v, use_decimal=True, default=str)
json.dumps(
v,
use_decimal=True,
default=str,
allow_nan=nan_strategies[nan_strategy],
ignore_nan=ignore_nan,
)
if _should_jsondump_value(k, v, flattened_schema)
else v,
),
Expand Down
1 change: 1 addition & 0 deletions singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def flatten_record(self, record: dict) -> dict:
flattened_schema=self.transformed_schema,
max_level=self.flattening_options.max_level,
separator=self.flattening_options.separator,
nan_strategy=self.flattening_options.nan_strategy,
)

def flatten_schema(self, raw_schema: dict) -> dict:
Expand Down