From d8638273cf5e59acadffa00a0c714fa114cb2fe5 Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Fri, 19 Jul 2024 23:53:05 +0200 Subject: [PATCH 1/4] feat: pyarrow join methods --- narwhals/_arrow/dataframe.py | 34 +++++++++++++++-------- narwhals/_arrow/expr.py | 3 ++ narwhals/_arrow/series.py | 4 +++ narwhals/_pandas_like/dataframe.py | 2 +- narwhals/_pandas_like/utils.py | 29 ------------------- narwhals/utils.py | 29 +++++++++++++++++++ tests/expr_and_series/fill_null_test.py | 7 +---- tests/frame/join_test.py | 37 ++++++------------------- utils/check_backend_completeness.py | 1 - 9 files changed, 69 insertions(+), 77 deletions(-) diff --git a/narwhals/_arrow/dataframe.py b/narwhals/_arrow/dataframe.py index 94dccd0c53..7526e528a3 100644 --- a/narwhals/_arrow/dataframe.py +++ b/narwhals/_arrow/dataframe.py @@ -14,6 +14,7 @@ from narwhals.dependencies import get_pyarrow from narwhals.dependencies import get_pyarrow_parquet from narwhals.utils import flatten +from narwhals.utils import generate_unique_token if TYPE_CHECKING: from typing_extensions import Self @@ -208,7 +209,7 @@ def join( self, other: Self, *, - how: Literal["inner"] = "inner", + how: Literal["left", "inner", "outer", "cross", "anti", "semi"] = "inner", left_on: str | list[str] | None, right_on: str | list[str] | None, ) -> Self: @@ -217,24 +218,35 @@ def join( if isinstance(right_on, str): right_on = [right_on] - if how == "cross": # type: ignore[comparison-overlap] - raise NotImplementedError - - if how == "anti": # type: ignore[comparison-overlap] - raise NotImplementedError + how_to_join_map = { + "anti": "left anti", + "semi": "left semi", + "inner": "inner", + "left": "left outer", + } - if how == "semi": # type: ignore[comparison-overlap] - raise NotImplementedError + if how == "cross": + plx = self.__narwhals_namespace__() + key_token = generate_unique_token( + n_bytes=8, columns=[*self.columns, *other.columns] + ) - if how == "left": # type: ignore[comparison-overlap] - raise NotImplementedError + return self._from_native_dataframe( + self.with_columns(**{key_token: plx.lit(0, None)})._native_dataframe.join( + other.with_columns(**{key_token: plx.lit(0, None)})._native_dataframe, + keys=key_token, + right_keys=key_token, + join_type="inner", + right_suffix="_right", + ), + ).drop(key_token) return self._from_native_dataframe( self._native_dataframe.join( other._native_dataframe, keys=left_on, right_keys=right_on, - join_type=how, + join_type=how_to_join_map[how], right_suffix="_right", ), ) diff --git a/narwhals/_arrow/expr.py b/narwhals/_arrow/expr.py index f8bfbffce5..410647a4b4 100644 --- a/narwhals/_arrow/expr.py +++ b/narwhals/_arrow/expr.py @@ -212,6 +212,9 @@ def sample( self, "sample", n=n, fraction=fraction, with_replacement=with_replacement ) + def fill_null(self: Self, value: Any) -> Self: + return reuse_series_implementation(self, "fill_null", value=value) + @property def dt(self) -> ArrowExprDateTimeNamespace: return ArrowExprDateTimeNamespace(self) diff --git a/narwhals/_arrow/series.py b/narwhals/_arrow/series.py index c287dd81f0..df617f90f6 100644 --- a/narwhals/_arrow/series.py +++ b/narwhals/_arrow/series.py @@ -314,6 +314,10 @@ def sample( mask = np.random.choice(idx, size=n, replace=with_replacement) return self._from_native_series(pc.take(ser, mask)) + def fill_null(self: Self, value: Any) -> Self: + pc = get_pyarrow_compute() + return self._from_native_series(pc.fill_null(self._native_series, value)) + @property def shape(self) -> tuple[int]: return (len(self._native_series),) diff --git a/narwhals/_pandas_like/dataframe.py b/narwhals/_pandas_like/dataframe.py index fa6e707ca7..6606e8f154 100644 --- a/narwhals/_pandas_like/dataframe.py +++ b/narwhals/_pandas_like/dataframe.py @@ -13,7 +13,6 @@ from narwhals._pandas_like.expr import PandasLikeExpr from narwhals._pandas_like.utils import Implementation from narwhals._pandas_like.utils import create_native_series -from narwhals._pandas_like.utils import generate_unique_token from narwhals._pandas_like.utils import horizontal_concat from narwhals._pandas_like.utils import translate_dtype from narwhals._pandas_like.utils import validate_dataframe_comparand @@ -23,6 +22,7 @@ from narwhals.dependencies import get_numpy from narwhals.dependencies import get_pandas from narwhals.utils import flatten +from narwhals.utils import generate_unique_token if TYPE_CHECKING: from typing_extensions import Self diff --git a/narwhals/_pandas_like/utils.py b/narwhals/_pandas_like/utils.py index 3548966ca9..b493cfed2a 100644 --- a/narwhals/_pandas_like/utils.py +++ b/narwhals/_pandas_like/utils.py @@ -1,6 +1,5 @@ from __future__ import annotations -import secrets from enum import Enum from enum import auto from typing import TYPE_CHECKING @@ -458,31 +457,3 @@ def int_dtype_mapper(dtype: Any) -> str: if str(dtype).lower() != str(dtype): # pragma: no cover return "Int64" return "int64" - - -def generate_unique_token(n_bytes: int, columns: list[str]) -> str: # pragma: no cover - """Generates a unique token of specified n_bytes that is not present in the given list of columns. - - Arguments: - n_bytes : The number of bytes to generate for the token. - columns : The list of columns to check for uniqueness. - - Returns: - A unique token that is not present in the given list of columns. - - Raises: - AssertionError: If a unique token cannot be generated after 100 attempts. - """ - counter = 0 - while True: - token = secrets.token_hex(n_bytes) - if token not in columns: - return token - - counter += 1 - if counter > 100: - msg = ( - "Internal Error: Narwhals was not able to generate a column name to perform cross " - "join operation" - ) - raise AssertionError(msg) diff --git a/narwhals/utils.py b/narwhals/utils.py index 1f6a9074a3..19005b6876 100644 --- a/narwhals/utils.py +++ b/narwhals/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import re +import secrets from typing import TYPE_CHECKING from typing import Any from typing import Iterable @@ -336,3 +337,31 @@ def is_ordered_categorical(series: Series) -> bool: return native_series.type.ordered # type: ignore[no-any-return] # If it doesn't match any of the above, let's just play it safe and return False. return False # pragma: no cover + + +def generate_unique_token(n_bytes: int, columns: list[str]) -> str: # pragma: no cover + """Generates a unique token of specified n_bytes that is not present in the given list of columns. + + Arguments: + n_bytes : The number of bytes to generate for the token. + columns : The list of columns to check for uniqueness. + + Returns: + A unique token that is not present in the given list of columns. + + Raises: + AssertionError: If a unique token cannot be generated after 100 attempts. + """ + counter = 0 + while True: + token = secrets.token_hex(n_bytes) + if token not in columns: + return token + + counter += 1 + if counter > 100: + msg = ( + "Internal Error: Narwhals was not able to generate a column name to perform cross " + "join operation" + ) + raise AssertionError(msg) diff --git a/tests/expr_and_series/fill_null_test.py b/tests/expr_and_series/fill_null_test.py index 5cffcddd7a..dc9cc738ed 100644 --- a/tests/expr_and_series/fill_null_test.py +++ b/tests/expr_and_series/fill_null_test.py @@ -1,7 +1,5 @@ from typing import Any -import pytest - import narwhals.stable.v1 as nw from tests.utils import compare_dicts @@ -12,10 +10,7 @@ } -def test_fill_null(request: Any, constructor: Any) -> None: - if "pyarrow_table" in str(constructor): - request.applymarker(pytest.mark.xfail) - +def test_fill_null(constructor: Any) -> None: df = nw.from_native(constructor(data), eager_only=True) result = df.with_columns(nw.col("a", "b", "c").fill_null(99)) diff --git a/tests/frame/join_test.py b/tests/frame/join_test.py index 9ba119916b..ab0bbe18a7 100644 --- a/tests/frame/join_test.py +++ b/tests/frame/join_test.py @@ -35,15 +35,11 @@ def test_inner_join_single_key(constructor: Any) -> None: compare_dicts(result, expected) -def test_cross_join(request: Any, constructor: Any) -> None: - if "pyarrow_table" in str(constructor): - request.applymarker(pytest.mark.xfail) - +def test_cross_join(constructor: Any) -> None: data = {"a": [1, 3, 2]} df = nw.from_native(constructor(data)) - result = df.join(df, how="cross") # type: ignore[arg-type] - - expected = {"a": [1, 1, 1, 3, 3, 3, 2, 2, 2], "a_right": [1, 3, 2, 1, 3, 2, 1, 3, 2]} + result = df.join(df, how="cross").sort("a", "a_right") # type: ignore[arg-type] + expected = {"a": [1, 1, 1, 2, 2, 2, 3, 3, 3], "a_right": [1, 2, 3, 1, 2, 3, 1, 2, 3]} compare_dicts(result, expected) with pytest.raises(ValueError, match="Can not pass left_on, right_on for cross join"): @@ -69,15 +65,11 @@ def test_cross_join_non_pandas() -> None: ], ) def test_anti_join( - request: Any, constructor: Any, join_key: list[str], filter_expr: nw.Expr, expected: dict[str, list[Any]], ) -> None: - if "pyarrow_table" in str(constructor): - request.applymarker(pytest.mark.xfail) - data = {"a": [1, 3, 2], "b": [4, 4, 6], "z": [7.0, 8, 9]} df = nw.from_native(constructor(data)) other = df.filter(filter_expr) @@ -94,15 +86,11 @@ def test_anti_join( ], ) def test_semi_join( - request: Any, constructor: Any, join_key: list[str], filter_expr: nw.Expr, expected: dict[str, list[Any]], ) -> None: - if "pyarrow_table" in str(constructor): - request.applymarker(pytest.mark.xfail) - data = {"a": [1, 3, 2], "b": [4, 4, 6], "z": [7.0, 8, 9]} df = nw.from_native(constructor(data)) other = df.filter(filter_expr) @@ -125,10 +113,7 @@ def test_join_not_implemented(constructor: Any, how: str) -> None: @pytest.mark.filterwarnings("ignore:the default coalesce behavior") -def test_left_join(request: Any, constructor: Any) -> None: - if "pyarrow_table" in str(constructor): - request.applymarker(pytest.mark.xfail) - +def test_left_join(constructor: Any) -> None: data_left = {"a": [1.0, 2, 3], "b": [4.0, 5, 6]} data_right = {"a": [1.0, 2, 3], "c": [4.0, 5, 7]} df_left = nw.from_native(constructor(data_left), eager_only=True) @@ -141,10 +126,7 @@ def test_left_join(request: Any, constructor: Any) -> None: @pytest.mark.filterwarnings("ignore: the default coalesce behavior") -def test_left_join_multiple_column(request: Any, constructor: Any) -> None: - if "pyarrow_table" in str(constructor): - request.applymarker(pytest.mark.xfail) - +def test_left_join_multiple_column(constructor: Any) -> None: data_left = {"a": [1, 2, 3], "b": [4, 5, 6]} data_right = {"a": [1, 2, 3], "c": [4, 5, 6]} df_left = nw.from_native(constructor(data_left), eager_only=True) @@ -155,12 +137,9 @@ def test_left_join_multiple_column(request: Any, constructor: Any) -> None: @pytest.mark.filterwarnings("ignore: the default coalesce behavior") -def test_left_join_overlapping_column(request: Any, constructor: Any) -> None: - if "pyarrow_table" in str(constructor): - request.applymarker(pytest.mark.xfail) - - data_left = {"a": [1, 2, 3], "b": [4, 5, 6], "d": [1, 4, 2]} - data_right = {"a": [1, 2, 3], "c": [4, 5, 6], "d": [1, 4, 2]} +def test_left_join_overlapping_column(constructor: Any) -> None: + data_left = {"a": [1.0, 2, 3], "b": [4.0, 5, 6], "d": [1.0, 4, 2]} + data_right = {"a": [1.0, 2, 3], "c": [4.0, 5, 6], "d": [1.0, 4, 2]} df_left = nw.from_native(constructor(data_left), eager_only=True) df_right = nw.from_native(constructor(data_right), eager_only=True) result = df_left.join(df_right, left_on="b", right_on="c", how="left") diff --git a/utils/check_backend_completeness.py b/utils/check_backend_completeness.py index 90506dc4b1..aeedd63dfa 100644 --- a/utils/check_backend_completeness.py +++ b/utils/check_backend_completeness.py @@ -18,7 +18,6 @@ "DataFrame.pipe", "DataFrame.unique", "Series.drop_nulls", - "Series.fill_null", "Series.from_iterable", "Series.is_between", "Series.is_duplicated", From 8dd7815c3931d5b83330e797b18a80c7bd05d072 Mon Sep 17 00:00:00 2001 From: FBruzzesi Date: Fri, 19 Jul 2024 23:57:26 +0200 Subject: [PATCH 2/4] fill_null include type --- narwhals/_arrow/series.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/narwhals/_arrow/series.py b/narwhals/_arrow/series.py index df617f90f6..9fea6d8381 100644 --- a/narwhals/_arrow/series.py +++ b/narwhals/_arrow/series.py @@ -315,8 +315,12 @@ def sample( return self._from_native_series(pc.take(ser, mask)) def fill_null(self: Self, value: Any) -> Self: + pa = get_pyarrow() pc = get_pyarrow_compute() - return self._from_native_series(pc.fill_null(self._native_series, value)) + ser = self._native_series + dtype = ser.type + + return self._from_native_series(pc.fill_null(ser, pa.scalar(value, dtype))) @property def shape(self) -> tuple[int]: From 06e26c23f8efc10bcb4b45ce8f9ca3503bd37e80 Mon Sep 17 00:00:00 2001 From: Marco Gorelli <33491632+MarcoGorelli@users.noreply.github.com> Date: Sat, 20 Jul 2024 22:03:47 +0100 Subject: [PATCH 3/4] add extra hypothesis test cause im paranoid --- tests/hypothesis/test_join.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/hypothesis/test_join.py b/tests/hypothesis/test_join.py index d5db53e3bf..850caa1a34 100644 --- a/tests/hypothesis/test_join.py +++ b/tests/hypothesis/test_join.py @@ -2,6 +2,7 @@ import pandas as pd import polars as pl +import pyarrow as pa import pytest from hypothesis import assume from hypothesis import given @@ -163,3 +164,18 @@ def test_left_join( # pragma: no cover ) ).select(pl.all().fill_null(float("nan"))) compare_dicts(result_pd.to_dict(as_series=False), result_pl.to_dict(as_series=False)) + # For PyArrow, insert an extra sort, as the order of rows isn't guaranteed + result_pa = ( + nw.from_native(pa.table(data_left), eager_only=True) + .join( + nw.from_native(pa.table(data_right), eager_only=True), + how="left", + left_on=left_key, + right_on=right_key, + ) + .select(nw.all().cast(nw.Float64).fill_null(float("nan"))) + .pipe(lambda df: df.sort(df.columns)) + ) + compare_dicts( + result_pa, result_pd.pipe(lambda df: df.sort(df.columns)).to_dict(as_series=False) + ) From 7d7a30e01f0dc2a635217fd6a2dfbdb41b14df2a Mon Sep 17 00:00:00 2001 From: Marco Gorelli <33491632+MarcoGorelli@users.noreply.github.com> Date: Sat, 20 Jul 2024 22:10:32 +0100 Subject: [PATCH 4/4] fix typo in err msg --- narwhals/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/narwhals/utils.py b/narwhals/utils.py index 0513b0a0b7..c3ab3b0600 100644 --- a/narwhals/utils.py +++ b/narwhals/utils.py @@ -350,7 +350,7 @@ def generate_unique_token(n_bytes: int, columns: list[str]) -> str: # pragma: n counter += 1 if counter > 100: msg = ( - "Internal Error: Narwhals was not able to generate a column name to perform cross " + "Internal Error: Narwhals was not able to generate a column name to perform given " "join operation" ) raise AssertionError(msg)