Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,51 @@ def can_write() -> bool:
def can_read() -> bool:
return True

@staticmethod
def _sanitize_for_bigquery(data: Any) -> Any:
"""BigQuery-specific float sanitization for PARSE_JSON compatibility.

BigQuery's PARSE_JSON() requires floats that can "round-trip" through
string representation. This method limits total significant figures to 15
(IEEE 754 double precision safe zone) to ensure clean binary representation.

Args:
data: The data structure to sanitize (dict, list, or primitive)

Returns:
Sanitized data compatible with BigQuery's PARSE_JSON

Example:
>>> BigQuery._sanitize_for_bigquery({"time": 1760509016.282637})
{'time': 1760509016.28264} # Limited to 15 significant figures

>>> BigQuery._sanitize_for_bigquery({"cost": 0.001228})
{'cost': 0.001228} # Unchanged (only 4 significant figures)
"""
import math

if isinstance(data, float):
# Handle special values that BigQuery can't store in JSON
if math.isnan(data) or math.isinf(data):
return None
if data == 0:
return 0.0

# Limit total significant figures to 15 for IEEE 754 compatibility
# BigQuery PARSE_JSON requires values that round-trip cleanly
# For large numbers (like Unix timestamps), this reduces decimal precision
# For small numbers (like costs), full precision is preserved
magnitude = math.floor(math.log10(abs(data))) + 1
safe_decimals = max(0, 15 - magnitude)
return float(f"{data:.{safe_decimals}f}")

elif isinstance(data, dict):
return {k: BigQuery._sanitize_for_bigquery(v) for k, v in data.items()}
elif isinstance(data, list):
return [BigQuery._sanitize_for_bigquery(item) for item in data]
else:
return data

def get_engine(self) -> Any:
return self.bigquery.Client.from_service_account_info( # type: ignore
info=self.json_credentials
Expand Down Expand Up @@ -202,7 +247,13 @@ def execute_query(

if isinstance(value, (dict, list)) and column_type == "JSON":
# For JSON objects in JSON columns, convert to string and use PARSE_JSON
json_str = json.dumps(value) if value else None
# Sanitize floats before serialization to ensure clean JSON for PARSE_JSON
sanitized_value = BigQuery._sanitize_for_bigquery(value)
json_str = (
json.dumps(sanitized_value)
if sanitized_value is not None
else None
)
if json_str:
# Replace @`key` with PARSE_JSON(@`key`) in the SQL query
modified_sql = modified_sql.replace(
Expand All @@ -213,7 +264,13 @@ def execute_query(
)
elif isinstance(value, (dict, list)):
# For dict/list values in STRING columns, serialize to JSON string
json_str = json.dumps(value) if value else None
# Sanitize floats before serialization to ensure clean JSON
sanitized_value = BigQuery._sanitize_for_bigquery(value)
json_str = (
json.dumps(sanitized_value)
if sanitized_value is not None
else None
)
query_parameters.append(
self.bigquery.ScalarQueryParameter(key, "STRING", json_str)
)
Expand Down Expand Up @@ -314,7 +371,10 @@ def get_sql_values_for_query(
# Try to parse JSON strings back to objects for BigQuery
try:
parsed_value = json.loads(value)
sql_values[column] = parsed_value
# Sanitize floats after parsing to prevent precision issues
# json.loads() creates new float objects that may have binary precision problems
sanitized_value = BigQuery._sanitize_for_bigquery(parsed_value)
sql_values[column] = sanitized_value
except (TypeError, ValueError, json.JSONDecodeError):
# Not a JSON string, keep as string
sql_values[column] = f"{value}"
Expand Down
51 changes: 51 additions & 0 deletions unstract/connectors/src/unstract/connectors/databases/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Database Utilities

Common utilities for database connectors to ensure consistent data handling
across all database types (BigQuery, PostgreSQL, MySQL, Snowflake, etc.).
"""

import math
from typing import Any


def sanitize_floats_for_database(data: Any) -> Any:
"""Sanitize special float values (NaN, Inf) for database compatibility.

This minimal sanitization applies to all databases. It only handles
special float values that no database can store in JSON:
- NaN (Not a Number) → None
- Infinity → None
- -Infinity → None

Database-specific precision handling (like BigQuery's round-trip requirements)
should be implemented in the respective database connector.

Args:
data: The data structure to sanitize (dict, list, or primitive)

Returns:
Sanitized data with NaN/Inf converted to None

Example:
>>> sanitize_floats_for_database({"value": float("nan")})
{'value': None}

>>> sanitize_floats_for_database({"value": float("inf")})
{'value': None}

>>> sanitize_floats_for_database({"price": 1760509016.282637})
{'price': 1760509016.282637} # Unchanged - precision preserved
"""
if isinstance(data, float):
# Only handle special values that no database supports
if math.isnan(data) or math.isinf(data):
return None
# Return unchanged - let database connector handle precision if needed
return data
elif isinstance(data, dict):
return {k: sanitize_floats_for_database(v) for k, v in data.items()}
elif isinstance(data, list):
return [sanitize_floats_for_database(item) for item in data]
else:
# Return other types unchanged (int, str, bool, None, etc.)
return data
Loading