Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars Based Compute Engine #7067

Closed
asheeshgarg opened this issue Mar 10, 2023 · 14 comments
Closed

Polars Based Compute Engine #7067

asheeshgarg opened this issue Mar 10, 2023 · 14 comments

Comments

@asheeshgarg
Copy link

Feature Request / Improvement

@nazq @rdblue

Polars gives a good support to on top of columnar dataset using lazy and dataframe operations. It will be great to have a Polars based compute engine natively supported on top of Iceberg.

I have tested for a Iceberg store backed by parquet file we can directly load the Polars Dataframe using the PyIceberg/Java Iceberg APIs.

Query engine

None

@Fokko
Copy link
Contributor

Fokko commented Mar 10, 2023

Hey @asheeshgarg Thanks for raising this!

Integrating with Polars would be awesome.

Currently, we can load data into an Arrow table and convert that in every Arrow based backed (including Polars). As a short-term fix, we can add next to to_arrow() and to_pandas a method called to_polars().

And on a longer term. Unfortunately, we don't yet support pushing down the predicate from a PyArrow dataset directly all the way down to Iceberg. Once that is done, we can also easily add this to Polars. (Disclaimer: I'm more familiar with PyArrow at the moment, therefore the PyArrow concepts). I do think Polars and Iceberg would be an awesome combination as Polars is also lazy by design, and you would only open the Parquet files that you actually need for your query.

@asheeshgarg
Copy link
Author

asheeshgarg commented Mar 10, 2023

@Fokko thanks for the inputs for now what I am doing is using Pyiceberg API to to create scan and pass possible level of expression to reduce the datasets files and create a lazy dataframe using Polars and run filters again lazily on dataframe expressions for the query.

@Fokko
Copy link
Contributor

Fokko commented Mar 10, 2023

@asheeshgarg Ah nice, that works, but has some caveats that you need to be aware of. Iceberg tracks the columns by ID's instead of names. For example, if you rename a column, we do this on the table schema. When we read in the files, and we encounter a file that has the old column name, we update the name based on the ID of the column. Also, things likes deletes. This makes it quite an effort to implement Iceberg to engines like Polars as well (mostly because there is no rust implementation yet).

With the upcoming 0.4.0 version we'll get even more performance because now we also have metrics evaluation (skipping Parquet files based on the upper- and lower bounds) and also positional deletes. I suggested creating a Polars dataframe from an Arrow table because then you'll get things like the projection and deletes for free :)

@asheeshgarg
Copy link
Author

@Fokko sure schema evolution is definitely a challenge. Interesting to know 0.4.0 support for the partitioning pruning using metadata. Do we have PR where we can see this.
Also if it will be great to have RUST support for the platform.

@chitralverma
Copy link

chitralverma commented Apr 5, 2023

Hi all, Polars contributor here. I did the integration for DeltaIO recently :)
I was looking to do this integration over the weekend and it will be a quick addition because py-iceberg already allows a table to be converted to a pyarrow table which can be fed to Polars' eager read API. No need to rely on to_pandas which may incur additional overhead.

However, it would be great to support the lazy scan API as well, because most internal optimisation take place over there. This will require an iceberg table to be converted lazily into a pyarrow dataset with a .to_arrow_dataset() addition. I can open a PR for this if it makes sense to the contributors/ members of Iceberg.

Once this addition is in place, I can open a PR to support iceberg on Polars side.

@Fokko
Copy link
Contributor

Fokko commented Apr 5, 2023

Thanks @chitralverma for chiming in here.

I was looking to do this integration over the weekend. It will be a quick addition because py-iceberg already allows a table to be converted to a pyarrow table which can be fed to Polars' eager read API. No need to rely on to_pandas which may incur additional overhead.

That sounds like a great first step. The important part is that we push down the predicate from Polars into PyIceberg. Iceberg is designed to work with large tables, and not being able to prune files would result in very poor performance.

However, it would be great to support the lazy scan API as well, because most internal optimisation take place over there.

I fully agree. I think that would be a great second step, but would probably be a bit more complex. We don't integrate in the way with arrow that would be ideal, but we're working on this (probably would take some time). This would require when an action is being done on a dataset, it would need to call pyiceberg to do the planning (and do all the Iceberg optimizations).

I'm happy to help, but I'm less familiar with Polars, so it would be awesome if you could work on the integration on that side 🚀

@chitralverma
Copy link

I'm happy to help, but I'm less familiar with Polars, so it would be awesome if you could work on the integration on that side 🚀

sure, sounds good! let me get a little familiar with the iceberg codebase and then get back on this.

Once iceberg <=> arrow link works as expected, adding this on polars side will be easy.

@Fokko
Copy link
Contributor

Fokko commented Aug 3, 2023

@bitsondatadev mentioned Polars integration.

Looking at it a second time, I think we can implement this by just constructing a LazyFrame, similar to in: https://github.com/pola-rs/polars/blob/main/py-polars/polars/io/pyarrow_dataset/anonymous_scan.py

We would instantiate a StaticTable to fetch the schema, and then in the LazyFrame do the actual scan.

@bitsondatadev
Copy link
Collaborator

@Fokko FYI, I also asked on an issue I found on the Polars gh, if they prefer rust vs python implementation.

@Fokko
Copy link
Contributor

Fokko commented Aug 4, 2023

Took a stab at it, and it seems to work fine: https://www.loom.com/share/2b2dfbbada6e4fac88d0d0070e31f99f

@chitralverma
Copy link

chitralverma commented Aug 4, 2023

@Fokko saw the video and approach, I checked this out before.

The problem is that iceberg_table.scan().to_arrow() actually returns a pyarrow Table not a pyarrow dataset. Creating a pyarrow table effectively causes materialisation which would therefore not be lazy.

See this behaviour here.

I guess the changes need to be done on the pyiceberg side to return a pyarrow Dataset (lazy) and not a pyarrow Table (eager). this pyarrow dataset can then be easily fed to polars.

If you see delta also has the same.

@Fokko
Copy link
Contributor

Fokko commented Aug 4, 2023

I agree with you there, but it happens after the filtering, so PyIceberg will already prune the unrelated files, and filter out the unrelated data.

The problem with to_pyarrow_dataset is that Iceberg has much more sophisticated pruning, that can happen at different levels, and this cannot be expressed in arrow fragments. We're looking into adding substrait integration for PyIceberg, where we could express this, but this is further along.

With Iceberg's hidden partitioning, we don't have to do things like:

    Use the `pyarrow_options` parameter to read only certain partitions.

    >>> pl.scan_delta(  # doctest: +SKIP
    ...     table_path,
    ...     pyarrow_options={"partitions": [("year", "=", "2021")]},
    ... )

Which I think is very user-unfriendly, because if you don't pass the partition, it will also cause Polars to read too much data.

Since pl.LazyFrame._scan_python_function(...) only will be called on an action, and that passes in the filter-predicate, I think we're fine. Or am I missing something?

@asheeshgarg
Copy link
Author

asheeshgarg commented Aug 4, 2023

Another thing that seems to be bottlenecked currently one you materialize to arrow the size explode a lot as the dictionary encoding of parquet are not kept apache/arrow#20110
So the memory requirements is not very well suited. So to_arrow will seem to hit the with it.

@Fokko
Copy link
Contributor

Fokko commented Oct 2, 2023

Polars has support for Iceberg: pola-rs/polars#10375 🚀

@Fokko Fokko closed this as completed Oct 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants