From 3a348e9e52b385bb1bb457777013a344042404f4 Mon Sep 17 00:00:00 2001 From: sarath-mec Date: Sun, 30 Mar 2025 14:50:52 -0500 Subject: [PATCH 01/11] Fix #70 Add transform_columns --- reladiff/table_segment.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/reladiff/table_segment.py b/reladiff/table_segment.py index f64247c..ce6d6e4 100644 --- a/reladiff/table_segment.py +++ b/reladiff/table_segment.py @@ -1,9 +1,10 @@ import time -from typing import List, Tuple +from typing import List, Tuple, Dict import logging from itertools import product from runtype import dataclass +from dataclasses import field from .utils import safezip, Vector from sqeleton.utils import ArithString, split_space @@ -116,6 +117,7 @@ class TableSegment: key_columns: Tuple[str, ...] update_column: str = None extra_columns: Tuple[str, ...] = () + transform_columns: Dict[str, str] = field(default_factory=dict) # Restrict the segment min_key: Vector = None @@ -155,7 +157,7 @@ def _with_raw_schema(self, raw_schema: dict, refine: bool = True, allow_empty_ta if is_empty_table and not allow_empty_table: raise EmptyTable(f"Table {self.table_path} is empty. Use --allow-empty-tables to disable this protection.", self) - res = self.new(_schema=create_schema(self.database, self.table_path, schema, self.case_sensitive)) + res = self.new(_schema=create_schema(self.database, self.table_path, schema, self.case_sensitive), transform_rules = self.transform_rules) return EmptyTableSegment(res) if is_empty_table else res @@ -167,7 +169,7 @@ def with_schema(self, refine: bool = True, allow_empty_table: bool = False) -> " return self._with_raw_schema( self.database.query_table_schema(self.table_path), refine=refine, allow_empty_table=allow_empty_table ) - + def _cast_col_value(self, col, value): """Cast the value to the right type, based on the type of the column @@ -250,7 +252,14 @@ def relevant_columns(self) -> List[str]: @property def _relevant_columns_repr(self) -> List[Expr]: - return [NormalizeAsString(this[c]) for c in self.relevant_columns] + expressions = [] + for c in self.relevant_columns: #smks-fix + if c in self.transform_rules: + transform_expr = self.transform_rules[c] + expressions.append(NormalizeAsString(Code(transform_expr.format(column=this[c])), self._schema[c])) + else: + expressions.append(NormalizeAsString(this[c], self._schema[c])) + return expressions def count(self) -> int: """Count how many rows are in the segment, in one pass.""" From 888801211c9483ba2dee1981975309c2a909cf1b Mon Sep 17 00:00:00 2001 From: sarath-mec Date: Sun, 30 Mar 2025 23:31:58 -0500 Subject: [PATCH 02/11] Fix #70 Updated transform_columns documentation --- reladiff/__init__.py | 5 +++++ reladiff/table_segment.py | 11 ++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/reladiff/__init__.py b/reladiff/__init__.py index 8671b5c..488d6a6 100644 --- a/reladiff/__init__.py +++ b/reladiff/__init__.py @@ -128,6 +128,11 @@ def diff_tables( If different values are needed per table, it's possible to omit them here, and instead set them directly when creating each :class:`TableSegment`. + Note: + Column transformations using SQL expressions can be configured using the `transform_columns` attribute when creating + the :class:`TableSegment` instances for `table1` and `table2`. As transformations are typically specific to either + the source or target database, this parameter is not overridden directly in `diff_tables`. + Note: It is recommended to call .close() on the returned object when done, to release thread-pool. Alternatively, you may use it as a context manager. diff --git a/reladiff/table_segment.py b/reladiff/table_segment.py index ce6d6e4..ab9c7f1 100644 --- a/reladiff/table_segment.py +++ b/reladiff/table_segment.py @@ -99,6 +99,11 @@ class TableSegment: update_column (str, optional): Name of updated column, which signals that rows changed. Usually updated_at or last_update. Used by `min_update` and `max_update`. extra_columns (Tuple[str, ...], optional): Extra columns to compare + transform_columns (Dict[str, str], optional): A dictionary mapping column names to SQL transformation expressions. + These expressions are applied directly to the specified columns within the + comparison query, *before* the data is hashed or compared. Useful for + on-the-fly normalization (e.g., type casting, timezone conversions) without + requiring intermediate views or staging tables. Defaults to an empty dict. min_key (:data:`Vector`, optional): Lowest key value, used to restrict the segment max_key (:data:`Vector`, optional): Highest key value, used to restrict the segment min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment @@ -157,7 +162,7 @@ def _with_raw_schema(self, raw_schema: dict, refine: bool = True, allow_empty_ta if is_empty_table and not allow_empty_table: raise EmptyTable(f"Table {self.table_path} is empty. Use --allow-empty-tables to disable this protection.", self) - res = self.new(_schema=create_schema(self.database, self.table_path, schema, self.case_sensitive), transform_rules = self.transform_rules) + res = self.new(_schema=create_schema(self.database, self.table_path, schema, self.case_sensitive), transform_columns = self.transform_columns) return EmptyTableSegment(res) if is_empty_table else res @@ -254,8 +259,8 @@ def relevant_columns(self) -> List[str]: def _relevant_columns_repr(self) -> List[Expr]: expressions = [] for c in self.relevant_columns: #smks-fix - if c in self.transform_rules: - transform_expr = self.transform_rules[c] + if c in self.transform_columns: + transform_expr = self.transform_columns[c] expressions.append(NormalizeAsString(Code(transform_expr.format(column=this[c])), self._schema[c])) else: expressions.append(NormalizeAsString(this[c], self._schema[c])) From 44d22ec4e10fb060f0f3be56756349c42330d177 Mon Sep 17 00:00:00 2001 From: sarath-mec Date: Mon, 31 Mar 2025 00:35:50 -0500 Subject: [PATCH 03/11] Fix #70 Added transform_columns logic to Join Differ --- reladiff/joindiff_tables.py | 21 ++++++++++++++++++--- reladiff/table_segment.py | 2 +- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/reladiff/joindiff_tables.py b/reladiff/joindiff_tables.py index aa164ef..6d17a0e 100644 --- a/reladiff/joindiff_tables.py +++ b/reladiff/joindiff_tables.py @@ -314,10 +314,25 @@ def _create_outer_join(self, table1, table2): a = table1.make_select() b = table2.make_select() - is_diff_cols = {f"is_diff_{c1}": bool_to_int(a[c1].is_distinct_from(b[c2])) for c1, c2 in safezip(cols1, cols2)} + def get_expr(table_alias, col_name, transform_dict): + if col_name in transform_dict: + return Code(transform_dict[col_name]) + else: + return table_alias[col_name] + + # Get transformed expressions for both tables + is_diff_cols = {} + a_cols = {} + b_cols = {} + for c1, c2 in safezip(cols1, cols2): + ovrrd_c1 = table1.transform_columns.get(c1) + ovrrd_c2 = table2.transform_columns.get(c2) + expr_a = NormalizeAsString(Code(ovrrd_c1) if ovrrd_c1 else a[c1], table1._schema[c1]) + expr_b = NormalizeAsString(Code(ovrrd_c2) if ovrrd_c2 else b[c2], table2._schema[c2]) + is_diff_cols[f"is_diff_{c1}"] = bool_to_int(expr_a.is_distinct_from(expr_b)) + a_cols[f"{c1}_a"] = expr_a + b_cols[f"{c2}_b"] = expr_b - a_cols = {f"{c}_a": NormalizeAsString(a[c]) for c in cols1} - b_cols = {f"{c}_b": NormalizeAsString(b[c]) for c in cols2} # Order columns as col1_a, col1_b, col2_a, col2_b, etc. cols = {k: v for k, v in chain(*zip(a_cols.items(), b_cols.items()))} diff --git a/reladiff/table_segment.py b/reladiff/table_segment.py index ab9c7f1..419d969 100644 --- a/reladiff/table_segment.py +++ b/reladiff/table_segment.py @@ -258,7 +258,7 @@ def relevant_columns(self) -> List[str]: @property def _relevant_columns_repr(self) -> List[Expr]: expressions = [] - for c in self.relevant_columns: #smks-fix + for c in self.relevant_columns: if c in self.transform_columns: transform_expr = self.transform_columns[c] expressions.append(NormalizeAsString(Code(transform_expr.format(column=this[c])), self._schema[c])) From 2039968011c6204ce4ddb8dadc4d4c5062bbab21 Mon Sep 17 00:00:00 2001 From: sarath-mec Date: Mon, 31 Mar 2025 00:37:27 -0500 Subject: [PATCH 04/11] Fix #70 Removed unnecessary function --- reladiff/joindiff_tables.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/reladiff/joindiff_tables.py b/reladiff/joindiff_tables.py index 6d17a0e..ab17700 100644 --- a/reladiff/joindiff_tables.py +++ b/reladiff/joindiff_tables.py @@ -314,13 +314,8 @@ def _create_outer_join(self, table1, table2): a = table1.make_select() b = table2.make_select() - def get_expr(table_alias, col_name, transform_dict): - if col_name in transform_dict: - return Code(transform_dict[col_name]) - else: - return table_alias[col_name] - # Get transformed expressions for both tables + # Displayed output value also transformed to be similar with Hashdiffer is_diff_cols = {} a_cols = {} b_cols = {} From db0b10bacf1a1dbb9998bd0e98bd765ebf2f41b7 Mon Sep 17 00:00:00 2001 From: sarath-mec Date: Wed, 2 Apr 2025 13:03:17 -0500 Subject: [PATCH 05/11] Overriding NormalizeAsString to have is_distinct_from --- reladiff/joindiff_tables.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/reladiff/joindiff_tables.py b/reladiff/joindiff_tables.py index ab17700..a92dd60 100644 --- a/reladiff/joindiff_tables.py +++ b/reladiff/joindiff_tables.py @@ -29,6 +29,8 @@ ) from sqeleton.queries.ast_classes import Concat, Count, Expr, Random, TablePath, Code, ITable from sqeleton.queries.extras import NormalizeAsString +from sqeleton.queries.ast_classes import LazyOps, ExprNode +from typing import Optional, Any from .info_tree import InfoTree @@ -43,6 +45,12 @@ TABLE_WRITE_LIMIT = 1000 +@dataclass +class OverrideNormalizeAsString(NormalizeAsString, LazyOps, ExprNode): + expr: ExprNode + expr_type: Optional[Any] = None # Match type hint of NormalizeAsString + type = str + def merge_dicts(dicts): i = iter(dicts) @@ -322,8 +330,8 @@ def _create_outer_join(self, table1, table2): for c1, c2 in safezip(cols1, cols2): ovrrd_c1 = table1.transform_columns.get(c1) ovrrd_c2 = table2.transform_columns.get(c2) - expr_a = NormalizeAsString(Code(ovrrd_c1) if ovrrd_c1 else a[c1], table1._schema[c1]) - expr_b = NormalizeAsString(Code(ovrrd_c2) if ovrrd_c2 else b[c2], table2._schema[c2]) + expr_a = OverrideNormalizeAsString(Code(ovrrd_c1) if ovrrd_c1 else a[c1], table1._schema[c1]) + expr_b = OverrideNormalizeAsString(Code(ovrrd_c2) if ovrrd_c2 else b[c2], table2._schema[c2]) is_diff_cols[f"is_diff_{c1}"] = bool_to_int(expr_a.is_distinct_from(expr_b)) a_cols[f"{c1}_a"] = expr_a b_cols[f"{c2}_b"] = expr_b From 67e1fd275c9ba43d50a08ef23a37bcf7aadc6252 Mon Sep 17 00:00:00 2001 From: sarath-mec Date: Wed, 2 Apr 2025 17:56:28 -0500 Subject: [PATCH 06/11] Adding transform_columns to EmptyEmptyTableSegment.__getattr__ --- reladiff/table_segment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reladiff/table_segment.py b/reladiff/table_segment.py index 419d969..2bf7ce9 100644 --- a/reladiff/table_segment.py +++ b/reladiff/table_segment.py @@ -344,7 +344,7 @@ def count_and_checksum(self) -> Tuple[int, int]: return (0, None) def __getattr__(self, attr): - assert attr in ("database", "key_columns", "key_types", "relevant_columns", "_schema") + assert attr in ("database", "key_columns", "key_types", "relevant_columns", "_schema", "transform_columns") return getattr(self._table_segment, attr) @property From 28031be0cf56302708630b4dc8e16a2423756803 Mon Sep 17 00:00:00 2001 From: sarath-mec Date: Sat, 5 Apr 2025 03:56:01 -0500 Subject: [PATCH 07/11] Optimizing Code by removing --- reladiff/joindiff_tables.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/reladiff/joindiff_tables.py b/reladiff/joindiff_tables.py index a92dd60..fe24647 100644 --- a/reladiff/joindiff_tables.py +++ b/reladiff/joindiff_tables.py @@ -29,8 +29,7 @@ ) from sqeleton.queries.ast_classes import Concat, Count, Expr, Random, TablePath, Code, ITable from sqeleton.queries.extras import NormalizeAsString -from sqeleton.queries.ast_classes import LazyOps, ExprNode -from typing import Optional, Any +from sqeleton.queries.ast_classes import IsDistinctFrom from .info_tree import InfoTree @@ -45,12 +44,6 @@ TABLE_WRITE_LIMIT = 1000 -@dataclass -class OverrideNormalizeAsString(NormalizeAsString, LazyOps, ExprNode): - expr: ExprNode - expr_type: Optional[Any] = None # Match type hint of NormalizeAsString - type = str - def merge_dicts(dicts): i = iter(dicts) @@ -319,8 +312,11 @@ def _create_outer_join(self, table1, table2): if len(cols1) != len(cols2): raise ValueError("The provided columns are of a different count") - a = table1.make_select() - b = table2.make_select() + a = table1.make_select().alias("tbl_a") + b = table2.make_select().alias("tbl_b") + + # Create a compiler for transform_cols + compiler = Compiler(db, _is_root=False).add_table_context(a, b) # Get transformed expressions for both tables # Displayed output value also transformed to be similar with Hashdiffer @@ -328,13 +324,17 @@ def _create_outer_join(self, table1, table2): a_cols = {} b_cols = {} for c1, c2 in safezip(cols1, cols2): - ovrrd_c1 = table1.transform_columns.get(c1) - ovrrd_c2 = table2.transform_columns.get(c2) - expr_a = OverrideNormalizeAsString(Code(ovrrd_c1) if ovrrd_c1 else a[c1], table1._schema[c1]) - expr_b = OverrideNormalizeAsString(Code(ovrrd_c2) if ovrrd_c2 else b[c2], table2._schema[c2]) - is_diff_cols[f"is_diff_{c1}"] = bool_to_int(expr_a.is_distinct_from(expr_b)) - a_cols[f"{c1}_a"] = expr_a - b_cols[f"{c2}_b"] = expr_b + transform_expr_a = table1.transform_columns.get(c1) + transform_expr_b = table2.transform_columns.get(c2) + + # Compile the transformation expression to have aliasing + expr_a = Code(transform_expr_a.replace(c1, compiler.compile(a[c1]))) if transform_expr_a else a[c1] + expr_b = Code(transform_expr_b.replace(c2, compiler.compile(b[c2]))) if transform_expr_b else b[c2] + + # Normalize only needed for select #70 + is_diff_cols[f"is_diff_{c1}"] = bool_to_int(IsDistinctFrom(expr_a, expr_b)) + a_cols[f"{c1}_a"] = NormalizeAsString(expr_a, table1._schema[c1]) + b_cols[f"{c2}_b"] = NormalizeAsString(expr_b, table2._schema[c2]) # Order columns as col1_a, col1_b, col2_a, col2_b, etc. cols = {k: v for k, v in chain(*zip(a_cols.items(), b_cols.items()))} From 61040e694d6360cdf52a980f0a0ef948606a8bca Mon Sep 17 00:00:00 2001 From: Sarath Sasidharan Date: Wed, 11 Jun 2025 16:17:10 -0500 Subject: [PATCH 08/11] Created a new TableSegment method _get_transform_columns to support both Hash Diff and JoinDiffer In HashDiff, added logic to honor transform_rules for Key Columns as well (Tested Well in my Hashdiff usecase) JoinDiff already considers transform_rules for key columns --- reladiff/joindiff_tables.py | 8 ++------ reladiff/table_segment.py | 25 +++++++++++++++++-------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/reladiff/joindiff_tables.py b/reladiff/joindiff_tables.py index fe24647..565471c 100644 --- a/reladiff/joindiff_tables.py +++ b/reladiff/joindiff_tables.py @@ -29,7 +29,6 @@ ) from sqeleton.queries.ast_classes import Concat, Count, Expr, Random, TablePath, Code, ITable from sqeleton.queries.extras import NormalizeAsString -from sqeleton.queries.ast_classes import IsDistinctFrom from .info_tree import InfoTree @@ -324,12 +323,9 @@ def _create_outer_join(self, table1, table2): a_cols = {} b_cols = {} for c1, c2 in safezip(cols1, cols2): - transform_expr_a = table1.transform_columns.get(c1) - transform_expr_b = table2.transform_columns.get(c2) - # Compile the transformation expression to have aliasing - expr_a = Code(transform_expr_a.replace(c1, compiler.compile(a[c1]))) if transform_expr_a else a[c1] - expr_b = Code(transform_expr_b.replace(c2, compiler.compile(b[c2]))) if transform_expr_b else b[c2] + expr_a = table1._get_column_transforms(c1, compiler.compile(a[c1])) or a[c1] + expr_b = table2._get_column_transforms(c2, compiler.compile(b[c2])) or b[c2] # Normalize only needed for select #70 is_diff_cols[f"is_diff_{c1}"] = bool_to_int(IsDistinctFrom(expr_a, expr_b)) diff --git a/reladiff/table_segment.py b/reladiff/table_segment.py index 2bf7ce9..a44b135 100644 --- a/reladiff/table_segment.py +++ b/reladiff/table_segment.py @@ -12,6 +12,7 @@ from sqeleton.abcs.database_types import String_UUID from sqeleton.schema import Schema, create_schema from sqeleton.queries import Count, Checksum, SKIP, table, this, Expr, min_, max_, Code +from sqeleton.queries.ast_classes import BinBoolOp from sqeleton.queries.extras import ApplyFuncAndNormalizeAsString, NormalizeAsString logger = logging.getLogger("table_segment") @@ -186,15 +187,27 @@ def _cast_col_value(self, col, value): return str(value) return value + def _get_column_transforms(self, col_name: str, aliased_col=None) -> Expr: + """Get the Column Expression from the Transform Rules, if the column is present + For hashdiff - aliased_col will be None + For joindiff - aliased_col will be the aliased column name + """ + transform_expr = self.transform_columns.get(col_name) + + if aliased_col: + return Code(transform_expr.replace(col_name, aliased_col)) if transform_expr else None + + return Code(transform_expr) if transform_expr else this[col_name] + def _make_key_range(self): if self.min_key is not None: for mn, k in safezip(self.min_key, self.key_columns): mn = self._cast_col_value(k, mn) - yield mn <= this[k] + yield BinBoolOp(">=", [self._get_column_transforms(k), mn]) if self.max_key is not None: for k, mx in safezip(self.key_columns, self.max_key): mx = self._cast_col_value(k, mx) - yield this[k] < mx + yield BinBoolOp("<", [self._get_column_transforms(k), mx]) def _make_update_range(self): if self.min_update is not None: @@ -259,11 +272,7 @@ def relevant_columns(self) -> List[str]: def _relevant_columns_repr(self) -> List[Expr]: expressions = [] for c in self.relevant_columns: - if c in self.transform_columns: - transform_expr = self.transform_columns[c] - expressions.append(NormalizeAsString(Code(transform_expr.format(column=this[c])), self._schema[c])) - else: - expressions.append(NormalizeAsString(this[c], self._schema[c])) + expressions.append(NormalizeAsString(self._get_column_transforms(c), self._schema[c])) return expressions def count(self) -> int: @@ -291,7 +300,7 @@ def query_key_range(self) -> Tuple[tuple, tuple]: """Query database for minimum and maximum key. This is used for setting the initial bounds.""" # Normalizes the result (needed for UUIDs) after the min/max computation select = self.make_select().select( - ApplyFuncAndNormalizeAsString(this[k], f) for k in self.key_columns for f in (min_, max_) + ApplyFuncAndNormalizeAsString(self._get_column_transforms(k), f) for k in self.key_columns for f in (min_, max_) ) result = tuple(self.database.query(select, tuple)) From 0e563eecbfbc3ae02af0e5167bd14fad47b8b251 Mon Sep 17 00:00:00 2001 From: Sarath Sasidharan Date: Wed, 11 Jun 2025 17:13:26 -0500 Subject: [PATCH 09/11] Adding IsDistinctFrom --- reladiff/joindiff_tables.py | 1 + 1 file changed, 1 insertion(+) diff --git a/reladiff/joindiff_tables.py b/reladiff/joindiff_tables.py index 565471c..5e65be4 100644 --- a/reladiff/joindiff_tables.py +++ b/reladiff/joindiff_tables.py @@ -29,6 +29,7 @@ ) from sqeleton.queries.ast_classes import Concat, Count, Expr, Random, TablePath, Code, ITable from sqeleton.queries.extras import NormalizeAsString +from sqeleton.queries.ast_classes import IsDistinctFrom from .info_tree import InfoTree From 39609703a1d94eacc43a06e436f7981c86600405 Mon Sep 17 00:00:00 2001 From: Sarath Sasidharan Date: Wed, 11 Jun 2025 17:18:39 -0500 Subject: [PATCH 10/11] Adding transform_cols in EmptyTableSegment to avoid assert `attr in ("database", "key_columns", "key_types", "relevant_columns", "_schema", "transform_columns")` --- reladiff/table_segment.py | 1 + 1 file changed, 1 insertion(+) diff --git a/reladiff/table_segment.py b/reladiff/table_segment.py index a44b135..7c72034 100644 --- a/reladiff/table_segment.py +++ b/reladiff/table_segment.py @@ -335,6 +335,7 @@ def key_types(self): @dataclass class EmptyTableSegment: _table_segment: TableSegment + transform_columns: Dict[str, str] = field(default_factory=dict) def approximate_size(self): return 0 From 37bb41aa5e8bc9c156fb6e74b8054a7230411b14 Mon Sep 17 00:00:00 2001 From: Sarath Sasidharan Date: Wed, 11 Jun 2025 17:27:31 -0500 Subject: [PATCH 11/11] _get_column_transforms added to avoid attr error --- reladiff/table_segment.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/reladiff/table_segment.py b/reladiff/table_segment.py index 7c72034..f92c4d9 100644 --- a/reladiff/table_segment.py +++ b/reladiff/table_segment.py @@ -335,7 +335,6 @@ def key_types(self): @dataclass class EmptyTableSegment: _table_segment: TableSegment - transform_columns: Dict[str, str] = field(default_factory=dict) def approximate_size(self): return 0 @@ -354,7 +353,7 @@ def count_and_checksum(self) -> Tuple[int, int]: return (0, None) def __getattr__(self, attr): - assert attr in ("database", "key_columns", "key_types", "relevant_columns", "_schema", "transform_columns") + assert attr in ("database", "key_columns", "key_types", "relevant_columns", "_schema", "transform_columns", "_get_column_transforms") return getattr(self._table_segment, attr) @property