From 9d092931b1ec6bfb8d7d2b3a712d96e4fe2268df Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Sat, 23 Aug 2025 17:39:03 +0200 Subject: [PATCH 01/10] feat: .lazy into spark-like backends --- docs/api-reference/expr.md | 4 +- narwhals/_arrow/dataframe.py | 43 ++++++++++++++-- narwhals/_compliant/dataframe.py | 5 +- narwhals/_duckdb/dataframe.py | 8 ++- narwhals/_ibis/dataframe.py | 8 ++- narwhals/_pandas_like/dataframe.py | 29 ++++++++++- narwhals/_polars/dataframe.py | 44 +++++++++++++++- narwhals/_typing.py | 1 - narwhals/_utils.py | 13 +++-- narwhals/dataframe.py | 29 ++++++++--- narwhals/schema.py | 40 ++++++++++++++ narwhals/stable/v1/__init__.py | 12 +++-- narwhals/stable/v2/__init__.py | 12 +++-- tests/frame/lazy_test.py | 83 +++++++++++++++++++++++++++--- 14 files changed, 288 insertions(+), 43 deletions(-) diff --git a/docs/api-reference/expr.md b/docs/api-reference/expr.md index 7e182ac000..e8d3a58bdd 100644 --- a/docs/api-reference/expr.md +++ b/docs/api-reference/expr.md @@ -9,6 +9,7 @@ - all - any - cast + - clip - count - cum_count - cum_max @@ -21,7 +22,6 @@ - exp - fill_null - filter - - clip - is_between - is_close - is_duplicated @@ -41,8 +41,8 @@ - median - min - mode - - null_count - n_unique + - null_count - over - pipe - quantile diff --git a/narwhals/_arrow/dataframe.py b/narwhals/_arrow/dataframe.py index 4de78d2792..a07a7d964b 100644 --- a/narwhals/_arrow/dataframe.py +++ b/narwhals/_arrow/dataframe.py @@ -45,6 +45,7 @@ Order, ) from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import Version, _LimitedContext @@ -527,7 +528,12 @@ def tail(self, n: int) -> Self: ) return self._with_native(df.slice(abs(n)), validate_column_names=False) - def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny: + def lazy( + self, + backend: _LazyAllowedImpl | None = None, + *, + session: SparkSession | None = None, + ) -> CompliantLazyFrameAny: if backend is None: return self if backend is Implementation.DUCKDB: @@ -535,9 +541,9 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny from narwhals._duckdb.dataframe import DuckDBLazyFrame - df = self.native # noqa: F841 + _df = self.native return DuckDBLazyFrame( - duckdb.table("df"), validate_backend_version=True, version=self._version + duckdb.table("_df"), validate_backend_version=True, version=self._version ) if backend is Implementation.POLARS: import polars as pl # ignore-banned-import @@ -559,7 +565,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny validate_backend_version=True, version=self._version, ) - if backend.is_ibis(): + if backend is Implementation.IBIS: import ibis # ignore-banned-import from narwhals._ibis.dataframe import IbisLazyFrame @@ -570,6 +576,35 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny version=self._version, ) + if backend.is_spark_like(): + from narwhals._spark_like.dataframe import SparkLikeLazyFrame + from narwhals.schema import Schema + + if session is None: + msg = "Spark like backends require `session` to be not None." + raise ValueError(msg) + spark_like_schema = Schema(self.schema)._to_spark_like( + backend=backend, session=session + ) + + can_create_from_arrow = backend in { + Implementation.PYSPARK, + Implementation.PYSPARK_CONNECT, + } and backend._backend_version() >= (4, 0, 0) + + data = ( + self.native + if can_create_from_arrow + else tuple(self.iter_rows(named=True, buffer_size=512)) + ) + + return SparkLikeLazyFrame( + session.createDataFrame(data, schema=spark_like_schema), # type: ignore[arg-type] + version=self._version, + implementation=backend, + validate_backend_version=True, + ) + raise AssertionError # pragma: no cover def collect( diff --git a/narwhals/_compliant/dataframe.py b/narwhals/_compliant/dataframe.py index 485e5f4e6a..5c01459d3c 100644 --- a/narwhals/_compliant/dataframe.py +++ b/narwhals/_compliant/dataframe.py @@ -50,6 +50,7 @@ from narwhals._compliant.group_by import CompliantGroupBy, DataFrameGroupBy from narwhals._compliant.namespace import EagerNamespace + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import Implementation, _LimitedContext @@ -199,7 +200,9 @@ def join_asof( strategy: AsofJoinStrategy, suffix: str, ) -> Self: ... - def lazy(self, backend: _LazyAllowedImpl | None) -> CompliantLazyFrameAny: ... + def lazy( + self, backend: _LazyAllowedImpl | None, *, session: SparkSession | None + ) -> CompliantLazyFrameAny: ... def pivot( self, on: Sequence[str], diff --git a/narwhals/_duckdb/dataframe.py b/narwhals/_duckdb/dataframe.py index 848bd9fe83..b8cc159f26 100644 --- a/narwhals/_duckdb/dataframe.py +++ b/narwhals/_duckdb/dataframe.py @@ -48,6 +48,7 @@ from narwhals._duckdb.group_by import DuckDBGroupBy from narwhals._duckdb.namespace import DuckDBNamespace from narwhals._duckdb.series import DuckDBInterchangeSeries + from narwhals._spark_like.utils import SparkSession from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import _LimitedContext from narwhals.dataframe import LazyFrame @@ -189,7 +190,12 @@ def drop(self, columns: Sequence[str], *, strict: bool) -> Self: selection = (name for name in self.columns if name not in columns_to_drop) return self._with_native(self.native.select(*selection)) - def lazy(self, backend: _LazyAllowedImpl | None = None) -> Self: + def lazy( + self, + backend: _LazyAllowedImpl | None = None, + *, + session: SparkSession | None = None, + ) -> Self: # The `backend`` argument has no effect but we keep it here for # backwards compatibility because in `narwhals.stable.v1` # function `.from_native()` will return a DataFrame for DuckDB. diff --git a/narwhals/_ibis/dataframe.py b/narwhals/_ibis/dataframe.py index 7d3369ca66..2a5daddd5d 100644 --- a/narwhals/_ibis/dataframe.py +++ b/narwhals/_ibis/dataframe.py @@ -34,6 +34,7 @@ from narwhals._ibis.group_by import IbisGroupBy from narwhals._ibis.namespace import IbisNamespace from narwhals._ibis.series import IbisInterchangeSeries + from narwhals._spark_like.utils import SparkSession from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import _LimitedContext from narwhals.dataframe import LazyFrame @@ -165,7 +166,12 @@ def drop(self, columns: Sequence[str], *, strict: bool) -> Self: selection = (col for col in self.columns if col not in columns_to_drop) return self._with_native(self.native.select(*selection)) - def lazy(self, backend: _LazyAllowedImpl | None = None) -> Self: + def lazy( + self, + backend: _LazyAllowedImpl | None = None, + *, + session: SparkSession | None = None, + ) -> Self: # The `backend`` argument has no effect but we keep it here for # backwards compatibility because in `narwhals.stable.v1` # function `.from_native()` will return a DataFrame for Ibis. diff --git a/narwhals/_pandas_like/dataframe.py b/narwhals/_pandas_like/dataframe.py index a5995b8145..d4d173a562 100644 --- a/narwhals/_pandas_like/dataframe.py +++ b/narwhals/_pandas_like/dataframe.py @@ -47,6 +47,7 @@ from narwhals._pandas_like.expr import PandasLikeExpr from narwhals._pandas_like.group_by import PandasLikeGroupBy from narwhals._pandas_like.namespace import PandasLikeNamespace + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import Version, _LimitedContext @@ -772,7 +773,12 @@ def unique( ) # --- lazy-only --- - def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny: + def lazy( + self, + backend: _LazyAllowedImpl | None = None, + *, + session: SparkSession | None = None, + ) -> CompliantLazyFrameAny: pandas_df = self.to_pandas() if backend is None: return self @@ -806,7 +812,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny validate_backend_version=True, version=self._version, ) - if backend.is_ibis(): + if backend is Implementation.IBIS: import ibis # ignore-banned-import from narwhals._ibis.dataframe import IbisLazyFrame @@ -816,6 +822,25 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny validate_backend_version=True, version=self._version, ) + + if backend.is_spark_like(): + from narwhals._spark_like.dataframe import SparkLikeLazyFrame + from narwhals.schema import Schema + + if session is None: + msg = "Spark like backends require `session` to be not None." + raise ValueError(msg) + + spark_like_schema = Schema(self.schema)._to_spark_like( + backend=backend, session=session + ) + return SparkLikeLazyFrame( + session.createDataFrame(pandas_df, schema=spark_like_schema), + version=self._version, + implementation=backend, + validate_backend_version=True, + ) + raise AssertionError # pragma: no cover @property diff --git a/narwhals/_polars/dataframe.py b/narwhals/_polars/dataframe.py index c9e22f48f9..01db88daac 100644 --- a/narwhals/_polars/dataframe.py +++ b/narwhals/_polars/dataframe.py @@ -40,6 +40,7 @@ from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny from narwhals._polars.expr import PolarsExpr from narwhals._polars.group_by import PolarsGroupBy, PolarsLazyGroupBy + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl from narwhals._utils import Version, _LimitedContext @@ -455,7 +456,12 @@ def iter_columns(self) -> Iterator[PolarsSeries]: for series in self.native.iter_columns(): yield PolarsSeries.from_native(series, context=self) - def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny: + def lazy( + self, + backend: _LazyAllowedImpl | None = None, + *, + session: SparkSession | None = None, + ) -> CompliantLazyFrameAny: if backend is None or backend is Implementation.POLARS: return PolarsLazyFrame.from_native(self.native.lazy(), context=self) if backend is Implementation.DUCKDB: @@ -478,7 +484,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny validate_backend_version=True, version=self._version, ) - if backend.is_ibis(): + if backend is Implementation.IBIS: import ibis # ignore-banned-import from narwhals._ibis.dataframe import IbisLazyFrame @@ -489,6 +495,40 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny version=self._version, ) + if backend.is_spark_like(): + from importlib.util import find_spec + + from narwhals._spark_like.dataframe import SparkLikeLazyFrame + from narwhals.schema import Schema + + if session is None: + msg = "Spark like backends require `session` to be not None." + raise ValueError(msg) + + # pyspark.sql requires pyarrow to be installed from v4.0.0 + can_create_from_arrow = backend in { + Implementation.PYSPARK, + Implementation.PYSPARK_CONNECT, + } and backend._backend_version() >= (4, 0, 0) + + data: Any + if can_create_from_arrow: + data = self.to_arrow() + elif find_spec("pandas") is not None: + data = self.to_pandas() + else: + data = self.iter_rows(named=True) + + spark_like_schema = Schema(self.schema)._to_spark_like( + backend=backend, session=session + ) + return SparkLikeLazyFrame( + session.createDataFrame(data, schema=spark_like_schema), + version=self._version, + implementation=backend, + validate_backend_version=True, + ) + raise AssertionError # pragma: no cover @overload diff --git a/narwhals/_typing.py b/narwhals/_typing.py index 07ffb01a64..d8bc1408ae 100644 --- a/narwhals/_typing.py +++ b/narwhals/_typing.py @@ -51,7 +51,6 @@ # - https://github.com/narwhals-dev/narwhals/pull/2971#discussion_r2277137003 # - https://github.com/narwhals-dev/narwhals/pull/3002#issuecomment-3194267667 _LazyFrameCollectImpl: TypeAlias = Literal[_PandasImpl, _PolarsImpl, _ArrowImpl] # noqa: PYI047 -_DataFrameLazyImpl: TypeAlias = Literal[_PolarsImpl, _DaskImpl, _DuckDBImpl, _IbisImpl] # noqa: PYI047 # `str | Implementation` aliases Pandas: TypeAlias = Literal[_Pandas, _PandasImpl] diff --git a/narwhals/_utils.py b/narwhals/_utils.py index 0d7aa2d13d..17ffdf02b7 100644 --- a/narwhals/_utils.py +++ b/narwhals/_utils.py @@ -79,8 +79,8 @@ from narwhals._typing import ( Backend, IntoBackend, - _DataFrameLazyImpl, _EagerAllowedImpl, + _LazyAllowedImpl, _LazyFrameCollectImpl, ) from narwhals.dataframe import DataFrame, LazyFrame @@ -1648,9 +1648,9 @@ def is_compliant_expr( def is_eager_allowed(impl: Implementation, /) -> TypeIs[_EagerAllowedImpl]: """Return True if `impl` allows eager operations.""" return impl in { - Implementation.PANDAS, - Implementation.MODIN, Implementation.CUDF, + Implementation.MODIN, + Implementation.PANDAS, Implementation.POLARS, Implementation.PYARROW, } @@ -1661,13 +1661,16 @@ def can_lazyframe_collect(impl: Implementation, /) -> TypeIs[_LazyFrameCollectIm return impl in {Implementation.PANDAS, Implementation.POLARS, Implementation.PYARROW} -def can_dataframe_lazy(impl: Implementation, /) -> TypeIs[_DataFrameLazyImpl]: +def can_dataframe_lazy(impl: Implementation, /) -> TypeIs[_LazyAllowedImpl]: """Return True if `DataFrame.lazy(impl)` is allowed.""" return impl in { Implementation.DASK, Implementation.DUCKDB, - Implementation.POLARS, Implementation.IBIS, + Implementation.POLARS, + Implementation.PYSPARK, + Implementation.PYSPARK_CONNECT, + Implementation.SQLFRAME, } diff --git a/narwhals/dataframe.py b/narwhals/dataframe.py index 52be7570e4..8e3d4a1bc9 100644 --- a/narwhals/dataframe.py +++ b/narwhals/dataframe.py @@ -22,7 +22,7 @@ check_expressions_preserve_length, is_scalar_like, ) -from narwhals._typing import Arrow, Pandas, _DataFrameLazyImpl, _LazyFrameCollectImpl +from narwhals._typing import Arrow, Pandas, _LazyAllowedImpl, _LazyFrameCollectImpl from narwhals._utils import ( Implementation, Version, @@ -71,8 +71,9 @@ from narwhals._compliant import CompliantDataFrame, CompliantLazyFrame from narwhals._compliant.typing import CompliantExprAny, EagerNamespaceAny + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable - from narwhals._typing import Dask, DuckDB, EagerAllowed, Ibis, IntoBackend, Polars + from narwhals._typing import EagerAllowed, IntoBackend, LazyAllowed, Polars from narwhals.group_by import GroupBy, LazyGroupBy from narwhals.typing import ( AsofJoinStrategy, @@ -724,7 +725,10 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: return pa_table.__arrow_c_stream__(requested_schema=requested_schema) # type: ignore[no-untyped-call] def lazy( - self, backend: IntoBackend[Polars | DuckDB | Ibis | Dask] | None = None + self, + backend: IntoBackend[LazyAllowed] | None = None, + *, + session: SparkSession | None = None, ) -> LazyFrame[Any]: """Restrict available API methods to lazy-only ones. @@ -736,6 +740,18 @@ def lazy( if you want to ensure that you write dataframe-agnostic code which all has the possibility of running entirely lazily. + Note: + Spark like backends require a session object to be passed. + + For instance: + + ```py + import narwhals as nw + from sqlframe.duckdb import DuckDBSession + + df.lazy(backend=nw.Implementation.SQLFRAME, session=DuckDBSession()) + ``` + Arguments: backend: Which lazy backend collect to. This will be the underlying backend for the resulting Narwhals LazyFrame. If not specified, and the @@ -748,6 +764,7 @@ def lazy( `IBIS` or `POLARS`. - As a string: `"dask"`, `"duckdb"`, `"ibis"` or `"polars"` - Directly as a module `dask.dataframe`, `duckdb`, `ibis` or `polars`. + session: Spark-like session to be used if backend is spark-like. Examples: >>> import polars as pl @@ -783,11 +800,11 @@ def lazy( """ lazy = self._compliant_frame.lazy if backend is None: - return self._lazyframe(lazy(None), level="lazy") + return self._lazyframe(lazy(None, session=session), level="lazy") lazy_backend = Implementation.from_backend(backend) if can_dataframe_lazy(lazy_backend): - return self._lazyframe(lazy(lazy_backend), level="lazy") - msg = f"Not-supported backend.\n\nExpected one of {get_args(_DataFrameLazyImpl)} or `None`, got {lazy_backend}" + return self._lazyframe(lazy(lazy_backend, session=session), level="lazy") + msg = f"Not-supported backend.\n\nExpected one of {get_args(_LazyAllowedImpl)} or `None`, got {lazy_backend}" raise ValueError(msg) def to_native(self) -> DataFrameT: diff --git a/narwhals/schema.py b/narwhals/schema.py index a34379983d..11db2244e2 100644 --- a/narwhals/schema.py +++ b/narwhals/schema.py @@ -18,7 +18,10 @@ import polars as pl import pyarrow as pa + import sqlframe.base.types as sqlframe_types + from narwhals._spark_like.utils import SparkSession + from narwhals._typing import _SparkLikeImpl from narwhals.dtypes import DType from narwhals.typing import DTypeBackend @@ -171,3 +174,40 @@ def to_polars(self) -> pl.Schema: if pl_version >= (1, 0, 0) else cast("pl.Schema", dict(schema)) ) + + def to_pyspark(self, *, session: SparkSession) -> sqlframe_types.StructType: + return self._to_spark_like(backend=Implementation.PYSPARK, session=session) + + def to_pyspark_connect(self, *, session: SparkSession) -> sqlframe_types.StructType: + return self._to_spark_like( + backend=Implementation.PYSPARK_CONNECT, session=session + ) + + def to_sqlframe(self, *, session: SparkSession) -> sqlframe_types.StructType: + return self._to_spark_like(backend=Implementation.SQLFRAME, session=session) + + def _to_spark_like( + self, *, backend: _SparkLikeImpl, session: SparkSession + ) -> sqlframe_types.StructType: + from narwhals._spark_like.utils import ( + import_native_dtypes, + narwhals_to_native_dtype as narwhals_to_spark_like_dtype, + ) + + version = self._version + spark_dtypes = import_native_dtypes(backend) + StructType = spark_dtypes.StructType # noqa: N806 + StructField = spark_dtypes.StructField # noqa: N806 + + _narwhals_to_spark_like_dtype = partial( + narwhals_to_spark_like_dtype, + version=version, + spark_types=spark_dtypes, + session=session, + ) + return StructType( # type: ignore[no-any-return] + [ + StructField(name, _narwhals_to_spark_like_dtype(nw_dtype), True) + for name, nw_dtype in self.items() + ] + ) diff --git a/narwhals/stable/v1/__init__.py b/narwhals/stable/v1/__init__.py index 6d3fe30367..9341ecf8fd 100644 --- a/narwhals/stable/v1/__init__.py +++ b/narwhals/stable/v1/__init__.py @@ -68,15 +68,14 @@ from typing_extensions import ParamSpec, Self + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import ( Arrow, Backend, - Dask, - DuckDB, EagerAllowed, - Ibis, IntoBackend, + LazyAllowed, Pandas, Polars, ) @@ -193,9 +192,12 @@ def get_column(self, name: str) -> Series: return super().get_column(name) # type: ignore[return-value] def lazy( - self, backend: IntoBackend[Polars | DuckDB | Ibis | Dask] | None = None + self, + backend: IntoBackend[LazyAllowed] | None = None, + *, + session: SparkSession | None = None, ) -> LazyFrame[Any]: - return _stableify(super().lazy(backend=backend)) + return _stableify(super().lazy(backend=backend, session=session)) @overload # type: ignore[override] def to_dict(self, *, as_series: Literal[True] = ...) -> dict[str, Series[Any]]: ... diff --git a/narwhals/stable/v2/__init__.py b/narwhals/stable/v2/__init__.py index fe8f7a71b0..ef531d5c7d 100644 --- a/narwhals/stable/v2/__init__.py +++ b/narwhals/stable/v2/__init__.py @@ -63,15 +63,14 @@ from typing_extensions import ParamSpec, Self + from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import ( Arrow, Backend, - Dask, - DuckDB, EagerAllowed, - Ibis, IntoBackend, + LazyAllowed, Pandas, Polars, ) @@ -187,9 +186,12 @@ def get_column(self, name: str) -> Series: return super().get_column(name) # type: ignore[return-value] def lazy( - self, backend: IntoBackend[Polars | DuckDB | Ibis | Dask] | None = None + self, + backend: IntoBackend[LazyAllowed] | None = None, + *, + session: SparkSession | None = None, ) -> LazyFrame[Any]: - return _stableify(super().lazy(backend=backend)) + return _stableify(super().lazy(backend=backend, session=session)) @overload # type: ignore[override] def to_dict(self, *, as_series: Literal[True] = ...) -> dict[str, Series[Any]]: ... diff --git a/tests/frame/lazy_test.py b/tests/frame/lazy_test.py index 46eff6dfef..8eecddb13a 100644 --- a/tests/frame/lazy_test.py +++ b/tests/frame/lazy_test.py @@ -1,19 +1,23 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +import os +import re +from typing import TYPE_CHECKING, Any, cast import pytest import narwhals as nw from narwhals._utils import Implementation from narwhals.dependencies import get_cudf, get_modin +from tests.utils import assert_equal_data if TYPE_CHECKING: + from narwhals._spark_like.utils import SparkSession from narwhals._typing import Dask, DuckDB, Ibis, Polars from tests.utils import ConstructorEager -data = {"a": [1, 2, 3]} +data = {"a": [1, 2, 3], "b": ["x", "y", "z"]} def test_lazy_to_default(constructor_eager: ConstructorEager) -> None: @@ -60,20 +64,83 @@ def test_lazy_to_default(constructor_eager: ConstructorEager) -> None: "ibis", ], ) -def test_lazy_backend( +def test_lazy_backend_non_spark_like( constructor_eager: ConstructorEager, backend: Polars | DuckDB | Ibis | Dask ) -> None: - implementation = Implementation.from_backend(backend) - pytest.importorskip(implementation.name.lower()) + impl = Implementation.from_backend(backend) + pytest.importorskip(impl.name.lower()) df = nw.from_native(constructor_eager(data), eager_only=True) result = df.lazy(backend=backend) assert isinstance(result, nw.LazyFrame) - assert result.implementation == implementation + assert result.implementation == impl + if ( + impl.is_duckdb() + and df.implementation.is_pandas() + and df.implementation._backend_version() >= (3, 0, 0) + ): + # Reason: https://github.com/duckdb/duckdb/issues/18297 + # > duckdb.duckdb.NotImplementedException: Not implemented Error: Data type 'str' not recognized + return + + assert_equal_data(df.sort("a"), data) + + +@pytest.mark.parametrize( + "backend", + [ + Implementation.PYSPARK, + Implementation.PYSPARK_CONNECT, + Implementation.SQLFRAME, + "pyspark", + "pyspark[connect]", + "sqlframe", + ], +) +def test_lazy_backend_spark_like( + constructor_eager: ConstructorEager, backend: Polars | DuckDB | Ibis | Dask +) -> None: + impl = Implementation.from_backend(backend) + pytest.importorskip(impl.name.lower()) + + session: SparkSession + if impl.is_sqlframe(): + from sqlframe.duckdb import DuckDBSession + + session = DuckDBSession() + else: + if is_spark_connect := os.environ.get("SPARK_CONNECT", None): + from pyspark.sql.connect.session import SparkSession as PySparkSession + else: + from pyspark.sql import SparkSession as PySparkSession + + builder = cast("PySparkSession.Builder", PySparkSession.builder).appName( + "unit-tests" + ) + session = ( # pyright: ignore[reportAssignmentType] + ( + builder.remote(f"sc://localhost:{os.environ.get('SPARK_PORT', '15002')}") + if is_spark_connect + else builder.master("local[1]").config("spark.ui.enabled", "false") + ) + .config("spark.default.parallelism", "1") + .config("spark.sql.shuffle.partitions", "2") + # common timezone for all tests environments + .config("spark.sql.session.timeZone", "UTC") + .getOrCreate() + ) + + df = nw.from_native(constructor_eager(data), eager_only=True) + result = df.lazy(backend=backend, session=session) + assert isinstance(result, nw.LazyFrame) + assert result.implementation == impl + assert_equal_data(df.sort("a"), data) + + err_msg = re.escape("Spark like backends require `session` to be not None.") + with pytest.raises(ValueError, match=err_msg): + result = df.lazy(backend=backend, session=None) def test_lazy_backend_invalid(constructor_eager: ConstructorEager) -> None: df = nw.from_native(constructor_eager(data), eager_only=True) with pytest.raises(ValueError, match="Not-supported backend"): df.lazy(backend=Implementation.PANDAS) # type: ignore[arg-type] - with pytest.raises(ValueError, match="Not-supported backend"): - df.lazy(backend="pyspark") # type: ignore[arg-type] From 2c47cbfa25877f942ff7ddbea38f7feb2f6ac59b Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Sat, 23 Aug 2025 19:20:41 +0200 Subject: [PATCH 02/10] simplify: remove schema conversion and let spark handle it --- narwhals/_arrow/dataframe.py | 14 ++++++----- narwhals/_pandas_like/dataframe.py | 6 +---- narwhals/_polars/dataframe.py | 20 +++++++-------- narwhals/schema.py | 40 ------------------------------ 4 files changed, 18 insertions(+), 62 deletions(-) diff --git a/narwhals/_arrow/dataframe.py b/narwhals/_arrow/dataframe.py index a07a7d964b..f53a335767 100644 --- a/narwhals/_arrow/dataframe.py +++ b/narwhals/_arrow/dataframe.py @@ -577,29 +577,31 @@ def lazy( ) if backend.is_spark_like(): + from importlib.util import find_spec + from narwhals._spark_like.dataframe import SparkLikeLazyFrame - from narwhals.schema import Schema if session is None: msg = "Spark like backends require `session` to be not None." raise ValueError(msg) - spark_like_schema = Schema(self.schema)._to_spark_like( - backend=backend, session=session - ) + # pyspark.sql requires pyarrow to be installed from v4.0.0 can_create_from_arrow = backend in { Implementation.PYSPARK, Implementation.PYSPARK_CONNECT, } and backend._backend_version() >= (4, 0, 0) + is_pandas_installed = find_spec("pandas") is not None - data = ( + data: Any = ( self.native if can_create_from_arrow + else self.to_pandas() + if is_pandas_installed else tuple(self.iter_rows(named=True, buffer_size=512)) ) return SparkLikeLazyFrame( - session.createDataFrame(data, schema=spark_like_schema), # type: ignore[arg-type] + session.createDataFrame(data), # type: ignore[arg-type] version=self._version, implementation=backend, validate_backend_version=True, diff --git a/narwhals/_pandas_like/dataframe.py b/narwhals/_pandas_like/dataframe.py index d4d173a562..0b19a63562 100644 --- a/narwhals/_pandas_like/dataframe.py +++ b/narwhals/_pandas_like/dataframe.py @@ -825,17 +825,13 @@ def lazy( if backend.is_spark_like(): from narwhals._spark_like.dataframe import SparkLikeLazyFrame - from narwhals.schema import Schema if session is None: msg = "Spark like backends require `session` to be not None." raise ValueError(msg) - spark_like_schema = Schema(self.schema)._to_spark_like( - backend=backend, session=session - ) return SparkLikeLazyFrame( - session.createDataFrame(pandas_df, schema=spark_like_schema), + session.createDataFrame(pandas_df), version=self._version, implementation=backend, validate_backend_version=True, diff --git a/narwhals/_polars/dataframe.py b/narwhals/_polars/dataframe.py index 01db88daac..9817838d53 100644 --- a/narwhals/_polars/dataframe.py +++ b/narwhals/_polars/dataframe.py @@ -499,7 +499,6 @@ def lazy( from importlib.util import find_spec from narwhals._spark_like.dataframe import SparkLikeLazyFrame - from narwhals.schema import Schema if session is None: msg = "Spark like backends require `session` to be not None." @@ -511,19 +510,18 @@ def lazy( Implementation.PYSPARK_CONNECT, } and backend._backend_version() >= (4, 0, 0) - data: Any - if can_create_from_arrow: - data = self.to_arrow() - elif find_spec("pandas") is not None: - data = self.to_pandas() - else: - data = self.iter_rows(named=True) + is_pandas_installed = find_spec("pandas") is not None - spark_like_schema = Schema(self.schema)._to_spark_like( - backend=backend, session=session + data: Any = ( + self.to_arrow() + if can_create_from_arrow + else self.to_pandas() + if is_pandas_installed + else tuple(self.iter_rows(named=True)) ) + return SparkLikeLazyFrame( - session.createDataFrame(data, schema=spark_like_schema), + session.createDataFrame(data), version=self._version, implementation=backend, validate_backend_version=True, diff --git a/narwhals/schema.py b/narwhals/schema.py index 11db2244e2..a34379983d 100644 --- a/narwhals/schema.py +++ b/narwhals/schema.py @@ -18,10 +18,7 @@ import polars as pl import pyarrow as pa - import sqlframe.base.types as sqlframe_types - from narwhals._spark_like.utils import SparkSession - from narwhals._typing import _SparkLikeImpl from narwhals.dtypes import DType from narwhals.typing import DTypeBackend @@ -174,40 +171,3 @@ def to_polars(self) -> pl.Schema: if pl_version >= (1, 0, 0) else cast("pl.Schema", dict(schema)) ) - - def to_pyspark(self, *, session: SparkSession) -> sqlframe_types.StructType: - return self._to_spark_like(backend=Implementation.PYSPARK, session=session) - - def to_pyspark_connect(self, *, session: SparkSession) -> sqlframe_types.StructType: - return self._to_spark_like( - backend=Implementation.PYSPARK_CONNECT, session=session - ) - - def to_sqlframe(self, *, session: SparkSession) -> sqlframe_types.StructType: - return self._to_spark_like(backend=Implementation.SQLFRAME, session=session) - - def _to_spark_like( - self, *, backend: _SparkLikeImpl, session: SparkSession - ) -> sqlframe_types.StructType: - from narwhals._spark_like.utils import ( - import_native_dtypes, - narwhals_to_native_dtype as narwhals_to_spark_like_dtype, - ) - - version = self._version - spark_dtypes = import_native_dtypes(backend) - StructType = spark_dtypes.StructType # noqa: N806 - StructField = spark_dtypes.StructField # noqa: N806 - - _narwhals_to_spark_like_dtype = partial( - narwhals_to_spark_like_dtype, - version=version, - spark_types=spark_dtypes, - session=session, - ) - return StructType( # type: ignore[no-any-return] - [ - StructField(name, _narwhals_to_spark_like_dtype(nw_dtype), True) - for name, nw_dtype in self.items() - ] - ) From 747cbd802a2ca560b4641e16c00373b6b98f979e Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Sat, 23 Aug 2025 19:20:58 +0200 Subject: [PATCH 03/10] simplify test --- tests/frame/lazy_test.py | 62 +++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/tests/frame/lazy_test.py b/tests/frame/lazy_test.py index 8eecddb13a..54ec8e3c85 100644 --- a/tests/frame/lazy_test.py +++ b/tests/frame/lazy_test.py @@ -13,7 +13,7 @@ if TYPE_CHECKING: from narwhals._spark_like.utils import SparkSession - from narwhals._typing import Dask, DuckDB, Ibis, Polars + from narwhals._typing import LazyAllowed from tests.utils import ConstructorEager @@ -58,21 +58,26 @@ def test_lazy_to_default(constructor_eager: ConstructorEager) -> None: Implementation.DUCKDB, Implementation.DASK, Implementation.IBIS, + Implementation.PYSPARK, + Implementation.SQLFRAME, "polars", "duckdb", "dask", "ibis", + "pyspark", + "sqlframe", ], ) -def test_lazy_backend_non_spark_like( - constructor_eager: ConstructorEager, backend: Polars | DuckDB | Ibis | Dask -) -> None: +def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None: impl = Implementation.from_backend(backend) pytest.importorskip(impl.name.lower()) + + if ( + is_spark_connect := os.environ.get("SPARK_CONNECT", None) + ) is not None and impl.is_pyspark(): + impl = Implementation.PYSPARK_CONNECT + df = nw.from_native(constructor_eager(data), eager_only=True) - result = df.lazy(backend=backend) - assert isinstance(result, nw.LazyFrame) - assert result.implementation == impl if ( impl.is_duckdb() and df.implementation.is_pandas() @@ -82,33 +87,13 @@ def test_lazy_backend_non_spark_like( # > duckdb.duckdb.NotImplementedException: Not implemented Error: Data type 'str' not recognized return - assert_equal_data(df.sort("a"), data) - - -@pytest.mark.parametrize( - "backend", - [ - Implementation.PYSPARK, - Implementation.PYSPARK_CONNECT, - Implementation.SQLFRAME, - "pyspark", - "pyspark[connect]", - "sqlframe", - ], -) -def test_lazy_backend_spark_like( - constructor_eager: ConstructorEager, backend: Polars | DuckDB | Ibis | Dask -) -> None: - impl = Implementation.from_backend(backend) - pytest.importorskip(impl.name.lower()) - - session: SparkSession - if impl.is_sqlframe(): + session: SparkSession | None + if impl is Implementation.SQLFRAME: from sqlframe.duckdb import DuckDBSession session = DuckDBSession() - else: - if is_spark_connect := os.environ.get("SPARK_CONNECT", None): + elif impl in {Implementation.PYSPARK, Implementation.PYSPARK_CONNECT}: + if is_spark_connect: from pyspark.sql.connect.session import SparkSession as PySparkSession else: from pyspark.sql import SparkSession as PySparkSession @@ -128,16 +113,27 @@ def test_lazy_backend_spark_like( .config("spark.sql.session.timeZone", "UTC") .getOrCreate() ) + else: + session = None - df = nw.from_native(constructor_eager(data), eager_only=True) result = df.lazy(backend=backend, session=session) assert isinstance(result, nw.LazyFrame) assert result.implementation == impl assert_equal_data(df.sort("a"), data) + +@pytest.mark.parametrize("backend", ["pyspark", "sqlframe"]) +def test_lazy_spark_like_requires_session( + constructor_eager: ConstructorEager, backend: LazyAllowed +) -> None: + impl = Implementation.from_backend(backend) + pytest.importorskip(impl.name.lower()) + + df = nw.from_native(constructor_eager(data), eager_only=True) + err_msg = re.escape("Spark like backends require `session` to be not None.") with pytest.raises(ValueError, match=err_msg): - result = df.lazy(backend=backend, session=None) + df.lazy(backend=backend, session=None) def test_lazy_backend_invalid(constructor_eager: ConstructorEager) -> None: From 4a26eff5063f17ba28318c955c007f55156dfea3 Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Sat, 23 Aug 2025 19:31:55 +0200 Subject: [PATCH 04/10] pragma no cover test branches --- narwhals/_arrow/dataframe.py | 2 +- tests/frame/lazy_test.py | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/narwhals/_arrow/dataframe.py b/narwhals/_arrow/dataframe.py index f53a335767..8007330034 100644 --- a/narwhals/_arrow/dataframe.py +++ b/narwhals/_arrow/dataframe.py @@ -601,7 +601,7 @@ def lazy( ) return SparkLikeLazyFrame( - session.createDataFrame(data), # type: ignore[arg-type] + session.createDataFrame(data), version=self._version, implementation=backend, validate_backend_version=True, diff --git a/tests/frame/lazy_test.py b/tests/frame/lazy_test.py index 54ec8e3c85..3bbc8057e3 100644 --- a/tests/frame/lazy_test.py +++ b/tests/frame/lazy_test.py @@ -72,9 +72,8 @@ def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None impl = Implementation.from_backend(backend) pytest.importorskip(impl.name.lower()) - if ( - is_spark_connect := os.environ.get("SPARK_CONNECT", None) - ) is not None and impl.is_pyspark(): + is_spark_connect = os.environ.get("SPARK_CONNECT", None) + if is_spark_connect is not None and impl.is_pyspark(): # pragma: no cover impl = Implementation.PYSPARK_CONNECT df = nw.from_native(constructor_eager(data), eager_only=True) @@ -82,7 +81,7 @@ def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None impl.is_duckdb() and df.implementation.is_pandas() and df.implementation._backend_version() >= (3, 0, 0) - ): + ): # pragma: no cover # Reason: https://github.com/duckdb/duckdb/issues/18297 # > duckdb.duckdb.NotImplementedException: Not implemented Error: Data type 'str' not recognized return @@ -92,7 +91,10 @@ def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None from sqlframe.duckdb import DuckDBSession session = DuckDBSession() - elif impl in {Implementation.PYSPARK, Implementation.PYSPARK_CONNECT}: + elif impl in { + Implementation.PYSPARK, + Implementation.PYSPARK_CONNECT, + }: # pragma: no cover if is_spark_connect: from pyspark.sql.connect.session import SparkSession as PySparkSession else: From 97dae1f9b375c8240554af935e9c34f562d9766c Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Sat, 23 Aug 2025 20:13:19 +0200 Subject: [PATCH 05/10] factor out from_compliant_dataframe --- narwhals/_arrow/dataframe.py | 24 ++------------------- narwhals/_polars/dataframe.py | 31 ++++++-------------------- narwhals/_spark_like/dataframe.py | 36 +++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 47 deletions(-) diff --git a/narwhals/_arrow/dataframe.py b/narwhals/_arrow/dataframe.py index 8007330034..69da8726f8 100644 --- a/narwhals/_arrow/dataframe.py +++ b/narwhals/_arrow/dataframe.py @@ -577,34 +577,14 @@ def lazy( ) if backend.is_spark_like(): - from importlib.util import find_spec - from narwhals._spark_like.dataframe import SparkLikeLazyFrame if session is None: msg = "Spark like backends require `session` to be not None." raise ValueError(msg) - # pyspark.sql requires pyarrow to be installed from v4.0.0 - can_create_from_arrow = backend in { - Implementation.PYSPARK, - Implementation.PYSPARK_CONNECT, - } and backend._backend_version() >= (4, 0, 0) - is_pandas_installed = find_spec("pandas") is not None - - data: Any = ( - self.native - if can_create_from_arrow - else self.to_pandas() - if is_pandas_installed - else tuple(self.iter_rows(named=True, buffer_size=512)) - ) - - return SparkLikeLazyFrame( - session.createDataFrame(data), - version=self._version, - implementation=backend, - validate_backend_version=True, + return SparkLikeLazyFrame._from_compliant_dataframe( + self, session=session, implementation=backend, version=self._version ) raise AssertionError # pragma: no cover diff --git a/narwhals/_polars/dataframe.py b/narwhals/_polars/dataframe.py index 9817838d53..df79180c9a 100644 --- a/narwhals/_polars/dataframe.py +++ b/narwhals/_polars/dataframe.py @@ -469,10 +469,9 @@ def lazy( from narwhals._duckdb.dataframe import DuckDBLazyFrame - # NOTE: (F841) is a false positive - df = self.native # noqa: F841 + _df = self.native return DuckDBLazyFrame( - duckdb.table("df"), validate_backend_version=True, version=self._version + duckdb.table("_df"), validate_backend_version=True, version=self._version ) if backend is Implementation.DASK: import dask.dataframe as dd # ignore-banned-import @@ -496,35 +495,17 @@ def lazy( ) if backend.is_spark_like(): - from importlib.util import find_spec - from narwhals._spark_like.dataframe import SparkLikeLazyFrame if session is None: msg = "Spark like backends require `session` to be not None." raise ValueError(msg) - # pyspark.sql requires pyarrow to be installed from v4.0.0 - can_create_from_arrow = backend in { - Implementation.PYSPARK, - Implementation.PYSPARK_CONNECT, - } and backend._backend_version() >= (4, 0, 0) - - is_pandas_installed = find_spec("pandas") is not None - - data: Any = ( - self.to_arrow() - if can_create_from_arrow - else self.to_pandas() - if is_pandas_installed - else tuple(self.iter_rows(named=True)) - ) - - return SparkLikeLazyFrame( - session.createDataFrame(data), - version=self._version, + return SparkLikeLazyFrame._from_compliant_dataframe( + self, # pyright: ignore[reportArgumentType] + session=session, implementation=backend, - validate_backend_version=True, + version=self._version, ) raise AssertionError # pragma: no cover diff --git a/narwhals/_spark_like/dataframe.py b/narwhals/_spark_like/dataframe.py index c1ab4ec1fb..a1bd756c74 100644 --- a/narwhals/_spark_like/dataframe.py +++ b/narwhals/_spark_like/dataframe.py @@ -42,6 +42,7 @@ from narwhals._spark_like.expr import SparkLikeExpr from narwhals._spark_like.group_by import SparkLikeLazyGroupBy from narwhals._spark_like.namespace import SparkLikeNamespace + from narwhals._spark_like.utils import SparkSession from narwhals._typing import _EagerAllowedImpl from narwhals._utils import Version, _LimitedContext from narwhals.dataframe import LazyFrame @@ -561,6 +562,41 @@ def with_row_index(self, name: str, order_by: Sequence[str]) -> Self: def sink_parquet(self, file: str | Path | BytesIO) -> None: self.native.write.parquet(file) + @classmethod + def _from_compliant_dataframe( + cls, + compliant_frame: CompliantDataFrameAny, + /, + *, + session: SparkSession, + implementation: Implementation, + version: Version, + ) -> SparkLikeLazyFrame: + from importlib.util import find_spec + + # pyspark.sql requires pyarrow to be installed from v4.0.0 + can_create_from_arrow = implementation in { + Implementation.PYSPARK, + Implementation.PYSPARK_CONNECT, + } and implementation._backend_version() >= (4, 0, 0) + + is_pandas_installed = find_spec("pandas") is not None + + data: Any = ( + compliant_frame.to_arrow() + if can_create_from_arrow + else compliant_frame.to_pandas() + if is_pandas_installed + else tuple(compliant_frame.iter_rows(named=True, buffer_size=512)) + ) + + return cls( + session.createDataFrame(data), + version=version, + implementation=implementation, + validate_backend_version=True, + ) + gather_every = not_implemented.deprecated( "`LazyFrame.gather_every` is deprecated and will be removed in a future version." ) From c4333f1c4a1b5219fa69e49f9bc2d5449624f691 Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Sun, 24 Aug 2025 00:16:22 +0200 Subject: [PATCH 06/10] simplify and 'session: Any' at top level --- narwhals/dataframe.py | 7 +++---- tests/frame/lazy_test.py | 38 ++++++++------------------------------ 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/narwhals/dataframe.py b/narwhals/dataframe.py index 8e3d4a1bc9..aaf900ea60 100644 --- a/narwhals/dataframe.py +++ b/narwhals/dataframe.py @@ -71,7 +71,6 @@ from narwhals._compliant import CompliantDataFrame, CompliantLazyFrame from narwhals._compliant.typing import CompliantExprAny, EagerNamespaceAny - from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import EagerAllowed, IntoBackend, LazyAllowed, Polars from narwhals.group_by import GroupBy, LazyGroupBy @@ -728,7 +727,7 @@ def lazy( self, backend: IntoBackend[LazyAllowed] | None = None, *, - session: SparkSession | None = None, + session: Any | None = None, ) -> LazyFrame[Any]: """Restrict available API methods to lazy-only ones. @@ -741,7 +740,7 @@ def lazy( the possibility of running entirely lazily. Note: - Spark like backends require a session object to be passed. + If `backend` is spark-like, then a valid `session` is required. For instance: @@ -764,7 +763,7 @@ def lazy( `IBIS` or `POLARS`. - As a string: `"dask"`, `"duckdb"`, `"ibis"` or `"polars"` - Directly as a module `dask.dataframe`, `duckdb`, `ibis` or `polars`. - session: Spark-like session to be used if backend is spark-like. + session: Session to be used if backend is spark-like. Examples: >>> import polars as pl diff --git a/tests/frame/lazy_test.py b/tests/frame/lazy_test.py index 3bbc8057e3..4af449b003 100644 --- a/tests/frame/lazy_test.py +++ b/tests/frame/lazy_test.py @@ -2,14 +2,14 @@ import os import re -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any import pytest import narwhals as nw from narwhals._utils import Implementation from narwhals.dependencies import get_cudf, get_modin -from tests.utils import assert_equal_data +from tests.utils import assert_equal_data, pyspark_session, sqlframe_session if TYPE_CHECKING: from narwhals._spark_like.utils import SparkSession @@ -74,6 +74,8 @@ def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None is_spark_connect = os.environ.get("SPARK_CONNECT", None) if is_spark_connect is not None and impl.is_pyspark(): # pragma: no cover + # Workaround for impl.name.lower() being "pyspark[connect]" for + # Implementation.PYSPARK_CONNECT, which is never installed. impl = Implementation.PYSPARK_CONNECT df = nw.from_native(constructor_eager(data), eager_only=True) @@ -87,34 +89,10 @@ def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None return session: SparkSession | None - if impl is Implementation.SQLFRAME: - from sqlframe.duckdb import DuckDBSession - - session = DuckDBSession() - elif impl in { - Implementation.PYSPARK, - Implementation.PYSPARK_CONNECT, - }: # pragma: no cover - if is_spark_connect: - from pyspark.sql.connect.session import SparkSession as PySparkSession - else: - from pyspark.sql import SparkSession as PySparkSession - - builder = cast("PySparkSession.Builder", PySparkSession.builder).appName( - "unit-tests" - ) - session = ( # pyright: ignore[reportAssignmentType] - ( - builder.remote(f"sc://localhost:{os.environ.get('SPARK_PORT', '15002')}") - if is_spark_connect - else builder.master("local[1]").config("spark.ui.enabled", "false") - ) - .config("spark.default.parallelism", "1") - .config("spark.sql.shuffle.partitions", "2") - # common timezone for all tests environments - .config("spark.sql.session.timeZone", "UTC") - .getOrCreate() - ) + if impl.is_sqlframe(): + session = sqlframe_session() + elif impl.is_pyspark() or impl.is_pyspark_connect(): # pragma: no cover + session = pyspark_session() # pyright: ignore[reportAssignmentType] else: session = None From eda88aa66271d42e1e3f14df282195a7fa21179c Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Wed, 27 Aug 2025 08:53:45 +0200 Subject: [PATCH 07/10] adjust for feedback --- narwhals/_duckdb/dataframe.py | 10 ++-------- narwhals/_ibis/dataframe.py | 10 ++-------- narwhals/_spark_like/dataframe.py | 27 +++++++++++---------------- narwhals/_utils.py | 2 +- narwhals/dataframe.py | 4 ++-- tests/frame/lazy_test.py | 18 ++++++++++++------ 6 files changed, 30 insertions(+), 41 deletions(-) diff --git a/narwhals/_duckdb/dataframe.py b/narwhals/_duckdb/dataframe.py index b8cc159f26..9c55d9dd22 100644 --- a/narwhals/_duckdb/dataframe.py +++ b/narwhals/_duckdb/dataframe.py @@ -48,8 +48,7 @@ from narwhals._duckdb.group_by import DuckDBGroupBy from narwhals._duckdb.namespace import DuckDBNamespace from narwhals._duckdb.series import DuckDBInterchangeSeries - from narwhals._spark_like.utils import SparkSession - from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl + from narwhals._typing import _EagerAllowedImpl from narwhals._utils import _LimitedContext from narwhals.dataframe import LazyFrame from narwhals.dtypes import DType @@ -190,12 +189,7 @@ def drop(self, columns: Sequence[str], *, strict: bool) -> Self: selection = (name for name in self.columns if name not in columns_to_drop) return self._with_native(self.native.select(*selection)) - def lazy( - self, - backend: _LazyAllowedImpl | None = None, - *, - session: SparkSession | None = None, - ) -> Self: + def lazy(self, backend: None = None, **_: None) -> Self: # The `backend`` argument has no effect but we keep it here for # backwards compatibility because in `narwhals.stable.v1` # function `.from_native()` will return a DataFrame for DuckDB. diff --git a/narwhals/_ibis/dataframe.py b/narwhals/_ibis/dataframe.py index 2a5daddd5d..e976d54729 100644 --- a/narwhals/_ibis/dataframe.py +++ b/narwhals/_ibis/dataframe.py @@ -34,8 +34,7 @@ from narwhals._ibis.group_by import IbisGroupBy from narwhals._ibis.namespace import IbisNamespace from narwhals._ibis.series import IbisInterchangeSeries - from narwhals._spark_like.utils import SparkSession - from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl + from narwhals._typing import _EagerAllowedImpl from narwhals._utils import _LimitedContext from narwhals.dataframe import LazyFrame from narwhals.dtypes import DType @@ -166,12 +165,7 @@ def drop(self, columns: Sequence[str], *, strict: bool) -> Self: selection = (col for col in self.columns if col not in columns_to_drop) return self._with_native(self.native.select(*selection)) - def lazy( - self, - backend: _LazyAllowedImpl | None = None, - *, - session: SparkSession | None = None, - ) -> Self: + def lazy(self, backend: None = None, **_: None) -> Self: # The `backend`` argument has no effect but we keep it here for # backwards compatibility because in `narwhals.stable.v1` # function `.from_native()` will return a DataFrame for Ibis. diff --git a/narwhals/_spark_like/dataframe.py b/narwhals/_spark_like/dataframe.py index a1bd756c74..eba3001e7b 100644 --- a/narwhals/_spark_like/dataframe.py +++ b/narwhals/_spark_like/dataframe.py @@ -565,7 +565,7 @@ def sink_parquet(self, file: str | Path | BytesIO) -> None: @classmethod def _from_compliant_dataframe( cls, - compliant_frame: CompliantDataFrameAny, + frame: CompliantDataFrameAny, /, *, session: SparkSession, @@ -574,21 +574,16 @@ def _from_compliant_dataframe( ) -> SparkLikeLazyFrame: from importlib.util import find_spec - # pyspark.sql requires pyarrow to be installed from v4.0.0 - can_create_from_arrow = implementation in { - Implementation.PYSPARK, - Implementation.PYSPARK_CONNECT, - } and implementation._backend_version() >= (4, 0, 0) - - is_pandas_installed = find_spec("pandas") is not None - - data: Any = ( - compliant_frame.to_arrow() - if can_create_from_arrow - else compliant_frame.to_pandas() - if is_pandas_installed - else tuple(compliant_frame.iter_rows(named=True, buffer_size=512)) - ) + impl = implementation + is_spark_v4 = (not impl.is_sqlframe()) and impl._backend_version() >= (4, 0, 0) + if is_spark_v4: # pragma: no cover + # pyspark.sql requires pyarrow to be installed from v4.0.0 + # and since v4.0.0 the input to `createDataFrame` can be a PyArrow Table. + data: Any = frame.to_arrow() + elif find_spec("pandas"): + data = frame.to_pandas() + else: # pragma: no cover + data = tuple(frame.iter_rows(named=True, buffer_size=512)) return cls( session.createDataFrame(data), diff --git a/narwhals/_utils.py b/narwhals/_utils.py index 17ffdf02b7..dee2234d21 100644 --- a/narwhals/_utils.py +++ b/narwhals/_utils.py @@ -1661,7 +1661,7 @@ def can_lazyframe_collect(impl: Implementation, /) -> TypeIs[_LazyFrameCollectIm return impl in {Implementation.PANDAS, Implementation.POLARS, Implementation.PYARROW} -def can_dataframe_lazy(impl: Implementation, /) -> TypeIs[_LazyAllowedImpl]: +def is_lazy_allowed(impl: Implementation, /) -> TypeIs[_LazyAllowedImpl]: """Return True if `DataFrame.lazy(impl)` is allowed.""" return impl in { Implementation.DASK, diff --git a/narwhals/dataframe.py b/narwhals/dataframe.py index a6c0c95597..622f70f36e 100644 --- a/narwhals/dataframe.py +++ b/narwhals/dataframe.py @@ -26,7 +26,6 @@ from narwhals._utils import ( Implementation, Version, - can_dataframe_lazy, can_lazyframe_collect, check_columns_exist, flatten, @@ -35,6 +34,7 @@ is_compliant_lazyframe, is_eager_allowed, is_index_selector, + is_lazy_allowed, is_list_of, is_sequence_like, is_slice_none, @@ -801,7 +801,7 @@ def lazy( if backend is None: return self._lazyframe(lazy(None, session=session), level="lazy") lazy_backend = Implementation.from_backend(backend) - if can_dataframe_lazy(lazy_backend): + if is_lazy_allowed(lazy_backend): return self._lazyframe(lazy(lazy_backend, session=session), level="lazy") msg = f"Not-supported backend.\n\nExpected one of {get_args(_LazyAllowedImpl)} or `None`, got {lazy_backend}" raise ValueError(msg) diff --git a/tests/frame/lazy_test.py b/tests/frame/lazy_test.py index 4af449b003..1c4788a0db 100644 --- a/tests/frame/lazy_test.py +++ b/tests/frame/lazy_test.py @@ -13,7 +13,7 @@ if TYPE_CHECKING: from narwhals._spark_like.utils import SparkSession - from narwhals._typing import LazyAllowed + from narwhals._typing import LazyAllowed, SparkLike from tests.utils import ConstructorEager @@ -68,7 +68,11 @@ def test_lazy_to_default(constructor_eager: ConstructorEager) -> None: "sqlframe", ], ) -def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None: +def test_lazy( + request: pytest.FixtureRequest, + constructor_eager: ConstructorEager, + backend: LazyAllowed, +) -> None: impl = Implementation.from_backend(backend) pytest.importorskip(impl.name.lower()) @@ -84,9 +88,11 @@ def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None and df.implementation.is_pandas() and df.implementation._backend_version() >= (3, 0, 0) ): # pragma: no cover - # Reason: https://github.com/duckdb/duckdb/issues/18297 - # > duckdb.duckdb.NotImplementedException: Not implemented Error: Data type 'str' not recognized - return + reason = ( + "https://github.com/duckdb/duckdb/issues/18297\n" + "> duckdb.duckdb.NotImplementedException: Not implemented Error: Data type 'str' not recognized" + ) + request.applymarker(pytest.mark.xfail(reason=reason)) session: SparkSession | None if impl.is_sqlframe(): @@ -104,7 +110,7 @@ def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None @pytest.mark.parametrize("backend", ["pyspark", "sqlframe"]) def test_lazy_spark_like_requires_session( - constructor_eager: ConstructorEager, backend: LazyAllowed + constructor_eager: ConstructorEager, backend: SparkLike ) -> None: impl = Implementation.from_backend(backend) pytest.importorskip(impl.name.lower()) From 6042a4868b75848d1b5787492498a029670aca57 Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Wed, 27 Aug 2025 08:58:43 +0200 Subject: [PATCH 08/10] issue not solved but test passing? --- tests/frame/lazy_test.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/tests/frame/lazy_test.py b/tests/frame/lazy_test.py index 1c4788a0db..5a500fc81a 100644 --- a/tests/frame/lazy_test.py +++ b/tests/frame/lazy_test.py @@ -68,11 +68,7 @@ def test_lazy_to_default(constructor_eager: ConstructorEager) -> None: "sqlframe", ], ) -def test_lazy( - request: pytest.FixtureRequest, - constructor_eager: ConstructorEager, - backend: LazyAllowed, -) -> None: +def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None: impl = Implementation.from_backend(backend) pytest.importorskip(impl.name.lower()) @@ -83,17 +79,6 @@ def test_lazy( impl = Implementation.PYSPARK_CONNECT df = nw.from_native(constructor_eager(data), eager_only=True) - if ( - impl.is_duckdb() - and df.implementation.is_pandas() - and df.implementation._backend_version() >= (3, 0, 0) - ): # pragma: no cover - reason = ( - "https://github.com/duckdb/duckdb/issues/18297\n" - "> duckdb.duckdb.NotImplementedException: Not implemented Error: Data type 'str' not recognized" - ) - request.applymarker(pytest.mark.xfail(reason=reason)) - session: SparkSession | None if impl.is_sqlframe(): session = sqlframe_session() From 13c5a9a2987e9cc5694b57eb54738510d0599f47 Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Wed, 27 Aug 2025 08:47:11 +0000 Subject: [PATCH 09/10] refactor(typing): Remove `SparkSession` from `v1`, `v2` also --- narwhals/stable/v1/__init__.py | 3 +-- narwhals/stable/v2/__init__.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/narwhals/stable/v1/__init__.py b/narwhals/stable/v1/__init__.py index 9341ecf8fd..4b9f7ef487 100644 --- a/narwhals/stable/v1/__init__.py +++ b/narwhals/stable/v1/__init__.py @@ -68,7 +68,6 @@ from typing_extensions import ParamSpec, Self - from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import ( Arrow, @@ -195,7 +194,7 @@ def lazy( self, backend: IntoBackend[LazyAllowed] | None = None, *, - session: SparkSession | None = None, + session: Any | None = None, ) -> LazyFrame[Any]: return _stableify(super().lazy(backend=backend, session=session)) diff --git a/narwhals/stable/v2/__init__.py b/narwhals/stable/v2/__init__.py index ef531d5c7d..e1963a4480 100644 --- a/narwhals/stable/v2/__init__.py +++ b/narwhals/stable/v2/__init__.py @@ -63,7 +63,6 @@ from typing_extensions import ParamSpec, Self - from narwhals._spark_like.utils import SparkSession from narwhals._translate import IntoArrowTable from narwhals._typing import ( Arrow, @@ -189,7 +188,7 @@ def lazy( self, backend: IntoBackend[LazyAllowed] | None = None, *, - session: SparkSession | None = None, + session: Any | None = None, ) -> LazyFrame[Any]: return _stableify(super().lazy(backend=backend, session=session)) From 281246352bb47cfb5cecb6d87823d5c2eabf08b1 Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Wed, 27 Aug 2025 09:01:43 +0000 Subject: [PATCH 10/10] test(typing): Use `Any` to handle `Unknown` --- tests/frame/lazy_test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/frame/lazy_test.py b/tests/frame/lazy_test.py index 5a500fc81a..9c5cc83804 100644 --- a/tests/frame/lazy_test.py +++ b/tests/frame/lazy_test.py @@ -12,7 +12,6 @@ from tests.utils import assert_equal_data, pyspark_session, sqlframe_session if TYPE_CHECKING: - from narwhals._spark_like.utils import SparkSession from narwhals._typing import LazyAllowed, SparkLike from tests.utils import ConstructorEager @@ -79,11 +78,11 @@ def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None impl = Implementation.PYSPARK_CONNECT df = nw.from_native(constructor_eager(data), eager_only=True) - session: SparkSession | None + session: Any if impl.is_sqlframe(): session = sqlframe_session() elif impl.is_pyspark() or impl.is_pyspark_connect(): # pragma: no cover - session = pyspark_session() # pyright: ignore[reportAssignmentType] + session = pyspark_session() else: session = None