From 76e236594abe68e391c295e19c62efcf1289e1e9 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 15 Oct 2025 19:37:34 +0530 Subject: [PATCH 1/4] UN-2882 [FIX] Fix BigQuery float precision issue in PARSE_JSON for metadata serialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added BigQuery-specific float sanitization with IEEE 754 double precision safe zone - Consolidated duplicate float sanitization logic into shared utilities - Fixed insertion errors caused by floats with >15 significant figures 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../connectors/databases/bigquery/bigquery.py | 62 ++++++++++++++++++- .../unstract/connectors/databases/utils.py | 51 +++++++++++++++ .../shared/infrastructure/database/utils.py | 52 ++-------------- 3 files changed, 115 insertions(+), 50 deletions(-) create mode 100644 unstract/connectors/src/unstract/connectors/databases/utils.py diff --git a/unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py b/unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py index fef13666d2..6282129769 100644 --- a/unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py +++ b/unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py @@ -63,6 +63,51 @@ def can_write() -> bool: def can_read() -> bool: return True + @staticmethod + def _sanitize_for_bigquery(data: Any) -> Any: + """BigQuery-specific float sanitization for PARSE_JSON compatibility. + + BigQuery's PARSE_JSON() requires floats that can "round-trip" through + string representation. This method limits total significant figures to 15 + (IEEE 754 double precision safe zone) to ensure clean binary representation. + + Args: + data: The data structure to sanitize (dict, list, or primitive) + + Returns: + Sanitized data compatible with BigQuery's PARSE_JSON + + Example: + >>> BigQuery._sanitize_for_bigquery({"time": 1760509016.282637}) + {'time': 1760509016.28264} # Limited to 15 significant figures + + >>> BigQuery._sanitize_for_bigquery({"cost": 0.001228}) + {'cost': 0.001228} # Unchanged (only 4 significant figures) + """ + import math + + if isinstance(data, float): + # Handle special values that BigQuery can't store in JSON + if math.isnan(data) or math.isinf(data): + return None + if data == 0: + return 0.0 + + # Limit total significant figures to 15 for IEEE 754 compatibility + # BigQuery PARSE_JSON requires values that round-trip cleanly + # For large numbers (like Unix timestamps), this reduces decimal precision + # For small numbers (like costs), full precision is preserved + magnitude = math.floor(math.log10(abs(data))) + 1 + safe_decimals = max(0, 15 - magnitude) + return float(f"{data:.{safe_decimals}f}") + + elif isinstance(data, dict): + return {k: BigQuery._sanitize_for_bigquery(v) for k, v in data.items()} + elif isinstance(data, list): + return [BigQuery._sanitize_for_bigquery(item) for item in data] + else: + return data + def get_engine(self) -> Any: return self.bigquery.Client.from_service_account_info( # type: ignore info=self.json_credentials @@ -202,7 +247,11 @@ def execute_query( if isinstance(value, (dict, list)) and column_type == "JSON": # For JSON objects in JSON columns, convert to string and use PARSE_JSON - json_str = json.dumps(value) if value else None + # Sanitize floats before serialization to ensure clean JSON for PARSE_JSON + sanitized_value = BigQuery._sanitize_for_bigquery(value) + json_str = ( + json.dumps(sanitized_value) if sanitized_value else None + ) if json_str: # Replace @`key` with PARSE_JSON(@`key`) in the SQL query modified_sql = modified_sql.replace( @@ -213,7 +262,11 @@ def execute_query( ) elif isinstance(value, (dict, list)): # For dict/list values in STRING columns, serialize to JSON string - json_str = json.dumps(value) if value else None + # Sanitize floats before serialization to ensure clean JSON + sanitized_value = BigQuery._sanitize_for_bigquery(value) + json_str = ( + json.dumps(sanitized_value) if sanitized_value else None + ) query_parameters.append( self.bigquery.ScalarQueryParameter(key, "STRING", json_str) ) @@ -314,7 +367,10 @@ def get_sql_values_for_query( # Try to parse JSON strings back to objects for BigQuery try: parsed_value = json.loads(value) - sql_values[column] = parsed_value + # Sanitize floats after parsing to prevent precision issues + # json.loads() creates new float objects that may have binary precision problems + sanitized_value = BigQuery._sanitize_for_bigquery(parsed_value) + sql_values[column] = sanitized_value except (TypeError, ValueError, json.JSONDecodeError): # Not a JSON string, keep as string sql_values[column] = f"{value}" diff --git a/unstract/connectors/src/unstract/connectors/databases/utils.py b/unstract/connectors/src/unstract/connectors/databases/utils.py new file mode 100644 index 0000000000..f06c542da5 --- /dev/null +++ b/unstract/connectors/src/unstract/connectors/databases/utils.py @@ -0,0 +1,51 @@ +"""Database Utilities + +Common utilities for database connectors to ensure consistent data handling +across all database types (BigQuery, PostgreSQL, MySQL, Snowflake, etc.). +""" + +import math +from typing import Any + + +def sanitize_floats_for_database(data: Any) -> Any: + """Sanitize special float values (NaN, Inf) for database compatibility. + + This minimal sanitization applies to all databases. It only handles + special float values that no database can store in JSON: + - NaN (Not a Number) → None + - Infinity → None + - -Infinity → None + + Database-specific precision handling (like BigQuery's round-trip requirements) + should be implemented in the respective database connector. + + Args: + data: The data structure to sanitize (dict, list, or primitive) + + Returns: + Sanitized data with NaN/Inf converted to None + + Example: + >>> sanitize_floats_for_database({"value": float("nan")}) + {'value': None} + + >>> sanitize_floats_for_database({"value": float("inf")}) + {'value': None} + + >>> sanitize_floats_for_database({"price": 1760509016.282637}) + {'price': 1760509016.282637} # Unchanged - precision preserved + """ + if isinstance(data, float): + # Only handle special values that no database supports + if math.isnan(data) or math.isinf(data): + return None + # Return unchanged - let database connector handle precision if needed + return data + elif isinstance(data, dict): + return {k: sanitize_floats_for_database(v) for k, v in data.items()} + elif isinstance(data, list): + return [sanitize_floats_for_database(item) for item in data] + else: + # Return other types unchanged (int, str, bool, None, etc.) + return data diff --git a/workers/shared/infrastructure/database/utils.py b/workers/shared/infrastructure/database/utils.py index df9a66bf58..e3d78f93be 100644 --- a/workers/shared/infrastructure/database/utils.py +++ b/workers/shared/infrastructure/database/utils.py @@ -6,7 +6,6 @@ import datetime import json -import math from typing import Any from shared.enums.status_enums import FileProcessingStatus @@ -15,6 +14,7 @@ from unstract.connectors.databases import connectors as db_connectors from unstract.connectors.databases.exceptions import UnstractDBConnectorException from unstract.connectors.databases.unstract_db import UnstractDB +from unstract.connectors.databases.utils import sanitize_floats_for_database from unstract.connectors.exceptions import ConnectorError from ..logging import WorkerLogger @@ -78,46 +78,6 @@ def __init__(self, detail: str): class WorkerDatabaseUtils: """Worker-compatible database utilities following production patterns.""" - @staticmethod - def _sanitize_floats_for_database(data: Any, precision: int = 6) -> Any: - """Recursively sanitize float values for database JSON compatibility. - - BigQuery's PARSE_JSON() requires floats that can "round-trip" through - string representation. This function normalizes floats to ensure they - serialize cleanly for all database types (BigQuery, PostgreSQL, MySQL, etc.). - - Args: - data: The data structure to sanitize (dict, list, or primitive) - precision: Number of decimal places to preserve (default: 6) - - Returns: - Sanitized data with normalized float values - - Example: - >>> _sanitize_floats_for_database({"time": 22.770092, "count": 5}) - {'time': 22.770092, 'count': 5} - """ - if isinstance(data, float): - # Handle special float values that databases don't support in JSON - if math.isnan(data) or math.isinf(data): - return None - # Normalize float representation using string formatting - # This ensures clean binary representation that BigQuery accepts - return float(f"{data:.{precision}f}") - elif isinstance(data, dict): - return { - k: WorkerDatabaseUtils._sanitize_floats_for_database(v, precision) - for k, v in data.items() - } - elif isinstance(data, list): - return [ - WorkerDatabaseUtils._sanitize_floats_for_database(item, precision) - for item in data - ] - else: - # Return other types unchanged (int, str, bool, None, etc.) - return data - @staticmethod def get_sql_values_for_query( conn_cls: Any, @@ -334,9 +294,7 @@ def _add_processing_columns( if metadata and has_metadata_col: try: # Sanitize floats for database JSON compatibility (BigQuery, PostgreSQL, etc.) - sanitized_metadata = WorkerDatabaseUtils._sanitize_floats_for_database( - metadata - ) + sanitized_metadata = sanitize_floats_for_database(metadata) values[TableColumns.METADATA] = json.dumps(sanitized_metadata) except (TypeError, ValueError) as e: logger.error(f"Failed to serialize metadata to JSON: {e}") @@ -404,7 +362,7 @@ def _process_single_column_mode( values[v2_col_name] = wrapped_dict else: # Sanitize floats for database JSON compatibility - sanitized_data = WorkerDatabaseUtils._sanitize_floats_for_database(data) + sanitized_data = sanitize_floats_for_database(data) values[single_column_name] = sanitized_data if has_v2_col: values[v2_col_name] = sanitized_data @@ -416,14 +374,14 @@ def _process_split_column_mode( """Process data for split column mode.""" if isinstance(data, dict): # Sanitize floats for database JSON compatibility - sanitized_data = WorkerDatabaseUtils._sanitize_floats_for_database(data) + sanitized_data = sanitize_floats_for_database(data) values.update(sanitized_data) elif isinstance(data, str): values[single_column_name] = data else: try: # Sanitize floats for database JSON compatibility before serialization - sanitized_data = WorkerDatabaseUtils._sanitize_floats_for_database(data) + sanitized_data = sanitize_floats_for_database(data) values[single_column_name] = json.dumps(sanitized_data) except (TypeError, ValueError) as e: logger.error( From 4b1dceb10e87e8a3aee18f34ed53b3b628dcb835 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 15 Oct 2025 19:56:40 +0530 Subject: [PATCH 2/4] Fix truthiness check for empty values in JSON serialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Changed 'if sanitized_value' to 'if sanitized_value is not None' - Prevents empty dicts {}, empty lists [], and zero values from becoming None - Addresses CodeRabbit AI feedback on PR #1593 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../unstract/connectors/databases/bigquery/bigquery.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py b/unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py index 6282129769..983c365543 100644 --- a/unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py +++ b/unstract/connectors/src/unstract/connectors/databases/bigquery/bigquery.py @@ -250,7 +250,9 @@ def execute_query( # Sanitize floats before serialization to ensure clean JSON for PARSE_JSON sanitized_value = BigQuery._sanitize_for_bigquery(value) json_str = ( - json.dumps(sanitized_value) if sanitized_value else None + json.dumps(sanitized_value) + if sanitized_value is not None + else None ) if json_str: # Replace @`key` with PARSE_JSON(@`key`) in the SQL query @@ -265,7 +267,9 @@ def execute_query( # Sanitize floats before serialization to ensure clean JSON sanitized_value = BigQuery._sanitize_for_bigquery(value) json_str = ( - json.dumps(sanitized_value) if sanitized_value else None + json.dumps(sanitized_value) + if sanitized_value is not None + else None ) query_parameters.append( self.bigquery.ScalarQueryParameter(key, "STRING", json_str) From 63213f646928930aa0fb5cfc5bcc4610b880077d Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Wed, 15 Oct 2025 17:24:14 +0000 Subject: [PATCH 3/4] CodeRabbit Generated Unit Tests: Add tests for float sanitization and BigQuery precision limits --- .../tests/databases/test_bigquery_db.py | 336 +++++++++++++++++ .../connectors/tests/databases/test_utils.py | 210 +++++++++++ workers/shared/tests/__init__.py | 1 + .../shared/tests/infrastructure/__init__.py | 1 + .../tests/infrastructure/database/__init__.py | 1 + .../infrastructure/database/test_utils.py | 344 ++++++++++++++++++ 6 files changed, 893 insertions(+) create mode 100644 unstract/connectors/tests/databases/test_utils.py create mode 100644 workers/shared/tests/__init__.py create mode 100644 workers/shared/tests/infrastructure/__init__.py create mode 100644 workers/shared/tests/infrastructure/database/__init__.py create mode 100644 workers/shared/tests/infrastructure/database/test_utils.py diff --git a/unstract/connectors/tests/databases/test_bigquery_db.py b/unstract/connectors/tests/databases/test_bigquery_db.py index 5673919beb..81e03c7620 100644 --- a/unstract/connectors/tests/databases/test_bigquery_db.py +++ b/unstract/connectors/tests/databases/test_bigquery_db.py @@ -32,5 +32,341 @@ def test_json_credentials(self): self.assertTrue(len(results) > 0) # add assertion here + +class TestBigQuerySanitization(unittest.TestCase): + """Comprehensive tests for BigQuery float sanitization.""" + + def test_sanitize_nan_returns_none(self): + """Test that NaN values are converted to None.""" + result = BigQuery._sanitize_for_bigquery(float('nan')) + self.assertIsNone(result) + + def test_sanitize_infinity_returns_none(self): + """Test that positive infinity is converted to None.""" + result = BigQuery._sanitize_for_bigquery(float('inf')) + self.assertIsNone(result) + + def test_sanitize_negative_infinity_returns_none(self): + """Test that negative infinity is converted to None.""" + result = BigQuery._sanitize_for_bigquery(float('-inf')) + self.assertIsNone(result) + + def test_sanitize_zero_returns_zero(self): + """Test that zero is preserved.""" + result = BigQuery._sanitize_for_bigquery(0.0) + self.assertEqual(result, 0.0) + + def test_sanitize_negative_zero_returns_zero(self): + """Test that negative zero is handled correctly.""" + result = BigQuery._sanitize_for_bigquery(-0.0) + self.assertEqual(result, 0.0) + + def test_sanitize_large_unix_timestamp(self): + """Test that Unix timestamps are limited to 15 significant figures.""" + # Unix timestamp with high precision + timestamp = 1760509016.282637 + result = BigQuery._sanitize_for_bigquery(timestamp) + + # Should limit to 15 total significant figures + # 1760509016 has 10 digits, so 5 decimal places remain + self.assertAlmostEqual(result, 1760509016.28264, places=5) + + # Verify it's different from original (precision reduced) + self.assertNotEqual(result, timestamp) + + def test_sanitize_small_decimal_preserves_precision(self): + """Test that small numbers retain full precision.""" + small_number = 0.001228 + result = BigQuery._sanitize_for_bigquery(small_number) + + # Small numbers should be unchanged (only 4 significant figures) + self.assertEqual(result, small_number) + + def test_sanitize_medium_float_limits_precision(self): + """Test that medium-sized floats are properly limited.""" + # Number with 16+ significant figures + value = 12345.67890123456789 + result = BigQuery._sanitize_for_bigquery(value) + + # Should limit to 15 significant figures total + # 12345 has 5 digits, so 10 decimal places remain + self.assertAlmostEqual(result, 12345.6789012346, places=10) + + def test_sanitize_very_large_number(self): + """Test that very large numbers are handled correctly.""" + large_value = 9.87654321098765e15 + result = BigQuery._sanitize_for_bigquery(large_value) + + # Should limit to 15 significant figures + self.assertIsInstance(result, float) + self.assertNotEqual(result, float('inf')) + self.assertNotEqual(result, float('nan')) + + def test_sanitize_very_small_number(self): + """Test that very small numbers preserve precision.""" + small_value = 1.23456789e-10 + result = BigQuery._sanitize_for_bigquery(small_value) + + # Should preserve precision for small numbers + self.assertAlmostEqual(result, small_value, places=15) + + def test_sanitize_negative_numbers(self): + """Test that negative numbers are handled correctly.""" + negative = -123.456789012345678 + result = BigQuery._sanitize_for_bigquery(negative) + + # Should limit precision but preserve sign + self.assertLess(result, 0) + self.assertAlmostEqual(result, -123.456789012346, places=12) + + def test_sanitize_dict_with_floats(self): + """Test sanitization of dictionaries containing floats.""" + data = { + "timestamp": 1760509016.282637, + "cost": 0.001228, + "nan_value": float('nan'), + "inf_value": float('inf'), + "normal": 42.0 + } + result = BigQuery._sanitize_for_bigquery(data) + + self.assertIsInstance(result, dict) + self.assertAlmostEqual(result["timestamp"], 1760509016.28264, places=5) + self.assertEqual(result["cost"], 0.001228) + self.assertIsNone(result["nan_value"]) + self.assertIsNone(result["inf_value"]) + self.assertEqual(result["normal"], 42.0) + + def test_sanitize_nested_dict(self): + """Test sanitization of nested dictionaries.""" + data = { + "outer": { + "inner": { + "value": 1234567890.123456789, + "special": float('nan') + } + } + } + result = BigQuery._sanitize_for_bigquery(data) + + self.assertIsInstance(result["outer"]["inner"]["value"], float) + self.assertIsNone(result["outer"]["inner"]["special"]) + + def test_sanitize_list_with_floats(self): + """Test sanitization of lists containing floats.""" + data = [1760509016.282637, 0.001228, float('nan'), float('inf'), 42.0] + result = BigQuery._sanitize_for_bigquery(data) + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 5) + self.assertAlmostEqual(result[0], 1760509016.28264, places=5) + self.assertEqual(result[1], 0.001228) + self.assertIsNone(result[2]) + self.assertIsNone(result[3]) + self.assertEqual(result[4], 42.0) + + def test_sanitize_nested_lists(self): + """Test sanitization of nested lists.""" + data = [[1.234567890123456789, float('nan')], [float('inf'), 0.001]] + result = BigQuery._sanitize_for_bigquery(data) + + self.assertIsInstance(result, list) + self.assertIsInstance(result[0], list) + self.assertIsNone(result[0][1]) + self.assertIsNone(result[1][0]) + + def test_sanitize_mixed_structure(self): + """Test sanitization of mixed dict/list structures.""" + data = { + "items": [ + {"value": 1760509016.282637, "name": "timestamp"}, + {"value": float('nan'), "name": "invalid"} + ], + "summary": { + "total": 99999.999999999999, + "count": 5 + } + } + result = BigQuery._sanitize_for_bigquery(data) + + self.assertIsInstance(result, dict) + self.assertIsInstance(result["items"], list) + self.assertAlmostEqual(result["items"][0]["value"], 1760509016.28264, places=5) + self.assertEqual(result["items"][0]["name"], "timestamp") + self.assertIsNone(result["items"][1]["value"]) + + def test_sanitize_string_passthrough(self): + """Test that strings are passed through unchanged.""" + data = "test string" + result = BigQuery._sanitize_for_bigquery(data) + self.assertEqual(result, data) + + def test_sanitize_int_passthrough(self): + """Test that integers are passed through unchanged.""" + data = 42 + result = BigQuery._sanitize_for_bigquery(data) + self.assertEqual(result, data) + + def test_sanitize_bool_passthrough(self): + """Test that booleans are passed through unchanged.""" + data = True + result = BigQuery._sanitize_for_bigquery(data) + self.assertEqual(result, data) + + def test_sanitize_none_passthrough(self): + """Test that None is passed through unchanged.""" + data = None + result = BigQuery._sanitize_for_bigquery(data) + self.assertIsNone(result) + + def test_sanitize_empty_dict(self): + """Test that empty dictionaries are handled correctly.""" + data = {} + result = BigQuery._sanitize_for_bigquery(data) + self.assertEqual(result, {}) + + def test_sanitize_empty_list(self): + """Test that empty lists are handled correctly.""" + data = [] + result = BigQuery._sanitize_for_bigquery(data) + self.assertEqual(result, []) + + def test_sanitize_complex_real_world_data(self): + """Test sanitization of complex real-world data structures.""" + data = { + "execution_metadata": { + "start_time": 1760509016.282637, + "end_time": 1760509045.891234, + "duration": 29.608597, + "status": "completed" + }, + "metrics": [ + {"name": "accuracy", "value": 0.9876543210123456}, + {"name": "loss", "value": 0.00123456789}, + {"name": "invalid", "value": float('nan')} + ], + "costs": { + "compute": 0.001228, + "storage": 0.000456, + "total": 0.001684 + } + } + result = BigQuery._sanitize_for_bigquery(data) + + # Verify structure is preserved + self.assertIn("execution_metadata", result) + self.assertIn("metrics", result) + self.assertIn("costs", result) + + # Verify timestamps are limited + self.assertAlmostEqual( + result["execution_metadata"]["start_time"], + 1760509016.28264, + places=5 + ) + + # Verify small numbers are preserved + self.assertEqual(result["costs"]["compute"], 0.001228) + + # Verify NaN is converted to None + self.assertIsNone(result["metrics"][2]["value"]) + + def test_sanitize_float_edge_cases(self): + """Test edge cases for float sanitization.""" + test_cases = [ + (1.0, 1.0), # Simple whole number + (0.1, 0.1), # Decimal that can't be exactly represented + (1e-100, 1e-100), # Very small number + (1e100, 1e100), # Very large number (within float range) + (-123.456, -123.456), # Negative decimal + ] + + for input_val, expected in test_cases: + with self.subTest(input=input_val): + result = BigQuery._sanitize_for_bigquery(input_val) + if abs(input_val) < 1e50: # For reasonable numbers + self.assertAlmostEqual(result, expected, places=10) + else: + self.assertIsInstance(result, float) + + def test_sanitize_preserves_dict_keys(self): + """Test that dictionary keys are preserved during sanitization.""" + data = { + "key1": 1.234567890123456789, + "key2": float('nan'), + "key3": "string_value", + "key4": [1, 2, 3] + } + result = BigQuery._sanitize_for_bigquery(data) + + self.assertEqual(set(result.keys()), set(data.keys())) + + def test_sanitize_maintains_list_order(self): + """Test that list order is maintained during sanitization.""" + data = [1.111, 2.222, 3.333, float('nan'), 5.555] + result = BigQuery._sanitize_for_bigquery(data) + + self.assertEqual(len(result), len(data)) + self.assertAlmostEqual(result[0], 1.111, places=3) + self.assertAlmostEqual(result[1], 2.222, places=3) + self.assertAlmostEqual(result[2], 3.333, places=3) + self.assertIsNone(result[3]) + self.assertAlmostEqual(result[4], 5.555, places=3) + + if __name__ == "__main__": unittest.main() + """Comprehensive tests for BigQuery float sanitization.""" + + def test_sanitize_nan_returns_none(self): + """Test that NaN values are converted to None.""" + result = BigQuery._sanitize_for_bigquery(float('nan')) + self.assertIsNone(result) + + def test_sanitize_infinity_returns_none(self): + """Test that positive infinity is converted to None.""" + result = BigQuery._sanitize_for_bigquery(float('inf')) + self.assertIsNone(result) + + def test_sanitize_negative_infinity_returns_none(self): + """Test that negative infinity is converted to None.""" + result = BigQuery._sanitize_for_bigquery(float('-inf')) + self.assertIsNone(result) + + def test_sanitize_zero_returns_zero(self): + """Test that zero is preserved.""" + result = BigQuery._sanitize_for_bigquery(0.0) + self.assertEqual(result, 0.0) + + def test_sanitize_large_unix_timestamp(self): + """Test that Unix timestamps are limited to 15 significant figures.""" + timestamp = 1760509016.282637 + result = BigQuery._sanitize_for_bigquery(timestamp) + self.assertAlmostEqual(result, 1760509016.28264, places=5) + self.assertNotEqual(result, timestamp) + + def test_sanitize_small_decimal_preserves_precision(self): + """Test that small numbers retain full precision.""" + small_number = 0.001228 + result = BigQuery._sanitize_for_bigquery(small_number) + self.assertEqual(result, small_number) + + def test_sanitize_dict_with_floats(self): + """Test sanitization of dictionaries containing floats.""" + data = { + "timestamp": 1760509016.282637, + "cost": 0.001228, + "nan_value": float('nan'), + "inf_value": float('inf'), + "normal": 42.0 + } + result = BigQuery._sanitize_for_bigquery(data) + self.assertIsInstance(result, dict) + self.assertAlmostEqual(result["timestamp"], 1760509016.28264, places=5) + self.assertEqual(result["cost"], 0.001228) + self.assertIsNone(result["nan_value"]) + self.assertIsNone(result["inf_value"]) + self.assertEqual(result["normal"], 42.0) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/unstract/connectors/tests/databases/test_utils.py b/unstract/connectors/tests/databases/test_utils.py new file mode 100644 index 0000000000..3ae1fe7f3b --- /dev/null +++ b/unstract/connectors/tests/databases/test_utils.py @@ -0,0 +1,210 @@ +import unittest + +from unstract.connectors.databases.utils import sanitize_floats_for_database + + +class TestSanitizeFloatsForDatabase(unittest.TestCase): + """Comprehensive tests for database float sanitization utility.""" + + def test_nan_converted_to_none(self): + """Test that NaN values are converted to None.""" + result = sanitize_floats_for_database(float('nan')) + self.assertIsNone(result) + + def test_positive_infinity_converted_to_none(self): + """Test that positive infinity is converted to None.""" + result = sanitize_floats_for_database(float('inf')) + self.assertIsNone(result) + + def test_negative_infinity_converted_to_none(self): + """Test that negative infinity is converted to None.""" + result = sanitize_floats_for_database(float('-inf')) + self.assertIsNone(result) + + def test_normal_float_unchanged(self): + """Test that normal floats are not modified.""" + test_value = 1760509016.282637 + result = sanitize_floats_for_database(test_value) + self.assertEqual(result, test_value) + + def test_zero_unchanged(self): + """Test that zero is preserved.""" + result = sanitize_floats_for_database(0.0) + self.assertEqual(result, 0.0) + + def test_negative_float_unchanged(self): + """Test that negative floats are preserved.""" + test_value = -123.456789 + result = sanitize_floats_for_database(test_value) + self.assertEqual(result, test_value) + + def test_small_float_unchanged(self): + """Test that small floats retain full precision.""" + test_value = 0.001228 + result = sanitize_floats_for_database(test_value) + self.assertEqual(result, test_value) + + def test_large_float_unchanged(self): + """Test that large floats retain full precision.""" + test_value = 9876543210.123456789 + result = sanitize_floats_for_database(test_value) + self.assertEqual(result, test_value) + + def test_dict_with_nan(self): + """Test sanitization of dictionary with NaN values.""" + data = { + "valid": 42.5, + "invalid": float('nan'), + "name": "test" + } + result = sanitize_floats_for_database(data) + + self.assertEqual(result["valid"], 42.5) + self.assertIsNone(result["invalid"]) + self.assertEqual(result["name"], "test") + + def test_dict_with_infinity(self): + """Test sanitization of dictionary with infinity values.""" + data = { + "pos_inf": float('inf'), + "neg_inf": float('-inf'), + "normal": 123.456 + } + result = sanitize_floats_for_database(data) + + self.assertIsNone(result["pos_inf"]) + self.assertIsNone(result["neg_inf"]) + self.assertEqual(result["normal"], 123.456) + + def test_nested_dict(self): + """Test sanitization of nested dictionaries.""" + data = { + "level1": { + "level2": { + "valid": 1.23, + "invalid": float('nan') + }, + "inf": float('inf') + } + } + result = sanitize_floats_for_database(data) + + self.assertEqual(result["level1"]["level2"]["valid"], 1.23) + self.assertIsNone(result["level1"]["level2"]["invalid"]) + self.assertIsNone(result["level1"]["inf"]) + + def test_list_with_special_floats(self): + """Test sanitization of list with special float values.""" + data = [1.23, float('nan'), 4.56, float('inf'), float('-inf')] + result = sanitize_floats_for_database(data) + + self.assertEqual(len(result), 5) + self.assertEqual(result[0], 1.23) + self.assertIsNone(result[1]) + self.assertEqual(result[2], 4.56) + self.assertIsNone(result[3]) + self.assertIsNone(result[4]) + + def test_nested_list(self): + """Test sanitization of nested lists.""" + data = [ + [1.23, float('nan')], + [float('inf'), 4.56] + ] + result = sanitize_floats_for_database(data) + + self.assertEqual(result[0][0], 1.23) + self.assertIsNone(result[0][1]) + self.assertIsNone(result[1][0]) + self.assertEqual(result[1][1], 4.56) + + def test_mixed_dict_and_list(self): + """Test sanitization of mixed dict/list structures.""" + data = { + "items": [ + {"value": 1.23, "valid": True}, + {"value": float('nan'), "valid": False} + ], + "summary": { + "total": 100.0, + "invalid": float('inf') + } + } + result = sanitize_floats_for_database(data) + + self.assertEqual(result["items"][0]["value"], 1.23) + self.assertIsNone(result["items"][1]["value"]) + self.assertEqual(result["summary"]["total"], 100.0) + self.assertIsNone(result["summary"]["invalid"]) + + def test_string_passthrough(self): + """Test that strings pass through unchanged.""" + data = "test string" + result = sanitize_floats_for_database(data) + self.assertEqual(result, data) + + def test_int_passthrough(self): + """Test that integers pass through unchanged.""" + data = 42 + result = sanitize_floats_for_database(data) + self.assertEqual(result, data) + + def test_bool_passthrough(self): + """Test that booleans pass through unchanged.""" + data = True + result = sanitize_floats_for_database(data) + self.assertTrue(result) + + def test_none_passthrough(self): + """Test that None passes through unchanged.""" + data = None + result = sanitize_floats_for_database(data) + self.assertIsNone(result) + + def test_empty_dict(self): + """Test that empty dictionaries are handled.""" + data = {} + result = sanitize_floats_for_database(data) + self.assertEqual(result, {}) + + def test_empty_list(self): + """Test that empty lists are handled.""" + data = [] + result = sanitize_floats_for_database(data) + self.assertEqual(result, []) + + def test_precision_not_modified(self): + """Test that float precision is not modified.""" + timestamp = 1760509016.282637123456789 + result = sanitize_floats_for_database(timestamp) + self.assertEqual(result, timestamp) + + def test_complex_real_world_metadata(self): + """Test sanitization of complex real-world metadata.""" + metadata = { + "execution": { + "start_time": 1760509016.282637, + "end_time": 1760509045.891234, + "duration": 29.608597 + }, + "metrics": { + "success_rate": 0.987654321, + "error_rate": float('nan'), + "timeout_rate": 0.001234 + }, + "resources": { + "cpu_usage": 45.67, + "memory_usage": float('inf'), + "disk_usage": 123.456789 + } + } + result = sanitize_floats_for_database(metadata) + + self.assertEqual(result["execution"]["start_time"], 1760509016.282637) + self.assertEqual(result["metrics"]["success_rate"], 0.987654321) + self.assertIsNone(result["metrics"]["error_rate"]) + self.assertIsNone(result["resources"]["memory_usage"]) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/workers/shared/tests/__init__.py b/workers/shared/tests/__init__.py new file mode 100644 index 0000000000..1f7f0e7b65 --- /dev/null +++ b/workers/shared/tests/__init__.py @@ -0,0 +1 @@ +"""Workers shared tests package.""" \ No newline at end of file diff --git a/workers/shared/tests/infrastructure/__init__.py b/workers/shared/tests/infrastructure/__init__.py new file mode 100644 index 0000000000..ad2345596a --- /dev/null +++ b/workers/shared/tests/infrastructure/__init__.py @@ -0,0 +1 @@ +"""Workers infrastructure tests package.""" \ No newline at end of file diff --git a/workers/shared/tests/infrastructure/database/__init__.py b/workers/shared/tests/infrastructure/database/__init__.py new file mode 100644 index 0000000000..4d1ca915d3 --- /dev/null +++ b/workers/shared/tests/infrastructure/database/__init__.py @@ -0,0 +1 @@ +"""Workers database tests package.""" \ No newline at end of file diff --git a/workers/shared/tests/infrastructure/database/test_utils.py b/workers/shared/tests/infrastructure/database/test_utils.py new file mode 100644 index 0000000000..703a7a579f --- /dev/null +++ b/workers/shared/tests/infrastructure/database/test_utils.py @@ -0,0 +1,344 @@ +import datetime +import json +import unittest + +from shared.enums.status_enums import FileProcessingStatus +from shared.infrastructure.database.utils import ( + ColumnModes, + TableColumns, + WorkerDatabaseUtils, +) + + +class TestWorkerDatabaseUtilsFloatSanitization(unittest.TestCase): + """Test float sanitization integration in WorkerDatabaseUtils.""" + + def test_metadata_with_nan_sanitized(self): + """Test that metadata with NaN values is properly sanitized.""" + metadata = { + "value": float('nan'), + "timestamp": 1760509016.282637 + } + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data={"result": "test"}, + file_path="/test/path", + execution_id="exec123", + metadata=metadata + ) + + metadata_json = json.loads(values[TableColumns.METADATA]) + self.assertIsNone(metadata_json["value"]) + self.assertEqual(metadata_json["timestamp"], 1760509016.282637) + + def test_metadata_with_infinity_sanitized(self): + """Test that metadata with infinity values is sanitized.""" + metadata = { + "pos_inf": float('inf'), + "neg_inf": float('-inf'), + "normal": 42.0 + } + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data={"result": "test"}, + file_path="/test/path", + execution_id="exec123", + metadata=metadata + ) + + metadata_json = json.loads(values[TableColumns.METADATA]) + self.assertIsNone(metadata_json["pos_inf"]) + self.assertIsNone(metadata_json["neg_inf"]) + self.assertEqual(metadata_json["normal"], 42.0) + + def test_single_column_data_with_nan_sanitized(self): + """Test that single column data with NaN is sanitized.""" + data = { + "value": float('nan'), + "count": 5 + } + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data=data, + file_path="/test/path", + execution_id="exec123" + ) + + self.assertIsNone(values["data"]["value"]) + self.assertEqual(values["data"]["count"], 5) + + def test_single_column_data_preserves_float_precision(self): + """Test that single column mode preserves float precision.""" + data = { + "timestamp": 1760509016.282637123456789, + "cost": 0.001228 + } + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data=data, + file_path="/test/path", + execution_id="exec123" + ) + + self.assertEqual(values["data"]["timestamp"], 1760509016.282637123456789) + self.assertEqual(values["data"]["cost"], 0.001228) + + def test_split_column_data_with_special_floats(self): + """Test that split column data handles special floats.""" + data = { + "value": float('inf'), + "normal": 123.456 + } + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.SPLIT_JSON_INTO_COLUMNS, + data=data, + file_path="/test/path", + execution_id="exec123" + ) + + self.assertIsNone(values["value"]) + self.assertEqual(values["normal"], 123.456) + + def test_nested_data_structure_sanitization(self): + """Test sanitization of nested data structures.""" + data = { + "outer": { + "inner": { + "nan_value": float('nan'), + "valid": 42.0 + } + } + } + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data=data, + file_path="/test/path", + execution_id="exec123" + ) + + self.assertIsNone(values["data"]["outer"]["inner"]["nan_value"]) + self.assertEqual(values["data"]["outer"]["inner"]["valid"], 42.0) + + def test_list_data_with_special_floats(self): + """Test sanitization of list data with special floats.""" + data = [1.23, float('nan'), float('inf'), 4.56] + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data=data, + file_path="/test/path", + execution_id="exec123" + ) + + result_list = values["data"] + self.assertEqual(result_list[0], 1.23) + self.assertIsNone(result_list[1]) + self.assertIsNone(result_list[2]) + self.assertEqual(result_list[3], 4.56) + + def test_string_data_wrapped_properly(self): + """Test that string data is wrapped correctly.""" + data = "simple string" + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data=data, + file_path="/test/path", + execution_id="exec123" + ) + + self.assertEqual(values["data"], {"result": "simple string"}) + + def test_none_data_handling(self): + """Test that None data is handled correctly.""" + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data=None, + file_path="/test/path", + execution_id="exec123" + ) + + self.assertNotIn("data", values) + + def test_error_status_set_when_error_provided(self): + """Test that error status is set correctly.""" + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data={"result": "test"}, + file_path="/test/path", + execution_id="exec123", + error="Test error message" + ) + + self.assertEqual(values[TableColumns.ERROR_MESSAGE], "Test error message") + self.assertEqual(values[TableColumns.STATUS], FileProcessingStatus.ERROR) + + def test_success_status_when_no_error(self): + """Test that success status is set when no error.""" + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data={"result": "test"}, + file_path="/test/path", + execution_id="exec123" + ) + + self.assertEqual(values[TableColumns.STATUS], FileProcessingStatus.SUCCESS) + + def test_agent_name_included_when_requested(self): + """Test that agent name is included when requested.""" + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data={"result": "test"}, + file_path="/test/path", + execution_id="exec123", + include_agent=True, + agent_name="TEST_AGENT" + ) + + self.assertEqual(values[TableColumns.CREATED_BY], "TEST_AGENT") + + def test_timestamp_included_when_requested(self): + """Test that timestamp is included when requested.""" + before = datetime.datetime.now() + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data={"result": "test"}, + file_path="/test/path", + execution_id="exec123", + include_timestamp=True + ) + after = datetime.datetime.now() + + self.assertIn(TableColumns.CREATED_AT, values) + timestamp = values[TableColumns.CREATED_AT] + self.assertGreaterEqual(timestamp, before) + self.assertLessEqual(timestamp, after) + + def test_custom_column_names(self): + """Test that custom column names work correctly.""" + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data={"result": "test"}, + file_path="/custom/path", + execution_id="custom_id", + file_path_name="custom_file_path", + execution_id_name="custom_execution_id", + single_column_name="custom_data" + ) + + self.assertEqual(values["custom_file_path"], "/custom/path") + self.assertEqual(values["custom_execution_id"], "custom_id") + self.assertIn("custom_data", values) + + def test_complex_real_world_scenario(self): + """Test complex real-world data processing scenario.""" + metadata = { + "execution_time": 1760509016.282637, + "cpu_usage": 45.67, + "memory_usage": float('nan'), + "retry_count": 3 + } + + data = { + "extracted_data": { + "field1": "value1", + "field2": 123.456789, + "field3": float('inf'), + }, + "confidence_scores": [0.95, 0.87, float('nan'), 0.92] + } + + values = WorkerDatabaseUtils.get_columns_and_values( + column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, + data=data, + file_path="/documents/doc1.pdf", + execution_id="exec-2024-001", + metadata=metadata, + include_agent=True, + include_timestamp=True + ) + + metadata_json = json.loads(values[TableColumns.METADATA]) + self.assertIsNone(metadata_json["memory_usage"]) + self.assertEqual(metadata_json["execution_time"], 1760509016.282637) + + self.assertIsNone(values["data"]["extracted_data"]["field3"]) + self.assertIsNone(values["data"]["confidence_scores"][2]) + + self.assertEqual(values["data"]["extracted_data"]["field1"], "value1") + self.assertEqual(values["file_path"], "/documents/doc1.pdf") + + +class TestCreateSafeErrorJson(unittest.TestCase): + """Test safe error JSON creation.""" + + def test_creates_error_json_with_details(self): + """Test that error JSON contains all required details.""" + error = TypeError("Cannot serialize object") + result = WorkerDatabaseUtils._create_safe_error_json("test_data", error) + + self.assertEqual(result["error"], "JSON serialization failed") + self.assertEqual(result["error_type"], "TypeError") + self.assertEqual(result["error_message"], "Cannot serialize object") + self.assertEqual(result["data_description"], "test_data") + self.assertIn("timestamp", result) + + +class TestDetermineColumnMode(unittest.TestCase): + """Test column mode determination.""" + + def test_single_column_mode_string(self): + """Test single column mode string recognition.""" + mode = WorkerDatabaseUtils._determine_column_mode( + ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN + ) + self.assertEqual(mode, ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN) + + def test_split_column_mode_string(self): + """Test split column mode string recognition.""" + mode = WorkerDatabaseUtils._determine_column_mode( + ColumnModes.SPLIT_JSON_INTO_COLUMNS + ) + self.assertEqual(mode, ColumnModes.SPLIT_JSON_INTO_COLUMNS) + + def test_invalid_mode_defaults_to_single_column(self): + """Test that invalid mode defaults to single column.""" + mode = WorkerDatabaseUtils._determine_column_mode("INVALID_MODE") + self.assertEqual(mode, ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN) + + +class TestHasTableColumn(unittest.TestCase): + """Test table column existence checking.""" + + def test_returns_true_when_column_exists(self): + """Test returns True when column exists.""" + table_info = {"column1": "STRING", "column2": "INT"} + result = WorkerDatabaseUtils._has_table_column(table_info, "column1") + self.assertTrue(result) + + def test_case_insensitive_matching(self): + """Test that column matching is case-insensitive.""" + table_info = {"Column1": "STRING", "COLUMN2": "INT"} + result = WorkerDatabaseUtils._has_table_column(table_info, "column1") + self.assertTrue(result) + + def test_returns_false_when_column_missing(self): + """Test returns False when column doesn't exist.""" + table_info = {"column1": "STRING"} + result = WorkerDatabaseUtils._has_table_column(table_info, "column2") + self.assertFalse(result) + + def test_returns_true_when_table_info_none(self): + """Test returns True when table_info is None.""" + result = WorkerDatabaseUtils._has_table_column(None, "any_column") + self.assertTrue(result) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From 2b51fd13b72f625f2f4ea11b4d953e10bf0d514f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 15 Oct 2025 17:24:41 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../tests/databases/test_bigquery_db.py | 40 ++++++------- .../connectors/tests/databases/test_utils.py | 16 ++--- workers/shared/tests/__init__.py | 2 +- .../shared/tests/infrastructure/__init__.py | 2 +- .../tests/infrastructure/database/__init__.py | 2 +- .../infrastructure/database/test_utils.py | 58 +++++++++---------- 6 files changed, 60 insertions(+), 60 deletions(-) diff --git a/unstract/connectors/tests/databases/test_bigquery_db.py b/unstract/connectors/tests/databases/test_bigquery_db.py index 81e03c7620..94a2f36525 100644 --- a/unstract/connectors/tests/databases/test_bigquery_db.py +++ b/unstract/connectors/tests/databases/test_bigquery_db.py @@ -66,11 +66,11 @@ def test_sanitize_large_unix_timestamp(self): # Unix timestamp with high precision timestamp = 1760509016.282637 result = BigQuery._sanitize_for_bigquery(timestamp) - + # Should limit to 15 total significant figures # 1760509016 has 10 digits, so 5 decimal places remain self.assertAlmostEqual(result, 1760509016.28264, places=5) - + # Verify it's different from original (precision reduced) self.assertNotEqual(result, timestamp) @@ -78,7 +78,7 @@ def test_sanitize_small_decimal_preserves_precision(self): """Test that small numbers retain full precision.""" small_number = 0.001228 result = BigQuery._sanitize_for_bigquery(small_number) - + # Small numbers should be unchanged (only 4 significant figures) self.assertEqual(result, small_number) @@ -87,7 +87,7 @@ def test_sanitize_medium_float_limits_precision(self): # Number with 16+ significant figures value = 12345.67890123456789 result = BigQuery._sanitize_for_bigquery(value) - + # Should limit to 15 significant figures total # 12345 has 5 digits, so 10 decimal places remain self.assertAlmostEqual(result, 12345.6789012346, places=10) @@ -96,7 +96,7 @@ def test_sanitize_very_large_number(self): """Test that very large numbers are handled correctly.""" large_value = 9.87654321098765e15 result = BigQuery._sanitize_for_bigquery(large_value) - + # Should limit to 15 significant figures self.assertIsInstance(result, float) self.assertNotEqual(result, float('inf')) @@ -106,7 +106,7 @@ def test_sanitize_very_small_number(self): """Test that very small numbers preserve precision.""" small_value = 1.23456789e-10 result = BigQuery._sanitize_for_bigquery(small_value) - + # Should preserve precision for small numbers self.assertAlmostEqual(result, small_value, places=15) @@ -114,7 +114,7 @@ def test_sanitize_negative_numbers(self): """Test that negative numbers are handled correctly.""" negative = -123.456789012345678 result = BigQuery._sanitize_for_bigquery(negative) - + # Should limit precision but preserve sign self.assertLess(result, 0) self.assertAlmostEqual(result, -123.456789012346, places=12) @@ -129,7 +129,7 @@ def test_sanitize_dict_with_floats(self): "normal": 42.0 } result = BigQuery._sanitize_for_bigquery(data) - + self.assertIsInstance(result, dict) self.assertAlmostEqual(result["timestamp"], 1760509016.28264, places=5) self.assertEqual(result["cost"], 0.001228) @@ -148,7 +148,7 @@ def test_sanitize_nested_dict(self): } } result = BigQuery._sanitize_for_bigquery(data) - + self.assertIsInstance(result["outer"]["inner"]["value"], float) self.assertIsNone(result["outer"]["inner"]["special"]) @@ -156,7 +156,7 @@ def test_sanitize_list_with_floats(self): """Test sanitization of lists containing floats.""" data = [1760509016.282637, 0.001228, float('nan'), float('inf'), 42.0] result = BigQuery._sanitize_for_bigquery(data) - + self.assertIsInstance(result, list) self.assertEqual(len(result), 5) self.assertAlmostEqual(result[0], 1760509016.28264, places=5) @@ -169,7 +169,7 @@ def test_sanitize_nested_lists(self): """Test sanitization of nested lists.""" data = [[1.234567890123456789, float('nan')], [float('inf'), 0.001]] result = BigQuery._sanitize_for_bigquery(data) - + self.assertIsInstance(result, list) self.assertIsInstance(result[0], list) self.assertIsNone(result[0][1]) @@ -188,7 +188,7 @@ def test_sanitize_mixed_structure(self): } } result = BigQuery._sanitize_for_bigquery(data) - + self.assertIsInstance(result, dict) self.assertIsInstance(result["items"], list) self.assertAlmostEqual(result["items"][0]["value"], 1760509016.28264, places=5) @@ -252,22 +252,22 @@ def test_sanitize_complex_real_world_data(self): } } result = BigQuery._sanitize_for_bigquery(data) - + # Verify structure is preserved self.assertIn("execution_metadata", result) self.assertIn("metrics", result) self.assertIn("costs", result) - + # Verify timestamps are limited self.assertAlmostEqual( result["execution_metadata"]["start_time"], 1760509016.28264, places=5 ) - + # Verify small numbers are preserved self.assertEqual(result["costs"]["compute"], 0.001228) - + # Verify NaN is converted to None self.assertIsNone(result["metrics"][2]["value"]) @@ -280,7 +280,7 @@ def test_sanitize_float_edge_cases(self): (1e100, 1e100), # Very large number (within float range) (-123.456, -123.456), # Negative decimal ] - + for input_val, expected in test_cases: with self.subTest(input=input_val): result = BigQuery._sanitize_for_bigquery(input_val) @@ -298,14 +298,14 @@ def test_sanitize_preserves_dict_keys(self): "key4": [1, 2, 3] } result = BigQuery._sanitize_for_bigquery(data) - + self.assertEqual(set(result.keys()), set(data.keys())) def test_sanitize_maintains_list_order(self): """Test that list order is maintained during sanitization.""" data = [1.111, 2.222, 3.333, float('nan'), 5.555] result = BigQuery._sanitize_for_bigquery(data) - + self.assertEqual(len(result), len(data)) self.assertAlmostEqual(result[0], 1.111, places=3) self.assertAlmostEqual(result[1], 2.222, places=3) @@ -369,4 +369,4 @@ def test_sanitize_dict_with_floats(self): self.assertEqual(result["normal"], 42.0) if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() diff --git a/unstract/connectors/tests/databases/test_utils.py b/unstract/connectors/tests/databases/test_utils.py index 3ae1fe7f3b..eb7404467d 100644 --- a/unstract/connectors/tests/databases/test_utils.py +++ b/unstract/connectors/tests/databases/test_utils.py @@ -58,7 +58,7 @@ def test_dict_with_nan(self): "name": "test" } result = sanitize_floats_for_database(data) - + self.assertEqual(result["valid"], 42.5) self.assertIsNone(result["invalid"]) self.assertEqual(result["name"], "test") @@ -71,7 +71,7 @@ def test_dict_with_infinity(self): "normal": 123.456 } result = sanitize_floats_for_database(data) - + self.assertIsNone(result["pos_inf"]) self.assertIsNone(result["neg_inf"]) self.assertEqual(result["normal"], 123.456) @@ -88,7 +88,7 @@ def test_nested_dict(self): } } result = sanitize_floats_for_database(data) - + self.assertEqual(result["level1"]["level2"]["valid"], 1.23) self.assertIsNone(result["level1"]["level2"]["invalid"]) self.assertIsNone(result["level1"]["inf"]) @@ -97,7 +97,7 @@ def test_list_with_special_floats(self): """Test sanitization of list with special float values.""" data = [1.23, float('nan'), 4.56, float('inf'), float('-inf')] result = sanitize_floats_for_database(data) - + self.assertEqual(len(result), 5) self.assertEqual(result[0], 1.23) self.assertIsNone(result[1]) @@ -112,7 +112,7 @@ def test_nested_list(self): [float('inf'), 4.56] ] result = sanitize_floats_for_database(data) - + self.assertEqual(result[0][0], 1.23) self.assertIsNone(result[0][1]) self.assertIsNone(result[1][0]) @@ -131,7 +131,7 @@ def test_mixed_dict_and_list(self): } } result = sanitize_floats_for_database(data) - + self.assertEqual(result["items"][0]["value"], 1.23) self.assertIsNone(result["items"][1]["value"]) self.assertEqual(result["summary"]["total"], 100.0) @@ -199,7 +199,7 @@ def test_complex_real_world_metadata(self): } } result = sanitize_floats_for_database(metadata) - + self.assertEqual(result["execution"]["start_time"], 1760509016.282637) self.assertEqual(result["metrics"]["success_rate"], 0.987654321) self.assertIsNone(result["metrics"]["error_rate"]) @@ -207,4 +207,4 @@ def test_complex_real_world_metadata(self): if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() diff --git a/workers/shared/tests/__init__.py b/workers/shared/tests/__init__.py index 1f7f0e7b65..4b724894f3 100644 --- a/workers/shared/tests/__init__.py +++ b/workers/shared/tests/__init__.py @@ -1 +1 @@ -"""Workers shared tests package.""" \ No newline at end of file +"""Workers shared tests package.""" diff --git a/workers/shared/tests/infrastructure/__init__.py b/workers/shared/tests/infrastructure/__init__.py index ad2345596a..2763d07f94 100644 --- a/workers/shared/tests/infrastructure/__init__.py +++ b/workers/shared/tests/infrastructure/__init__.py @@ -1 +1 @@ -"""Workers infrastructure tests package.""" \ No newline at end of file +"""Workers infrastructure tests package.""" diff --git a/workers/shared/tests/infrastructure/database/__init__.py b/workers/shared/tests/infrastructure/database/__init__.py index 4d1ca915d3..f0337cd48b 100644 --- a/workers/shared/tests/infrastructure/database/__init__.py +++ b/workers/shared/tests/infrastructure/database/__init__.py @@ -1 +1 @@ -"""Workers database tests package.""" \ No newline at end of file +"""Workers database tests package.""" diff --git a/workers/shared/tests/infrastructure/database/test_utils.py b/workers/shared/tests/infrastructure/database/test_utils.py index 703a7a579f..6dc511c048 100644 --- a/workers/shared/tests/infrastructure/database/test_utils.py +++ b/workers/shared/tests/infrastructure/database/test_utils.py @@ -19,7 +19,7 @@ def test_metadata_with_nan_sanitized(self): "value": float('nan'), "timestamp": 1760509016.282637 } - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, data={"result": "test"}, @@ -27,7 +27,7 @@ def test_metadata_with_nan_sanitized(self): execution_id="exec123", metadata=metadata ) - + metadata_json = json.loads(values[TableColumns.METADATA]) self.assertIsNone(metadata_json["value"]) self.assertEqual(metadata_json["timestamp"], 1760509016.282637) @@ -39,7 +39,7 @@ def test_metadata_with_infinity_sanitized(self): "neg_inf": float('-inf'), "normal": 42.0 } - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, data={"result": "test"}, @@ -47,7 +47,7 @@ def test_metadata_with_infinity_sanitized(self): execution_id="exec123", metadata=metadata ) - + metadata_json = json.loads(values[TableColumns.METADATA]) self.assertIsNone(metadata_json["pos_inf"]) self.assertIsNone(metadata_json["neg_inf"]) @@ -59,14 +59,14 @@ def test_single_column_data_with_nan_sanitized(self): "value": float('nan'), "count": 5 } - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, data=data, file_path="/test/path", execution_id="exec123" ) - + self.assertIsNone(values["data"]["value"]) self.assertEqual(values["data"]["count"], 5) @@ -76,14 +76,14 @@ def test_single_column_data_preserves_float_precision(self): "timestamp": 1760509016.282637123456789, "cost": 0.001228 } - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, data=data, file_path="/test/path", execution_id="exec123" ) - + self.assertEqual(values["data"]["timestamp"], 1760509016.282637123456789) self.assertEqual(values["data"]["cost"], 0.001228) @@ -93,14 +93,14 @@ def test_split_column_data_with_special_floats(self): "value": float('inf'), "normal": 123.456 } - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.SPLIT_JSON_INTO_COLUMNS, data=data, file_path="/test/path", execution_id="exec123" ) - + self.assertIsNone(values["value"]) self.assertEqual(values["normal"], 123.456) @@ -114,28 +114,28 @@ def test_nested_data_structure_sanitization(self): } } } - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, data=data, file_path="/test/path", execution_id="exec123" ) - + self.assertIsNone(values["data"]["outer"]["inner"]["nan_value"]) self.assertEqual(values["data"]["outer"]["inner"]["valid"], 42.0) def test_list_data_with_special_floats(self): """Test sanitization of list data with special floats.""" data = [1.23, float('nan'), float('inf'), 4.56] - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, data=data, file_path="/test/path", execution_id="exec123" ) - + result_list = values["data"] self.assertEqual(result_list[0], 1.23) self.assertIsNone(result_list[1]) @@ -145,14 +145,14 @@ def test_list_data_with_special_floats(self): def test_string_data_wrapped_properly(self): """Test that string data is wrapped correctly.""" data = "simple string" - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, data=data, file_path="/test/path", execution_id="exec123" ) - + self.assertEqual(values["data"], {"result": "simple string"}) def test_none_data_handling(self): @@ -163,7 +163,7 @@ def test_none_data_handling(self): file_path="/test/path", execution_id="exec123" ) - + self.assertNotIn("data", values) def test_error_status_set_when_error_provided(self): @@ -175,7 +175,7 @@ def test_error_status_set_when_error_provided(self): execution_id="exec123", error="Test error message" ) - + self.assertEqual(values[TableColumns.ERROR_MESSAGE], "Test error message") self.assertEqual(values[TableColumns.STATUS], FileProcessingStatus.ERROR) @@ -187,7 +187,7 @@ def test_success_status_when_no_error(self): file_path="/test/path", execution_id="exec123" ) - + self.assertEqual(values[TableColumns.STATUS], FileProcessingStatus.SUCCESS) def test_agent_name_included_when_requested(self): @@ -200,7 +200,7 @@ def test_agent_name_included_when_requested(self): include_agent=True, agent_name="TEST_AGENT" ) - + self.assertEqual(values[TableColumns.CREATED_BY], "TEST_AGENT") def test_timestamp_included_when_requested(self): @@ -214,7 +214,7 @@ def test_timestamp_included_when_requested(self): include_timestamp=True ) after = datetime.datetime.now() - + self.assertIn(TableColumns.CREATED_AT, values) timestamp = values[TableColumns.CREATED_AT] self.assertGreaterEqual(timestamp, before) @@ -231,7 +231,7 @@ def test_custom_column_names(self): execution_id_name="custom_execution_id", single_column_name="custom_data" ) - + self.assertEqual(values["custom_file_path"], "/custom/path") self.assertEqual(values["custom_execution_id"], "custom_id") self.assertIn("custom_data", values) @@ -244,7 +244,7 @@ def test_complex_real_world_scenario(self): "memory_usage": float('nan'), "retry_count": 3 } - + data = { "extracted_data": { "field1": "value1", @@ -253,7 +253,7 @@ def test_complex_real_world_scenario(self): }, "confidence_scores": [0.95, 0.87, float('nan'), 0.92] } - + values = WorkerDatabaseUtils.get_columns_and_values( column_mode_str=ColumnModes.WRITE_JSON_TO_A_SINGLE_COLUMN, data=data, @@ -263,14 +263,14 @@ def test_complex_real_world_scenario(self): include_agent=True, include_timestamp=True ) - + metadata_json = json.loads(values[TableColumns.METADATA]) self.assertIsNone(metadata_json["memory_usage"]) self.assertEqual(metadata_json["execution_time"], 1760509016.282637) - + self.assertIsNone(values["data"]["extracted_data"]["field3"]) self.assertIsNone(values["data"]["confidence_scores"][2]) - + self.assertEqual(values["data"]["extracted_data"]["field1"], "value1") self.assertEqual(values["file_path"], "/documents/doc1.pdf") @@ -282,7 +282,7 @@ def test_creates_error_json_with_details(self): """Test that error JSON contains all required details.""" error = TypeError("Cannot serialize object") result = WorkerDatabaseUtils._create_safe_error_json("test_data", error) - + self.assertEqual(result["error"], "JSON serialization failed") self.assertEqual(result["error_type"], "TypeError") self.assertEqual(result["error_message"], "Cannot serialize object") @@ -341,4 +341,4 @@ def test_returns_true_when_table_info_none(self): if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main()