From 9f02b52c8aaa767ff0a1793ccfe649f910f76e40 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 8 Nov 2024 10:58:19 -0700 Subject: [PATCH 1/7] [edgetest] automated change (#353) Co-authored-by: fdosani <10391670+fdosani@users.noreply.github.com> --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f381dc5..bcacf63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ maintainers = [ { name="Faisal Dosani", email="faisal.dosani@capitalone.com" } ] license = {text = "Apache Software License"} -dependencies = ["pandas<=2.2.3,>=0.25.0", "numpy<=2.1.2,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "fugue<=0.9.1,>=0.8.7", "polars<=1.12.0,>=0.20.4"] +dependencies = ["pandas<=2.2.3,>=0.25.0", "numpy<=2.1.3,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "fugue<=0.9.1,>=0.8.7", "polars<=1.12.0,>=0.20.4"] requires-python = ">=3.9.0" classifiers = [ "Intended Audience :: Developers", From c6530adbbb710f2c6570726daed1c590ec32062a Mon Sep 17 00:00:00 2001 From: Chet Bramble Date: Fri, 15 Nov 2024 07:58:03 -0500 Subject: [PATCH 2/7] Fixed comment in count_matching_rows() (#355) --- datacompy/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datacompy/base.py b/datacompy/base.py index d79fa0c..2c829af 100644 --- a/datacompy/base.py +++ b/datacompy/base.py @@ -118,7 +118,7 @@ def all_rows_overlap(self) -> bool: @abstractmethod def count_matching_rows(self) -> int: - """Count the number of matchin grows.""" + """Count the number of matching rows.""" pass @abstractmethod From 09686a9b54840a3d5dd6fba236b7533ac6a0f115 Mon Sep 17 00:00:00 2001 From: rhaffar <141745338+rhaffar@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:08:01 -0500 Subject: [PATCH 3/7] Optimize Snowflake/Snowpark Compare (#354) * non-blocking diffs, multithread intersect * caching to prevent bad snowflake pushdown optimization * update max_diff test --- datacompy/snowflake.py | 187 +++++++++++++++++++++++----------------- tests/test_snowflake.py | 2 +- 2 files changed, 107 insertions(+), 82 deletions(-) diff --git a/datacompy/snowflake.py b/datacompy/snowflake.py index 4441e5a..071e355 100644 --- a/datacompy/snowflake.py +++ b/datacompy/snowflake.py @@ -23,6 +23,7 @@ import logging import os +from concurrent.futures import ThreadPoolExecutor, as_completed from copy import deepcopy from typing import Any, Dict, List, Optional, Union, cast @@ -31,8 +32,9 @@ try: import snowflake.snowpark as sp + from snowflake.connector.errors import ProgrammingError from snowflake.snowpark import Window - from snowflake.snowpark.exceptions import SnowparkSQLException + from snowflake.snowpark.exceptions import SnowparkClientException from snowflake.snowpark.functions import ( abs, col, @@ -45,6 +47,7 @@ trim, when, ) + except ImportError: pass # for non-snowflake users from datacompy.base import BaseCompare @@ -288,8 +291,12 @@ def _dataframe_merge(self, ignore_spaces: bool) -> None: LOG.debug("Duplicate rows found, deduping by order of remaining fields") # setting internal index LOG.info("Adding internal index to dataframes") - df1 = df1.withColumn("__index", monotonically_increasing_id()) - df2 = df2.withColumn("__index", monotonically_increasing_id()) + df1 = df1.withColumn( + "__index", monotonically_increasing_id() + ).cache_result() + df2 = df2.withColumn( + "__index", monotonically_increasing_id() + ).cache_result() # Create order column for uniqueness of match order_column = temp_column_name(df1, df2) @@ -305,11 +312,6 @@ def _dataframe_merge(self, ignore_spaces: bool) -> None: ).drop("__index") temp_join_columns.append(order_column) - # drop index - LOG.info("Dropping internal index") - df1 = df1.drop("__index") - df2 = df2.drop("__index") - if ignore_spaces: for column in self.join_columns: if "string" in next( @@ -400,23 +402,15 @@ def _intersect_compare(self, ignore_spaces: bool) -> None: """Run the comparison on the intersect dataframe. This loops through all columns that are shared between df1 and df2, and - creates a column column_match which is True for matches, False - otherwise. + creates a column column_match which is True for matches, False otherwise. + Finally calculates and stores the compare metrics for matching column pairs. """ LOG.debug("Comparing intersection") - max_diff: float - null_diff: int - row_cnt = self.intersect_rows.count() - for column in self.intersect_columns(): - if column in self.join_columns: - match_cnt = row_cnt - col_match = "" - max_diff = 0 - null_diff = 0 - else: - col_1 = column + "_" + self.df1_name - col_2 = column + "_" + self.df2_name - col_match = column + "_MATCH" + for col in self.intersect_columns(): + if col not in self.join_columns: + col_1 = col + "_" + self.df1_name + col_2 = col + "_" + self.df2_name + col_match = col + "_MATCH" self.intersect_rows = columns_equal( self.intersect_rows, col_1, @@ -426,46 +420,87 @@ def _intersect_compare(self, ignore_spaces: bool) -> None: self.abs_tol, ignore_spaces, ) - match_cnt = ( - self.intersect_rows.select(col_match) - .where(col(col_match) == True) # noqa: E712 - .count() - ) - max_diff = calculate_max_diff( - self.intersect_rows, - col_1, - col_2, + row_cnt = self.intersect_rows.count() + + with ThreadPoolExecutor() as executor: + futures = [] + for column in self.intersect_columns(): + future = executor.submit( + self._calculate_column_compare_stats, column, row_cnt ) - null_diff = calculate_null_diff(self.intersect_rows, col_1, col_2) + futures.append(future) + for future in as_completed(futures): + if future.exception(): + raise future.exception() - if row_cnt > 0: - match_rate = float(match_cnt) / row_cnt - else: - match_rate = 0 - LOG.info(f"{column}: {match_cnt} / {row_cnt} ({match_rate:.2%}) match") - - col1_dtype, _ = _get_column_dtypes(self.df1, column, column) - col2_dtype, _ = _get_column_dtypes(self.df2, column, column) - - self.column_stats.append( - { - "column": column, - "match_column": col_match, - "match_cnt": match_cnt, - "unequal_cnt": row_cnt - match_cnt, - "dtype1": str(col1_dtype), - "dtype2": str(col2_dtype), - "all_match": all( - ( - col1_dtype == col2_dtype, - row_cnt == match_cnt, - ) - ), - "max_diff": max_diff, - "null_diff": null_diff, - } + def _calculate_column_compare_stats(self, column: str, row_cnt: int) -> None: + """Populate the column stats for all intersecting column pairs. + + Calculates compare stats by intersecting column pairs. For the non-trivial case + where intersecting columns are not join columns, a match count, max difference, + and null difference must be calculated. + """ + if column in self.join_columns: + match_cnt = row_cnt + col_match = "" + max_diff = 0 + null_diff = 0 + else: + col_1 = column + "_" + self.df1_name + col_2 = column + "_" + self.df2_name + col_match = column + "_MATCH" + + match_cnt = ( + self.intersect_rows.select(col_match) + .where(col(col_match) == True) # noqa: E712 + .count(block=False) ) + max_diff = calculate_max_diff( + self.intersect_rows, + col_1, + col_2, + ) + null_diff = calculate_null_diff(self.intersect_rows, col_1, col_2) + + match_cnt = match_cnt.result() + try: + max_diff = max_diff.result()[0][0] + except (SnowparkClientException, ProgrammingError): + max_diff = 0 + try: + null_diff = null_diff.result() + except (SnowparkClientException, ProgrammingError): + null_diff = 0 + + if row_cnt > 0: + match_rate = float(match_cnt) / row_cnt + else: + match_rate = 0 + LOG.info(f"{column}: {match_cnt} / {row_cnt} ({match_rate:.2%}) match") + + col1_dtype, _ = _get_column_dtypes(self.df1, column, column) + col2_dtype, _ = _get_column_dtypes(self.df2, column, column) + + self.column_stats.append( + { + "column": column, + "match_column": col_match, + "match_cnt": match_cnt, + "unequal_cnt": row_cnt - match_cnt, + "dtype1": str(col1_dtype), + "dtype2": str(col2_dtype), + "all_match": all( + ( + col1_dtype == col2_dtype, + row_cnt == match_cnt, + ) + ), + "max_diff": max_diff, + "null_diff": null_diff, + } + ) + def all_columns_match(self) -> bool: """Whether the columns all match in the dataframes. @@ -994,23 +1029,16 @@ def calculate_max_diff(dataframe: "sp.DataFrame", col_1: str, col_2: str) -> flo max diff """ # Attempting to coalesce maximum diff for non-numeric results in error, if error return 0 max diff. - try: - diff = dataframe.select( - (col(col_1).astype("float") - col(col_2).astype("float")).alias("diff") - ) - abs_diff = diff.select(abs(col("diff")).alias("abs_diff")) - max_diff: float = ( - abs_diff.where(is_null(col("abs_diff")) == False) # noqa: E712 - .agg({"abs_diff": "max"}) - .collect()[0][0] - ) - except SnowparkSQLException: - return None - - if pd.isna(max_diff) or pd.isnull(max_diff) or max_diff is None: - return 0 - else: - return max_diff + diff = dataframe.select( + (col(col_1).astype("float") - col(col_2).astype("float")).alias("diff") + ) + abs_diff = diff.select(abs(col("diff")).alias("abs_diff")) + max_diff: float = ( + abs_diff.where(is_null(col("abs_diff")) == False) # noqa: E712 + .agg({"abs_diff": "max"}) + .collect(block=False) + ) + return max_diff def calculate_null_diff(dataframe: "sp.DataFrame", col_1: str, col_2: str) -> int: @@ -1047,12 +1075,9 @@ def calculate_null_diff(dataframe: "sp.DataFrame", col_1: str, col_2: str) -> in null_diff = nulls_df.where( ((col("col_1_null") == False) & (col("col_2_null") == True)) # noqa: E712 | ((col("col_1_null") == True) & (col("col_2_null") == False)) # noqa: E712 - ).count() + ).count(block=False) - if pd.isna(null_diff) or pd.isnull(null_diff) or null_diff is None: - return 0 - else: - return null_diff + return null_diff def _generate_id_within_group( diff --git a/tests/test_snowflake.py b/tests/test_snowflake.py index 548f604..6380f88 100644 --- a/tests/test_snowflake.py +++ b/tests/test_snowflake.py @@ -1187,7 +1187,7 @@ def test_calculate_max_diff(snowpark_session, column, expected): ) MAX_DIFF_DF = snowpark_session.createDataFrame(pdf) assert np.isclose( - calculate_max_diff(MAX_DIFF_DF, "BASE", column), + calculate_max_diff(MAX_DIFF_DF, "BASE", column).result()[0][0], expected, ) From 148cca9ffdfceb1d96f76d6fdaaf8fab2e4a218a Mon Sep 17 00:00:00 2001 From: David Saunders Date: Wed, 27 Nov 2024 14:48:31 +0000 Subject: [PATCH 4/7] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20PySpark=20typo=20(#357?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- docs/source/spark_usage.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index cf6070d..ad2ad9b 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ Pandas on Spark implementation. The original ``SparkCompare`` implementation dif from all the other native implementations. To align the API better, and keep behaviour consistent we are deprecating the original ``SparkCompare`` into a new module ``LegacySparkCompare`` -Subsequently in ``v0.13.0`` a PySaprk DataFrame class has been introduced (``SparkSQLCompare``) +Subsequently in ``v0.13.0`` a PySpark DataFrame class has been introduced (``SparkSQLCompare``) which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark logic is now under the ``spark`` submodule. diff --git a/docs/source/spark_usage.rst b/docs/source/spark_usage.rst index 7bd942c..269840b 100644 --- a/docs/source/spark_usage.rst +++ b/docs/source/spark_usage.rst @@ -9,7 +9,7 @@ Spark Usage and keep behaviour consistent we are deprecating the original ``SparkCompare`` into a new module ``LegacySparkCompare`` - Subsequently in ``v0.13.0`` a PySaprk DataFrame class has been introduced (``SparkSQLCompare``) + Subsequently in ``v0.13.0`` a PySpark DataFrame class has been introduced (``SparkSQLCompare``) which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark logic is now under the ``spark`` submodule. From e0e839ba7f223c18704fa2bb30262947f423d9e0 Mon Sep 17 00:00:00 2001 From: rhaffar <141745338+rhaffar@users.noreply.github.com> Date: Thu, 28 Nov 2024 10:37:11 -0500 Subject: [PATCH 5/7] catch exception, column output change (#358) --- datacompy/snowflake.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/datacompy/snowflake.py b/datacompy/snowflake.py index 071e355..4f9e135 100644 --- a/datacompy/snowflake.py +++ b/datacompy/snowflake.py @@ -32,9 +32,8 @@ try: import snowflake.snowpark as sp - from snowflake.connector.errors import ProgrammingError + from snowflake.connector.errors import DatabaseError, ProgrammingError from snowflake.snowpark import Window - from snowflake.snowpark.exceptions import SnowparkClientException from snowflake.snowpark.functions import ( abs, col, @@ -466,11 +465,11 @@ def _calculate_column_compare_stats(self, column: str, row_cnt: int) -> None: match_cnt = match_cnt.result() try: max_diff = max_diff.result()[0][0] - except (SnowparkClientException, ProgrammingError): + except (ProgrammingError, DatabaseError): max_diff = 0 try: null_diff = null_diff.result() - except (SnowparkClientException, ProgrammingError): + except (ProgrammingError, DatabaseError): null_diff = 0 if row_cnt > 0: @@ -747,8 +746,8 @@ def report( report += render( "column_summary.txt", len(self.intersect_columns()), - len(self.df1_unq_columns()), - len(self.df2_unq_columns()), + f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}", + f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}", self.df1_name, self.df2_name, ) From b84a1c9e29a1b716345be5981aeb4b5360a6fba2 Mon Sep 17 00:00:00 2001 From: Faisal Date: Tue, 10 Dec 2024 16:07:30 -0400 Subject: [PATCH 6/7] fugue optional dependency (#360) * adding in logging and updating deps * updating docs * updating fugue extras --- .github/workflows/test-package.yml | 2 +- README.md | 4 +- datacompy/base.py | 5 ++- datacompy/core.py | 4 +- datacompy/fugue.py | 46 +++++++++++--------- datacompy/logger.py | 61 +++++++++++++++++++++++++++ datacompy/polars.py | 12 ++---- datacompy/snowflake.py | 16 ++++--- datacompy/spark/sql.py | 12 +++--- docs/source/fugue_usage.rst | 24 ++++------- pyproject.toml | 13 +++--- tests/test_fugue/test_duckdb.py | 1 + tests/test_fugue/test_fugue_pandas.py | 3 ++ tests/test_fugue/test_fugue_polars.py | 1 + tests/test_fugue/test_fugue_spark.py | 1 + tests/test_snowflake.py | 1 + 16 files changed, 137 insertions(+), 69 deletions(-) create mode 100644 datacompy/logger.py diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index dc50777..0414e3e 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -134,7 +134,7 @@ jobs: - name: Install datacompy run: | python -m pip install --upgrade pip - python -m pip install .[tests,duckdb,polars,dask,ray] + python -m pip install .[tests,fugue] - name: Test with pytest run: | python -m pytest tests/ --ignore=tests/test_snowflake.py diff --git a/README.md b/README.md index ad2ad9b..7c74f3c 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,7 @@ If you would like to use Spark or any other backends please make sure you instal ```shell pip install datacompy[spark] -pip install datacompy[dask] -pip install datacompy[duckdb] -pip install datacompy[ray] +pip install datacompy[fugue] pip install datacompy[snowflake] ``` diff --git a/datacompy/base.py b/datacompy/base.py index 2c829af..d8ee6f9 100644 --- a/datacompy/base.py +++ b/datacompy/base.py @@ -21,13 +21,14 @@ two dataframes. """ -import logging from abc import ABC, abstractmethod from typing import Any, Optional from ordered_set import OrderedSet -LOG = logging.getLogger(__name__) +from datacompy.logger import INFO, get_logger + +LOG = get_logger(__name__, INFO) class BaseCompare(ABC): diff --git a/datacompy/core.py b/datacompy/core.py index 7d0da3d..a7cdcca 100644 --- a/datacompy/core.py +++ b/datacompy/core.py @@ -21,7 +21,6 @@ two dataframes. """ -import logging import os from typing import Any, Dict, List, Optional, Union, cast @@ -30,8 +29,9 @@ from ordered_set import OrderedSet from datacompy.base import BaseCompare, temp_column_name +from datacompy.logger import INFO, get_logger -LOG = logging.getLogger(__name__) +LOG = get_logger(__name__, INFO) class Compare(BaseCompare): diff --git a/datacompy/fugue.py b/datacompy/fugue.py index c4c27cb..fff14ba 100644 --- a/datacompy/fugue.py +++ b/datacompy/fugue.py @@ -15,25 +15,33 @@ """Compare two DataFrames that are supported by Fugue.""" -import logging import pickle from collections import defaultdict from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast -import fugue.api as fa import pandas as pd -import pyarrow as pa -from fugue import AnyDataFrame from ordered_set import OrderedSet -from triad import Schema from datacompy.core import Compare, render +from datacompy.logger import INFO, get_logger -LOG = logging.getLogger(__name__) +LOG = get_logger(__name__, INFO) HASH_COL = "__datacompy__hash__" -def unq_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]: +try: + import fugue.api as fa + import pyarrow as pa + from fugue import AnyDataFrame + from triad import Schema +except ImportError: + LOG.warning( + "Please note that you are missing the optional dependency: fugue. " + "If you need to use this functionality it must be installed." + ) + + +def unq_columns(df1: "AnyDataFrame", df2: "AnyDataFrame") -> OrderedSet[str]: """Get columns that are unique to df1. Parameters @@ -54,7 +62,7 @@ def unq_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]: return cast(OrderedSet[str], OrderedSet(col1) - OrderedSet(col2)) -def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]: +def intersect_columns(df1: "AnyDataFrame", df2: "AnyDataFrame") -> OrderedSet[str]: """Get columns that are shared between the two dataframes. Parameters @@ -75,7 +83,7 @@ def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]: return OrderedSet(col1) & OrderedSet(col2) -def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame) -> bool: +def all_columns_match(df1: "AnyDataFrame", df2: "AnyDataFrame") -> bool: """Whether the columns all match in the dataframes. Parameters @@ -95,8 +103,8 @@ def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame) -> bool: def is_match( - df1: AnyDataFrame, - df2: AnyDataFrame, + df1: "AnyDataFrame", + df2: "AnyDataFrame", join_columns: Union[str, List[str]], abs_tol: float = 0, rel_tol: float = 0, @@ -194,8 +202,8 @@ def is_match( def all_rows_overlap( - df1: AnyDataFrame, - df2: AnyDataFrame, + df1: "AnyDataFrame", + df2: "AnyDataFrame", join_columns: Union[str, List[str]], abs_tol: float = 0, rel_tol: float = 0, @@ -290,8 +298,8 @@ def all_rows_overlap( def count_matching_rows( - df1: AnyDataFrame, - df2: AnyDataFrame, + df1: "AnyDataFrame", + df2: "AnyDataFrame", join_columns: Union[str, List[str]], abs_tol: float = 0, rel_tol: float = 0, @@ -385,8 +393,8 @@ def count_matching_rows( def report( - df1: AnyDataFrame, - df2: AnyDataFrame, + df1: "AnyDataFrame", + df2: "AnyDataFrame", join_columns: Union[str, List[str]], abs_tol: float = 0, rel_tol: float = 0, @@ -638,8 +646,8 @@ def _any(col: str) -> int: def _distributed_compare( - df1: AnyDataFrame, - df2: AnyDataFrame, + df1: "AnyDataFrame", + df2: "AnyDataFrame", join_columns: Union[str, List[str]], return_obj_func: Callable[[Compare], Any], abs_tol: float = 0, diff --git a/datacompy/logger.py b/datacompy/logger.py new file mode 100644 index 0000000..2efcb47 --- /dev/null +++ b/datacompy/logger.py @@ -0,0 +1,61 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2024 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Logging Module. + +Module which sets up the basic logging infrustrcuture for the application. +""" + +import logging +import sys + +# logger formating +BRIEF_FORMAT = "%(levelname)s %(asctime)s - %(name)s: %(message)s" +VERBOSE_FORMAT = ( + "%(levelname)s|%(asctime)s|%(name)s|%(filename)s|" + "%(funcName)s|%(lineno)d: %(message)s" +) +FORMAT_TO_USE = VERBOSE_FORMAT + +# logger levels +DEBUG = logging.DEBUG +INFO = logging.INFO +WARN = logging.WARNING +ERROR = logging.ERROR +CRITICAL = logging.CRITICAL + + +def get_logger(name=None, log_level=logging.DEBUG): + """Set the basic logging features for the application. + + Parameters + ---------- + name : str, optional + The name of the logger. Defaults to ``None`` + log_level : int, optional + The logging level. Defaults to ``logging.INFO`` + + Returns + ------- + logging.Logger + Returns a Logger obejct which is set with the passed in paramters. + Please see the following for more details: + https://docs.python.org/2/library/logging.html + """ + logging.basicConfig(format=FORMAT_TO_USE, stream=sys.stdout, level=log_level) + logging.captureWarnings(True) + logger = logging.getLogger(name) + return logger diff --git a/datacompy/polars.py b/datacompy/polars.py index 6b335c4..d6300b8 100644 --- a/datacompy/polars.py +++ b/datacompy/polars.py @@ -21,23 +21,19 @@ two dataframes. """ -import logging import os from copy import deepcopy from typing import Any, Dict, List, Optional, Union, cast import numpy as np +import polars as pl from ordered_set import OrderedSet +from polars.exceptions import ComputeError, InvalidOperationError from datacompy.base import BaseCompare, temp_column_name +from datacompy.logger import INFO, get_logger -try: - import polars as pl - from polars.exceptions import ComputeError, InvalidOperationError -except ImportError: - pass # Let non-Polars people at least enjoy the loveliness of the pandas datacompy functionality - -LOG = logging.getLogger(__name__) +LOG = get_logger(__name__, INFO) STRING_TYPE = ["String", "Utf8"] DATE_TYPE = ["Date", "Datetime"] diff --git a/datacompy/snowflake.py b/datacompy/snowflake.py index 4f9e135..007f002 100644 --- a/datacompy/snowflake.py +++ b/datacompy/snowflake.py @@ -21,7 +21,6 @@ two dataframes. """ -import logging import os from concurrent.futures import ThreadPoolExecutor, as_completed from copy import deepcopy @@ -30,6 +29,12 @@ import pandas as pd from ordered_set import OrderedSet +from datacompy.base import BaseCompare +from datacompy.logger import INFO, get_logger +from datacompy.spark.sql import decimal_comparator + +LOG = get_logger(__name__, INFO) + try: import snowflake.snowpark as sp from snowflake.connector.errors import DatabaseError, ProgrammingError @@ -48,11 +53,10 @@ ) except ImportError: - pass # for non-snowflake users -from datacompy.base import BaseCompare -from datacompy.spark.sql import decimal_comparator - -LOG = logging.getLogger(__name__) + LOG.warning( + "Please note that you are missing the optional dependency: snowflake. " + "If you need to use this functionality it must be installed." + ) NUMERIC_SNOWPARK_TYPES = [ diff --git a/datacompy/spark/sql.py b/datacompy/spark/sql.py index 61a2871..050c5ba 100644 --- a/datacompy/spark/sql.py +++ b/datacompy/spark/sql.py @@ -21,7 +21,6 @@ two dataframes. """ -import logging import os from copy import deepcopy from typing import List, Optional, Tuple, Union @@ -30,6 +29,9 @@ from ordered_set import OrderedSet from datacompy.base import BaseCompare, temp_column_name +from datacompy.logger import INFO, get_logger + +LOG = get_logger(__name__, INFO) try: import pyspark.sql @@ -49,10 +51,10 @@ when, ) except ImportError: - pass # Let non-Spark people at least enjoy the loveliness of the spark sql datacompy functionality - - -LOG = logging.getLogger(__name__) + LOG.warning( + "Please note that you are missing the optional dependency: spark. " + "If you need to use this functionality it must be installed." + ) def decimal_comparator(): diff --git a/docs/source/fugue_usage.rst b/docs/source/fugue_usage.rst index 34791c3..143db59 100644 --- a/docs/source/fugue_usage.rst +++ b/docs/source/fugue_usage.rst @@ -5,6 +5,15 @@ Fugue Detail for data processing on Pandas, DuckDB, Polars, Arrow, Spark, Dask, Ray, and many other backends. DataComPy integrates with Fugue to provide a simple way to compare data across these backends. + +Installation +------------ + +:: + + pip install datacompy[fugue] + + Basic Usage ----------- @@ -90,13 +99,6 @@ to compare a Pandas dataframe with a Spark dataframe: join_columns='acct_id', ) -Notice that in order to use a specific backend, you need to have the corresponding library installed. -For example, if you want compare Ray datasets, you must do - -:: - - pip install datacompy[ray] - How it works ------------ @@ -106,11 +108,3 @@ using the Pandas-based ``Compare``. The comparison results are then aggregated t Different from the join operation used in ``SparkCompare``, the Fugue version uses the ``cogroup -> map`` like semantic (not exactly the same, Fugue adopts a coarse version to achieve great performance), which guarantees full data comparison with consistent result compared to Pandas-based ``Compare``. - - -Future releases ---------------- - -We are hoping to pilot Fugue for the community in future releases (0.10+) and gather feedback. With Fugue we get the -benefits of not having to maintain Framework specific code, and also having cross-framework compatibility. We may in -future depending on feedback deprecate ``SparkCompare`` in favour of just using Fugue to manage non-Pandas use cases. diff --git a/pyproject.toml b/pyproject.toml index bcacf63..9481a80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ maintainers = [ { name="Faisal Dosani", email="faisal.dosani@capitalone.com" } ] license = {text = "Apache Software License"} -dependencies = ["pandas<=2.2.3,>=0.25.0", "numpy<=2.1.3,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "fugue<=0.9.1,>=0.8.7", "polars<=1.12.0,>=0.20.4"] +dependencies = ["pandas<=2.2.3,>=0.25.0", "numpy<=2.1.3,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "polars[pandas]<=1.12.0,>=0.20.4"] requires-python = ">=3.9.0" classifiers = [ "Intended Audience :: Developers", @@ -54,11 +54,9 @@ version = {attr = "datacompy.__version__"} python-tag = "py3" [project.optional-dependencies] -duckdb = ["fugue[duckdb]"] +fugue = ["fugue[duckdb,dask,ray]<=0.9.1,>=0.8.7"] spark = ["pyspark[connect]>=3.1.1; python_version < \"3.11\"", "pyspark[connect]>=3.4; python_version >= \"3.11\""] snowflake = ["snowflake-connector-python", "snowflake-snowpark-python"] -dask = ["fugue[dask]"] -ray = ["fugue[ray]"] docs = ["sphinx", "furo", "myst-parser"] tests = ["pytest", "pytest-cov"] @@ -67,8 +65,8 @@ tests-snowflake = ["snowflake-snowpark-python[localtest]"] qa = ["pre-commit", "ruff==0.5.7", "mypy", "pandas-stubs"] build = ["build", "twine", "wheel"] edgetest = ["edgetest", "edgetest-conda"] -dev_no_snowflake = ["datacompy[duckdb]", "datacompy[spark]", "datacompy[docs]", "datacompy[tests]", "datacompy[tests-spark]", "datacompy[qa]", "datacompy[build]"] -dev = ["datacompy[duckdb]", "datacompy[spark]", "datacompy[snowflake]", "datacompy[docs]", "datacompy[tests]", "datacompy[tests-spark]", "datacompy[tests-snowflake]", "datacompy[qa]", "datacompy[build]"] +dev_no_snowflake = ["datacompy[fugue]", "datacompy[spark]", "datacompy[docs]", "datacompy[tests]", "datacompy[tests-spark]", "datacompy[qa]", "datacompy[build]"] +dev = ["datacompy[fugue]", "datacompy[spark]", "datacompy[snowflake]", "datacompy[docs]", "datacompy[tests]", "datacompy[tests-spark]", "datacompy[tests-snowflake]", "datacompy[qa]", "datacompy[build]"] # Linters, formatters and type checkers [tool.ruff] @@ -143,6 +141,5 @@ upgrade = [ "pandas", "numpy", "ordered-set", - "fugue", - "polars", + "polars[pandas]", ] diff --git a/tests/test_fugue/test_duckdb.py b/tests/test_fugue/test_duckdb.py index 715f101..77b0588 100644 --- a/tests/test_fugue/test_duckdb.py +++ b/tests/test_fugue/test_duckdb.py @@ -26,6 +26,7 @@ from ordered_set import OrderedSet from pytest import raises +pytest.importorskip("fugue") duckdb = pytest.importorskip("duckdb") diff --git a/tests/test_fugue/test_fugue_pandas.py b/tests/test_fugue/test_fugue_pandas.py index 2f12a5c..3c99a54 100644 --- a/tests/test_fugue/test_fugue_pandas.py +++ b/tests/test_fugue/test_fugue_pandas.py @@ -17,6 +17,7 @@ from io import StringIO import pandas as pd +import pytest from datacompy import ( Compare, all_columns_match, @@ -31,6 +32,8 @@ from pytest import raises from test_fugue_helpers import _compare_report +pytest.importorskip("fugue") + def test_is_match_native( ref_df, diff --git a/tests/test_fugue/test_fugue_polars.py b/tests/test_fugue/test_fugue_polars.py index 7f56d21..bf58bcd 100644 --- a/tests/test_fugue/test_fugue_polars.py +++ b/tests/test_fugue/test_fugue_polars.py @@ -26,6 +26,7 @@ from ordered_set import OrderedSet from pytest import raises +pytest.importorskip("fugue") pl = pytest.importorskip("polars") diff --git a/tests/test_fugue/test_fugue_spark.py b/tests/test_fugue/test_fugue_spark.py index c74d059..8c1046a 100644 --- a/tests/test_fugue/test_fugue_spark.py +++ b/tests/test_fugue/test_fugue_spark.py @@ -31,6 +31,7 @@ from pytest import raises from test_fugue_helpers import _compare_report +pytest.importorskip("fugue") pyspark = pytest.importorskip("pyspark") if sys.version_info >= (3, 12): diff --git a/tests/test_snowflake.py b/tests/test_snowflake.py index 6380f88..026c807 100644 --- a/tests/test_snowflake.py +++ b/tests/test_snowflake.py @@ -30,6 +30,7 @@ import pytest from pytest import raises +pytest.importorskip("snowflake") pytest.importorskip("pyspark") From f00d0bd6bb23ae5f5c67f3dca72bce6b0bcf4e19 Mon Sep 17 00:00:00 2001 From: Faisal Date: Tue, 17 Dec 2024 12:05:11 -0400 Subject: [PATCH 7/7] fixing polars test and bumping versions (#361) * change exception to include SchemaError * bumping versions * bumping datacompy version --- datacompy/__init__.py | 2 +- pyproject.toml | 2 +- tests/test_polars.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datacompy/__init__.py b/datacompy/__init__.py index 58691ba..9f51808 100644 --- a/datacompy/__init__.py +++ b/datacompy/__init__.py @@ -18,7 +18,7 @@ Then extended to carry that functionality over to Spark Dataframes. """ -__version__ = "0.14.4" +__version__ = "0.15.0" import platform from warnings import warn diff --git a/pyproject.toml b/pyproject.toml index 9481a80..b0cb8fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ maintainers = [ { name="Faisal Dosani", email="faisal.dosani@capitalone.com" } ] license = {text = "Apache Software License"} -dependencies = ["pandas<=2.2.3,>=0.25.0", "numpy<=2.1.3,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "polars[pandas]<=1.12.0,>=0.20.4"] +dependencies = ["pandas<=2.2.3,>=0.25.0", "numpy<=2.2.0,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "polars[pandas]<=1.17.1,>=0.20.4"] requires-python = ">=3.9.0" classifiers = [ "Intended Audience :: Developers", diff --git a/tests/test_polars.py b/tests/test_polars.py index 9020503..ff89db0 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -38,7 +38,7 @@ generate_id_within_group, temp_column_name, ) -from polars.exceptions import ComputeError, DuplicateError +from polars.exceptions import ComputeError, DuplicateError, SchemaError from polars.testing import assert_series_equal logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) @@ -536,7 +536,7 @@ def test_string_joiner(): def test_float_and_string_with_joins(): df1 = pl.DataFrame([{"a": float("1"), "b": 2}, {"a": float("2"), "b": 2}]) df2 = pl.DataFrame([{"a": 1, "b": 2}, {"a": 2, "b": 2}]) - with raises(ComputeError): + with raises((ComputeError, SchemaError)): PolarsCompare(df1, df2, "a")