Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/api-reference/expr.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- all
- any
- cast
- clip
- count
- cum_count
- cum_max
Expand All @@ -22,7 +23,6 @@
- fill_nan
- fill_null
- filter
- clip
- is_between
- is_close
- is_duplicated
Expand All @@ -42,8 +42,8 @@
- median
- min
- mode
- null_count
- n_unique
- null_count
- over
- pipe
- quantile
Expand Down
25 changes: 21 additions & 4 deletions narwhals/_arrow/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
Order,
)
from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny
from narwhals._spark_like.utils import SparkSession
from narwhals._translate import IntoArrowTable
from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl
from narwhals._utils import Version, _LimitedContext
Expand Down Expand Up @@ -527,17 +528,22 @@ def tail(self, n: int) -> Self:
)
return self._with_native(df.slice(abs(n)), validate_column_names=False)

def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny:
def lazy(
self,
backend: _LazyAllowedImpl | None = None,
*,
session: SparkSession | None = None,
) -> CompliantLazyFrameAny:
if backend is None:
return self
if backend is Implementation.DUCKDB:
import duckdb # ignore-banned-import

from narwhals._duckdb.dataframe import DuckDBLazyFrame

df = self.native # noqa: F841
_df = self.native
return DuckDBLazyFrame(
duckdb.table("df"), validate_backend_version=True, version=self._version
duckdb.table("_df"), validate_backend_version=True, version=self._version
)
if backend is Implementation.POLARS:
import polars as pl # ignore-banned-import
Expand All @@ -559,7 +565,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny
validate_backend_version=True,
version=self._version,
)
if backend.is_ibis():
if backend is Implementation.IBIS:
import ibis # ignore-banned-import

from narwhals._ibis.dataframe import IbisLazyFrame
Expand All @@ -570,6 +576,17 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny
version=self._version,
)

if backend.is_spark_like():
from narwhals._spark_like.dataframe import SparkLikeLazyFrame

if session is None:
msg = "Spark like backends require `session` to be not None."
raise ValueError(msg)

return SparkLikeLazyFrame._from_compliant_dataframe(
self, session=session, implementation=backend, version=self._version
)

raise AssertionError # pragma: no cover

def collect(
Expand Down
5 changes: 4 additions & 1 deletion narwhals/_compliant/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

from narwhals._compliant.group_by import CompliantGroupBy, DataFrameGroupBy
from narwhals._compliant.namespace import EagerNamespace
from narwhals._spark_like.utils import SparkSession
from narwhals._translate import IntoArrowTable
from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl
from narwhals._utils import Implementation, _LimitedContext
Expand Down Expand Up @@ -196,7 +197,9 @@ def join_asof(
strategy: AsofJoinStrategy,
suffix: str,
) -> Self: ...
def lazy(self, backend: _LazyAllowedImpl | None) -> CompliantLazyFrameAny: ...
def lazy(
self, backend: _LazyAllowedImpl | None, *, session: SparkSession | None
) -> CompliantLazyFrameAny: ...
def pivot(
self,
on: Sequence[str],
Expand Down
4 changes: 2 additions & 2 deletions narwhals/_duckdb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from narwhals._duckdb.group_by import DuckDBGroupBy
from narwhals._duckdb.namespace import DuckDBNamespace
from narwhals._duckdb.series import DuckDBInterchangeSeries
from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl
from narwhals._typing import _EagerAllowedImpl
from narwhals._utils import _LimitedContext
from narwhals.dataframe import LazyFrame
from narwhals.dtypes import DType
Expand Down Expand Up @@ -189,7 +189,7 @@ def drop(self, columns: Sequence[str], *, strict: bool) -> Self:
selection = (name for name in self.columns if name not in columns_to_drop)
return self._with_native(self.native.select(*selection))

def lazy(self, backend: _LazyAllowedImpl | None = None) -> Self:
def lazy(self, backend: None = None, **_: None) -> Self:
# The `backend`` argument has no effect but we keep it here for
# backwards compatibility because in `narwhals.stable.v1`
# function `.from_native()` will return a DataFrame for DuckDB.
Expand Down
4 changes: 2 additions & 2 deletions narwhals/_ibis/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from narwhals._ibis.group_by import IbisGroupBy
from narwhals._ibis.namespace import IbisNamespace
from narwhals._ibis.series import IbisInterchangeSeries
from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl
from narwhals._typing import _EagerAllowedImpl
from narwhals._utils import _LimitedContext
from narwhals.dataframe import LazyFrame
from narwhals.dtypes import DType
Expand Down Expand Up @@ -165,7 +165,7 @@ def drop(self, columns: Sequence[str], *, strict: bool) -> Self:
selection = (col for col in self.columns if col not in columns_to_drop)
return self._with_native(self.native.select(*selection))

def lazy(self, backend: _LazyAllowedImpl | None = None) -> Self:
def lazy(self, backend: None = None, **_: None) -> Self:
# The `backend`` argument has no effect but we keep it here for
# backwards compatibility because in `narwhals.stable.v1`
# function `.from_native()` will return a DataFrame for Ibis.
Expand Down
25 changes: 23 additions & 2 deletions narwhals/_pandas_like/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from narwhals._pandas_like.expr import PandasLikeExpr
from narwhals._pandas_like.group_by import PandasLikeGroupBy
from narwhals._pandas_like.namespace import PandasLikeNamespace
from narwhals._spark_like.utils import SparkSession
from narwhals._translate import IntoArrowTable
from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl
from narwhals._utils import Version, _LimitedContext
Expand Down Expand Up @@ -772,7 +773,12 @@ def unique(
)

# --- lazy-only ---
def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny:
def lazy(
self,
backend: _LazyAllowedImpl | None = None,
*,
session: SparkSession | None = None,
) -> CompliantLazyFrameAny:
pandas_df = self.to_pandas()
if backend is None:
return self
Expand Down Expand Up @@ -806,7 +812,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny
validate_backend_version=True,
version=self._version,
)
if backend.is_ibis():
if backend is Implementation.IBIS:
import ibis # ignore-banned-import

from narwhals._ibis.dataframe import IbisLazyFrame
Expand All @@ -816,6 +822,21 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny
validate_backend_version=True,
version=self._version,
)

if backend.is_spark_like():
from narwhals._spark_like.dataframe import SparkLikeLazyFrame

if session is None:
msg = "Spark like backends require `session` to be not None."
raise ValueError(msg)

return SparkLikeLazyFrame(
session.createDataFrame(pandas_df),
version=self._version,
implementation=backend,
validate_backend_version=True,
)

raise AssertionError # pragma: no cover

@property
Expand Down
29 changes: 24 additions & 5 deletions narwhals/_polars/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny
from narwhals._polars.expr import PolarsExpr
from narwhals._polars.group_by import PolarsGroupBy, PolarsLazyGroupBy
from narwhals._spark_like.utils import SparkSession
from narwhals._translate import IntoArrowTable
from narwhals._typing import _EagerAllowedImpl, _LazyAllowedImpl
from narwhals._utils import Version, _LimitedContext
Expand Down Expand Up @@ -455,18 +456,22 @@ def iter_columns(self) -> Iterator[PolarsSeries]:
for series in self.native.iter_columns():
yield PolarsSeries.from_native(series, context=self)

def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny:
def lazy(
self,
backend: _LazyAllowedImpl | None = None,
*,
session: SparkSession | None = None,
) -> CompliantLazyFrameAny:
if backend is None or backend is Implementation.POLARS:
return PolarsLazyFrame.from_native(self.native.lazy(), context=self)
if backend is Implementation.DUCKDB:
import duckdb # ignore-banned-import

from narwhals._duckdb.dataframe import DuckDBLazyFrame

# NOTE: (F841) is a false positive
df = self.native # noqa: F841
_df = self.native
return DuckDBLazyFrame(
duckdb.table("df"), validate_backend_version=True, version=self._version
duckdb.table("_df"), validate_backend_version=True, version=self._version
)
if backend is Implementation.DASK:
import dask.dataframe as dd # ignore-banned-import
Expand All @@ -478,7 +483,7 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny
validate_backend_version=True,
version=self._version,
)
if backend.is_ibis():
if backend is Implementation.IBIS:
import ibis # ignore-banned-import

from narwhals._ibis.dataframe import IbisLazyFrame
Expand All @@ -489,6 +494,20 @@ def lazy(self, backend: _LazyAllowedImpl | None = None) -> CompliantLazyFrameAny
version=self._version,
)

if backend.is_spark_like():
from narwhals._spark_like.dataframe import SparkLikeLazyFrame

if session is None:
msg = "Spark like backends require `session` to be not None."
raise ValueError(msg)

return SparkLikeLazyFrame._from_compliant_dataframe(
self, # pyright: ignore[reportArgumentType]
session=session,
implementation=backend,
version=self._version,
)

raise AssertionError # pragma: no cover

@overload
Expand Down
31 changes: 31 additions & 0 deletions narwhals/_spark_like/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from narwhals._spark_like.expr import SparkLikeExpr
from narwhals._spark_like.group_by import SparkLikeLazyGroupBy
from narwhals._spark_like.namespace import SparkLikeNamespace
from narwhals._spark_like.utils import SparkSession
from narwhals._typing import _EagerAllowedImpl
from narwhals._utils import Version, _LimitedContext
from narwhals.dataframe import LazyFrame
Expand Down Expand Up @@ -561,6 +562,36 @@ def with_row_index(self, name: str, order_by: Sequence[str]) -> Self:
def sink_parquet(self, file: str | Path | BytesIO) -> None:
self.native.write.parquet(file)

@classmethod
def _from_compliant_dataframe(
cls,
frame: CompliantDataFrameAny,
/,
*,
session: SparkSession,
implementation: Implementation,
version: Version,
) -> SparkLikeLazyFrame:
from importlib.util import find_spec

impl = implementation
is_spark_v4 = (not impl.is_sqlframe()) and impl._backend_version() >= (4, 0, 0)
if is_spark_v4: # pragma: no cover
# pyspark.sql requires pyarrow to be installed from v4.0.0
# and since v4.0.0 the input to `createDataFrame` can be a PyArrow Table.
data: Any = frame.to_arrow()
elif find_spec("pandas"):
data = frame.to_pandas()
else: # pragma: no cover
data = tuple(frame.iter_rows(named=True, buffer_size=512))

return cls(
session.createDataFrame(data),
version=version,
implementation=implementation,
validate_backend_version=True,
)

gather_every = not_implemented.deprecated(
"`LazyFrame.gather_every` is deprecated and will be removed in a future version."
)
Expand Down
1 change: 0 additions & 1 deletion narwhals/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
# - https://github.com/narwhals-dev/narwhals/pull/2971#discussion_r2277137003
# - https://github.com/narwhals-dev/narwhals/pull/3002#issuecomment-3194267667
_LazyFrameCollectImpl: TypeAlias = Literal[_PandasImpl, _PolarsImpl, _ArrowImpl] # noqa: PYI047
_DataFrameLazyImpl: TypeAlias = Literal[_PolarsImpl, _DaskImpl, _DuckDBImpl, _IbisImpl] # noqa: PYI047

Comment on lines 53 to 54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Farewell questionably-named alias πŸ₯³

# `str | Implementation` aliases
Pandas: TypeAlias = Literal[_Pandas, _PandasImpl]
Expand Down
13 changes: 8 additions & 5 deletions narwhals/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
from narwhals._typing import (
Backend,
IntoBackend,
_DataFrameLazyImpl,
_EagerAllowedImpl,
_LazyAllowedImpl,
_LazyFrameCollectImpl,
)
from narwhals.dataframe import DataFrame, LazyFrame
Expand Down Expand Up @@ -1648,9 +1648,9 @@ def is_compliant_expr(
def is_eager_allowed(impl: Implementation, /) -> TypeIs[_EagerAllowedImpl]:
"""Return True if `impl` allows eager operations."""
return impl in {
Implementation.PANDAS,
Implementation.MODIN,
Implementation.CUDF,
Implementation.MODIN,
Implementation.PANDAS,
Implementation.POLARS,
Implementation.PYARROW,
}
Expand All @@ -1661,13 +1661,16 @@ def can_lazyframe_collect(impl: Implementation, /) -> TypeIs[_LazyFrameCollectIm
return impl in {Implementation.PANDAS, Implementation.POLARS, Implementation.PYARROW}


def can_dataframe_lazy(impl: Implementation, /) -> TypeIs[_DataFrameLazyImpl]:
def is_lazy_allowed(impl: Implementation, /) -> TypeIs[_LazyAllowedImpl]:
"""Return True if `DataFrame.lazy(impl)` is allowed."""
return impl in {
Implementation.DASK,
Implementation.DUCKDB,
Implementation.POLARS,
Implementation.IBIS,
Implementation.POLARS,
Implementation.PYSPARK,
Implementation.PYSPARK_CONNECT,
Implementation.SQLFRAME,
}


Expand Down
Loading
Loading