Skip to content

feat: LazyFrame.collect with backend and **kwargs#1734

Merged
MarcoGorelli merged 25 commits intomainfrom
feat/collect-kwargs
Feb 2, 2025
Merged

feat: LazyFrame.collect with backend and **kwargs#1734
MarcoGorelli merged 25 commits intomainfrom
feat/collect-kwargs

Conversation

@FBruzzesi
Copy link
Member

@FBruzzesi FBruzzesi commented Jan 6, 2025

What type of PR is this? (check all applicable)

  • 💾 Refactor
  • ✨ Feature
  • 🐛 Bug Fix
  • 🔧 Optimization
  • 📝 Documentation
  • ✅ Test
  • 🐳 Other

Related issues

Checklist

  • Code follows style guide (ruff)
  • Tests added
  • Documented the changes

If you have comments or can explain your changes, please do so below

This is a proposal for #1479.

As it gets more relevant now due to DuckDB support and to decide how we could collect a DuckDB table.

For polars and dask, collect kwargs would follow native collect and compute respectively. For DuckDB we could come up with our own and document it properly. Specifically I would suggest to let the user decide to which dataframe backend to collect to (return_type?), with Arrow as default.

@FBruzzesi FBruzzesi added the enhancement New feature or request label Jan 6, 2025
@MarcoGorelli
Copy link
Member

MarcoGorelli commented Jan 6, 2025

Nice, I like the look of this

it may help to simplify

narwhals/tests/utils.py

Lines 77 to 80 in 8c9525a

if result.implementation is Implementation.POLARS and os.environ.get(
"NARWHALS_POLARS_GPU", False
): # pragma: no cover
result = result.to_native().collect(engine="gpu")

to just result.collect(polars_kwargs=dict(engine="gpu"))?


I would suggest to let the user decide to which dataframe backend to collect to (return_type?), with Arrow as default.

agree, I think arrow's a good default for duckdb (also, as far as I can tell, collecting into Polars from duckdb requires pyarrow anyway, suggesting they first collect into pyarrow anyway?). to check my understanding then, this would be compatibe with the collect in #1725, and we can add extra kwargs later?

@FBruzzesi
Copy link
Member Author

to just result.collect(polars_kwargs=dict(engine="gpu"))?

Yes exactly!

agree, I think arrow's a good default for duckdb (also, as far as I can tell, collecting into Polars from duckdb requires pyarrow anyway, suggesting they first collect into pyarrow anyway?).

🤔 now that you mention (and completely unrelated from this PR), we could do the same for pyspark: see SO Ritchie answer

to check my understanding then, this would be compatibe with the collect in #1725, and we can add extra kwargs later?

Yes indeed as long as we intend to have the default to be PyArrow

@MarcoGorelli
Copy link
Member

we could do the same for pyspark: see SO Ritchie answer

yes, nice! 🙌

@MarcoGorelli
Copy link
Member

I think this looks good, just going to give the chance to others to weigh in

@FBruzzesi
Copy link
Member Author

FBruzzesi commented Jan 6, 2025

@MarcoGorelli I just added duckdb_kwargs to specify DuckDB return_type in the last commit. Can revert it if we want to sleep on it

from narwhals.utils import Implementation

return PolarsDataFrame(
df=self._native_frame.pl(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated but.. should we change in PolarsDataFrame:

- df: pl.DataFrame,
+ native_dataframe: pl.DataFrame,

Comment on lines 3616 to 3618
polars_kwargs: dict[str, Any] | None = None,
dask_kwargs: dict[str, Any] | None = None,
duckdb_kwargs: dict[str, str] | None = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could all be TypedDict's 👀

@EdAbati
Copy link
Collaborator

EdAbati commented Jan 9, 2025

Niiiice! 🙌🙌

A couple of questions/idea:

  • wouldn't return_type eventually become an argument for every Lazy backend? e.g. one may want to always collect to polars regardless of which Lazy dataframe they are starting from.
    What do you think about changing the signature to:

    def collect(self, return_type, ...) -> ...
  • still not 100% sold to the idea of having one specific kwargs per Lazy backend, I think it would give us a bit less flexibility. The signature would change if we add more lazy backends (e.g. pyspark). Or I think it'd become a problem if:

    • we remove a lazy backend, for example moving it to an separate integration library
    • people wants to create their own Lazy backends and want to pass args to their collect. (probably a niche use case though)

    What do you think?

    Also I wouldn't find particularly ugly something like this (but that's personal preference :D ):

     lazy_df = df.lazy().select(nw_v1.all().sum())
     if lazy_df.implementation == Implementation.POLARS:
         eager_df = lazy_df.collect(no_optimization=True)
     elif lazy_df.implementation == Implementation.DASK:
         eager_df = lazy_df.collect(optimize_graph=False)
     elif lazy_df.implementation == Implementation.DUCKDB:
         eager_df = lazy_df.collect(return_type="pyarrow")

@FBruzzesi
Copy link
Member Author

FBruzzesi commented Jan 9, 2025

Hey @EdAbati, thanks for your feedback! that's exactly the purpose of a RFC 👌

  • wouldn't return_type eventually become an argument for every Lazy backend? e.g. one may want to always collect to polars regardless of which Lazy dataframe they are starting from.

Considering the dataframe ecosystem as of today:

  • Polars would always collect to polars
  • Dask would always collect to pandas
  • DuckDB (and maybe Ibis) could collect to polars, arrow and pandas
  • Spark natively to pandas (but I see the appeal of collecting to others)

Not sure what we are aim to support in the future 😉 but I would try not to break our head too soon here!

  • still not 100% sold to the idea of having one specific kwargs per Lazy backend, I think it would give us a bit less flexibility. The signature would change if we add more lazy backends (e.g. pyspark). Or I think it'd become a problem if:

    • we remove a lazy backend, for example moving it to an separate integration library

Those are definitly fair concerns to think about! Thanks for pointing those out!

  • people wants to create their own Lazy backends and want to pass args to their collect. (probably a niche use case though)

That's when they can branch out, we would just need to add additional **kwargs to be passed to all .collect if the implementation is outside of what we cover with the dedicated arguments. In code:

def collect(
    self: Self,
    *,
    polars_kwargs: dict[str, Any] | None = None,
    dask_kwargs: dict[str, Any] | None = None,
    duckdb_kwargs: dict[str, str] | None = None,
    **kwargs: Any,
):
    from narwhals.utils import Implementation

    if self.implementation is Implementation.POLARS and polars_kwargs is not None:
        kwargs_ = polars_kwargs
    elif ...:
        ...
    else:
        kwargs_ = kwargs

    return self._dataframe(
        self._compliant_frame.collect(**kwargs),
        level="full",
    )

What do you think?
Also I wouldn't find particularly ugly something like this (but that's personal preference :D ):

 lazy_df = df.lazy().select(nw_v1.all().sum())
 if lazy_df.implementation == Implementation.POLARS:
     eager_df = lazy_df.collect(no_optimization=True)
 elif lazy_df.implementation == Implementation.DASK:
     eager_df = lazy_df.collect(optimize_graph=False)
 elif lazy_df.implementation == Implementation.DUCKDB:
     eager_df = lazy_df.collect(return_type="pyarrow")

I would be ok deferring the responsibility to the users. But coming back to personal preference, if I were to use an external library to find that I have to do a lot of branching myself, I wouldn't say it is particularly ergonomic 😅

@EdAbati
Copy link
Collaborator

EdAbati commented Jan 11, 2025

Not sure what we are aim to support in the future 😉 but I would try not to break our head too soon here!

Ah! I was under the impression that there was a request to make LazyFrames able to collect to any backend. Similarly to what the methods to_pandas(), to_arrow etc would do for eager frames. Maybe I misunderstood then :D

Also FYI PySpark 4.0.0 will support toArrow

@MarcoGorelli
Copy link
Member

there is indeed a request to be able to collect to specific backends (e.g. someone via work asked to be able to collect duckdb-backed lazyframe into polars-backed dataframe), but I think this would still be backend-specific - e.g. not all lazy backends would necessarily have a way to collect to polars

as in, which might not have a way to do lazy_df.collect(eager_backend='polars') and know that it will work for all lazy backends...but if we know that say duckdb supports it, we can do lazy_df.collect(duckdb_kwargs={'eager_backend': 'polars'})

Having said that, should we:

  • use eager_backend instead of return_dtype? return_type kind of sounds like it affects the return type of the function, although that is always nw.DataFrame
  • should we document a couple of common kwargs for polars / dask backends? e.g. streaming, engine='gpu'. then for duckdb_kwargs / pyspark_kwargs we can document 'eager_backend'

if lazy_df.implementation == Implementation.POLARS:

small note, but this can now be done as if lazy_df.implementation.is_polars()

@MarcoGorelli
Copy link
Member

MarcoGorelli commented Jan 16, 2025

Been thinking about this a bit more, and in the read functions we have **kwargs, perhaps we should be consistent with that? To avoid branching, users could make a collect_kwargs dictionary which maps implementations to kwargs and then do

lf.collect(collect_kwargs[lf.implementation])

We could also have eager_backend as an argument in collect, and lazy_backend in lazy?

@MarcoGorelli
Copy link
Member

MarcoGorelli commented Jan 28, 2025

TBH, the more I think about this, the more I'd be in favour of just:

    def collect(self: Self, backend: ModuleType | Implementation | str | None, **kwargs) -> DataFrame[Any]: ...
    def lazy(self: Self, backend: ModuleType | Implementation| str | None) -> LazyFrame[Any]:

Because then:

  • as a user, I could do
df = (
    lf.group_by('a', 'b').agg(nw.all().mean().name.suffix('_mean'))
    .collect(engine='gpu')  # alternative is `.collect(polars_kwargs={'engine': 'gpu'})`
    .with_columns(pl.col('a').rolling_mean(2))
    .to_native()
  • as a tool-builder wanting to avoid extensive if/then statement, I could do:
collect_kwargs = {
    nw.dependencies.get_polars(): {'engine: 'gpu'},
    nw.dependencies.get_dask(): {'optimize_graph': False},
    nw.dependencies.get_pyarrow(): {'backend': 'pyarrow'},
    my_fancy_extension_module: {'my_fancy_kwarg': my_fancy_value},
}
 lazy_df = df.lazy().select(nw.all().sum()).collect(**collect_kwargs[nw.get_native_namespace(df)])
  • it's possible to round-trip lazy-eager-lazy, like
lf = lf.group_by('foo').agg(nw.selectors.numeric.mean())
implementation = lf.implementation
df = lf.collect()
train_df, val_df = train_test_split(df)
train_lf = train_df.lazy(implementation)
val_lf = val_df.lazy(implementation)

I think we'd be pretty safe to use backend, because Polars uses engine and I can't see why they'd add both

Regarding ModuleType | Implementation | str | None, I think we can accept all of:

  • module (e.g. polars, duckdb, ...)
  • implementation (e.g. nw.Implementation.POLARS)
  • str (e.g. 'polars')
  • None: use the default eager/lazy backend for the given module (we can document sensible defaults)

We should probably accept all the above in from_dict and similars too, and perhaps deprecate native_namespace which is annoyingly long

@FBruzzesi
Copy link
Member Author

Thanks @MarcoGorelli, we can do some brainstorming in the community call tomorrow maybe.

  • as a tool-builder wanting to avoid extensive if/then statement, I could do:
collect_kwargs = {
    nw.dependencies.get_polars(): {'engine: 'gpu'},
    nw.dependencies.get_dask(): {'optimize_graph': False},
    nw.dependencies.get_pyarrow(): {'backend': 'pyarrow'},
    my_fancy_extension_module: {'my_fancy_kwarg': my_fancy_value},
}
 lazy_df = df.lazy().select(nw.all().sum()).collect(**collect_kwargs[nw.get_native_namespace(df)])

I am ok with this - it might not be the most ergonomic way but it definitely lowers the effort a lot from our side (we would just need to pass kwargs along

I think we'd be pretty safe to use backend, because Polars uses engine and I can't see why they'd add both

Regarding ModuleType | Implementation | str | None, I think we can accept all of:

  • module (e.g. polars, duckdb, ...)
  • implementation (e.g. nw.Implementation.POLARS)
  • str (e.g. 'polars')
  • None: use the default eager/lazy backend for the given module (we can document sensible defaults)

Am I correctly assuming this refers to which custom collect backend we want to allow for duckdb/pyspark? Or you would like it to be more general?

Since polars and dask have a native collects backend, I would rather keep that, but I might be missing some use cases.
For pyspark and duckdb I am totally open to customize their available kwargs to allow for backend (with pyarrow as default?)

@MarcoGorelli
Copy link
Member

I was thinking something like:

  • backend not specified: the default one is used, and we document what that means. This would be:
    • polars.LazyFrame -> polars.DataFrame
    • dask.DataFrame -> pandas.DataFrame
    • duckdb.PyRelation -> pyarrow.Table
    • pyspark -> probably pyarrow.Table?
  • backend specified: we collect into the desired eager implementation.

So:

  • if someone calls .collect('pandas') on a polars.LazyFrame-backed narwhals.LazyFrame, then we give them a pandas-backed narwhals.DataFrame
  • if someone calls .collect() on a polars.LazyFrame-backed narwhals.LazyFrame, then we give them a polars.DataFrame-backed narwhals.DataFrame

@FBruzzesi
Copy link
Member Author

I am almost finished with the reimplementation. I have just one question left for now.
Since PandasLikeDataFrame also implements .collect() method, should we return self for backend=None or default to pandas?

@MarcoGorelli
Copy link
Member

I'd say, self, so that from_native(df).lazy().collect() if starting from cuDF round-trips, right?

Comment on lines 140 to 143
return ArrowDataFrame(
native_dataframe=pa.Table.from_batches(
self._native_frame._collect_as_arrow()
),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am defaulting to pandas instead of pyarrow because otherwise some tests will fail. Specifically those that result in an empty dataframe - pyarrow from_batches raises that RecordBatches cannot be empty. Maybe once pyspark 4.0 is out this could be better integrated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would technically be a breaking change - can we catch the exception and return an empty dataframe?

Copy link
Member Author

@FBruzzesi FBruzzesi Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I will take a look. Notice that changing now to pyarrow now is also a breaking change 🙈

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right we currently collect into pandas?

🙈

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've documented pyspark support as "work-in-progress" so we may not need to feel too guilty about changing this 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct 🙃

Comment on lines +155 to +156
df=pl.from_arrow( # type: ignore[arg-type]
pa.Table.from_batches(self._native_frame._collect_as_arrow())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ritchie answer in stackoverflow

@FBruzzesi
Copy link
Member Author

@MarcoGorelli I was expecting worse from CI after such overhaul 😂

@FBruzzesi FBruzzesi changed the title RFC, feat: LazyFrame.collect kwargs feat: LazyFrame.collect with backend and **kwargs Jan 30, 2025
@FBruzzesi FBruzzesi added the high priority Your PR will be reviewed very quickly if you address this label Jan 30, 2025
@FBruzzesi FBruzzesi mentioned this pull request Jan 31, 2025
10 tasks
Comment on lines +125 to +136
mapping = {
"pandas": Implementation.PANDAS,
"modin": Implementation.MODIN,
"cudf": Implementation.CUDF,
"pyarrow": Implementation.PYARROW,
"pyspark": Implementation.PYSPARK,
"polars": Implementation.POLARS,
"dask": Implementation.DASK,
"duckdb": Implementation.DUCKDB,
"ibis": Implementation.IBIS,
}
return mapping.get(backend_name, Implementation.UNKNOWN)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to suggest this, after adding something similar in (https://github.com/vega/altair/blob/94220be0115e8b13d2ebc686552edf68fd841a54/altair/datasets/_reader.py#L493-L510)

Great to see it in narhwals 🎉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 nice, you're one step ahead

minor comment but I couldn't help spotting the "narhwals" typo in there (line 509) 🙈

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooh well spotted, thanks @MarcoGorelli

Copy link
Member

@MarcoGorelli MarcoGorelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing, thanks so much @FBruzzesi ! I pushed a little commit to simplify backend parsing, to default to PyArrow for PySpark, and to handle the (hopefully rare) empty input cases

I noticed something here regarding collecting to different backends: going from pyspark to pandas loses the timezone-awareness of the dtype, but for pyarrow it's preserved:

(Pdb) p self._native_frame
DataFrame[b: timestamp]
(Pdb) p self._native_frame.toPandas()
                    b
0 2020-01-01 12:34:56
(Pdb) p self._native_frame._collect_as_arrow()
[pyarrow.RecordBatch
b: timestamp[us, tz=UTC]
----
b: [2020-01-01 12:34:56.000000Z]]

This helped spot that in to_datetime we're not parsing timezone-naive formats as timezone-naive

To me this kinda confirms that PyArrow is probably a better default

@FBruzzesi
Copy link
Member Author

FBruzzesi commented Feb 2, 2025

Thanks @MarcoGorelli for all the additional improvements! I am very excited to finally ship this one and enabling collect kwargs ✨

@MarcoGorelli
Copy link
Member

thanks all! let's ship it then

@MarcoGorelli MarcoGorelli merged commit 8ca9422 into main Feb 2, 2025
23 checks passed
@MarcoGorelli MarcoGorelli deleted the feat/collect-kwargs branch February 2, 2025 13:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request high priority Your PR will be reviewed very quickly if you address this

Projects

None yet

Development

Successfully merging this pull request may close these issues.

API: collect for lazy-only libraries

4 participants