Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fugue support (Phase 1) #201

Merged
merged 4 commits into from
May 12, 2023
Merged

Add Fugue support (Phase 1) #201

merged 4 commits into from
May 12, 2023

Conversation

goodwanghan
Copy link
Contributor

@goodwanghan goodwanghan commented May 8, 2023

Fugue is an abstraction for distributed and local computing frameworks such as Spark Dask, Ray, Duckdb and Polars. For datacompy, fugue can elegantly scale the core Compare class to different distributed backends.

Is Phase 1, we only implemented is_match function. In Phase 2, we will enable report.

Notice, is_match will compare unordered data, meaning that, df1, df2 without the same order can still match. In distributed systems, the concept of order doesn't natively exist, so that is why is_match doesn't require orders.

Here are a few examples to use is_match

import fugue.api as fa
from datacompy import is_match

is_match(pdf1, pdf2, join_columns="a")  # defaults to Compare class
is_match(pdf1, pdf2, join_columns="a", parallelism=1)  # force to use Fugue, but the backend is still pandas

is_match(spark_df1, spark_df2, join_columns="a") # compare spark dataframes using spark
is_match(spark_df1, pdf2, join_columns="a") # compare spark dataframe with pandas dataframe using spark
is_match(pdf1, spark_df2, join_columns="a") # compare spark dataframe with pandas dataframe using spark

with fa.engine_context(spark_session):
    is_match(pdf1, pdf2, join_columns="a")  # force to use spark to compare dataframes

is_match(ray_df1, ray_df2, join_columns="a") # compare ray dataset using ray
is_match(ray_df1, pdf2, join_columns="a") # compare ray dataset with pandas dataframe using ray
is_match(pdf1, ray_df2, join_columns="a") # compare ray dataset with pandas dataframe using ray

@CLAassistant
Copy link

CLAassistant commented May 8, 2023

CLA assistant check
All committers have signed the CLA.

@goodwanghan goodwanghan changed the title Add Fugue support Add Fugue support (Phase 1) May 8, 2023
@goodwanghan
Copy link
Contributor Author

@kvnkho @fdosani

@fdosani
Copy link
Member

fdosani commented May 8, 2023

@goodwanghan Thank you for the PR. I'll try and take a look at this shortly. I might have some question and have a discussion to get your opinion on a few things once I've dived in a bit.

@ak-gupta @@NikhilJArora just in case you are interested in checking this out too. :)

@goodwanghan
Copy link
Contributor Author

Sounds good @fdosani , if you want to test dask and ray, please use fugue 0.8.4.dev2, if you want to test other backends, just use the official releases.

@fdosani
Copy link
Member

fdosani commented May 8, 2023

@goodwanghan

Just poking around with the code base. If I do something like the following. Basically comparing 2 dataframes which are just sorted backwards it returns different results which from reading the code makes sense.

import pandas as pd
import numpy as np
from datacompy import is_match


df1 = pd.DataFrame(np.random.randint(0, 100, size=(10000, 2)), columns=["b", "c"])
df1.reset_index(inplace=True)
df1.columns = ["a", "b", "c"]
df2 = df1.copy()
df2["b"] = df2["b"] + 0.1
df2.sort_index(ascending=False, inplace=True)

In [55]: is_match(df1, df2, join_columns="a", parallelism=1, abs_tol=0.2)
Out[55]: False

In [56]: is_match(df1, df2, join_columns="a", abs_tol=0.2)
Out[56]: True

Fully admit being new to Fugue I might not totally understand the implementation so please excuse my ignorance or dumb questions.

So the splitting up into groups happens here. So lets say I have 3 "buckets" those 3 may or may not have the corresponding row from the other dataframe.

Based on what @kvnkho mentioned at PyData these buckets are what is run on say Spark correct in isolation?
I'm wondering if there is a way to join things before bucketing (this might not be simple or scaleable) hence we really want to join things to compare vs just comparing rows.

@goodwanghan
Copy link
Contributor Author

is_match(df1, df2, join_columns="a", parallelism=1, abs_tol=0.2)

Ah, this is because they don't match on schema. df1.b is integer df2.b is float. Fugue has more strict schema comparison.
But of course, if you want to loose this, we can hand special logic to handle it.

if you add

df1["b"]=df1["b"].astype(float)

you will see the compare succeeds

1 similar comment
@goodwanghan
Copy link
Contributor Author

is_match(df1, df2, join_columns="a", parallelism=1, abs_tol=0.2)

Ah, this is because they don't match on schema. df1.b is integer df2.b is float. Fugue has more strict schema comparison.
But of course, if you want to loose this, we can hand special logic to handle it.

if you add

df1["b"]=df1["b"].astype(float)

you will see the compare succeeds

@fdosani
Copy link
Member

fdosani commented May 8, 2023

is_match(df1, df2, join_columns="a", parallelism=1, abs_tol=0.2)

Ah, this is because they don't match on schema. df1.b is integer df2.b is float. Fugue has more strict schema comparison. But of course, if you want to loose this, we can hand special logic to handle it.

if you add

df1["b"]=df1["b"].astype(float)

you will see the compare succeeds

That is great. Ok so maybe my understanding was off here. The _comp which happens does happen on the entire dataset then correct? I thought maybe pieces are being segmented off and that if things were not ordered properly there would be a chance the join wouldn't take place.

@goodwanghan
Copy link
Contributor Author

is_match(df1, df2, join_columns="a", parallelism=1, abs_tol=0.2)

Ah, this is because they don't match on schema. df1.b is integer df2.b is float. Fugue has more strict schema comparison. But of course, if you want to loose this, we can hand special logic to handle it.
if you add

df1["b"]=df1["b"].astype(float)

you will see the compare succeeds

That is great. Ok so maybe my understanding was off here. The _comp which happens does happen on the entire dataset then correct? I thought maybe pieces are being segmented off and that if things were not ordered properly there would be a chance the join wouldn't take place.

The _comp function happens on each partition that will be handled by a worker.
Each partition will contain multiple groups of data defined by joined_columns

For example

df1

a b
x 1
x 2
y 1
y 2
z 1

df2

a b
x 1
x 2
y 1
y 2
z 1
z 2

if join_columns is a, then the data could be partitioned as group 1 (containing x y), and group 2(containing z)
Then on the remote worker, for each partition, the subdataframes will be reconstructed and compared using _comp

@goodwanghan
Copy link
Contributor Author

@goodwanghan

Just poking around with the code base. If I do something like the following. Basically comparing 2 dataframes which are just sorted backwards it returns different results which from reading the code makes sense.

import pandas as pd
import numpy as np
from datacompy import is_match


df1 = pd.DataFrame(np.random.randint(0, 100, size=(10000, 2)), columns=["b", "c"])
df1.reset_index(inplace=True)
df1.columns = ["a", "b", "c"]
df2 = df1.copy()
df2["b"] = df2["b"] + 0.1
df2.sort_index(ascending=False, inplace=True)

In [55]: is_match(df1, df2, join_columns="a", parallelism=1, abs_tol=0.2)
Out[55]: False

In [56]: is_match(df1, df2, join_columns="a", abs_tol=0.2)
Out[56]: True

Fully admit being new to Fugue I might not totally understand the implementation so please excuse my ignorance or dumb questions.

So the splitting up into groups happens here. So lets say I have 3 "buckets" those 3 may or may not have the corresponding row from the other dataframe.

Based on what @kvnkho mentioned at PyData these buckets are what is run on say Spark correct in isolation? I'm wondering if there is a way to join things before bucketing (this might not be simple or scaleable) hence we really want to join things to compare vs just comparing rows.

In this case, bucket=2

You asked about join, we didn't directly use join here, instead we use map->union->groupmap, which is similar to join.

@fdosani
Copy link
Member

fdosani commented May 8, 2023

You asked about join, we didn't directly use join here, instead we use map->union->groupmap, which is similar to join.

Got it! Yup this is making sense now. Thanks for the explanation.

Are you OK if I push a couple of small tweaks? Nothing logic wise. This all makes sense now.
More just naming, organization, and docs.

I guess maybe the other thing we should discuss is the strict vs loose comparison we discussed above. In the core.py it happens here in columns_equal

@goodwanghan
Copy link
Contributor Author

You asked about join, we didn't directly use join here, instead we use map->union->groupmap, which is similar to join.

Got it! Yup this is making sense now. Thanks for the explanation.

Are you OK if I push a couple of small tweaks? Nothing logic wise. This all makes sense now. More just naming, organization, and docs.

I guess maybe the other thing we should discuss is the strict vs loose comparison we discussed above. In the core.py it happens here in columns_equal

Please feel free to make changes. Yeah we can also implement columns_equal, that is straightforward.

@fdosani
Copy link
Member

fdosani commented May 8, 2023

is_match(df1, df2, join_columns="a", parallelism=1, abs_tol=0.2)

Ah, this is because they don't match on schema. df1.b is integer df2.b is float. Fugue has more strict schema comparison. But of course, if you want to loose this, we can hand special logic to handle it.

if you add

df1["b"]=df1["b"].astype(float)

you will see the compare succeeds

Sorry just one thing about this strict schema you noted. Shouldn't it work as is, without the astype?
When Compare is called it should run the internal logic in columns_equal. As it does with just native pandas dataframes. I guess I'm missing what fugue is doing in the background with the schema and types.

Actually never mind. i missed this line. 🤦

datacompy/fuguecompare.py Outdated Show resolved Hide resolved
datacompy/fuguecompare.py Show resolved Hide resolved
datacompy/fuguecompare.py Show resolved Hide resolved
@goodwanghan
Copy link
Contributor Author

I did a test on 100gb data. It can finish in 7 min with 256 cpus.

The test is that I loaded two spark dataframes from the same file. is_match returns true, but SparkCompare reported difference.

@fdosani fdosani self-requested a review May 11, 2023 17:35
@fdosani fdosani merged commit 5ea71cd into capitalone:develop May 12, 2023
@fdosani fdosani mentioned this pull request Jun 1, 2023
rhaffar pushed a commit to rhaffar/datacompy that referenced this pull request Sep 12, 2024
* Add Fugue support

* add polars and duckdb

* fixing docstrings and cleanup

* adding in strict_schema and check for hash_cols in both dfs

---------

Co-authored-by: fdosani <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants