Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: MERGE/Upsert Support #1534

Closed
wants to merge 54 commits into from
Closed

Conversation

mattmartin14
Copy link
Contributor

@mattmartin14 mattmartin14 commented Jan 17, 2025

Hi,

This PR adds basic upsert functionality to pyiceberg. It also addresss PR #402.

An upsert is an operation that combines an update and an insert into a table, in this case an Iceberg table. What makes an upsert unique is that its wrapped in a single transaction, so either both the update and insert succeed together, or if one fails, the entire operation fails.

To illustrate a simple example of how an upsert will function, let's assume we have the following pyarrow dataframe as our source (input):

cust_id cust_name age
1 Fred 26
2 Amy 32
3 Kevin 25

And we have the following iceberg table as our target that we want to upsert our source input to:

cust_id cust_name age
1 Fred 26
2 Amy 31

In this given example, let's assume our primary key (join column is the cust_id); our upsert function will perform the following operations on the target iceberg table:

  • Update row with cust_id = 2; why? Amy's age has changed
  • Insert row with cust_id = 3; why? The row does not exist in the target table.

** Please note: cust_id = 1 exists in both the source and target tables. But, given the non-key columns (cust_name, age) have not changed, we will not update that row; this avoids unnecessary I/O.

When I originally submitted this PR, I was using Apache Datafusion as the data processing engine to determine the rows eligible for an update and rows that needed to be inserted. After some iterations and discussion, the pyiceberg team reached a consensus that for now, we will not introduce datafusion into pyiceberg as a dependency; instead, this feature uses the pyarrow compute engine to determine the rows eligible for updates and inserts.

This feature will initially support the following upsert opertations:

  • when matched update all
  • when not matched insert all

Down the road, we could potentially enhance this feature to handle more upsert predicates and also work on performance for larger tables. For now, we wanted to get the basic functionality in the hands of developers.

Thanks,
Matt Martin

@kevinjqliu kevinjqliu self-requested a review January 17, 2025 18:08
@mattmartin14
Copy link
Contributor Author

@kevinjqliu - i'm doing this work on behalf of my company and when i ran my tests, i used a standard python virtual environment venv; i haven't figured out quite yet how to get poetry to work inside my companies firewall. So, not sure if those are errors I can address or if someone else can pitch in here.

@Fokko Fokko self-requested a review January 17, 2025 19:20
@bitsondatadev
Copy link
Contributor

bitsondatadev commented Jan 17, 2025

@kevinjqliu - i'm doing this work on behalf of my company and when i ran my tests, i used a standard python virtual environment venv; i haven't figured out quite yet how to get poetry to work inside my companies firewall. So, not sure if those are errors I can address or if someone else can pitch in here.

@mattmartin14, what's going on man!? Thanks for working on this and most impressively thanks for the comprehensive description.

Out of curiosity, did you discuss your approach with anyone before putting this together? This is good but a few flags for OSS contributions to lower the upfront back and forth:

  1. My main concern is it looks like you're introducing datafusion to get this feature out and adding dependencies is generally avoided until absolutely necessary (i.e. the value of the new dependency serves many use cases or won't cause a lot of maintenance). If you haven't, I recommend informally discussing this with @Fokko or @kevinjqliu to see if datafusion is already on the roadmap or can be considered. Likely the conversation can finish with them, but they may suggest asking on the mailing list to document and discuss it with more folks. There may already be ways of doing this using the existing library and not adding another dependency.

  2. As you've noticed the build is breaking with the poetry issue and it sounds like this is happening on a work laptop. I recommend working with your company to allow you to work on OSS code outside of a company device as I'm familiar with how locked down their system is and it will be difficult to contribute behind the corporate VPN. I did this for contributions where I submitted the code to my repository on my OSS account (we weren't required to have company accounts so this worked out anyways). This would require a review to ensure sensitive information wasn't leaked. Once I submitted the code to my branch, I was authorized to work on that code on my personal laptop (generally off hours to get things polished for merging).

  3. For Apache projects, you have to add license notifications to each file you add and these files are missing:

     !????? /home/runner/work/iceberg-python/iceberg-python/pyiceberg/table/merge_rows_util.py
     !????? /home/runner/work/iceberg-python/iceberg-python/pyiceberg/table/test.py
     !????? /home/runner/work/iceberg-python/iceberg-python/tests/table/test_merge_rows.py
    
  4. There's bits of dead code scattered about, I'm not sure if that's for discussion or todo. I would either remove it or comment on it when this PR is ready for review.

  5. Where I am hitting a wall on performance

    Aside from total disregard for it, performance should be pretty low on the list of concerns. This was also a new paradigm when I submitted to open source. You're not writing code for a corporate team-sized set of eyes, you're writing it for many people to evaluate. I always recommend doing the bare minimum unless the performant version adds ten or twenty lines of code. Focus on correctness and edge cases first. Second focus on minimalism and clarity. There's 500+ lines of code that people have to get their heads around. That can be justified, but often can be broken up to lower the burden on those reviewing the code and lowers the chances you'll sneak a bug in unintentionally.

Contributing to OSS has a different focus than internal code, so hopefully these help. This does look well thought out in terms of implementation, but performance should be second or third try in favor of having code history that everyone in the community can wrap their heads around.

I'd suggest to get these addressed before Fokko and Kevin scan it. I'll be happy to do a quick glance once the tests are running and there's some consensus around datafusion.

PR number one yeah!

@mattmartin14
Copy link
Contributor Author

Thanks @bitsondatadev for all this great feedback. I'll get working on your suggestions and push an update next week and will address all your concerns.

@kevinjqliu
Copy link
Contributor

Thanks @mattmartin14 for the PR! And thanks @bitsondatadev on the tips on working in OSS. I certainly had to learn a lot of these over the years.

A couple things I think we can address first.

  1. Support for MERGE INTO / Upsert

This has been a much anticipated and asked feature in the community. Issue #402 has been tracking it with many eyes on it. I think we still need to figure out the best approach to support this feature.

Like you mentioned in the description, MERGE INTO is a query engine feature. Pyiceberg itself is a client library to support the Iceberg python ecosystem. Pyiceberg aims to provide the necessary Iceberg building blocks so that other engines/programs can interact with Iceberg tables easily.

As we’re building out more of more engine-like features, it becomes harder to support more complex and data-intensive workloads such as MERGE INTO. We have been able to use pyarrow for query processing but it has its own limitations. For more compute intensive workloads, such as Bucket and Truncate transform, we were able to leverage rust (iceberg-rust) to handle the computation.

Looking at #402, I don’t see any concrete plans on how we can support MERGE INTO. I’ve added this as an agenda on the monthly pyiceberg sync and will post the update. Please join us if you have time!

  1. Taking on Datafusion as a dependency

I’m very interested in exploring datafusion and ways we can leverage it for this project. As I mentioned above, we currently use pyarrow to handle most of the compute. It’ll be interesting to evaluate datafusion as an alternative. Datafusion has its own ecosystem of expression api, dataframe api, and runtime. All of which are good complements to pyiceberg. It has integrations with the rust side as well, something I have started exploring in apache/iceberg-rust#865

That said, I think we need a wider discussion and alignment on how to integrate with datafusion. It’s a good time to start thinking about it! I’ve added this as another discussion item on the monthly sync.

  1. Performance concerns

Compute intensive workloads are generally a bottleneck in python. I am excited for future pyiceberg <> iceberg-rust integration where we can leverage rust to perform those computations.

The composite key code builds an overwrite filter, and once that filter gets too lengthy (in my testing more than 200 rows), the visitor “OR” function in pyiceberg hits a recursion depth error.

This is an interesting observation and I think I’ve seen someone else run into this issue before. We’d want to address this separately. This is something we might want to explore using datafusion’s expression api to replace our own parser.

@mattmartin14
Copy link
Contributor Author

@kevinjqliu @Fokko @bitsondatadev - the issues should be resolved. I got poetry working in my company's firewall; i've also removed the dead code and added the license headers to each file. please take a look

@mattmartin14
Copy link
Contributor Author

also - i added datafusion to the poetry toml file and lock and it appears that you all need to resolve the conflict here, as it's not letting me.

@mattmartin14
Copy link
Contributor Author

Also @kevinjqliu - To address your question on datafusion. When I looked into this feature, I explored these 3 options for an arrow processing engine:

  1. Duckdb
  2. Datafusion
  3. Daft

I ultimately decided that datafusion would make the most sense, given these things it had going:

  • It's already owned by the Apache foundation. So licensing would be a non-issue
  • its very light weight and specifically designed to process and query arrow tables
  • it's rust based and if pyiceberg is ultimately going to be migrated to iceberg-rust one day, the integrations would be easier
  • The iceberg rust project is already building integrations for it, as seen here.

Hope this helps on how I arrived at that conclusion. Just using native pyarrow to try and process the data would be a very large uphill battle as we would effectively have to build our own data processing engine with it e.g. hash joins, sorting, optimizations, etc. I figured it does not make sense to reinvent the wheel and instead use an engine that is already out there (datafusion) and put it to good use.

I took a look at the attachment you posted for any upcoming meetings for the pyiceberg sync, but did not see any 2025 meetings listed. I'd be glad to attend to discuss this further, if needed.

Thanks,
Matt

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @mattmartin14. There is some work to be done here, mostly because we pull the table into memory, and then perform the operation, which defeats the purpose of Iceberg because we don't use the statistics to optimize the query. I left a few comments

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
@mattmartin14
Copy link
Contributor Author

Hi All,
I've pushed a new update through and asked for some clarrification from @Fokko on some of his suggestions. In terms of the call we had yesterday (1/28/25), I think its great for us to all align on a directional statement on just how much we want to sign up for on pyiceberg managing merge commands.

I think if we tackle the basic merge/upsert pattern (when matched update all, when not matched insert all), that would cover 90% of most merge use cases. For things that require a more involved upsert using multiple matched predicates, we should direct the users to use spark sql, since that is already baked in to the platform.

If anyone disagrees with that directional statement, please let me know.

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
@mattmartin14
Copy link
Contributor Author

@Fokko - I've updated the PR to include the pred code you provided to pre-filter the iceberg table, thus avoid loading it all into memory. I was also able to borrow that same pred code as the overwrite_filter later down. Thanks; i think this PR is starting to look in good shape. The only oustanding items I see are if we want to rename "merge_rows" to "merge". I added some comments on that thread and would like to know your thoughts.

Thanks,
Matt

@mattmartin14
Copy link
Contributor Author

@Fokko - i added some additional smoke tests to test for situations where the primary key is a string or a date; the filter list code you wrote works fine for ints and strings, but on dates, i'm getting a type error such as this:

TypeError: Invalid literal value: datetime.date(2021, 1, 1)

For reference, here is the function to help jog your memory. Do you know how we can handle updating this function to handle situations where a date is a joined column?

def get_filter_list(df: pyarrow_table, join_cols: list) -> BooleanExpression:

    unique_keys = df.select(join_cols).group_by(join_cols).aggregate([])

    pred = None

    if len(join_cols) == 1:
        pred = In(join_cols[0], unique_keys[0].to_pylist())
    else:
        pred = Or(*[
            And(*[
                EqualTo(col, row[col])
                for col in join_cols
            ])
            for row in unique_keys.to_pylist()
        ])

    return pred

@mattmartin14
Copy link
Contributor Author

mattmartin14 commented Feb 12, 2025

trying another push on the table init; found a white space issue

@mattmartin14
Copy link
Contributor Author

FINALLY!!!!!
victory

@kevinjqliu
Copy link
Contributor

woot! Thanks! can you run git checkout main vendor? This will revert the unrelated changes from the vendor/ dir

@mattmartin14
Copy link
Contributor Author

woot! Thanks! can you run git checkout main vendor? This will revert the unrelated changes from the vendor/ dir

I thought I pushed the fixes on the vendor facebook, hive metastore stuff already? is that not picked up?

@mattmartin14
Copy link
Contributor Author

@kevinjqliu - I saw i pushed the hive metastore and ttypes, i didnt push your changes on fb303; LMK if you still need me to run a checkout

@kevinjqliu
Copy link
Contributor

the changes to vendor/ is still there.

could you try something like

git fetch origin main
git checkout origin/main vendor

assuming that origin refers to the apache/iceberg-python repo. you can check this by running git remote -v

@mattmartin14
Copy link
Contributor Author

the changes to vendor/ is still there.

could you try something like

git fetch origin main
git checkout origin/main vendor

assuming that origin refers to the apache/iceberg-python repo. you can check this by running git remote -v

Hey @kevinjqliu - i ran git remote -v and this is what i got:

github	[email protected]:apache-iceberg-python.git (fetch)
github	[email protected]:apache-iceberg-python.git (push)
origin	https://github.com/StateFarmIns/iceberg-python.git (fetch)
origin	https://github.com/StateFarmIns/iceberg-python.git (push)

I'm not really sure where to go from here. I tried the commands:

git fetch origin main

which worked, followed by:

git checkout origin main/vendor

and got this error:

error: pathspec 'main/vendor' did not match any file(s) known to git

Any thoughts?

@kevinjqliu
Copy link
Contributor

in this case, the right alias is github
since

github	[email protected]:apache-iceberg-python.git (fetch)

so

git fetch github main
git checkout github/main vendor

mind the space before vendor

@mattmartin14
Copy link
Contributor Author

in this case, the right alias is github since

github	[email protected]:apache-iceberg-python.git (fetch)

so

git fetch github main
git checkout github/main vendor

mind the space before vendor

Sorry @kevinjqliu - that did not work; we have the iceberg-python project forked to https://github.com/StateFarmIns/iceberg-python.

When i try to run git fetch github main, i get a timeout on my side (firewall problem), can you let me know what changes you want done? I thought I redid the vendor files based on what you provided earlier, so i'm surprised things are still out of sync.

@kevinjqliu
Copy link
Contributor

ah thats odd. lets do this the manual way

for these files

  • vendor/fb303/FacebookService.py
  • vendor/hive_metastore/ThriftHiveMetastore.py
  • vendor/hive_metastore/ttypes.py

copy/paste the files from https://github.com/apache/iceberg-python
following these

@mattmartin14
Copy link
Contributor Author

ah thats odd. lets do this the manual way

for these files

  • vendor/fb303/FacebookService.py
  • vendor/hive_metastore/ThriftHiveMetastore.py
  • vendor/hive_metastore/ttypes.py

copy/paste the files from https://github.com/apache/iceberg-python following these

@kevinjqliu - I reran those. I'm now geting a hash conflict on the poetry lock file. Can you help produce the correct file again so i can resolve?

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for driving this @mattmartin14 🙌 I believe this looks good, there are some files touched that are not needed

vendor/fb303/FacebookService.py Outdated Show resolved Hide resolved
vendor/hive_metastore/ThriftHiveMetastore.py Outdated Show resolved Hide resolved
vendor/hive_metastore/ttypes.py Outdated Show resolved Hide resolved
@mattmartin14
Copy link
Contributor Author

@Fokko , @kevinjqliu - i think we are good on teh vendor files; i just need your help one more time computing the poetry.lock file, since if i do it on my end, it will embed our private state farm repo on nearly every line

@mattmartin14
Copy link
Contributor Author

@Fokko @kevinjqliu - for context on this poetry lock file issue that keeps coming up:

If i run poetry.lock locally, it wil embed my company's private repo throughout the file on hundreds of lines. I know this open source project is getting constant updates, so this is a tough game of catch-me-if-you-can. So i need one of you to rerun the poetry lock file with datafusion as a dependency if you don't mind, upload it to a separate branch so i can pull it down manually.

as an FYI - i've provided our internal OSS team with this issue, given in the long run, we will want somthing more sustainable so i can continue to contribute to the project.

But for now, i need your help running the pyproject.toml and poetry.lock files.

Thanks,
Matt

@mattmartin14
Copy link
Contributor Author

i've tried something a little different to resolve the lock file conflict; i'll see if it works; added my company's repo with the option of priority==explicit; it looks like it might have prevented doing a bunch of insertions

@kevinjqliu
Copy link
Contributor

looks like there are some changes that we previous removed that are now back :/
like changes in pyproject.toml

@mattmartin14 do you mind if i push these commits to a new PR using my fork so I can make the necessary changes?

@mattmartin14
Copy link
Contributor Author

looks like there are some changes that we previous removed that are now back :/

like changes in pyproject.toml

@mattmartin14 do you mind if i push these commits to a new PR using my fork so I can make the necessary changes?

That's fine, go ahead and do it. Team work makes the dream work!

Fokko added a commit that referenced this pull request Feb 13, 2025
Closes #402
This PR adds the `upsert` function to the `Table` class and supports the
following upsert operations:
- when matched update all
- when not matched insert all

This PR is a remake of #1534 due to some infrastructure issues. For
additional context, please refer to that PR.

---------

Co-authored-by: VAA7RQ <[email protected]>
Co-authored-by: VAA7RQ <[email protected]>
Co-authored-by: mattmartin14 <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
@mattmartin14
Copy link
Contributor Author

Final stage of this PR was moved to new PR #1660 due to some infrastructure challenges on my end. Closing this one now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants