Skip to content

Commit 464fbb3

Browse files
UN-2882 [FIX] Fix BigQuery float precision issue in PARSE_JSON for metadata serialization (#1593)
* UN-2882 [FIX] Fix BigQuery float precision issue in PARSE_JSON for metadata serialization - Added BigQuery-specific float sanitization with IEEE 754 double precision safe zone - Consolidated duplicate float sanitization logic into shared utilities - Fixed insertion errors caused by floats with >15 significant figures 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Fix truthiness check for empty values in JSON serialization - Changed 'if sanitized_value' to 'if sanitized_value is not None' - Prevents empty dicts {}, empty lists [], and zero values from becoming None - Addresses CodeRabbit AI feedback on PR #1593 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> --------- Co-authored-by: Claude <[email protected]>
1 parent 7ca14cf commit 464fbb3

File tree

3 files changed

+119
-50
lines changed

3 files changed

+119
-50
lines changed

unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,51 @@ def can_write() -> bool:
6363
def can_read() -> bool:
6464
return True
6565

66+
@staticmethod
67+
def _sanitize_for_bigquery(data: Any) -> Any:
68+
"""BigQuery-specific float sanitization for PARSE_JSON compatibility.
69+
70+
BigQuery's PARSE_JSON() requires floats that can "round-trip" through
71+
string representation. This method limits total significant figures to 15
72+
(IEEE 754 double precision safe zone) to ensure clean binary representation.
73+
74+
Args:
75+
data: The data structure to sanitize (dict, list, or primitive)
76+
77+
Returns:
78+
Sanitized data compatible with BigQuery's PARSE_JSON
79+
80+
Example:
81+
>>> BigQuery._sanitize_for_bigquery({"time": 1760509016.282637})
82+
{'time': 1760509016.28264} # Limited to 15 significant figures
83+
84+
>>> BigQuery._sanitize_for_bigquery({"cost": 0.001228})
85+
{'cost': 0.001228} # Unchanged (only 4 significant figures)
86+
"""
87+
import math
88+
89+
if isinstance(data, float):
90+
# Handle special values that BigQuery can't store in JSON
91+
if math.isnan(data) or math.isinf(data):
92+
return None
93+
if data == 0:
94+
return 0.0
95+
96+
# Limit total significant figures to 15 for IEEE 754 compatibility
97+
# BigQuery PARSE_JSON requires values that round-trip cleanly
98+
# For large numbers (like Unix timestamps), this reduces decimal precision
99+
# For small numbers (like costs), full precision is preserved
100+
magnitude = math.floor(math.log10(abs(data))) + 1
101+
safe_decimals = max(0, 15 - magnitude)
102+
return float(f"{data:.{safe_decimals}f}")
103+
104+
elif isinstance(data, dict):
105+
return {k: BigQuery._sanitize_for_bigquery(v) for k, v in data.items()}
106+
elif isinstance(data, list):
107+
return [BigQuery._sanitize_for_bigquery(item) for item in data]
108+
else:
109+
return data
110+
66111
def get_engine(self) -> Any:
67112
return self.bigquery.Client.from_service_account_info( # type: ignore
68113
info=self.json_credentials
@@ -202,7 +247,13 @@ def execute_query(
202247

203248
if isinstance(value, (dict, list)) and column_type == "JSON":
204249
# For JSON objects in JSON columns, convert to string and use PARSE_JSON
205-
json_str = json.dumps(value) if value else None
250+
# Sanitize floats before serialization to ensure clean JSON for PARSE_JSON
251+
sanitized_value = BigQuery._sanitize_for_bigquery(value)
252+
json_str = (
253+
json.dumps(sanitized_value)
254+
if sanitized_value is not None
255+
else None
256+
)
206257
if json_str:
207258
# Replace @`key` with PARSE_JSON(@`key`) in the SQL query
208259
modified_sql = modified_sql.replace(
@@ -213,7 +264,13 @@ def execute_query(
213264
)
214265
elif isinstance(value, (dict, list)):
215266
# For dict/list values in STRING columns, serialize to JSON string
216-
json_str = json.dumps(value) if value else None
267+
# Sanitize floats before serialization to ensure clean JSON
268+
sanitized_value = BigQuery._sanitize_for_bigquery(value)
269+
json_str = (
270+
json.dumps(sanitized_value)
271+
if sanitized_value is not None
272+
else None
273+
)
217274
query_parameters.append(
218275
self.bigquery.ScalarQueryParameter(key, "STRING", json_str)
219276
)
@@ -314,7 +371,10 @@ def get_sql_values_for_query(
314371
# Try to parse JSON strings back to objects for BigQuery
315372
try:
316373
parsed_value = json.loads(value)
317-
sql_values[column] = parsed_value
374+
# Sanitize floats after parsing to prevent precision issues
375+
# json.loads() creates new float objects that may have binary precision problems
376+
sanitized_value = BigQuery._sanitize_for_bigquery(parsed_value)
377+
sql_values[column] = sanitized_value
318378
except (TypeError, ValueError, json.JSONDecodeError):
319379
# Not a JSON string, keep as string
320380
sql_values[column] = f"{value}"
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Database Utilities
2+
3+
Common utilities for database connectors to ensure consistent data handling
4+
across all database types (BigQuery, PostgreSQL, MySQL, Snowflake, etc.).
5+
"""
6+
7+
import math
8+
from typing import Any
9+
10+
11+
def sanitize_floats_for_database(data: Any) -> Any:
12+
"""Sanitize special float values (NaN, Inf) for database compatibility.
13+
14+
This minimal sanitization applies to all databases. It only handles
15+
special float values that no database can store in JSON:
16+
- NaN (Not a Number) → None
17+
- Infinity → None
18+
- -Infinity → None
19+
20+
Database-specific precision handling (like BigQuery's round-trip requirements)
21+
should be implemented in the respective database connector.
22+
23+
Args:
24+
data: The data structure to sanitize (dict, list, or primitive)
25+
26+
Returns:
27+
Sanitized data with NaN/Inf converted to None
28+
29+
Example:
30+
>>> sanitize_floats_for_database({"value": float("nan")})
31+
{'value': None}
32+
33+
>>> sanitize_floats_for_database({"value": float("inf")})
34+
{'value': None}
35+
36+
>>> sanitize_floats_for_database({"price": 1760509016.282637})
37+
{'price': 1760509016.282637} # Unchanged - precision preserved
38+
"""
39+
if isinstance(data, float):
40+
# Only handle special values that no database supports
41+
if math.isnan(data) or math.isinf(data):
42+
return None
43+
# Return unchanged - let database connector handle precision if needed
44+
return data
45+
elif isinstance(data, dict):
46+
return {k: sanitize_floats_for_database(v) for k, v in data.items()}
47+
elif isinstance(data, list):
48+
return [sanitize_floats_for_database(item) for item in data]
49+
else:
50+
# Return other types unchanged (int, str, bool, None, etc.)
51+
return data

workers/shared/infrastructure/database/utils.py

Lines changed: 5 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import datetime
88
import json
9-
import math
109
from typing import Any
1110

1211
from shared.enums.status_enums import FileProcessingStatus
@@ -15,6 +14,7 @@
1514
from unstract.connectors.databases import connectors as db_connectors
1615
from unstract.connectors.databases.exceptions import UnstractDBConnectorException
1716
from unstract.connectors.databases.unstract_db import UnstractDB
17+
from unstract.connectors.databases.utils import sanitize_floats_for_database
1818
from unstract.connectors.exceptions import ConnectorError
1919

2020
from ..logging import WorkerLogger
@@ -78,46 +78,6 @@ def __init__(self, detail: str):
7878
class WorkerDatabaseUtils:
7979
"""Worker-compatible database utilities following production patterns."""
8080

81-
@staticmethod
82-
def _sanitize_floats_for_database(data: Any, precision: int = 6) -> Any:
83-
"""Recursively sanitize float values for database JSON compatibility.
84-
85-
BigQuery's PARSE_JSON() requires floats that can "round-trip" through
86-
string representation. This function normalizes floats to ensure they
87-
serialize cleanly for all database types (BigQuery, PostgreSQL, MySQL, etc.).
88-
89-
Args:
90-
data: The data structure to sanitize (dict, list, or primitive)
91-
precision: Number of decimal places to preserve (default: 6)
92-
93-
Returns:
94-
Sanitized data with normalized float values
95-
96-
Example:
97-
>>> _sanitize_floats_for_database({"time": 22.770092, "count": 5})
98-
{'time': 22.770092, 'count': 5}
99-
"""
100-
if isinstance(data, float):
101-
# Handle special float values that databases don't support in JSON
102-
if math.isnan(data) or math.isinf(data):
103-
return None
104-
# Normalize float representation using string formatting
105-
# This ensures clean binary representation that BigQuery accepts
106-
return float(f"{data:.{precision}f}")
107-
elif isinstance(data, dict):
108-
return {
109-
k: WorkerDatabaseUtils._sanitize_floats_for_database(v, precision)
110-
for k, v in data.items()
111-
}
112-
elif isinstance(data, list):
113-
return [
114-
WorkerDatabaseUtils._sanitize_floats_for_database(item, precision)
115-
for item in data
116-
]
117-
else:
118-
# Return other types unchanged (int, str, bool, None, etc.)
119-
return data
120-
12181
@staticmethod
12282
def get_sql_values_for_query(
12383
conn_cls: Any,
@@ -334,9 +294,7 @@ def _add_processing_columns(
334294
if metadata and has_metadata_col:
335295
try:
336296
# Sanitize floats for database JSON compatibility (BigQuery, PostgreSQL, etc.)
337-
sanitized_metadata = WorkerDatabaseUtils._sanitize_floats_for_database(
338-
metadata
339-
)
297+
sanitized_metadata = sanitize_floats_for_database(metadata)
340298
values[TableColumns.METADATA] = json.dumps(sanitized_metadata)
341299
except (TypeError, ValueError) as e:
342300
logger.error(f"Failed to serialize metadata to JSON: {e}")
@@ -404,7 +362,7 @@ def _process_single_column_mode(
404362
values[v2_col_name] = wrapped_dict
405363
else:
406364
# Sanitize floats for database JSON compatibility
407-
sanitized_data = WorkerDatabaseUtils._sanitize_floats_for_database(data)
365+
sanitized_data = sanitize_floats_for_database(data)
408366
values[single_column_name] = sanitized_data
409367
if has_v2_col:
410368
values[v2_col_name] = sanitized_data
@@ -416,14 +374,14 @@ def _process_split_column_mode(
416374
"""Process data for split column mode."""
417375
if isinstance(data, dict):
418376
# Sanitize floats for database JSON compatibility
419-
sanitized_data = WorkerDatabaseUtils._sanitize_floats_for_database(data)
377+
sanitized_data = sanitize_floats_for_database(data)
420378
values.update(sanitized_data)
421379
elif isinstance(data, str):
422380
values[single_column_name] = data
423381
else:
424382
try:
425383
# Sanitize floats for database JSON compatibility before serialization
426-
sanitized_data = WorkerDatabaseUtils._sanitize_floats_for_database(data)
384+
sanitized_data = sanitize_floats_for_database(data)
427385
values[single_column_name] = json.dumps(sanitized_data)
428386
except (TypeError, ValueError) as e:
429387
logger.error(

0 commit comments

Comments
 (0)