Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: no longer load full table into ram in write by using concurrent write #2289

Draft
wants to merge 38 commits into
base: main
Choose a base branch
from

Conversation

aersam
Copy link
Contributor

@aersam aersam commented Mar 15, 2024

Description

This is a followup of #2265

It additionally uses streams/channels to concurrently write at the cost of more memory consumption. Default is keeping one recordbatch in RAM only, so it's opt-in.

I tested this with a local file and it went from 700s to 200s if I work with 10 concurrent streams. Of course memory consumption goes up, but given that we currently load the whole table in RAM, it's OK :)

This adds a depenency on async-channel as I need a multi-consumer channel.

Related Issue(s)

Fixes #2255

@aersam
Copy link
Contributor Author

aersam commented Mar 21, 2024

Let's get the other one in first

I hope it's ok to prioritize this one from my side to not have to keep both branches up-to-date

@ion-elgreco
Copy link
Collaborator

Let's get the other one in first

I hope it's ok to prioritize this one from my side to not have to keep both branches up-to-date

That's ok!

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Mar 28, 2024

@aersam I think we should max out the concurrent streams for python users.

In most use cases we are passing a recordBatchReader where the recordBatches are already in memory before constructing the reader, in that case you won't see any memory difference. And it wouldn't be different than the prior behavior since the reader was always collected.

I also have one suggestion on the python side, I think it's better if we simplify it and just provide a parameter called parallelize, which is always set to True. If users want to control the amount of concurrent streams, they should set an env_var which we then can parse in python_lib, if it's not set and parallelize = True, then we take the max possible streams.

@aersam
Copy link
Contributor Author

aersam commented Mar 29, 2024

How about parallelize:bool|int on python side? 🙂

@ion-elgreco
Copy link
Collaborator

@aersam that also works! :)

@aersam
Copy link
Contributor Author

aersam commented Apr 15, 2024

I finally had the time to update this branch with the new parallel parameter in python. Hope it's looking good now!

@ion-elgreco
Copy link
Collaborator

@aersam btw, did you have any profiling numbers on speed ups/memory trade offs when parallel is True. Would be nice to share those in the release notes later on

@aersam
Copy link
Contributor Author

aersam commented Apr 15, 2024

I only did some manual test on my own data, but could probably write some benchmark in python, using duckdb or polars as source. Would it make sense to add this to the code somehow?

@ion-elgreco
Copy link
Collaborator

@aersam here you could add it, and even maybe reuse some of the benchmarks there: https://github.com/delta-io/delta-rs/tree/main/crates/benchmarks

@aersam
Copy link
Contributor Author

aersam commented Apr 17, 2024

I did some very basic benchmarking, but the results were not as I hoped :) While RAM consumption is significantly lower, the speed is not good enough yet. I think maybe the channel must be bigger, I'll do some more testing

I did my test quick and dirty using python, I can share the code if you want. Basically it's this:

import duckdb
from deltalake.writer import write_deltalake
from uuid import uuid4

with duckdb.connect() as con: # get your 42.parquet here: https://duckdb.org/2024/03/26/42-parquet-a-zip-bomb-for-the-big-data-age.html
    con.execute("select b, random() as a from read_parquet('42.parquet') limit 300000000")
    reader = con.fetch_record_batch()
    write_deltalake(f"_test/{uuid4()}", reader, schema=reader.schema, mode="overwrite", engine="rust") 

@aersam
Copy link
Contributor Author

aersam commented Apr 19, 2024

Pretty sure the non-async write causes issues. But object_store 0.10 will change a lot there, so maybe better to wait for that

@ion-elgreco
Copy link
Collaborator

Pretty sure the non-async write causes issues. But object_store 0.10 will change a lot there, so maybe better to wait for that

Yes let's see how effective these changes are with new upload trait

@ion-elgreco
Copy link
Collaborator

@aersam fyi, ObjectStore just got bumped in the repo to 0.10

@ion-elgreco
Copy link
Collaborator

@aersam hey, do you think you have time to resolve the merge conflicts?

@aersam
Copy link
Contributor Author

aersam commented Jul 24, 2024

@aersam hey, do you think you have time to resolve the merge conflicts?

I'm sorry, I had a bit a shift in priorities, so it will take time to do so. Especially since there were quite some changes in the writer as I see

@ion-elgreco
Copy link
Collaborator

@aersam hey, do you think you have time to resolve the merge conflicts?

I'm sorry, I had a bit a shift in priorities, so it will take time to do so. Especially since there were quite some changes in the writer as I see

No worries! Just ping me once it's ready for another review round

@rtyler rtyler self-assigned this Nov 3, 2024
@rtyler rtyler marked this pull request as draft November 3, 2024 02:04
@rtyler rtyler added this to the Rust v1.0.0 milestone Nov 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Do not load full source into RAM on write_to_deltalake
3 participants