Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Window Functions for use with function builder #808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7553836
Add window function as template for others and function builder
timsaucer Aug 13, 2024
4cfb650
Adding docstrings
timsaucer Aug 13, 2024
0803153
Change last_value to use function builder instead of explicitly passi…
timsaucer Aug 13, 2024
309d236
Allow any value for lead function default value and add unit test
timsaucer Aug 21, 2024
37b154e
Add lead window function and unit tests
timsaucer Aug 21, 2024
58a4807
Temporarily commenting out deprecated functions in documenation so bu…
timsaucer Aug 21, 2024
f09496e
Expose row_number window function
timsaucer Aug 21, 2024
c242728
Add rank window function
timsaucer Aug 21, 2024
18f7c81
Add percent rank and dense rank
timsaucer Aug 22, 2024
eb5598d
Add cume_dist
timsaucer Aug 22, 2024
f92d064
Add ntile window function
timsaucer Aug 22, 2024
8ab55e5
Add comment to update when upstream merges
timsaucer Aug 23, 2024
ca397ba
Window frame required calling inner value
timsaucer Aug 23, 2024
0c86754
Add unit test for avg as window function
timsaucer Aug 23, 2024
1ee2691
Working on documentation for window functions
timsaucer Aug 23, 2024
6d54973
Add pyo build config file to git ignore since this is user specific
timsaucer Aug 24, 2024
37022e3
Add examples to docstring
timsaucer Aug 24, 2024
ebf7c96
Optionally add window function parameters during function call
timsaucer Aug 30, 2024
14c7169
Update sort and order_by to apply automatic ordering if any other exp…
timsaucer Aug 30, 2024
b5f33e8
Update unit tests to be cleaner and use default sort on expressions
timsaucer Aug 30, 2024
5fd129c
Ignore vscode folder specific settings
timsaucer Aug 30, 2024
8863931
Window frames should only apply to aggregate functions used as window…
timsaucer Aug 31, 2024
47af829
Remove deprecated warning until we actually have a way to use all fun…
timsaucer Aug 31, 2024
009c361
Built in window functions do not have any impact by setting null_trea…
timsaucer Aug 31, 2024
bc3be5d
Update user documentation on how to pass parameters for different win…
timsaucer Aug 31, 2024
070b595
Make first_value and last_value identical in the interface
timsaucer Aug 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ target
/docs/temp
/docs/build
.DS_Store
.vscode

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down Expand Up @@ -31,3 +32,6 @@ apache-rat-*.jar
CHANGELOG.md.bak

docs/mdbook/book

.pyo3_build_config

2 changes: 2 additions & 0 deletions docs/source/user-guide/common-operations/aggregations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
.. specific language governing permissions and limitations
.. under the License.

.. _aggregation:

Aggregation
============

Expand Down
187 changes: 156 additions & 31 deletions docs/source/user-guide/common-operations/windows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
.. specific language governing permissions and limitations
.. under the License.

.. _window_functions:

Window Functions
================

In this section you will learn about window functions. A window function utilizes values from one or multiple rows to
produce a result for each individual row, unlike an aggregate function that provides a single value for multiple rows.
In this section you will learn about window functions. A window function utilizes values from one or
multiple rows to produce a result for each individual row, unlike an aggregate function that
provides a single value for multiple rows.

The functionality of window functions in DataFusion is supported by the dedicated :py:func:`~datafusion.functions.window` function.
The window functions are availble in the :py:mod:`~datafusion.functions` module.

We'll use the pokemon dataset (from Ritchie Vink) in the following examples.

Expand All @@ -40,54 +43,176 @@ We'll use the pokemon dataset (from Ritchie Vink) in the following examples.
ctx = SessionContext()
df = ctx.read_csv("pokemon.csv")

Here is an example that shows how to compare each pokemons’s attack power with the average attack power in its ``"Type 1"``
Here is an example that shows how you can compare each pokemon's speed to the speed of the
previous row in the DataFrame.

.. ipython:: python

df.select(
col('"Name"'),
col('"Attack"'),
f.alias(
f.window("avg", [col('"Attack"')], partition_by=[col('"Type 1"')]),
"Average Attack",
)
col('"Speed"'),
f.lag(col('"Speed"')).alias("Previous Speed")
)

You can also control the order in which rows are processed by window functions by providing
Setting Parameters
------------------


Ordering
^^^^^^^^

You can control the order in which rows are processed by window functions by providing
a list of ``order_by`` functions for the ``order_by`` parameter.

.. ipython:: python

df.select(
col('"Name"'),
col('"Attack"'),
f.alias(
f.window(
"rank",
[],
partition_by=[col('"Type 1"')],
order_by=[f.order_by(col('"Attack"'))],
),
"rank",
),
col('"Type 1"'),
f.rank(
partition_by=[col('"Type 1"')],
order_by=[col('"Attack"').sort(ascending=True)],
).alias("rank"),
).sort(col('"Type 1"'), col('"Attack"'))

Partitions
^^^^^^^^^^

A window function can take a list of ``partition_by`` columns similar to an
:ref:`Aggregation Function<aggregation>`. This will cause the window values to be evaluated
independently for each of the partitions. In the example above, we found the rank of each
Pokemon per ``Type 1`` partitions. We can see the first couple of each partition if we do
the following:

.. ipython:: python

df.select(
col('"Name"'),
col('"Attack"'),
col('"Type 1"'),
f.rank(
partition_by=[col('"Type 1"')],
order_by=[col('"Attack"').sort(ascending=True)],
).alias("rank"),
).filter(col("rank") < lit(3)).sort(col('"Type 1"'), col("rank"))

Window Frame
^^^^^^^^^^^^

When using aggregate functions, the Window Frame of defines the rows over which it operates.
If you do not specify a Window Frame, the frame will be set depending on the following
criteria.

* If an ``order_by`` clause is set, the default window frame is defined as the rows between
unbounded preceeding and the current row.
* If an ``order_by`` is not set, the default frame is defined as the rows betwene unbounded
and unbounded following (the entire partition).

Window Frames are defined by three parameters: unit type, starting bound, and ending bound.

The unit types available are:

* Rows: The starting and ending boundaries are defined by the number of rows relative to the
current row.
* Range: When using Range, the ``order_by`` clause must have exactly one term. The boundaries
are defined bow how close the rows are to the value of the expression in the ``order_by``
parameter.
* Groups: A "group" is the set of all rows that have equivalent values for all terms in the
``order_by`` clause.

In this example we perform a "rolling average" of the speed of the current Pokemon and the
two preceeding rows.

.. ipython:: python

from datafusion.expr import WindowFrame

df.select(
col('"Name"'),
col('"Speed"'),
f.window("avg",
[col('"Speed"')],
order_by=[col('"Speed"')],
window_frame=WindowFrame("rows", 2, 0)
).alias("Previous Speed")
)

Null Treatment
^^^^^^^^^^^^^^

When using aggregate functions as window functions, it is often useful to specify how null values
should be treated. In order to do this you need to use the builder function. In future releases
we expect this to be simplified in the interface.

One common usage for handling nulls is the case where you want to find the last value up to the
current row. In the following example we demonstrate how setting the null treatment to ignore
nulls will fill in with the value of the most recent non-null row. To do this, we also will set
the window frame so that we only process up to the current row.

In this example, we filter down to one specific type of Pokemon that does have some entries in
it's ``Type 2`` column that are null.

.. ipython:: python

from datafusion.common import NullTreatment

df.filter(col('"Type 1"') == lit("Bug")).select(
'"Name"',
'"Type 2"',
f.window("last_value", [col('"Type 2"')])
.window_frame(WindowFrame("rows", None, 0))
.order_by(col('"Speed"'))
.null_treatment(NullTreatment.IGNORE_NULLS)
.build()
.alias("last_wo_null"),
f.window("last_value", [col('"Type 2"')])
.window_frame(WindowFrame("rows", None, 0))
.order_by(col('"Speed"'))
.null_treatment(NullTreatment.RESPECT_NULLS)
.build()
.alias("last_with_null")
)

Aggregate Functions
-------------------

You can use any :ref:`Aggregation Function<aggregation>` as a window function. Currently
aggregate functions must use the deprecated
:py:func:`datafusion.functions.window` API but this should be resolved in
DataFusion 42.0 (`Issue Link <https://github.com/apache/datafusion-python/issues/833>`_). Here
is an example that shows how to compare each pokemons’s attack power with the average attack
power in its ``"Type 1"`` using the :py:func:`datafusion.functions.avg` function.

.. ipython:: python
:okwarning:

df.select(
col('"Name"'),
col('"Attack"'),
col('"Type 1"'),
f.window("avg", [col('"Attack"')])
.partition_by(col('"Type 1"'))
.build()
.alias("Average Attack"),
)

Available Functions
-------------------

The possible window functions are:

1. Rank Functions
- rank
- dense_rank
- row_number
- ntile
- :py:func:`datafusion.functions.rank`
- :py:func:`datafusion.functions.dense_rank`
- :py:func:`datafusion.functions.ntile`
- :py:func:`datafusion.functions.row_number`

2. Analytical Functions
- cume_dist
- percent_rank
- lag
- lead
- first_value
- last_value
- nth_value
- :py:func:`datafusion.functions.cume_dist`
- :py:func:`datafusion.functions.percent_rank`
- :py:func:`datafusion.functions.lag`
- :py:func:`datafusion.functions.lead`

3. Aggregate Functions
- All aggregate functions can be used as window functions.
- All :ref:`Aggregation Functions<aggregation>` can be used as window functions.
7 changes: 3 additions & 4 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,10 @@ def select(self, *exprs: Expr | str) -> DataFrame:
df = df.select("a", col("b"), col("a").alias("alternate_a"))

"""
exprs = [
arg.expr if isinstance(arg, Expr) else Expr.column(arg).expr
for arg in exprs
exprs_internal = [
Expr.column(arg).expr if isinstance(arg, str) else arg.expr for arg in exprs
]
return DataFrame(self.df.select(*exprs))
return DataFrame(self.df.select(*exprs_internal))

def filter(self, *predicates: Expr) -> DataFrame:
"""Return a DataFrame for which ``predicate`` evaluates to ``True``.
Expand Down
Loading
Loading