Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SparkSQLCompare and spark submodule #310

Merged
merged 18 commits into from
Jun 20, 2024
Merged

SparkSQLCompare and spark submodule #310

merged 18 commits into from
Jun 20, 2024

Conversation

fdosani
Copy link
Member

@fdosani fdosani commented Jun 5, 2024

In the following PR I've created:

  • bumped version to 0.13.0 (breaking changes)
  • new Compare class called SparkSQLCompare
    • the Pandas on Spark API while simplified did not seem to be performant.
    • this new logic seems to be much faster and should yield almost identical result
    • support for pandas 2.0+ unlike SparkPandasCompare
  • renamed SparkCompare -> SparkPandasCompare
  • reorganized all the spark logic into a submodule spark
  • cleaned up some typing
  • updates for docs

Some benchmark numbers to follow

@fdosani
Copy link
Member Author

fdosani commented Jun 5, 2024

single machine: 16 CPUs, 64 GB RAM

100 Million rows x 10 cols:

----------------------------------------------- benchmark: 1 tests -----------------------------------------------
Name (time in s)          Min       Max      Mean   StdDev    Median     IQR  Outliers     OPS  Rounds  Iterations
------------------------------------------------------------------------------------------------------------------
test_vanilla         133.7600  384.2520  161.6279  78.2540  137.0472  4.0239       1;1  0.0062      10           1
------------------------------------------------------------------------------------------------------------------

500 Million rows x 10 cols:

Does not run: OOM

Distributed: 20 executors, 8 cores, 32GB RAM

100 Million rows x 10 cols:

--------------------------------------------- benchmark: 1 tests ---------------------------------------------
Name (time in s)         Min      Max     Mean   StdDev   Median     IQR  Outliers     OPS  Rounds  Iterations
--------------------------------------------------------------------------------------------------------------
test_vanilla         37.7553  91.5924  47.2185  16.2070  41.5550  6.0568       1;1  0.0212      10           1
--------------------------------------------------------------------------------------------------------------

500 Million rows x 10 cols:

----------------------------------------------- benchmark: 1 tests -----------------------------------------------
Name (time in s)          Min       Max      Mean   StdDev    Median     IQR  Outliers     OPS  Rounds  Iterations
------------------------------------------------------------------------------------------------------------------
test_vanilla         110.8866  239.6311  130.9814  38.4822  118.5674  7.5951       1;1  0.0076      10           1
------------------------------------------------------------------------------------------------------------------

@fdosani fdosani marked this pull request as ready for review June 5, 2024 23:17
Copy link
Contributor

@rhaffar rhaffar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good Faisal,

One thing to mention, I do seem to get several of the following warning when performing a Spark SQL compare: No Partition Defined for Window operation! - looks like you'd need to add a partition to the Windows in the dataframe merge to silence these.

One other consideration - when re-running a compare, I get a few WARN CacheManager: Asked to cache already cached data.. Not a meaningful warning since it's just trying to re-cache the dataframes from the first compare, may be worth just forcefully silencing this one.

datacompy/spark/sql.py Outdated Show resolved Hide resolved
datacompy/spark/sql.py Show resolved Hide resolved
datacompy/spark/sql.py Show resolved Hide resolved
@fdosani
Copy link
Member Author

fdosani commented Jun 13, 2024

One thing to mention, I do seem to get several of the following warning when performing a Spark SQL compare: No Partition Defined for Window operation! - looks like you'd need to add a partition to the Windows in the dataframe merge to silence these.

I think this is coming from the following lines:

df1 = df1.withColumn(
    "__index",
    row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
            )
df2 = df2.withColumn(
    "__index",
    row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
)

I need to create an index column but the increasing number need to happen across the whole dataframe, so there isn't anything to really partition on. This only happens when there are dupes.

I think I can just use monotonically_increasing_id without any windowing. It should still give the same effect.

@fdosani fdosani requested a review from rhaffar June 14, 2024 12:21
@rhaffar
Copy link
Contributor

rhaffar commented Jun 14, 2024

One thing to mention, I do seem to get several of the following warning when performing a Spark SQL compare: No Partition Defined for Window operation! - looks like you'd need to add a partition to the Windows in the dataframe merge to silence these.

I think this is coming from the following lines:

df1 = df1.withColumn(
    "__index",
    row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
            )
df2 = df2.withColumn(
    "__index",
    row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
)

I need to create an index column but the increasing number need to happen across the whole dataframe, so there isn't anything to really partition on. This only happens when there are dupes.

I think I can just use monotonically_increasing_id without any windowing. It should still give the same effect.

Re-testing locally with a small dataframe, it seems what use to be a ~6-7 second run is now ~25 seconds due to this specific change. I'm not sure how this scales with larger dataframes and running locally vs a cluster, but it may be worth checking this doesn't have a meaningful affect on performance.

@fdosani
Copy link
Member Author

fdosani commented Jun 14, 2024

One thing to mention, I do seem to get several of the following warning when performing a Spark SQL compare: No Partition Defined for Window operation! - looks like you'd need to add a partition to the Windows in the dataframe merge to silence these.

I think this is coming from the following lines:

df1 = df1.withColumn(
    "__index",
    row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
            )
df2 = df2.withColumn(
    "__index",
    row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
)

I need to create an index column but the increasing number need to happen across the whole dataframe, so there isn't anything to really partition on. This only happens when there are dupes.
I think I can just use monotonically_increasing_id without any windowing. It should still give the same effect.

Re-testing locally with a small dataframe, it seems what use to be a ~6-7 second run is now ~25 seconds due to this specific change. I'm not sure how this scales with larger dataframes and running locally vs a cluster, but it may be worth checking this doesn't have a meaningful affect on performance.

@rhaffar Any chance you have some sample code I can try and recreate locally?

on a 10K row dataset I get:

  • New: 286 ms ± 65.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
  • Old: 586 ms ± 128 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@rhaffar
Copy link
Contributor

rhaffar commented Jun 14, 2024

One thing to mention, I do seem to get several of the following warning when performing a Spark SQL compare: No Partition Defined for Window operation! - looks like you'd need to add a partition to the Windows in the dataframe merge to silence these.

I think this is coming from the following lines:

df1 = df1.withColumn(
    "__index",
    row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
            )
df2 = df2.withColumn(
    "__index",
    row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
)

I need to create an index column but the increasing number need to happen across the whole dataframe, so there isn't anything to really partition on. This only happens when there are dupes.
I think I can just use monotonically_increasing_id without any windowing. It should still give the same effect.

Re-testing locally with a small dataframe, it seems what use to be a ~6-7 second run is now ~25 seconds due to this specific change. I'm not sure how this scales with larger dataframes and running locally vs a cluster, but it may be worth checking this doesn't have a meaningful affect on performance.

@rhaffar Any chance you have some sample code I can try and recreate locally?

on a 10K row dataset I get:

  • New: 286 ms ± 65.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
  • Old: 586 ms ± 128 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Sure, can't share ipynb files so I'll just copy paste the code block. Frankly this was tested with dataframes that are only a few rows, and was run locally, so I suppose not super meaningful for performance testing. If you're getting improved performance with the updated code on a larger dataset then I think that's a much better indication regarding performance changes.

import datetime
from pyspark.sql import SparkSession
import datacompy
import datacompy.spark.sql as sp


spark = SparkSession \
    .builder \
    .appName("datacompy_spark") \
    .getOrCreate()
 
from pyspark.sql import Row

data1 = [
    Row(acct_id=10000001234, dollar_amt=123.45, name='George Maharis', float_fld=14530.1555,
        date_fld=datetime.date(2017, 1, 1)),
    Row(acct_id=10000001235, dollar_amt=0.45, name='Michael Bluth', float_fld=1.0,
        date_fld=datetime.date(2017, 1, 1)),
    Row(acct_id=10000001236, dollar_amt=1345.0, name='George Bluth', float_fld=None,
        date_fld=datetime.date(2017, 1, 1)),
    Row(acct_id=10000001237, dollar_amt=123456.0, name='Bob Loblaw', float_fld=345.12,
        date_fld=datetime.date(2017, 1, 1)),
    Row(acct_id=10000001239, dollar_amt=1.05, name='Lucille Bluth', float_fld=None,
        date_fld=datetime.date(2017, 1, 1))
]

data2 = [
    Row(acct_id=10000001234, dollar_amt=123.4, name='George Michael Bluth', float_fld=14530.155),
    Row(acct_id=10000001235, dollar_amt=0.45, name='Michael Bluth', float_fld=None),
    Row(acct_id=None, dollar_amt=1345.0, name='George Bluth', float_fld=1.0),
    Row(acct_id=10000001237, dollar_amt=123456.0, name='Robert Loblaw', float_fld=345.12),
    Row(acct_id=10000001238, dollar_amt=1.05, name='Loose Seal Bluth', float_fld=111.0),
    Row(acct_id=10000001238, dollar_amt=1.05, name='Loose Seal Bluth', float_fld=111.0)
]

df_1 = spark.createDataFrame(data1)
df_2 = spark.createDataFrame(data2)

compare = sp.SparkSQLCompare(spark, df_1, df_2, join_columns=['ACCT_ID'])
#compare.matches(ignore_extra_columns=False)
print(compare.report())

@fdosani
Copy link
Member Author

fdosani commented Jun 14, 2024

@rhaffar thanks for flagging. Running some benchmarks with dupes in there. not sure if this might be an issue when running on very small data since it all gets moved to one partition vs multiple. With larger datasets moving everything over to one partition would cause OOM errors. Let's see with 10M or 100M and distributed.

Copy link
Contributor

@rhaffar rhaffar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@fdosani
Copy link
Member Author

fdosani commented Jun 14, 2024

LGTM

Thanks! I looked at this on distributed for 10M, 50M and 100M rows:

10M

withColumn("__index", monotonically_increasing_id())

--------------------------------------------- benchmark: 1 tests ---------------------------------------------
--
Name (time in s)         Min      Max     Mean   StdDev   Median     IQR  Outliers     OPS  Rounds  Iterations
--------------------------------------------------------------------------------------------------------------
test_vanilla         17.3825  75.0905  24.8319  17.8866  18.2787  1.5836       1;2  0.0403      10           1
--------------------------------------------------------------------------------------------------------------

 
with row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,

---------------------------------------------- benchmark: 1 tests ---------------------------------------------
Name (time in s)         Min       Max     Mean   StdDev   Median     IQR  Outliers     OPS  Rounds  Iterations
---------------------------------------------------------------------------------------------------------------
test_vanilla         47.3446  159.5515  60.6921  34.8873  47.9401  6.9533       1;1  0.0165      10           1
---------------------------------------------------------------------------------------------------------------

50M

with row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,

----------------------------------------------- benchmark: 1 tests ----------------------------------------------
Name (time in s)          Min       Max      Mean  StdDev    Median     IQR  Outliers     OPS  Rounds  Iterations
-----------------------------------------------------------------------------------------------------------------
test_vanilla         142.3333  142.3333  142.3333  0.0000  142.3333  0.0000       0;0  0.0070       1           1
-----------------------------------------------------------------------------------------------------------------

withColumn("__index", monotonically_increasing_id())

----------------------------------------------- benchmark: 1 tests ----------------------------------------------
Name (time in s)          Min       Max      Mean  StdDev    Median     IQR  Outliers     OPS  Rounds  Iterations
-----------------------------------------------------------------------------------------------------------------
test_vanilla         117.6224  117.6224  117.6224  0.0000  117.6224  0.0000       0;0  0.0085       1           1
-----------------------------------------------------------------------------------------------------------------

100M

row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,

------------------------------------------------- benchmark: 1 tests -------------------------------------------------
Name (time in s)          Min         Max      Mean    StdDev    Median      IQR  Outliers     OPS  Rounds  Iterations
----------------------------------------------------------------------------------------------------------------------
test_vanilla         243.9418  1,128.8603  342.0786  276.5795  255.0061  10.0884       1;2  0.0029      10           1
----------------------------------------------------------------------------------------------------------------------

withColumn("__index", monotonically_increasing_id())

---------------------------------------------- benchmark: 1 tests ----------------------------------------------
Name (time in s)         Min       Max     Mean   StdDev   Median      IQR  Outliers     OPS  Rounds  Iterations
----------------------------------------------------------------------------------------------------------------
test_vanilla         45.8924  124.4062  64.0970  27.0061  50.3175  13.0589       2;2  0.0156      10           1
----------------------------------------------------------------------------------------------------------------

Hypothesis seems to make sense. Large data would benefit from the withColumn("__index", monotonically_increasing_id()) vs smaller data.

@satniks
Copy link

satniks commented Jun 20, 2024

@fdosani , I tried this pull request using remote databricks spark cluster using databricks-connect library. With this library, I get pyspark.sql.connect.dataframe.DataFrame and when I pass it to SparkSQCLompare(), I get following error:

TypeError: df1 must be a pyspark.sql.DataFrame

pyspark.sql.connect.dataframe.DataFrame worked earlier with legacysparkcompare.

I am checking how to convert pyspark.sql.connect.dataframe.DataFrame to pyspark.sql.DataFrame

@fdosani
Copy link
Member Author

fdosani commented Jun 20, 2024

@fdosani , I tried this pull request using remote databricks spark cluster using databricks-connect library. With this library, I get pyspark.sql.connect.dataframe.DataFrame and when I pass it to SparkSQCLompare(), I get following error:

TypeError: df1 must be a pyspark.sql.DataFrame

pyspark.sql.connect.dataframe.DataFrame worked earlier with legacysparkcompare.

I am checking how to convert pyspark.sql.connect.dataframe.DataFrame to pyspark.sql.DataFrame

So I don't know much about Spark Connect, but it seems to allow remote spark connects to run dataframes commands etc.
I think you won't be able to get a normal pyspark.sql.DataFrame since you are using a remote cluster.

For your natural workflow are you using Spark Connect, or was this just to test out?

@satniks
Copy link

satniks commented Jun 20, 2024

@fdosani , I tried this pull request using remote databricks spark cluster using databricks-connect library. With this library, I get pyspark.sql.connect.dataframe.DataFrame and when I pass it to SparkSQCLompare(), I get following error:
TypeError: df1 must be a pyspark.sql.DataFrame
pyspark.sql.connect.dataframe.DataFrame worked earlier with legacysparkcompare.
I am checking how to convert pyspark.sql.connect.dataframe.DataFrame to pyspark.sql.DataFrame

So I don't know much about Spark Connect, but it seems to allow remote spark connects to run dataframes commands etc. I think you won't be able to get a normal pyspark.sql.DataFrame since you are using a remote cluster.

For your natural workflow are you using Spark Connect, or was this just to test out?

We always use databricks spark via spark connect from servers outside databricks, so I need to find workaround to convert this. I am looking the PR apache/spark#46129 to see if such conversion possible.

@fdosani
Copy link
Member Author

fdosani commented Jun 20, 2024

@satniks
Copy link

satniks commented Jun 20, 2024

@satniks can you try this branch quickly? https://github.com/capitalone/datacompy/tree/spark-connect-test

Wow. Thanks @fdosani It worked and shown expected differences. It also fixed performance issues I reported earlier with version 0.12 with databricks sparks.

Now waiting for version 0.13 :-)

Copy link
Contributor

@jdawang jdawang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, left some non-blocking comments. Will come back when you get the unit tests passing. Sorry for the late review.

# Clean up temp columns for duplicate row matching
if self._any_dupes:
outer_join = outer_join.drop(
*[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the unpacking here may be unnecessary as we're creating a list just to unpack it. Could be simplified to .drop(col1, col2)

ignore_extra_columns : bool
Ignores any columns in one dataframe and not in the other.
"""
if not ignore_extra_columns and not self.all_columns_match():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future, but I think this can be simplified to a return ignore_extra_columns or/and...

Tuple(str, str)
Tuple of base and compare datatype
"""
base_dtype = [d[1] for d in dataframe.dtypes if d[0] == col_1][0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking: think this can be simplified to next(d[1] for d in dataframe.dtypes if d[0] == col_1)

@jdawang jdawang self-requested a review June 20, 2024 17:03
@fdosani fdosani merged commit 537cd7d into develop Jun 20, 2024
30 checks passed
@fdosani fdosani deleted the vanilla-spark branch June 20, 2024 18:29
@fdosani fdosani mentioned this pull request Jun 20, 2024
rhaffar pushed a commit to rhaffar/datacompy that referenced this pull request Sep 11, 2024
* [WIP] vanilla spark

* [WIP] fixing tests and logic

* [WIP] __index cleanup

* updating pyspark.sql logic and fixing tests

* restructuring spark logic into submodule and typing

* remove pandas 2 restriction for spark sql

* fix for sql call

* updating docs

* updating benchmarks with pyspark dataframe

* relative imports and linting

* relative imports and linting

* feedback from review, switch to monotonic and simplify checks

* allow pyspark.sql.connect.dataframe.DataFrame

* checking version for spark connect

* typo fix

* adding import

* adding connect extras

* adding connect extras
rhaffar pushed a commit to rhaffar/datacompy that referenced this pull request Sep 12, 2024
* [WIP] vanilla spark

* [WIP] fixing tests and logic

* [WIP] __index cleanup

* updating pyspark.sql logic and fixing tests

* restructuring spark logic into submodule and typing

* remove pandas 2 restriction for spark sql

* fix for sql call

* updating docs

* updating benchmarks with pyspark dataframe

* relative imports and linting

* relative imports and linting

* feedback from review, switch to monotonic and simplify checks

* allow pyspark.sql.connect.dataframe.DataFrame

* checking version for spark connect

* typo fix

* adding import

* adding connect extras

* adding connect extras
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants