Skip to content

feat: Add support for spark connect#2417

Merged
MarcoGorelli merged 22 commits intomainfrom
feat/spark-connect
Apr 28, 2025
Merged

feat: Add support for spark connect#2417
MarcoGorelli merged 22 commits intomainfrom
feat/spark-connect

Conversation

@FBruzzesi
Copy link
Member

@FBruzzesi FBruzzesi commented Apr 22, 2025

What type of PR is this? (check all applicable)

  • 💾 Refactor
  • ✨ Feature
  • 🐛 Bug Fix
  • 🔧 Optimization
  • 📝 Documentation
  • ✅ Test
  • 🐳 Other

Related issues

Checklist

  • Code follows style guide (ruff)
  • Tests added
  • Documented the changes

If you have comments or can explain your changes, please do so below

Opening as draft as:

  • datetime tests are failing (infer format and timezone aware)
  • cast to struct type is failing (even though the conversion internally seems fine - needs more investigation)
  • some naming/choices might need discussion (see code comments)
  • very unsure if the CI will work as expected

@FBruzzesi FBruzzesi added enhancement New feature or request pyspark-connect labels Apr 22, 2025
@dangotbanned dangotbanned added the pyspark Issue is related to pyspark backend label Apr 22, 2025
@dangotbanned
Copy link
Member

Just adding the original tag as well to be careful 🙂 (#2417 (comment))

Comment on lines 182 to 188
elif (
self._implementation is Implementation.PYSPARK_CONNECT
and self._backend_version < (4,)
):
import pyarrow as pa # ignore-banned-import

return pa.Table.from_pandas(self.native.toPandas())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This...got me a laugh and a tear!


def head(self, n: int) -> Self:
return self._with_native(self.native.limit(num=n))
return self._with_native(self.native.limit(n))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No argument named num 🤷🏼‍♀️

@FBruzzesi
Copy link
Member Author

Just adding the original tag as well to be careful 🙂 (#2417 (comment))

Thanks @dangotbanned , as I might need a few iterations before getting the CI right (just to make it start) I didn't want to trigger everything all the time 🙈
But yes, before making a final call to this PR, we should definitly run all the jobs

@dangotbanned
Copy link
Member

dangotbanned commented Apr 22, 2025

#2417 (comment)

No worries @FBruzzesi - trust your judgement here 😄

Comment on lines 68 to 73
- name: Download Spark
run: |
curl -sL https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz
tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz
echo "SPARK_HOME=$PWD/spark-${SPARK_VERSION}-bin-hadoop3" >> $GITHUB_ENV
echo "$PWD/spark-${SPARK_VERSION}-bin-hadoop3/bin" >> $GITHUB_PATH
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So apparently this is not a good idea.

Searching for start-connect-server.sh in across all public github actions (yaml files in .github folder) here is what we get: search.

Which to me does not really look great since all the repos are apache/spark forks 🥲 with one exception, which however uses that string to skip the spark connect test.

I will sleep on this and keep looking for alternatives 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting news! It seems to work with the latest commit (05e0bcc). It just takes a while to set up

@dangotbanned
Copy link
Member

Just wanna throw these into the mix again 😄

So far this PR hasn't seemed to add much branching within methods.
@FBruzzesi if you find there's more of a need for that later - then there could be more of a benefit to splitting out the classes

@FBruzzesi FBruzzesi marked this pull request as ready for review April 25, 2025 07:05
order_by_cols = [self._F.asc_nulls_last(_input)]

window = self._Window().orderBy(order_by_cols)
window = self._Window().partitionBy(self._F.lit(1)).orderBy(order_by_cols)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over from #2429

.drop(index_col_name)
)

return cast("PySparkDataFrame", frame)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dangotbanned perdoname por mi vida loca 😂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully fixed in (f297ac4)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice thanks! That's one more trick that I need to add to my belt!

Copy link
Member

@dangotbanned dangotbanned Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same trick as the rest of spark_like typing 😉

You need to pick one of the imports for TYPE_CHECKING, which goes first. Then the real code goes afterwards.
A type checker will view the non-TYPE_CHECKING as unreachable - since they "think" it is always True - whereas at runtime it is always False

@property
def _F(self): # type: ignore[no-untyped-def] # noqa: ANN202, N802
if TYPE_CHECKING:
from sqlframe.base import functions
return functions
else:
return import_functions(self._implementation)
@property
def _native_dtypes(self): # type: ignore[no-untyped-def] # noqa: ANN202
if TYPE_CHECKING:
from sqlframe.base import types
return types
else:
return import_native_dtypes(self._implementation)
@property
def _Window(self) -> type[Window]: # noqa: N802
if TYPE_CHECKING:
from sqlframe.base.window import Window
return Window
else:
return import_window(self._implementation)

def import_functions(implementation: Implementation, /) -> ModuleType:
if implementation is Implementation.PYSPARK:
from pyspark.sql import functions
return functions
from sqlframe.base.session import _BaseSession
return import_module(f"sqlframe.{_BaseSession().execution_dialect_name}.functions")
def import_native_dtypes(implementation: Implementation, /) -> ModuleType:
if implementation is Implementation.PYSPARK:
from pyspark.sql import types
return types
from sqlframe.base.session import _BaseSession
return import_module(f"sqlframe.{_BaseSession().execution_dialect_name}.types")
def import_window(implementation: Implementation, /) -> type[Any]:
if implementation is Implementation.PYSPARK:
from pyspark.sql import Window
return Window
from sqlframe.base.session import _BaseSession
return import_module(
f"sqlframe.{_BaseSession().execution_dialect_name}.window"
).Window

@FBruzzesi FBruzzesi requested a review from EdAbati April 25, 2025 07:11
Copy link
Member

@MarcoGorelli MarcoGorelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @FBruzzesi !

Copy link
Member

@dangotbanned dangotbanned left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FBruzzesi!
I was summoned (#2417 (comment)) for typing - but spotted a possible performance regression that would impact other spark-like backends

Comment on lines +153 to +157
for key, value in nw_schema.items():
try:
native_dtype = narwhals_to_native_dtype(value, self._version)
except Exception as exc: # noqa: BLE001,PERF203
native_spark_dtype = native_schema[key].dataType # type: ignore[index]
Copy link
Member

@dangotbanned dangotbanned Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FBruzzesi Could you address this performance issue (PERF203) or leave a comment on why it is unavoidable please?

I'd personally try to also avoid (BLE001) and using exception handling entirely - but understand they were already here 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle we could first try to check if self.collect_schema() has any unknown type. If it doesn't then it should be possible to call .to_arrow() - if it does then this workaround is needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dangotbanned sorry for the direct ping - let's figure out what to write - to me the explanation be in ln158 is quite good.

I will need your approval to merge 😎

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey sorry I lost this @FBruzzesi

I started trying to address it, but couldn't get the tests working locally 😔

Copy link
Collaborator

@EdAbati EdAbati left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for doing this! Looks great to me, just left minor comments :)

Comment on lines +77 to +82
- name: Cache Spark
id: cache-spark
uses: actions/cache@v4
with:
path: /opt/spark
key: spark-${{ env.SPARK_VERSION }}-bin-hadoop3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Comment on lines 67 to 68
java-version: '11'
distribution: 'temurin'
Copy link
Collaborator

@EdAbati EdAbati Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have ~0 knownledge about Java. Where does this come from? should we add the source in a comment?
I can see in their CI they use this instead:
https://github.com/apache/spark/blob/b634978936499f58f8cb2e8ea16339feb02ffb52/.github/workflows/build_python_connect.yml#L54-L58

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have ~0 knownledge about Java.

That makes 2 of us

When do this come from should we add the source in a comment?

I can try to bump the version or follow what they do

Copy link
Collaborator

@EdAbati EdAbati left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thank you :)

@MarcoGorelli
Copy link
Member

thanks @FBruzzesi ! going to go ahead an ship this

happy to discuss whether the PERF203 ruff code can be avoided separately, but i don't think it's a blocker, and it was already there to begin with (probably because i added it 😳 ) it's not introduced by this PR

@MarcoGorelli MarcoGorelli merged commit a260885 into main Apr 28, 2025
32 checks passed
@MarcoGorelli MarcoGorelli deleted the feat/spark-connect branch April 28, 2025 08:21
@MarcoGorelli
Copy link
Member

quick note - indeed, by deleting the whole merge commit message apart from the "coauthored" by part, we don't make git log unnecessarily long, and we preserve coauthors (github now shows "FBruzzesi and dangotbanned" for this commit). cool, gonna add something about that to the contributing guide

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request pyspark Issue is related to pyspark backend pyspark-connect

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enh]: Support for Spark Connect

4 participants