Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
8a55317
chore: rename changes_length to filtration
MarcoGorelli Feb 25, 2025
4cda373
hey this works
MarcoGorelli Feb 25, 2025
bc6fb60
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Feb 25, 2025
78320eb
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 1, 2025
ba8fc2a
passes for sqlframme
MarcoGorelli Mar 1, 2025
4251f55
pandas working too!
MarcoGorelli Mar 1, 2025
bb144b8
robustness
MarcoGorelli Mar 1, 2025
3f154e3
wip
MarcoGorelli Mar 1, 2025
1437b6d
wip
MarcoGorelli Mar 1, 2025
3344eb7
wip
MarcoGorelli Mar 1, 2025
ccc135d
support for polars
MarcoGorelli Mar 1, 2025
945dde8
avoid doing 2 sorts in pandas implementation
MarcoGorelli Mar 2, 2025
e2d9cd6
tpch
MarcoGorelli Mar 2, 2025
64051d3
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 2, 2025
5c2b2e2
amazing
MarcoGorelli Mar 2, 2025
4abe290
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 2, 2025
8a9b1b4
do it for pyarrow too
MarcoGorelli Mar 2, 2025
70e23ac
simplify
MarcoGorelli Mar 2, 2025
543e686
wip
MarcoGorelli Mar 2, 2025
0a556a5
fixup pandas reverse
MarcoGorelli Mar 2, 2025
9449066
xfail some dask
MarcoGorelli Mar 2, 2025
5f35d22
old polars
MarcoGorelli Mar 2, 2025
87e9dfe
fixup
MarcoGorelli Mar 2, 2025
d02c9eb
silence modins noise
MarcoGorelli Mar 2, 2025
d597d5a
old pandas compat
MarcoGorelli Mar 2, 2025
bff5e5b
old pandas compat
MarcoGorelli Mar 2, 2025
b1491ba
arrow fix
MarcoGorelli Mar 2, 2025
cdb4eb1
wip
MarcoGorelli Mar 2, 2025
e954e77
coverage
MarcoGorelli Mar 2, 2025
eab25f9
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 2, 2025
47923e3
lint
MarcoGorelli Mar 2, 2025
d1c1e4d
pandas fixup
MarcoGorelli Mar 2, 2025
01136c8
pandas cross-version fix
MarcoGorelli Mar 3, 2025
b9a4885
modin xfail
MarcoGorelli Mar 3, 2025
4b67964
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 3, 2025
e3d32aa
support at least cum_sum(order_by) for dask
MarcoGorelli Mar 3, 2025
b371bb6
just raise for dask
MarcoGorelli Mar 3, 2025
e2ad3c6
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 4, 2025
fd1fad4
wip broken
MarcoGorelli Mar 4, 2025
24f0961
post-merge fixup
MarcoGorelli Mar 4, 2025
15e89f5
wip
MarcoGorelli Mar 4, 2025
60162d1
document dask limitation
MarcoGorelli Mar 4, 2025
a7c71d6
correct xfail
MarcoGorelli Mar 4, 2025
3ef02ea
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 5, 2025
fe2118d
use empty string in one test
MarcoGorelli Mar 5, 2025
816f964
correct pandas logic for order_by with nulls
MarcoGorelli Mar 5, 2025
efc4862
pandas fix ungrouped case
MarcoGorelli Mar 5, 2025
390ba3f
tmp: working for pyarrow, but does 2 sorts. can we do just one?
MarcoGorelli Mar 5, 2025
1d3b8c3
more generic solution (supports duplicates in index)
MarcoGorelli Mar 5, 2025
ae72db4
hey, we really can avoid double-sorting for pandas1
MarcoGorelli Mar 5, 2025
744fb7b
hey, we really can avoid double-sorting for pandas1
MarcoGorelli Mar 5, 2025
0d30782
add comment about double-sort
MarcoGorelli Mar 5, 2025
d5ef86a
sqlframe/pyspark fix
MarcoGorelli Mar 5, 2025
13b9474
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 5, 2025
ba8dab1
cross-version compat
MarcoGorelli Mar 5, 2025
91978fe
more cross-version compat
MarcoGorelli Mar 5, 2025
4710b1d
dask
MarcoGorelli Mar 5, 2025
200015c
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 6, 2025
8ff5635
factor out scatter_in_place
MarcoGorelli Mar 6, 2025
d40dc23
coverage
MarcoGorelli Mar 6, 2025
5457898
correct comment
MarcoGorelli Mar 6, 2025
d3e0573
old pandas fix
MarcoGorelli Mar 6, 2025
335cc8b
Merge remote-tracking branch 'upstream/main' into start-order-dependence
MarcoGorelli Mar 7, 2025
a516fef
use windowfunction type alias
MarcoGorelli Mar 7, 2025
7f15b9e
simplify pandas implementation a bit
MarcoGorelli Mar 7, 2025
8bf77f4
make sure that non-elementary expressions are supported if partition_…
MarcoGorelli Mar 7, 2025
c1c6de2
skip for old polars
MarcoGorelli Mar 7, 2025
824d1b8
include missing file
MarcoGorelli Mar 7, 2025
802fa09
xfail
MarcoGorelli Mar 7, 2025
67700a1
Merge branch 'main' into start-order-dependence
MarcoGorelli Mar 8, 2025
60ca848
fix typing
MarcoGorelli Mar 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions narwhals/_arrow/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,8 @@ def join(
other: Self,
*,
how: Literal["left", "inner", "cross", "anti", "semi"],
left_on: list[str] | None,
right_on: list[str] | None,
left_on: Sequence[str] | None,
right_on: Sequence[str] | None,
suffix: str,
) -> Self:
how_to_join_map: dict[str, JoinType] = {
Expand Down Expand Up @@ -442,8 +442,8 @@ def join(
return self._from_native_frame(
self._native_frame.join(
other._native_frame,
keys=left_on or [],
right_keys=right_on,
keys=left_on or [], # type: ignore[arg-type]
right_keys=right_on, # type: ignore[arg-type]
join_type=how_to_join_map[how],
right_suffix=suffix,
),
Expand Down
70 changes: 51 additions & 19 deletions narwhals/_arrow/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from typing import Mapping
from typing import Sequence

import pyarrow.compute as pc

from narwhals._arrow.expr_cat import ArrowExprCatNamespace
from narwhals._arrow.expr_dt import ArrowExprDateTimeNamespace
from narwhals._arrow.expr_list import ArrowExprListNamespace
Expand All @@ -22,6 +24,7 @@
from narwhals.exceptions import ColumnNotFoundError
from narwhals.typing import CompliantExpr
from narwhals.utils import Implementation
from narwhals.utils import generate_temporary_column_name
from narwhals.utils import not_implemented

if TYPE_CHECKING:
Expand Down Expand Up @@ -423,28 +426,57 @@ def clip(self: Self, lower_bound: Any | None, upper_bound: Any | None) -> Self:
self, "clip", lower_bound=lower_bound, upper_bound=upper_bound
)

def over(self: Self, keys: Sequence[str], kind: ExprKind) -> Self:
if not is_scalar_like(kind):
msg = "Only aggregation or literal operations are supported in `over` context for PyArrow."
def over(
self: Self,
partition_by: Sequence[str],
kind: ExprKind,
order_by: Sequence[str] | None,
) -> Self:
if partition_by and not is_scalar_like(kind):
msg = "Only aggregation or literal operations are supported in grouped `over` context for PyArrow."
raise NotImplementedError(msg)

def func(df: ArrowDataFrame) -> list[ArrowSeries]:
output_names, aliases = evaluate_output_names_and_aliases(self, df, [])
if overlap := set(output_names).intersection(keys):
# E.g. `df.select(nw.all().sum().over('a'))`. This is well-defined,
# we just don't support it yet.
msg = (
f"Column names {overlap} appear in both expression output names and in `over` keys.\n"
"This is not yet supported."
if not partition_by:
# e.g. `nw.col('a').cum_sum().order_by(key)`
# which we can always easily support, as it doesn't require grouping.
assert order_by is not None # help type checkers # noqa: S101

def func(df: ArrowDataFrame) -> Sequence[ArrowSeries]:
token = generate_temporary_column_name(8, df.columns)
df = df.with_row_index(token).sort(
*order_by, descending=False, nulls_last=False
)
result = self(df)
# TODO(marco): is there a way to do this efficiently without
# doing 2 sorts? Here we're sorting the dataframe and then
# again calling `sort_indices`. `ArrowSeries.scatter` would also sort.
sorting_indices = pc.sort_indices(df[token]._native_series) # type: ignore[call-overload]
return [
ser._from_native_series(pc.take(ser._native_series, sorting_indices)) # type: ignore[call-overload]
for ser in result
]
else:

def func(df: ArrowDataFrame) -> Sequence[ArrowSeries]:
output_names, aliases = evaluate_output_names_and_aliases(self, df, [])
if overlap := set(output_names).intersection(partition_by):
# E.g. `df.select(nw.all().sum().over('a'))`. This is well-defined,
# we just don't support it yet.
msg = (
f"Column names {overlap} appear in both expression output names and in `over` keys.\n"
"This is not yet supported."
)
raise NotImplementedError(msg)

tmp = df.group_by(*partition_by, drop_null_keys=False).agg(self)
tmp = df.simple_select(*partition_by).join(
tmp,
how="left",
left_on=partition_by,
right_on=partition_by,
suffix="_right",
)
raise NotImplementedError(msg)

tmp = df.group_by(*keys, drop_null_keys=False).agg(self)
on = list(keys)
tmp = df.simple_select(*keys).join(
tmp, how="left", left_on=on, right_on=on, suffix="_right"
)
return [tmp[alias] for alias in aliases]
return [tmp[alias] for alias in aliases]

return self.__class__(
func,
Expand Down
70 changes: 45 additions & 25 deletions narwhals/_dask/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from narwhals.utils import not_implemented

if TYPE_CHECKING:
from narwhals._expression_parsing import ExprKind

try:
import dask.dataframe.dask_expr as dx
except ModuleNotFoundError:
Expand Down Expand Up @@ -343,6 +345,7 @@ def shift(self: Self, n: int) -> Self:

def cum_sum(self: Self, *, reverse: bool) -> Self:
if reverse: # pragma: no cover
# https://github.com/dask/dask/issues/11802
msg = "`cum_sum(reverse=True)` is not supported with Dask backend"
raise NotImplementedError(msg)

Expand Down Expand Up @@ -533,41 +536,58 @@ def null_count(self: Self) -> Self:
lambda _input: _input.isna().sum().to_series(), "null_count"
)

def over(self: Self, keys: Sequence[str], kind: ExprKind) -> Self:
def over(
self: Self,
partition_by: Sequence[str],
kind: ExprKind,
order_by: Sequence[str] | None,
) -> Self:
# pandas is a required dependency of dask so it's safe to import this
from narwhals._pandas_like.group_by import AGGREGATIONS_TO_PANDAS_EQUIVALENT

if not is_elementary_expression(self): # pragma: no cover
if not partition_by:
assert order_by is not None # help type checkers # noqa: S101

# This is something like `nw.col('a').cum_sum().order_by(key)`
# which we can always easily support, as it doesn't require grouping.
def func(df: DaskLazyFrame) -> Sequence[dx.Series]:
return self(df.sort(*order_by, descending=False, nulls_last=False))
elif not is_elementary_expression(self): # pragma: no cover
msg = (
"Only elementary expressions are supported for `.over` in dask.\n\n"
"Please see: "
"https://narwhals-dev.github.io/narwhals/pandas_like_concepts/improve_group_by_operation/"
)
raise NotImplementedError(msg)
function_name = re.sub(r"(\w+->)", "", self._function_name)
try:
dask_function_name = AGGREGATIONS_TO_PANDAS_EQUIVALENT[function_name]
except KeyError:
msg = (
f"Unsupported function: {function_name} in `over` context.\n\n."
f"Supported functions are {', '.join(AGGREGATIONS_TO_PANDAS_EQUIVALENT)}\n"
)
raise NotImplementedError(msg) from None

def func(df: DaskLazyFrame) -> list[dx.Series]:
output_names, aliases = evaluate_output_names_and_aliases(self, df, [])

with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message=".*`meta` is not specified", category=UserWarning
)
res_native = df._native_frame.groupby(keys)[list(output_names)].transform(
dask_function_name, **self._call_kwargs
else:
function_name = re.sub(r"(\w+->)", "", self._function_name)
try:
dask_function_name = AGGREGATIONS_TO_PANDAS_EQUIVALENT[function_name]
except KeyError:
# window functions are unsupported: https://github.com/dask/dask/issues/11806
msg = (
f"Unsupported function: {function_name} in `over` context.\n\n."
f"Supported functions are {', '.join(AGGREGATIONS_TO_PANDAS_EQUIVALENT)}\n"
)
result_frame = df._from_native_frame(
res_native.rename(columns=dict(zip(output_names, aliases)))
)._native_frame
return [result_frame[name] for name in aliases]
raise NotImplementedError(msg) from None

def func(df: DaskLazyFrame) -> Sequence[dx.Series]:
output_names, aliases = evaluate_output_names_and_aliases(self, df, [])

with warnings.catch_warnings():
# https://github.com/dask/dask/issues/11804
warnings.filterwarnings(
"ignore",
message=".*`meta` is not specified",
category=UserWarning,
)
res_native = df._native_frame.groupby(partition_by)[
list(output_names)
].transform(dask_function_name, **self._call_kwargs)
result_frame = df._from_native_frame(
res_native.rename(columns=dict(zip(output_names, aliases)))
)._native_frame
return [result_frame[name] for name in aliases]

return self.__class__(
func,
Expand Down
111 changes: 69 additions & 42 deletions narwhals/_pandas_like/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
from narwhals._pandas_like.expr_str import PandasLikeExprStringNamespace
from narwhals._pandas_like.group_by import AGGREGATIONS_TO_PANDAS_EQUIVALENT
from narwhals._pandas_like.series import PandasLikeSeries
from narwhals._pandas_like.utils import rename
from narwhals.dependencies import get_numpy
from narwhals.dependencies import is_numpy_array
from narwhals.exceptions import ColumnNotFoundError
from narwhals.typing import CompliantExpr
from narwhals.utils import generate_temporary_column_name

if TYPE_CHECKING:
from typing_extensions import Self
Expand Down Expand Up @@ -457,63 +457,90 @@ def alias_output_names(names: Sequence[str]) -> Sequence[str]:
call_kwargs=self._call_kwargs,
)

def over(self: Self, partition_by: Sequence[str], kind: ExprKind) -> Self:
if not is_elementary_expression(self):
def over(
self: Self,
partition_by: Sequence[str],
kind: ExprKind,
order_by: Sequence[str] | None,
) -> Self:
if not partition_by:
# e.g. `nw.col('a').cum_sum().order_by(key)`
# We can always easily support this as it doesn't require grouping.
assert order_by is not None # noqa: S101 # help type-check

def func(df: PandasLikeDataFrame) -> Sequence[PandasLikeSeries]:
token = generate_temporary_column_name(8, df.columns)
df = df.with_row_index(token).sort(
*order_by, descending=False, nulls_last=False
)
results = self(df)
sorting_indices = df[token]
for s in results:
s._scatter_in_place(sorting_indices, s)
return results
elif not is_elementary_expression(self):
msg = (
"Only elementary expressions are supported for `.over` in pandas-like backends.\n\n"
"Please see: "
"https://narwhals-dev.github.io/narwhals/pandas_like_concepts/improve_group_by_operation/"
)
raise NotImplementedError(msg)
function_name = re.sub(r"(\w+->)", "", self._function_name)
try:
pandas_function_name = WINDOW_FUNCTIONS_TO_PANDAS_EQUIVALENT[function_name]
except KeyError:
try:
pandas_function_name = AGGREGATIONS_TO_PANDAS_EQUIVALENT[function_name]
except KeyError:
else:
function_name: str = re.sub(r"(\w+->)", "", self._function_name)
if pandas_function_name := WINDOW_FUNCTIONS_TO_PANDAS_EQUIVALENT.get(
function_name, AGGREGATIONS_TO_PANDAS_EQUIVALENT.get(function_name, None)
):
pass
else:
msg = (
f"Unsupported function: {function_name} in `over` context.\n\n"
f"Supported functions are {', '.join(WINDOW_FUNCTIONS_TO_PANDAS_EQUIVALENT)}\n"
f"and {', '.join(AGGREGATIONS_TO_PANDAS_EQUIVALENT)}."
)
raise NotImplementedError(msg) from None

def func(df: PandasLikeDataFrame) -> list[PandasLikeSeries]:
output_names, aliases = evaluate_output_names_and_aliases(self, df, [])
raise NotImplementedError(msg)
pandas_kwargs = window_kwargs_to_pandas_equivalent(
function_name, self._call_kwargs
)

if function_name == "cum_count":
plx = self.__narwhals_namespace__()
df = df.with_columns(~plx.col(*output_names).is_null())
if function_name.startswith("cum_"):
reverse = self._call_kwargs["reverse"]
else:
assert "reverse" not in self._call_kwargs # debug assertion # noqa: S101
reverse = False
if reverse:
# Only select the columns we need to avoid reversing columns
# unnecessarily
columns = list(set(partition_by).union(output_names))
native_frame = df[columns]._native_frame[::-1]
else:
native_frame = df._native_frame
res_native = native_frame.groupby(partition_by)[list(output_names)].transform(
pandas_function_name, **pandas_kwargs
)
result_frame = df._from_native_frame(
rename(
res_native,
columns=dict(zip(output_names, aliases)),
implementation=self._implementation,
backend_version=self._backend_version,
def func(df: PandasLikeDataFrame) -> Sequence[PandasLikeSeries]:
output_names, aliases = evaluate_output_names_and_aliases(self, df, [])

if function_name == "cum_count":
plx = self.__narwhals_namespace__()
df = df.with_columns(~plx.col(*output_names).is_null())

if function_name.startswith("cum_"):
reverse = self._call_kwargs["reverse"]
else:
assert "reverse" not in self._call_kwargs # noqa: S101
reverse = False

if order_by:
columns = list(set(partition_by).union(output_names).union(order_by))
token = generate_temporary_column_name(8, columns)
df = (
df[columns]
.with_row_index(token)
.sort(*order_by, descending=reverse, nulls_last=reverse)
)
sorting_indices = df[token]
elif reverse:
columns = list(set(partition_by).union(output_names))
df = df[columns][::-1]
res_native = df._native_frame.groupby(partition_by)[
list(output_names)
].transform(pandas_function_name, **pandas_kwargs)
result_frame = df._from_native_frame(res_native).rename(
dict(zip(output_names, aliases))
)
)
if reverse:
return [result_frame[name][::-1] for name in aliases]
return [result_frame[name] for name in aliases]
results = [result_frame[name] for name in aliases]
if order_by:
for s in results:
s._scatter_in_place(sorting_indices, s)
return results
if reverse:
return [s[::-1] for s in results]
return results

return self.__class__(
func,
Expand Down
Loading
Loading