Skip to content

feat: Allow spark-like backends in .lazy(backend=...)#3032

Merged
FBruzzesi merged 14 commits intomainfrom
feat/lazy-into-spark-like
Aug 27, 2025
Merged

feat: Allow spark-like backends in .lazy(backend=...)#3032
FBruzzesi merged 14 commits intomainfrom
feat/lazy-into-spark-like

Conversation

@FBruzzesi
Copy link
Member

@FBruzzesi FBruzzesi commented Aug 23, 2025

What type of PR is this? (check all applicable)

  • 💾 Refactor
  • ✨ Feature
  • 🐛 Bug Fix
  • 🔧 Optimization
  • 📝 Documentation
  • ✅ Test
  • 🐳 Other

Related issues

Checklist

  • Code follows style guide (ruff)
  • Tests added
  • Documented the changes

If you have comments or can explain your changes, please do so below

TODO:

  • some listed inline
  • coverage

@FBruzzesi FBruzzesi added enhancement New feature or request spark-like labels Aug 23, 2025
@FBruzzesi FBruzzesi requested a review from dangotbanned August 23, 2025 15:46
import pyarrow as pa
import sqlframe.base.types as sqlframe_types

from narwhals._spark_like.utils import SparkSession
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are annotating session: SparkSession - it needs to be defined as a structural type and not imported from _spark_like.utils into the narwhals level

I'm replying from my phone, so haven't checked what that type actually is yet

Copy link
Member

@dangotbanned dangotbanned Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right so this currently might produce confusing results.

SparkSession is from sqlframe

from sqlframe.base.session import _BaseSession as Session

SparkSession = Session[Any, Any, Any, Any, Any, Any, Any]

If someone has ...

  • Only sqlframe installed, this works with sqlframe.base.session._BaseSession and subclasses
  • Both sqlframe + pyspark installed, doesn't work with pyspark.sql.SparkSession or pyspark.sql.connect.session.SparkSession
  • Only pyspark installed, works with anything as the symbol shows as Unknown
  • Neither installed, works with anything as the symbol shows as Unknown

As someone who hasn't got pyspark installed 😭 but has got sqlframe, this is what I see in conftest.py which has a similar case

image image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yet I am not sure what's the fix for this.

In scan_{csv,parquet} we have **kwargs: Any. I think having session might be more useful/explicit. Depending how complex it is to support typing let's see if it's worth having. I imagine that a protocol might be required. Is this the right assumption 🧐?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The easiest option is to type it as session: Any | None = None on the public stuff and then use SparkSession internally

I agree it is useful to have the parameter 🙂

Copy link
Member Author

@FBruzzesi FBruzzesi Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dangotbanned I am going to shoot myself in the foot with this comment, so please pretty please, consider it as a follow up if this is correct. Wouldn't the following be the proper typing:

if TYPE_CHECKING:
    from pyspark.sql import SparkSession as PySparkSession
    from pysparl.sql.connect.session import SparkSession as PySparkConnectSession
    from sqlframe.base.session import _BaseSession

    from narwhals._typing import Dask, DuckDB, EagerAllowed, Ibis, IntoBackend, Polars, PySpark, PySparkConnect, SQLFrame
    
    SQLFrameSession = _BaseSession[Any, Any, Any, Any, Any, Any, Any]
    
@overload
def lazy(
    self,
    backend: IntoBackend[Polars | DuckDB | Ibis | Dask],
    *,
    session: None = None,
) -> LazyFrame[Any]: ...

@overload
def lazy(
    self,
    backend: IntoBackend[PySpark],
    *,
    session: PySparkSession,
) -> LazyFrame[Any]: ...

@overload
def lazy(
    self,
    backend: IntoBackend[PySparkConnect],
    *,
    session: PySparkConnectSession,
) -> LazyFrame[Any]: ...

@overload
def lazy(
    self,
    backend: IntoBackend[SQLFrame],
    *,
    session: SQLFrameSession,
) -> LazyFrame[Any]: ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am going to shoot myself in the foot with this comment,

😂

Wouldn't the following be the proper typing:

There are a couple of issues

>>> mypy
Found 84 errors in 35 files (checked 429 source files)

>>> pyright
78 errors, 0 warnings, 0 informations

Part of it is what I mentioned in (#3032 (comment)) - which we can totally solve later (#3016 (comment))

But there are also overloads missing (e.g. df.lazy())

A future problem would be that using IntoBackend (a union with ModuleType) would create incompatible overlapping overloads if we ever want to refine LazyFrame[Any].
I very much do! 😄 (#3016 (comment))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my suggestion was definitely partial in terms of @overload coverage. I will take a look later today, but I am also happy to keep it outside this PR

Note

TL;DR: It'll probably double the size of the PR

@FBruzzesi I 100% want @overloads here, but since we need to have them on all of:

  • nw.DataFrame.lazy
  • nw.stable.v1.DataFrame.lazy
  • nw.stable.v2.DataFrame.lazy

And backend, session have defaults - I'd expect the diff is gonna grow a lot


This is somewhat of a starting point:

class SparkSession(Protocol):
    def createDataFrame(self, *args: Any, **kwds: Any) -> Any: ...  # noqa: N802

class DataFrame(BaseFrame[DataFrameT]):
    @overload
    def lazy(self) -> LazyFrame[Any]: ...
    @overload
    def lazy(self, backend: Polars | Dask | DuckDB | Ibis) -> LazyFrame[Any]: ...
    @overload
    def lazy(
        self, backend: Polars | Dask | DuckDB | Ibis, *, session: None = ...
    ) -> LazyFrame[Any]: ...
    @overload
    def lazy(self, backend: SparkLike, *, session: SparkSession) -> LazyFrame[Any]: ...
    @overload
    def lazy(
        self, backend: IntoBackend[LazyAllowed], *, session: SparkSession | None
    ) -> LazyFrame[Any]: ...

    def lazy(
        self,
        backend: IntoBackend[LazyAllowed] | None = None,
        *,
        session: SparkSession | None = None,
    ) -> LazyFrame[Any]:

But still has loads of issues 😭

It doesn't catch this:

with pytest.raises(ValueError, match=err_msg):
df.lazy(backend=backend, session=None)

But it would catch this:

df.lazy(backend=backend)

This test required adding the last overload, but really (IntoBackend[LazyAllowed], session: SparkSession | None) shouldn't be allowed from a typing perspective

df = nw.from_native(constructor_eager(data), eager_only=True)
session: Any
if impl.is_sqlframe():
session = sqlframe_session()
elif impl.is_pyspark() or impl.is_pyspark_connect(): # pragma: no cover
session = pyspark_session()
else:
session = None
result = df.lazy(backend=backend, session=session)


And everything inside that follows from inside the method now needs updating to use this new SparkSession protocol for session

lazy = self._compliant_frame.lazy
if backend is None:
return self._lazyframe(lazy(None, session=session), level="lazy")
lazy_backend = Implementation.from_backend(backend)
if is_lazy_allowed(lazy_backend):
return self._lazyframe(lazy(lazy_backend, session=session), level="lazy")

@dangotbanned

This comment was marked as resolved.

@FBruzzesi FBruzzesi marked this pull request as ready for review August 25, 2025 08:42
@FBruzzesi
Copy link
Member Author

FBruzzesi commented Aug 25, 2025

Thanks @dangotbanned - I had a long day already but let me try to reply to #3032 (comment)

Isn't what you are proposing the same of using the session.builder.getOrCreate? This will guarantee that the session is the same if already exist:

from sqlframe.duckdb import DuckDBSession

s1 = DuckDBSession.builder.getOrCreate()
s2 = DuckDBSession.builder.getOrCreate()

s1 is s2
# True

I am not sure what's the "omit" part in

I only have sqlframe installed, but AFAICT doesn't that mean we can allow omitting session if one already exists?

means. Where should we omit it?

My understanding is that after we create it once, the same will be used afterwards - In case I would argue that DuckDBSession.builder.getOrCreate() is a much more explicit syntax than the one we are using to create the session.

Also I am not following on why we would want to raise an exception:

If this general idea is okay, then we could request a public API for it? E.g. get_or_raise

@dangotbanned
Copy link
Member

#3032 (comment)

Isn't what you are proposing the same of using the session.builder.getOrCreate?
This will guarantee that the session is the same if already exist

It seems I did quite a bad job explaining in (#3032 (comment)) 😅

The difference between getOrCreate and what I called get_or_raise - is that the latter would never create a session.

Looking back, what I suggested might be a more confusing public API:

import narwhals as nw
from sqlframe.duckdb import DuckDBSession

data = {"a": [1, 2, 3], "b": ["x", "y", "z"]}

df = nw.DataFrame.from_dict(data, backend="pandas")
df.lazy("sqlframe")  # error
df.lazy("sqlframe", session=DuckDBSession())  # ok
df.lazy("sqlframe")  # ok

However, here I was thinking it would be nice to resolve SparkSession | None -> SparkSession

Examples

if backend.is_spark_like():
from narwhals._spark_like.dataframe import SparkLikeLazyFrame
if session is None:
msg = "Spark like backends require `session` to be not None."
raise ValueError(msg)

if backend.is_spark_like():
from narwhals._spark_like.dataframe import SparkLikeLazyFrame
if session is None:
msg = "Spark like backends require `session` to be not None."
raise ValueError(msg)

if backend.is_spark_like():
from narwhals._spark_like.dataframe import SparkLikeLazyFrame
if session is None:
msg = "Spark like backends require `session` to be not None."
raise ValueError(msg)

But maybe we could just do this instead?

def ensure_session(session: SparkSession | None) -> SparkSession:
    if session:
        return session
    msg = "Spark like backends require `session` to be not None."
    raise ValueError(msg)

I know it isn't part of the PR now, but (#3032 (comment)) would be introducing a session parameter in a lot more places.
Maybe the case for not requiring it then would be stronger? Happy to drop the idea for now 😄

@FBruzzesi
Copy link
Member Author

FBruzzesi commented Aug 26, 2025

Thanks for clarifying @dangotbanned (#3032 (comment))! I somehow thought it was only related to our test suite. But now I get it.

Honestly I don't have enough experience with spark-like sessions to know if, while the session is the same, some settings be changed (e.g. spark.sql.session.timeZone).

For SQLFrame I was about to ask: "which session should we even check?", but then I got very surprised to see the following:

from sqlframe.standalone import StandaloneSession
from sqlframe.duckdb import DuckDBSession
from sqlframe.spark import SparkSession

StandaloneSession() is DuckDBSession() is SparkSession()
True

Honestly I am not sure if this is good or bad or even expected 🙈


For now I would hold back: for me it falls into one of those cases in which a user can branch based on the implementation. For example:

# Assume I have some lazyframe
some_lazy_frame = ...

nw_lf = nw.from_native(some_lazy_frame)

# Now I receive a pandas object
pd_frame = ...

nw_pd = nw.from_native(pd_frame, eager_only=True)

# and we want to align to the lazy frame:
nw_pd.lazy(
    backend=nw_lf.implementation,
    session=getattr(some_lazy_frame, "sparkSession", None)
)

(based on pyspark.sql.DataFrame.sparkSession)


If we were to have a function to align the backend of multiple dataframes (see: #2193), then we can definitely pick up the current session ourselves

Copy link
Member

@dangotbanned dangotbanned left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FBruzzesi!

I guess I'll call it here on the review 😄

I've left myself a TODO, which I'll try to investigate tomorrow

import pyarrow as pa
import sqlframe.base.types as sqlframe_types

from narwhals._spark_like.utils import SparkSession
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am going to shoot myself in the foot with this comment,

😂

Wouldn't the following be the proper typing:

There are a couple of issues

>>> mypy
Found 84 errors in 35 files (checked 429 source files)

>>> pyright
78 errors, 0 warnings, 0 informations

Part of it is what I mentioned in (#3032 (comment)) - which we can totally solve later (#3016 (comment))

But there are also overloads missing (e.g. df.lazy())

A future problem would be that using IntoBackend (a union with ModuleType) would create incompatible overlapping overloads if we ever want to refine LazyFrame[Any].
I very much do! 😄 (#3016 (comment))

Comment on lines 53 to 54
_LazyFrameCollectImpl: TypeAlias = Literal[_PandasImpl, _PolarsImpl, _ArrowImpl] # noqa: PYI047
_DataFrameLazyImpl: TypeAlias = Literal[_PolarsImpl, _DaskImpl, _DuckDBImpl, _IbisImpl] # noqa: PYI047

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Farewell questionably-named alias 🥳

@FBruzzesi
Copy link
Member Author

Thanks for the detailed review @dangotbanned

I solved all the suggestions in eda88aa

Regarding #3032 (comment), my suggestion was definitely partial in terms of @overload coverage. I will take a look later today, but I am also happy to keep it outside this PR

Comment on lines +71 to +79
def test_lazy(constructor_eager: ConstructorEager, backend: LazyAllowed) -> None:
impl = Implementation.from_backend(backend)
pytest.importorskip(impl.name.lower())

is_spark_connect = os.environ.get("SPARK_CONNECT", None)
if is_spark_connect is not None and impl.is_pyspark(): # pragma: no cover
# Workaround for impl.name.lower() being "pyspark[connect]" for
# Implementation.PYSPARK_CONNECT, which is never installed.
impl = Implementation.PYSPARK_CONNECT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self, we need a import_or_skip_backend test util

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this might be better as part of a lazy_backend and/or lazy_implementation fixture

Copy link
Member

@dangotbanned dangotbanned left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FBruzzesi!

I'm happy with where things are at, we can tackle (#3032 (comment)) after (#3016)

hungry narwhal

I think the CI failures were unrelated - but I trust your judgement 🥳

@FBruzzesi FBruzzesi merged commit 2183035 into main Aug 27, 2025
31 of 33 checks passed
@FBruzzesi FBruzzesi deleted the feat/lazy-into-spark-like branch August 27, 2025 18:18
dangotbanned added a commit that referenced this pull request Aug 27, 2025
@dangotbanned dangotbanned mentioned this pull request Sep 4, 2025
14 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request spark-like

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support DataFrame().lazy({'pyspark','sqlframe','pyspark[connect]'})

2 participants