diff --git a/docs/api-reference/expr.md b/docs/api-reference/expr.md index 2382ca0d17..d6b1a1411c 100644 --- a/docs/api-reference/expr.md +++ b/docs/api-reference/expr.md @@ -9,6 +9,7 @@ - all - any - cast + - clip - count - cum_count - cum_max @@ -22,7 +23,6 @@ - fill_nan - fill_null - filter - - clip - is_between - is_close - is_duplicated @@ -42,8 +42,8 @@ - median - min - mode - - null_count - n_unique + - null_count - over - pipe - quantile diff --git a/narwhals/_arrow/dataframe.py b/narwhals/_arrow/dataframe.py index 4de78d2792..69da8726f8 100644 --- a/narwhals/_arrow/dataframe.py +++ b/narwhals/_arrow/dataframe.py @@ -45,6 +45,7 @@ Order, ) from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import Version, _LimitedContext @@ -527,7 +528,12 @@ def tail(self, n: int) -> Self: ) return self._with_native(df.slice(abs(n)), validate_column_names=False) - def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny: + def lazy( + self, + backend: _LazyAllowedImpl | None = None, + *, + session: SparkSession | None = None, + ) -> CompliantLazyFrameAny: if backend is None: return self if backend is Implementation.DUCKDB: @@ -535,9 +541,9 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny from narwhals._duckdb.dataframe import DuckDBLazyFrame - df = self.native # noqa: F841 + _df = self.native return DuckDBLazyFrame( - duckdb.table("df"), validate_backend_version=True, version=self._version + duckdb.table("_df"), validate_backend_version=True, version=self._version ) if backend is Implementation.POLARS: import polars as pl # ignore-banned-import @@ -559,7 +565,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny validate_backend_version=True, version=self._version, ) - if backend.is_ibis(): + if backend is Implementation.IBIS: import ibis # ignore-banned-import from narwhals._ibis.dataframe import IbisLazyFrame @@ -570,6 +576,17 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny version=self._version, ) + if backend.is_spark_like(): + from narwhals._spark_like.dataframe import SparkLikeLazyFrame + + if session is None: + msg = "Spark like backends require `session` to be not None." + raise ValueError(msg) + + return SparkLikeLazyFrame._from_compliant_dataframe( + self, session=session, implementation=backend, version=self._version + ) + raise AssertionError # pragma: no cover def collect( diff --git a/narwhals/_compliant/dataframe.py b/narwhals/_compliant/dataframe.py index 33d0c9a167..a3e34bbebf 100644 --- a/narwhals/_compliant/dataframe.py +++ b/narwhals/_compliant/dataframe.py @@ -50,6 +50,7 @@ from narwhals._compliant.group_by import CompliantGroupBy, DataFrameGroupBy from narwhals._compliant.namespace import EagerNamespace + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import Implementation, _LimitedContext @@ -196,7 +197,9 @@ def join_asof( strategy: AsofJoinStrategy, suffix: str, ) -> Self: ... - def lazy(self, backend: _LazyAllowedImpl | None) -> CompliantLazyFrameAny: ... + def lazy( + self, backend: _LazyAllowedImpl | None, *, session: SparkSession | None + ) -> CompliantLazyFrameAny: ... def pivot( self, on: Sequence[str], diff --git a/narwhals/_duckdb/dataframe.py b/narwhals/_duckdb/dataframe.py index 848bd9fe83..9c55d9dd22 100644 --- a/narwhals/_duckdb/dataframe.py +++ b/narwhals/_duckdb/dataframe.py @@ -48,7 +48,7 @@ from narwhals._duckdb.group_by import DuckDBGroupBy from narwhals._duckdb.namespace import DuckDBNamespace from narwhals._duckdb.series import DuckDBInterchangeSeries - from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl + from narwhals._typing import _EagerAllowedImpl from narwhals._utils import _LimitedContext from narwhals.dataframe import LazyFrame from narwhals.dtypes import DType @@ -189,7 +189,7 @@ def drop(self, columns: Sequence[str], *, strict: bool) -> Self: selection = (name for name in self.columns if name not in columns_to_drop) return self._with_native(self.native.select(*selection)) - def lazy(self, backend: _LazyAllowedImpl | None = None) -> Self: + def lazy(self, backend: None = None, **_: None) -> Self: # The `backend`` argument has no effect but we keep it here for # backwards compatibility because in `narwhals.stable.v1` # function `.from_native()` will return a DataFrame for DuckDB. diff --git a/narwhals/_ibis/dataframe.py b/narwhals/_ibis/dataframe.py index 7d3369ca66..e976d54729 100644 --- a/narwhals/_ibis/dataframe.py +++ b/narwhals/_ibis/dataframe.py @@ -34,7 +34,7 @@ from narwhals._ibis.group_by import IbisGroupBy from narwhals._ibis.namespace import IbisNamespace from narwhals._ibis.series import IbisInterchangeSeries - from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl + from narwhals._typing import _EagerAllowedImpl from narwhals._utils import _LimitedContext from narwhals.dataframe import LazyFrame from narwhals.dtypes import DType @@ -165,7 +165,7 @@ def drop(self, columns: Sequence[str], *, strict: bool) -> Self: selection = (col for col in self.columns if col not in columns_to_drop) return self._with_native(self.native.select(*selection)) - def lazy(self, backend: _LazyAllowedImpl | None = None) -> Self: + def lazy(self, backend: None = None, **_: None) -> Self: # The `backend`` argument has no effect but we keep it here for # backwards compatibility because in `narwhals.stable.v1` # function `.from_native()` will return a DataFrame for Ibis. diff --git a/narwhals/_pandas_like/dataframe.py b/narwhals/_pandas_like/dataframe.py index a5995b8145..0b19a63562 100644 --- a/narwhals/_pandas_like/dataframe.py +++ b/narwhals/_pandas_like/dataframe.py @@ -47,6 +47,7 @@ from narwhals._pandas_like.expr import PandasLikeExpr from narwhals._pandas_like.group_by import PandasLikeGroupBy from narwhals._pandas_like.namespace import PandasLikeNamespace + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import Version, _LimitedContext @@ -772,7 +773,12 @@ def unique( ) # --- lazy-only --- - def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny: + def lazy( + self, + backend: _LazyAllowedImpl | None = None, + *, + session: SparkSession | None = None, + ) -> CompliantLazyFrameAny: pandas_df = self.to_pandas() if backend is None: return self @@ -806,7 +812,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny validate_backend_version=True, version=self._version, ) - if backend.is_ibis(): + if backend is Implementation.IBIS: import ibis # ignore-banned-import from narwhals._ibis.dataframe import IbisLazyFrame @@ -816,6 +822,21 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny validate_backend_version=True, version=self._version, ) + + if backend.is_spark_like(): + from narwhals._spark_like.dataframe import SparkLikeLazyFrame + + if session is None: + msg = "Spark like backends require `session` to be not None." + raise ValueError(msg) + + return SparkLikeLazyFrame( + session.createDataFrame(pandas_df), + version=self._version, + implementation=backend, + validate_backend_version=True, + ) + raise AssertionError # pragma: no cover @property diff --git a/narwhals/_polars/dataframe.py b/narwhals/_polars/dataframe.py index c9e22f48f9..df79180c9a 100644 --- a/narwhals/_polars/dataframe.py +++ b/narwhals/_polars/dataframe.py @@ -40,6 +40,7 @@ from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny from narwhals._polars.expr import PolarsExpr from narwhals._polars.group_by import PolarsGroupBy, PolarsLazyGroupBy + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import Version, _LimitedContext @@ -455,7 +456,12 @@ def iter_columns(self) -> Iterator[PolarsSeries]: for series in self.native.iter_columns(): yield PolarsSeries.from_native(series, context=self) - def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny: + def lazy( + self, + backend: _LazyAllowedImpl | None = None, + *, + session: SparkSession | None = None, + ) -> CompliantLazyFrameAny: if backend is None or backend is Implementation.POLARS: return PolarsLazyFrame.from_native(self.native.lazy(), context=self) if backend is Implementation.DUCKDB: @@ -463,10 +469,9 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny from narwhals._duckdb.dataframe import DuckDBLazyFrame - # NOTE: (F841) is a false positive - df = self.native # noqa: F841 + _df = self.native return DuckDBLazyFrame( - duckdb.table("df"), validate_backend_version=True, version=self._version + duckdb.table("_df"), validate_backend_version=True, version=self._version ) if backend is Implementation.DASK: import dask.dataframe as dd # ignore-banned-import @@ -478,7 +483,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny validate_backend_version=True, version=self._version, ) - if backend.is_ibis(): + if backend is Implementation.IBIS: import ibis # ignore-banned-import from narwhals._ibis.dataframe import IbisLazyFrame @@ -489,6 +494,20 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny version=self._version, ) + if backend.is_spark_like(): + from narwhals._spark_like.dataframe import SparkLikeLazyFrame + + if session is None: + msg = "Spark like backends require `session` to be not None." + raise ValueError(msg) + + return SparkLikeLazyFrame._from_compliant_dataframe( + self, # pyright: ignore[reportArgumentType] + session=session, + implementation=backend, + version=self._version, + ) + raise AssertionError # pragma: no cover @overload diff --git a/narwhals/_spark_like/dataframe.py b/narwhals/_spark_like/dataframe.py index c1ab4ec1fb..eba3001e7b 100644 --- a/narwhals/_spark_like/dataframe.py +++ b/narwhals/_spark_like/dataframe.py @@ -42,6 +42,7 @@ from narwhals._spark_like.expr import SparkLikeExpr from narwhals._spark_like.group_by import SparkLikeLazyGroupBy from narwhals._spark_like.namespace import SparkLikeNamespace + from narwhals._spark_like.utils import SparkSession from narwhals._typing import _EagerAllowedImpl from narwhals._utils import Version, _LimitedContext from narwhals.dataframe import LazyFrame @@ -561,6 +562,36 @@ def with_row_index(self, name: str, order_by: Sequence[str]) -> Self: def sink_parquet(self, file: str | Path | BytesIO) -> None: self.native.write.parquet(file) + @classmethod + def _from_compliant_dataframe( + cls, + frame: CompliantDataFrameAny, + /, + *, + session: SparkSession, + implementation: Implementation, + version: Version, + ) -> SparkLikeLazyFrame: + from importlib.util import find_spec + + impl = implementation + is_spark_v4 = (not impl.is_sqlframe()) and impl._backend_version() >= (4, 0, 0) + if is_spark_v4: # pragma: no cover + # pyspark.sql requires pyarrow to be installed from v4.0.0 + # and since v4.0.0 the input to `createDataFrame` can be a PyArrow Table. + data: Any = frame.to_arrow() + elif find_spec("pandas"): + data = frame.to_pandas() + else: # pragma: no cover + data = tuple(frame.iter_rows(named=True, buffer_size=512)) + + return cls( + session.createDataFrame(data), + version=version, + implementation=implementation, + validate_backend_version=True, + ) + gather_every = not_implemented.deprecated( "`LazyFrame.gather_every` is deprecated and will be removed in a future version." ) diff --git a/narwhals/_typing.py b/narwhals/_typing.py index 07ffb01a64..d8bc1408ae 100644 --- a/narwhals/_typing.py +++ b/narwhals/_typing.py @@ -51,7 +51,6 @@ # - https://github.com/narwhals-dev/narwhals/pull/2971#discussion_r2277137003 # - https://github.com/narwhals-dev/narwhals/pull/3002#issuecomment-3194267667 _LazyFrameCollectImpl: TypeAlias = Literal[_PandasImpl, _PolarsImpl, _ArrowImpl] # noqa: PYI047 -_DataFrameLazyImpl: TypeAlias = Literal[_PolarsImpl, _DaskImpl, _DuckDBImpl, _IbisImpl] # noqa: PYI047 # `str | Implementation` aliases Pandas: TypeAlias = Literal[_Pandas, _PandasImpl] diff --git a/narwhals/_utils.py b/narwhals/_utils.py index 0d7aa2d13d..dee2234d21 100644 --- a/narwhals/_utils.py +++ b/narwhals/_utils.py @@ -79,8 +79,8 @@ from narwhals._typing import ( Backend, IntoBackend, - _DataFrameLazyImpl, _EagerAllowedImpl, + _LazyAllowedImpl, _LazyFrameCollectImpl, ) from narwhals.dataframe import DataFrame, LazyFrame @@ -1648,9 +1648,9 @@ def is_compliant_expr( def is_eager_allowed(impl: Implementation, /) -> TypeIs[_EagerAllowedImpl]: """Return True if `impl` allows eager operations.""" return impl in { - Implementation.PANDAS, - Implementation.MODIN, Implementation.CUDF, + Implementation.MODIN, + Implementation.PANDAS, Implementation.POLARS, Implementation.PYARROW, } @@ -1661,13 +1661,16 @@ def can_lazyframe_collect(impl: Implementation, /) -> TypeIs[_LazyFrameCollectIm return impl in {Implementation.PANDAS, Implementation.POLARS, Implementation.PYARROW} -def can_dataframe_lazy(impl: Implementation, /) -> TypeIs[_DataFrameLazyImpl]: +def is_lazy_allowed(impl: Implementation, /) -> TypeIs[_LazyAllowedImpl]: """Return True if `DataFrame.lazy(impl)` is allowed.""" return impl in { Implementation.DASK, Implementation.DUCKDB, - Implementation.POLARS, Implementation.IBIS, + Implementation.POLARS, + Implementation.PYSPARK, + Implementation.PYSPARK_CONNECT, + Implementation.SQLFRAME, } diff --git a/narwhals/dataframe.py b/narwhals/dataframe.py index 96efa686e7..622f70f36e 100644 --- a/narwhals/dataframe.py +++ b/narwhals/dataframe.py @@ -22,11 +22,10 @@ check_expressions_preserve_length, is_scalar_like, ) -from narwhals._typing import Arrow, Pandas, _DataFrameLazyImpl, _LazyFrameCollectImpl +from narwhals._typing import Arrow, Pandas, _LazyAllowedImpl, _LazyFrameCollectImpl from narwhals._utils import ( Implementation, Version, - can_dataframe_lazy, can_lazyframe_collect, check_columns_exist, flatten, @@ -35,6 +34,7 @@ is_compliant_lazyframe, is_eager_allowed, is_index_selector, + is_lazy_allowed, is_list_of, is_sequence_like, is_slice_none, @@ -72,7 +72,7 @@ from narwhals._compliant import CompliantDataFrame, CompliantLazyFrame from narwhals._compliant.typing import CompliantExprAny, EagerNamespaceAny from narwhals._translate import IntoArrowTable - from narwhals._typing import Dask, DuckDB, EagerAllowed, Ibis, IntoBackend, Polars + from narwhals._typing import EagerAllowed, IntoBackend, LazyAllowed, Polars from narwhals.group_by import GroupBy, LazyGroupBy from narwhals.typing import ( AsofJoinStrategy, @@ -724,7 +724,10 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: return pa_table.__arrow_c_stream__(requested_schema=requested_schema) # type: ignore[no-untyped-call] def lazy( - self, backend: IntoBackend[Polars | DuckDB | Ibis | Dask] | None = None + self, + backend: IntoBackend[LazyAllowed] | None = None, + *, + session: Any | None = None, ) -> LazyFrame[Any]: """Restrict available API methods to lazy-only ones. @@ -736,6 +739,18 @@ def lazy( if you want to ensure that you write dataframe-agnostic code which all has the possibility of running entirely lazily. + Note: + If `backend` is spark-like, then a valid `session` is required. + + For instance: + + ```py + import narwhals as nw + from sqlframe.duckdb import DuckDBSession + + df.lazy(backend=nw.Implementation.SQLFRAME, session=DuckDBSession()) + ``` + Arguments: backend: Which lazy backend collect to. This will be the underlying backend for the resulting Narwhals LazyFrame. If not specified, and the @@ -748,6 +763,7 @@ def lazy( `IBIS` or `POLARS`. - As a string: `"dask"`, `"duckdb"`, `"ibis"` or `"polars"` - Directly as a module `dask.dataframe`, `duckdb`, `ibis` or `polars`. + session: Session to be used if backend is spark-like. Examples: >>> import polars as pl @@ -783,11 +799,11 @@ def lazy( """ lazy = self._compliant_frame.lazy if backend is None: - return self._lazyframe(lazy(None), level="lazy") + return self._lazyframe(lazy(None, session=session), level="lazy") lazy_backend = Implementation.from_backend(backend) - if can_dataframe_lazy(lazy_backend): - return self._lazyframe(lazy(lazy_backend), level="lazy") - msg = f"Not-supported backend.\n\nExpected one of {get_args(_DataFrameLazyImpl)} or `None`, got {lazy_backend}" + if is_lazy_allowed(lazy_backend): + return self._lazyframe(lazy(lazy_backend, session=session), level="lazy") + msg = f"Not-supported backend.\n\nExpected one of {get_args(_LazyAllowedImpl)} or `None`, got {lazy_backend}" raise ValueError(msg) def to_native(self) -> DataFrameT: diff --git a/narwhals/stable/v1/__init__.py b/narwhals/stable/v1/__init__.py index 6d3fe30367..4b9f7ef487 100644 --- a/narwhals/stable/v1/__init__.py +++ b/narwhals/stable/v1/__init__.py @@ -72,11 +72,9 @@ from narwhals._typing import ( Arrow, Backend, - Dask, - DuckDB, EagerAllowed, - Ibis, IntoBackend, + LazyAllowed, Pandas, Polars, ) @@ -193,9 +191,12 @@ def get_column(self, name: str) -> Series: return super().get_column(name) # type: ignore[return-value] def lazy( - self, backend: IntoBackend[Polars | DuckDB | Ibis | Dask] | None = None + self, + backend: IntoBackend[LazyAllowed] | None = None, + *, + session: Any | None = None, ) -> LazyFrame[Any]: - return _stableify(super().lazy(backend=backend)) + return _stableify(super().lazy(backend=backend, session=session)) @overload # type: ignore[override] def to_dict(self, *, as_series: Literal[True] = ...) -> dict[str, Series[Any]]: ... diff --git a/narwhals/stable/v2/__init__.py b/narwhals/stable/v2/__init__.py index fe8f7a71b0..e1963a4480 100644 --- a/narwhals/stable/v2/__init__.py +++ b/narwhals/stable/v2/__init__.py @@ -67,11 +67,9 @@ from narwhals._typing import ( Arrow, Backend, - Dask, - DuckDB, EagerAllowed, - Ibis, IntoBackend, + LazyAllowed, Pandas, Polars, ) @@ -187,9 +185,12 @@ def get_column(self, name: str) -> Series: return super().get_column(name) # type: ignore[return-value] def lazy( - self, backend: IntoBackend[Polars | DuckDB | Ibis | Dask] | None = None + self, + backend: IntoBackend[LazyAllowed] | None = None, + *, + session: Any | None = None, ) -> LazyFrame[Any]: - return _stableify(super().lazy(backend=backend)) + return _stableify(super().lazy(backend=backend, session=session)) @overload # type: ignore[override] def to_dict(self, *, as_series: Literal[True] = ...) -> dict[str, Series[Any]]: ... diff --git a/tests/frame/lazy_test.py b/tests/frame/lazy_test.py index 46eff6dfef..9c5cc83804 100644 --- a/tests/frame/lazy_test.py +++ b/tests/frame/lazy_test.py @@ -1,5 +1,7 @@ from __future__ import annotations +import os +import re from typing import TYPE_CHECKING, Any import pytest @@ -7,13 +9,14 @@ import narwhals as nw from narwhals._utils import Implementation from narwhals.dependencies import get_cudf, get_modin +from tests.utils import assert_equal_data, pyspark_session, sqlframe_session if TYPE_CHECKING: - from narwhals._typing import Dask, DuckDB, Ibis, Polars + from narwhals._typing import LazyAllowed, SparkLike from tests.utils import ConstructorEager -data = {"a": [1, 2, 3]} +data = {"a": [1, 2, 3], "b": ["x", "y", "z"]} def test_lazy_to_default(constructor_eager: ConstructorEager) -> None: @@ -54,26 +57,56 @@ def test_lazy_to_default(constructor_eager: ConstructorEager) -> None: Implementation.DUCKDB, Implementation.DASK, Implementation.IBIS, + Implementation.PYSPARK, + Implementation.SQLFRAME, "polars", "duckdb", "dask", "ibis", + "pyspark", + "sqlframe", ], ) -def test_lazy_backend( - constructor_eager: ConstructorEager, backend: Polars | DuckDB | Ibis | Dask -) -> None: - implementation = Implementation.from_backend(backend) - pytest.importorskip(implementation.name.lower()) +def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None: + impl = Implementation.from_backend(backend) + pytest.importorskip(impl.name.lower()) + + is_spark_connect = os.environ.get("SPARK_CONNECT", None) + if is_spark_connect is not None and impl.is_pyspark(): # pragma: no cover + # Workaround for impl.name.lower() being "pyspark[connect]" for + # Implementation.PYSPARK_CONNECT, which is never installed. + impl = Implementation.PYSPARK_CONNECT + df = nw.from_native(constructor_eager(data), eager_only=True) - result = df.lazy(backend=backend) + session: Any + if impl.is_sqlframe(): + session = sqlframe_session() + elif impl.is_pyspark() or impl.is_pyspark_connect(): # pragma: no cover + session = pyspark_session() + else: + session = None + + result = df.lazy(backend=backend, session=session) assert isinstance(result, nw.LazyFrame) - assert result.implementation == implementation + assert result.implementation == impl + assert_equal_data(df.sort("a"), data) + + +@pytest.mark.parametrize("backend", ["pyspark", "sqlframe"]) +def test_lazy_spark_like_requires_session( + constructor_eager: ConstructorEager, backend: SparkLike +) -> None: + impl = Implementation.from_backend(backend) + pytest.importorskip(impl.name.lower()) + + df = nw.from_native(constructor_eager(data), eager_only=True) + + err_msg = re.escape("Spark like backends require `session` to be not None.") + with pytest.raises(ValueError, match=err_msg): + df.lazy(backend=backend, session=None) def test_lazy_backend_invalid(constructor_eager: ConstructorEager) -> None: df = nw.from_native(constructor_eager(data), eager_only=True) with pytest.raises(ValueError, match="Not-supported backend"): df.lazy(backend=Implementation.PANDAS) # type: ignore[arg-type] - with pytest.raises(ValueError, match="Not-supported backend"): - df.lazy(backend="pyspark") # type: ignore[arg-type]