Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

Arrow UDF for window

Why are the changes needed?

to make Arrow UDF support window operation

Does this PR introduce any user-facing change?

Not, yet. Will make Arrow UDF public soon

In [1]: from typing import Iterator, Tuple
   ...: import pyarrow as pa
   ...: from pyspark.sql import Window
   ...: from pyspark.sql import functions as sf
   ...: from pyspark.sql.pandas.functions import arrow_udf
   ...:
   ...: import pandas as pd
   ...: from pyspark.sql.functions import pandas_udf
   ...: from pyspark.sql import Window
   ...:
   ...: df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
   ...:
   ...: w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
   ...:
   ...:

In [2]: @arrow_udf("double")
   ...: def arrow_mean_udf(v: pa.Array) -> float:
   ...:    assert isinstance(v, pa.Array), str(type(v))
   ...:    return pa.compute.mean(v)
   ...:
   ...: # df.select(arrow_mean_udf(df['v'])).show()
   ...: # df.groupby("id").agg(arrow_mean_udf('v')).show()
   ...:
   ...: df.withColumn('mean_v', arrow_mean_udf(df['v']).over(w)).show()
   ...:
   ...:
+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+

How was this patch tested?

New tests

Was this patch authored or co-authored using generative AI tooling?

No

@xinrong-meng
Copy link
Member

LGTM! Would you fix the lint (too long line)?

@zhengruifeng
Copy link
Contributor Author

thanks, merged to master

@zhengruifeng zhengruifeng deleted the arrow_udf_win branch July 23, 2025 09:28
child: SparkPlan): ArrowWindowPythonExec = {
val evalTypes = windowExpression.map(w => WindowFunctionType.pythonEvalType(w).get)
assert(evalTypes.distinct.size == 1,
"All window functions must have the same eval type in WindowInPandasExec")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WindowInPandasExec? I guess you meant ArrowWindowPythonExec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will fix it in #51648



def wrap_window_agg_arrow_udf(f, args_offsets, kwargs_offsets, return_type, runner_conf, udf_index):
window_bound_types_str = runner_conf.get("pandas_window_bound_types")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pandas_window_bound_types? Maybe we should have arrow_window_bound_types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants