From 28a5f4278ffd9856e3d79365764807dc10567f15 Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Tue, 25 Mar 2025 14:21:45 +0000 Subject: [PATCH 1/6] chore: Update `_dask.dataframe` --- narwhals/_dask/dataframe.py | 101 +++++++++++++++--------------------- 1 file changed, 42 insertions(+), 59 deletions(-) diff --git a/narwhals/_dask/dataframe.py b/narwhals/_dask/dataframe.py index a6b11116e7..2a64ef01f0 100644 --- a/narwhals/_dask/dataframe.py +++ b/narwhals/_dask/dataframe.py @@ -71,34 +71,28 @@ def __narwhals_lazyframe__(self: Self) -> Self: def _change_version(self: Self, version: Version) -> Self: return self.__class__( - self._native_frame, - backend_version=self._backend_version, - version=version, + self.native, backend_version=self._backend_version, version=version ) def _from_native_frame(self: Self, df: Any) -> Self: return self.__class__( - df, - backend_version=self._backend_version, - version=self._version, + df, backend_version=self._backend_version, version=self._version ) def _iter_columns(self) -> Iterator[dx.Series]: - for _col, ser in self._native_frame.items(): # noqa: PERF102 + for _col, ser in self.native.items(): # noqa: PERF102 yield ser def with_columns(self: Self, *exprs: DaskExpr) -> Self: - df = self._native_frame new_series = evaluate_exprs(self, *exprs) - df = df.assign(**dict(new_series)) - return self._from_native_frame(df) + return self._from_native_frame(self.native.assign(**dict(new_series))) def collect( self: Self, backend: Implementation | None, **kwargs: Any, ) -> CompliantDataFrame[Any, Any, Any]: - result = self._native_frame.compute(**kwargs) + result = self.native.compute(**kwargs) if backend is None or backend is Implementation.PANDAS: from narwhals._pandas_like.dataframe import PandasLikeDataFrame @@ -143,14 +137,13 @@ def columns(self: Self) -> list[str]: def filter(self: Self, predicate: DaskExpr) -> Self: # `[0]` is safe as the predicate's expression only returns a single column - mask = predicate._call(self)[0] - - return self._from_native_frame(self._native_frame.loc[mask]) + mask = predicate(self)[0] + return self._from_native_frame(self.native.loc[mask]) def simple_select(self: Self, *column_names: str) -> Self: return self._from_native_frame( select_columns_by_name( - self._native_frame, + self.native, list(column_names), self._backend_version, self._implementation, @@ -165,7 +158,7 @@ def aggregate(self: Self, *exprs: DaskExpr) -> Self: def select(self: Self, *exprs: DaskExpr) -> Self: new_series = evaluate_exprs(self, *exprs) df = select_columns_by_name( - self._native_frame.assign(**dict(new_series)), + self.native.assign(**dict(new_series)), [s[0] for s in new_series], self._backend_version, self._implementation, @@ -174,19 +167,19 @@ def select(self: Self, *exprs: DaskExpr) -> Self: def drop_nulls(self: Self, subset: Sequence[str] | None) -> Self: if subset is None: - return self._from_native_frame(self._native_frame.dropna()) + return self._from_native_frame(self.native.dropna()) plx = self.__narwhals_namespace__() return self.filter(~plx.any_horizontal(plx.col(*subset).is_null())) @property def schema(self: Self) -> dict[str, DType]: if self._cached_schema is None: - native_dtypes = self._native_frame.dtypes + native_dtypes = self.native.dtypes self._cached_schema = { col: native_to_narwhals_dtype( native_dtypes[col], self._version, self._implementation ) - for col in self._native_frame.columns + for col in self.native.columns } return self._cached_schema @@ -198,23 +191,21 @@ def drop(self: Self, columns: Sequence[str], *, strict: bool) -> Self: compliant_frame=self, columns=columns, strict=strict ) - return self._from_native_frame(self._native_frame.drop(columns=to_drop)) + return self._from_native_frame(self.native.drop(columns=to_drop)) def with_row_index(self: Self, name: str) -> Self: # Implementation is based on the following StackOverflow reply: # https://stackoverflow.com/questions/60831518/in-dask-how-does-one-add-a-range-of-integersauto-increment-to-a-new-column/60852409#60852409 return self._from_native_frame( - add_row_index( - self._native_frame, name, self._backend_version, self._implementation - ) + add_row_index(self.native, name, self._backend_version, self._implementation) ) def rename(self: Self, mapping: Mapping[str, str]) -> Self: - return self._from_native_frame(self._native_frame.rename(columns=mapping)) + return self._from_native_frame(self.native.rename(columns=mapping)) def head(self: Self, n: int) -> Self: return self._from_native_frame( - self._native_frame.head(n=n, compute=False, npartitions=-1) + self.native.head(n=n, compute=False, npartitions=-1) ) def unique( @@ -224,17 +215,16 @@ def unique( keep: Literal["any", "none"], ) -> Self: check_column_exists(self.columns, subset) - native_frame = self._native_frame if keep == "none": subset = subset or self.columns token = generate_temporary_column_name(n_bytes=8, columns=subset) - ser = native_frame.groupby(subset).size().rename(token) + ser = self.native.groupby(subset).size().rename(token) ser = ser[ser == 1] unique = ser.reset_index().drop(columns=token) - result = native_frame.merge(unique, on=subset, how="inner") + result = self.native.merge(unique, on=subset, how="inner") else: mapped_keep = {"any": "first"}.get(keep, keep) - result = native_frame.drop_duplicates(subset=subset, keep=mapped_keep) + result = self.native.drop_duplicates(subset=subset, keep=mapped_keep) return self._from_native_frame(result) def sort( @@ -243,14 +233,13 @@ def sort( descending: bool | Sequence[bool], nulls_last: bool, ) -> Self: - df = self._native_frame if isinstance(descending, bool): ascending: bool | list[bool] = not descending else: ascending = [not d for d in descending] - na_position = "last" if nulls_last else "first" + position = "last" if nulls_last else "first" return self._from_native_frame( - df.sort_values(list(by), ascending=ascending, na_position=na_position) + self.native.sort_values(list(by), ascending=ascending, na_position=position) ) def join( @@ -268,15 +257,15 @@ def join( ) return self._from_native_frame( - self._native_frame.assign(**{key_token: 0}) + self.native.assign(**{key_token: 0}) .merge( - other._native_frame.assign(**{key_token: 0}), + other.native.assign(**{key_token: 0}), how="inner", left_on=key_token, right_on=key_token, suffixes=("", suffix), ) - .drop(columns=key_token), + .drop(columns=key_token) ) if how == "anti": @@ -289,7 +278,7 @@ def join( raise TypeError(msg) other_native = ( select_columns_by_name( - other._native_frame, + other.native, list(right_on), self._backend_version, self._implementation, @@ -299,7 +288,7 @@ def join( ) .drop_duplicates() ) - df = self._native_frame.merge( + df = self.native.merge( other_native, how="outer", indicator=indicator_token, # pyright: ignore[reportArgumentType] @@ -316,7 +305,7 @@ def join( raise TypeError(msg) other_native = ( select_columns_by_name( - other._native_frame, + other.native, list(right_on), self._backend_version, self._implementation, @@ -327,18 +316,14 @@ def join( .drop_duplicates() # avoids potential rows duplication from inner join ) return self._from_native_frame( - self._native_frame.merge( - other_native, - how="inner", - left_on=left_on, - right_on=left_on, + self.native.merge( + other_native, how="inner", left_on=left_on, right_on=left_on ) ) if how == "left": - other_native = other._native_frame - result_native = self._native_frame.merge( - other_native, + result_native = self.native.merge( + other.native, how="left", left_on=left_on, right_on=right_on, @@ -361,29 +346,27 @@ def join( assert right_on is not None # noqa: S101 right_on_mapper = _remap_full_join_keys(left_on, right_on, suffix) - - other_native = other._native_frame - other_native = other_native.rename(columns=right_on_mapper) + other_native = other.native.rename(columns=right_on_mapper) check_column_names_are_unique(other_native.columns) right_on = list(right_on_mapper.values()) # we now have the suffixed keys return self._from_native_frame( - self._native_frame.merge( + self.native.merge( other_native, left_on=left_on, right_on=right_on, how="outer", suffixes=("", suffix), - ), + ) ) return self._from_native_frame( - self._native_frame.merge( - other._native_frame, + self.native.merge( + other.native, left_on=left_on, right_on=right_on, how=how, suffixes=("", suffix), - ), + ) ) def join_asof( @@ -400,8 +383,8 @@ def join_asof( plx = self.__native_namespace__() return self._from_native_frame( plx.merge_asof( - self._native_frame, - other._native_frame, + self.native, + other.native, left_on=left_on, right_on=right_on, left_by=by_left, @@ -417,11 +400,11 @@ def group_by(self: Self, *by: str, drop_null_keys: bool) -> DaskLazyGroupBy: return DaskLazyGroupBy(self, by, drop_null_keys=drop_null_keys) def tail(self: Self, n: int) -> Self: # pragma: no cover - native_frame = self._native_frame + native_frame = self.native n_partitions = native_frame.npartitions if n_partitions == 1: - return self._from_native_frame(self._native_frame.tail(n=n, compute=False)) + return self._from_native_frame(self.native.tail(n=n, compute=False)) else: msg = "`LazyFrame.tail` is not supported for Dask backend with multiple partitions." raise NotImplementedError(msg) @@ -446,7 +429,7 @@ def unpivot( value_name: str, ) -> Self: return self._from_native_frame( - self._native_frame.melt( + self.native.melt( id_vars=index, value_vars=on, var_name=variable_name, From 33d394104593f0bd5cd7254343843c3526f959f2 Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Tue, 25 Mar 2025 14:38:17 +0000 Subject: [PATCH 2/6] chore: Update `_duckdb.dataframe` --- narwhals/_duckdb/dataframe.py | 109 +++++++++++++--------------------- 1 file changed, 42 insertions(+), 67 deletions(-) diff --git a/narwhals/_duckdb/dataframe.py b/narwhals/_duckdb/dataframe.py index ff8e35a141..34559762a1 100644 --- a/narwhals/_duckdb/dataframe.py +++ b/narwhals/_duckdb/dataframe.py @@ -83,9 +83,7 @@ def __narwhals_namespace__(self: Self) -> DuckDBNamespace: def __getitem__(self: Self, item: str) -> DuckDBInterchangeSeries: from narwhals._duckdb.series import DuckDBInterchangeSeries - return DuckDBInterchangeSeries( - self._native_frame.select(item), version=self._version - ) + return DuckDBInterchangeSeries(self.native.select(item), version=self._version) def _iter_columns(self) -> Iterator[duckdb.Expression]: for col in self.columns: @@ -102,7 +100,7 @@ def collect( from narwhals._arrow.dataframe import ArrowDataFrame return ArrowDataFrame( - native_dataframe=self._native_frame.arrow(), + self.native.arrow(), backend_version=parse_version(pa), version=self._version, validate_column_names=True, @@ -114,7 +112,7 @@ def collect( from narwhals._pandas_like.dataframe import PandasLikeDataFrame return PandasLikeDataFrame( - native_dataframe=self._native_frame.df(), + self.native.df(), implementation=Implementation.PANDAS, backend_version=parse_version(pd), version=self._version, @@ -127,43 +125,33 @@ def collect( from narwhals._polars.dataframe import PolarsDataFrame return PolarsDataFrame( - df=self._native_frame.pl(), - backend_version=parse_version(pl), - version=self._version, + self.native.pl(), backend_version=parse_version(pl), version=self._version ) msg = f"Unsupported `backend` value: {backend}" # pragma: no cover raise ValueError(msg) # pragma: no cover def head(self: Self, n: int) -> Self: - return self._from_native_frame(self._native_frame.limit(n)) + return self._from_native_frame(self.native.limit(n)) def simple_select(self, *column_names: str) -> Self: - return self._from_native_frame(self._native_frame.select(*column_names)) + return self._from_native_frame(self.native.select(*column_names)) def aggregate(self: Self, *exprs: DuckDBExpr) -> Self: - new_columns_map = evaluate_exprs(self, *exprs) - return self._from_native_frame( - self._native_frame.aggregate( - [val.alias(col) for col, val in new_columns_map] # type: ignore[arg-type] - ), - ) + selection = [val.alias(col) for col, val in evaluate_exprs(self, *exprs)] + return self._from_native_frame(self.native.aggregate(selection)) # type: ignore[arg-type] def select( self: Self, *exprs: DuckDBExpr, ) -> Self: - new_columns_map = evaluate_exprs(self, *exprs) - return self._from_native_frame( - self._native_frame.select(*(val.alias(col) for col, val in new_columns_map)), - ) + selection = (val.alias(col) for col, val in evaluate_exprs(self, *exprs)) + return self._from_native_frame(self.native.select(*selection)) def drop(self: Self, columns: Sequence[str], *, strict: bool) -> Self: - columns_to_drop = parse_columns_to_drop( - compliant_frame=self, columns=columns, strict=strict - ) + columns_to_drop = parse_columns_to_drop(self, columns=columns, strict=strict) selection = (col for col in self.columns if col not in columns_to_drop) - return self._from_native_frame(self._native_frame.select(*selection)) + return self._from_native_frame(self.native.select(*selection)) def lazy(self: Self, *, backend: Implementation | None = None) -> Self: # The `backend`` argument has no effect but we keep it here for @@ -181,15 +169,15 @@ def with_columns(self: Self, *exprs: DuckDBExpr) -> Self: new_columns_map.pop(col).alias(col) if col in new_columns_map else ColumnExpression(col) - for col in self._native_frame.columns + for col in self.native.columns ] result.extend(value.alias(col) for col, value in new_columns_map.items()) - return self._from_native_frame(self._native_frame.select(*result)) + return self._from_native_frame(self.native.select(*result)) def filter(self: Self, predicate: DuckDBExpr) -> Self: # `[0]` is safe as the predicate's expression only returns a single column - mask = predicate._call(self)[0] - return self._from_native_frame(self._native_frame.filter(mask)) + mask = predicate(self)[0] + return self._from_native_frame(self.native.filter(mask)) @property def schema(self: Self) -> dict[str, DType]: @@ -199,7 +187,7 @@ def schema(self: Self) -> dict[str, DType]: self._cached_schema = { column_name: native_to_narwhals_dtype(str(duckdb_dtype), self._version) for column_name, duckdb_dtype in zip( - self._native_frame.columns, self._native_frame.types + self.native.columns, self.native.types ) } return self._cached_schema @@ -213,27 +201,23 @@ def to_pandas(self: Self) -> pd.DataFrame: import pandas as pd # ignore-banned-import() if parse_version(pd) >= (1, 0, 0): - return self._native_frame.df() + return self.native.df() else: # pragma: no cover msg = f"Conversion to pandas requires pandas>=1.0.0, found {pd.__version__}" raise NotImplementedError(msg) def to_arrow(self: Self) -> pa.Table: # only if version is v1, keep around for backcompat - return self._native_frame.arrow() + return self.native.arrow() def _change_version(self: Self, version: Version) -> Self: return self.__class__( - self._native_frame, - version=version, - backend_version=self._backend_version, + self.native, version=version, backend_version=self._backend_version ) def _from_native_frame(self: Self, df: duckdb.DuckDBPyRelation) -> Self: return self.__class__( - df, - backend_version=self._backend_version, - version=self._version, + df, backend_version=self._backend_version, version=self._version ) def group_by(self: Self, *keys: str, drop_null_keys: bool) -> DuckDBGroupBy: @@ -242,7 +226,7 @@ def group_by(self: Self, *keys: str, drop_null_keys: bool) -> DuckDBGroupBy: return DuckDBGroupBy(self, keys, drop_null_keys=drop_null_keys) def rename(self: Self, mapping: Mapping[str, str]) -> Self: - df = self._native_frame + df = self.native selection = [ f"{col} as {mapping[col]}" if col in mapping else col for col in df.columns ] @@ -257,35 +241,30 @@ def join( right_on: Sequence[str] | None, suffix: str, ) -> Self: - original_alias = self._native_frame.alias - native_how = "outer" if how == "full" else how if native_how == "cross": if self._backend_version < (1, 1, 4): msg = f"DuckDB>=1.1.4 is required for cross-join, found version: {self._backend_version}" raise NotImplementedError(msg) - rel = self._native_frame.set_alias("lhs").cross( # pragma: no cover - other._native_frame.set_alias("rhs") - ) + rel = self.native.set_alias("lhs").cross( + other.native.set_alias("rhs") + ) # pragma: no cover else: # help mypy assert left_on is not None # noqa: S101 assert right_on is not None # noqa: S101 - - conditions = [ + condition = " and ".join( f'lhs."{left}" = rhs."{right}"' for left, right in zip(left_on, right_on) - ] - - condition = " and ".join(conditions) - rel = self._native_frame.set_alias("lhs").join( - other._native_frame.set_alias("rhs"), condition=condition, how=native_how + ) + rel = self.native.set_alias("lhs").join( + other.native.set_alias("rhs"), condition=condition, how=native_how ) if native_how in {"inner", "left", "cross", "outer"}: - select = [f'lhs."{x}"' for x in self._native_frame.columns] - for col in other._native_frame.columns: - col_in_lhs: bool = col in self._native_frame.columns + select = [f'lhs."{x}"' for x in self.native.columns] + for col in other.native.columns: + col_in_lhs: bool = col in self.native.columns if native_how == "outer" and not col_in_lhs: select.append(f'rhs."{col}"') elif (native_how == "outer") or ( @@ -297,7 +276,7 @@ def join( else: # semi select = ["lhs.*"] - res = rel.select(", ".join(select)).set_alias(original_alias) + res = rel.select(", ".join(select)).set_alias(self.native.alias) return self._from_native_frame(res) def join_asof( @@ -311,8 +290,8 @@ def join_asof( strategy: Literal["backward", "forward", "nearest"], suffix: str, ) -> Self: - lhs = self._native_frame - rhs = other._native_frame + lhs = self.native + rhs = other.native conditions = [] if by_left is not None and by_right is not None: conditions += [ @@ -348,16 +327,14 @@ def join_asof( def collect_schema(self: Self) -> dict[str, DType]: return { column_name: native_to_narwhals_dtype(str(duckdb_dtype), self._version) - for column_name, duckdb_dtype in zip( - self._native_frame.columns, self._native_frame.types - ) + for column_name, duckdb_dtype in zip(self.native.columns, self.native.types) } def unique( self: Self, subset: Sequence[str] | None, *, keep: Literal["any", "none"] ) -> Self: if subset is not None: - rel = self._native_frame + rel = self.native # Sanitise input if any(x not in rel.columns for x in subset): msg = f"Columns {set(subset).difference(rel.columns)} not found in {rel.columns}." @@ -381,9 +358,7 @@ def unique( select * exclude ({idx_name}, {count_name}) from cte {keep_condition} """ # noqa: S608 return self._from_native_frame(duckdb.sql(query)) - return self._from_native_frame( - self._native_frame.unique(", ".join(self.columns)), - ) + return self._from_native_frame(self.native.unique(", ".join(self.columns))) def sort( self: Self, @@ -395,7 +370,7 @@ def sort( descending = [descending] * len(by) descending_str = ["desc" if x else "" for x in descending] - result = self._native_frame.order( + result = self.native.order( ",".join( ( f'"{col}" {desc} nulls last' @@ -408,7 +383,7 @@ def sort( return self._from_native_frame(result) def drop_nulls(self: Self, subset: Sequence[str] | None) -> Self: - rel = self._native_frame + rel = self.native subset_ = subset if subset is not None else rel.columns keep_condition = " and ".join(f'"{col}" is not null' for col in subset_) query = f"select * from rel where {keep_condition}" # noqa: S608 @@ -435,7 +410,7 @@ def explode(self: Self, columns: Sequence[str]) -> Self: raise NotImplementedError(msg) col_to_explode = ColumnExpression(columns[0]) - rel = self._native_frame + rel = self.native original_columns = self.columns not_null_condition = col_to_explode.isnotnull() & FunctionExpression( @@ -479,7 +454,7 @@ def unpivot( ) unpivot_on = ", ".join(f'"{col}"' for col in on_) - rel = self._native_frame # noqa: F841 + rel = self.native # noqa: F841 query = f""" with unpivot_cte as ( unpivot rel From 4a696884ecf3d5d33bda052c880cadf7e6582ef2 Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Tue, 25 Mar 2025 14:47:03 +0000 Subject: [PATCH 3/6] chore: Update `_pandas_like.dataframe` --- narwhals/_pandas_like/dataframe.py | 41 ++++++++++-------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/narwhals/_pandas_like/dataframe.py b/narwhals/_pandas_like/dataframe.py index 47e0d5c1da..4e68fe74e9 100644 --- a/narwhals/_pandas_like/dataframe.py +++ b/narwhals/_pandas_like/dataframe.py @@ -621,31 +621,24 @@ def join( return self._from_native_frame( self.native.assign(**{key_token: 0}) .merge( - other._native_frame.assign(**{key_token: 0}), + other.native.assign(**{key_token: 0}), how="inner", left_on=key_token, right_on=key_token, suffixes=("", suffix), ) - .drop(columns=key_token), + .drop(columns=key_token) ) else: return self._from_native_frame( - self.native.merge( - other._native_frame, - how="cross", - suffixes=("", suffix), - ), + self.native.merge(other.native, how="cross", suffixes=("", suffix)), ) if how == "anti": if self._implementation is Implementation.CUDF: return self._from_native_frame( self.native.merge( - other._native_frame, - how="leftanti", - left_on=left_on, - right_on=right_on, + other.native, how="leftanti", left_on=left_on, right_on=right_on ) ) else: @@ -659,7 +652,7 @@ def join( # rename to avoid creating extra columns in join other_native = rename( select_columns_by_name( - other._native_frame, + other.native, list(right_on), self._backend_version, self._implementation, @@ -688,7 +681,7 @@ def join( other_native = ( rename( select_columns_by_name( - other._native_frame, + other.native, list(right_on), self._backend_version, self._implementation, @@ -700,17 +693,13 @@ def join( ) return self._from_native_frame( self.native.merge( - other_native, - how="inner", - left_on=left_on, - right_on=left_on, + other_native, how="inner", left_on=left_on, right_on=left_on ) ) if how == "left": - other_native = other._native_frame result_native = self.native.merge( - other_native, + other.native, how="left", left_on=left_on, right_on=right_on, @@ -732,13 +721,11 @@ def join( assert right_on is not None # noqa: S101 right_on_mapper = _remap_full_join_keys(left_on, right_on, suffix) - - other_native = other._native_frame - other_native = other_native.rename(columns=right_on_mapper) + other_native = other.native.rename(columns=right_on_mapper) check_column_names_are_unique(other_native.columns) right_on = list(right_on_mapper.values()) # we now have the suffixed keys return self._from_native_frame( - self._native_frame.merge( + self.native.merge( other_native, left_on=left_on, right_on=right_on, @@ -749,12 +736,12 @@ def join( return self._from_native_frame( self.native.merge( - other._native_frame, + other.native, left_on=left_on, right_on=right_on, how=how, suffixes=("", suffix), - ), + ) ) def join_asof( @@ -772,7 +759,7 @@ def join_asof( return self._from_native_frame( plx.merge_asof( self.native, - other._native_frame, + other.native, left_on=left_on, right_on=right_on, left_by=by_left, @@ -891,7 +878,7 @@ def to_numpy(self: Self, dtype: Any = None, *, copy: bool | None = None) -> _2DA .col(*to_convert) .dt.convert_time_zone("UTC") .dt.replace_time_zone(None) - )._native_frame + ).native else: df = self.native From 39d3c3966c3e94e54b390fbc24e807236353ca68 Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Tue, 25 Mar 2025 14:47:52 +0000 Subject: [PATCH 4/6] refactor: Simplify `PandasLikeDataFrame.to_polars` --- narwhals/_pandas_like/dataframe.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/narwhals/_pandas_like/dataframe.py b/narwhals/_pandas_like/dataframe.py index 4e68fe74e9..ed99d273cd 100644 --- a/narwhals/_pandas_like/dataframe.py +++ b/narwhals/_pandas_like/dataframe.py @@ -915,14 +915,7 @@ def to_pandas(self: Self) -> pd.DataFrame: def to_polars(self: Self) -> pl.DataFrame: import polars as pl # ignore-banned-import - if self._implementation is Implementation.PANDAS: - return pl.from_pandas(self.native) - elif self._implementation is Implementation.CUDF: # pragma: no cover - return pl.from_pandas(self.native.to_pandas()) - elif self._implementation is Implementation.MODIN: - return pl.from_pandas(self.native._to_pandas()) - msg = f"Unknown implementation: {self._implementation}" # pragma: no cover - raise AssertionError(msg) + return pl.from_pandas(self.to_pandas()) def write_parquet(self: Self, file: str | Path | BytesIO) -> None: self.native.to_parquet(file) From b17e0dbedc8a43ecb84673929ec3f34c8db75aaa Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Tue, 25 Mar 2025 14:52:46 +0000 Subject: [PATCH 5/6] chore: Update `_polars.dataframe` --- narwhals/_polars/dataframe.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/narwhals/_polars/dataframe.py b/narwhals/_polars/dataframe.py index f5a95e6836..0a61ba5acf 100644 --- a/narwhals/_polars/dataframe.py +++ b/narwhals/_polars/dataframe.py @@ -306,7 +306,7 @@ def lazy( from narwhals._duckdb.dataframe import DuckDBLazyFrame # NOTE: (F841) is a false positive - df = self._native_frame # noqa: F841 + df = self.native # noqa: F841 return DuckDBLazyFrame( duckdb.table("df"), backend_version=parse_version(duckdb), @@ -428,11 +428,10 @@ def join( how_native = ( "outer" if (self._backend_version < (1, 0, 0) and how == "full") else how ) - try: return self._from_native_frame( - self._native_frame.join( - other=other._native_frame, + self.native.join( + other=other.native, how=how_native, # type: ignore[arg-type] left_on=left_on, right_on=right_on, @@ -562,12 +561,8 @@ def collect( raise catch_polars_exception(e, self._backend_version) from None if backend is None or backend is Implementation.POLARS: - from narwhals._polars.dataframe import PolarsDataFrame - return PolarsDataFrame( - result, - backend_version=self._backend_version, - version=self._version, + result, backend_version=self._backend_version, version=self._version ) if backend is Implementation.PANDAS: @@ -653,10 +648,9 @@ def join( how_native = ( "outer" if (self._backend_version < (1, 0, 0) and how == "full") else how ) - return self._from_native_frame( - self._native_frame.join( - other=other._native_frame, + self.native.join( + other=other.native, how=how_native, # type: ignore[arg-type] left_on=left_on, right_on=right_on, From 9c792f5861162b7e0dc20f60dcdb33f39587245a Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Tue, 25 Mar 2025 14:59:25 +0000 Subject: [PATCH 6/6] chore: Update `_spark_like.dataframe` --- narwhals/_spark_like/dataframe.py | 63 ++++++++++++++----------------- 1 file changed, 28 insertions(+), 35 deletions(-) diff --git a/narwhals/_spark_like/dataframe.py b/narwhals/_spark_like/dataframe.py index 403c67f440..cdd725c661 100644 --- a/narwhals/_spark_like/dataframe.py +++ b/narwhals/_spark_like/dataframe.py @@ -109,7 +109,7 @@ def __narwhals_lazyframe__(self: Self) -> Self: def _change_version(self: Self, version: Version) -> Self: return self.__class__( - self._native_frame, + self.native, backend_version=self._backend_version, version=version, implementation=self._implementation, @@ -129,9 +129,8 @@ def _collect_to_arrow(self) -> pa.Table: ): import pyarrow as pa # ignore-banned-import - native_frame = self._native_frame try: - return pa.Table.from_batches(native_frame._collect_as_arrow()) + return pa.Table.from_batches(self.native._collect_as_arrow()) except ValueError as exc: if "at least one RecordBatch" in str(exc): # Empty dataframe @@ -145,7 +144,7 @@ def _collect_to_arrow(self) -> pa.Table: try: native_dtype = narwhals_to_native_dtype(value, self._version) except Exception as exc: # noqa: BLE001 - native_spark_dtype = native_frame.schema[key].dataType # type: ignore[index] + native_spark_dtype = self.native.schema[key].dataType # type: ignore[index] # If we can't convert the type, just set it to `pa.null`, and warn. # Avoid the warning if we're starting from PySpark's void type. # We can avoid the check when we introduce `nw.Null` dtype. @@ -162,7 +161,7 @@ def _collect_to_arrow(self) -> pa.Table: else: # pragma: no cover raise else: - return self._native_frame.toArrow() + return self.native.toArrow() def _iter_columns(self) -> Iterator[Column]: for col in self.columns: @@ -183,7 +182,7 @@ def collect( from narwhals._pandas_like.dataframe import PandasLikeDataFrame return PandasLikeDataFrame( - native_dataframe=self._native_frame.toPandas(), + self.native.toPandas(), implementation=Implementation.PANDAS, backend_version=parse_version(pd), version=self._version, @@ -209,7 +208,7 @@ def collect( from narwhals._polars.dataframe import PolarsDataFrame return PolarsDataFrame( - df=pl.from_arrow(self._collect_to_arrow()), # type: ignore[arg-type] + pl.from_arrow(self._collect_to_arrow()), # type: ignore[arg-type] backend_version=parse_version(pl), version=self._version, ) @@ -218,7 +217,7 @@ def collect( raise ValueError(msg) # pragma: no cover def simple_select(self: Self, *column_names: str) -> Self: - return self._from_native_frame(self._native_frame.select(*column_names)) + return self._from_native_frame(self.native.select(*column_names)) def aggregate( self: Self, @@ -227,7 +226,7 @@ def aggregate( new_columns = evaluate_exprs(self, *exprs) new_columns_list = [col.alias(col_name) for col_name, col in new_columns] - return self._from_native_frame(self._native_frame.agg(*new_columns_list)) + return self._from_native_frame(self.native.agg(*new_columns_list)) def select( self: Self, @@ -235,16 +234,16 @@ def select( ) -> Self: new_columns = evaluate_exprs(self, *exprs) new_columns_list = [col.alias(col_name) for (col_name, col) in new_columns] - return self._from_native_frame(self._native_frame.select(*new_columns_list)) + return self._from_native_frame(self.native.select(*new_columns_list)) def with_columns(self: Self, *exprs: SparkLikeExpr) -> Self: new_columns = evaluate_exprs(self, *exprs) - return self._from_native_frame(self._native_frame.withColumns(dict(new_columns))) + return self._from_native_frame(self.native.withColumns(dict(new_columns))) def filter(self: Self, predicate: SparkLikeExpr) -> Self: # `[0]` is safe as the predicate's expression only returns a single column condition = predicate._call(self)[0] - spark_df = self._native_frame.where(condition) + spark_df = self.native.where(condition) return self._from_native_frame(spark_df) @property @@ -256,7 +255,7 @@ def schema(self: Self) -> dict[str, DType]: version=self._version, spark_types=self._native_dtypes, ) - for field in self._native_frame.schema + for field in self.native.schema } return self._cached_schema @@ -267,10 +266,10 @@ def drop(self: Self, columns: Sequence[str], *, strict: bool) -> Self: columns_to_drop = parse_columns_to_drop( compliant_frame=self, columns=columns, strict=strict ) - return self._from_native_frame(self._native_frame.drop(*columns_to_drop)) + return self._from_native_frame(self.native.drop(*columns_to_drop)) def head(self: Self, n: int) -> Self: - return self._from_native_frame(self._native_frame.limit(num=n)) + return self._from_native_frame(self.native.limit(num=n)) def group_by(self: Self, *keys: str, drop_null_keys: bool) -> SparkLikeLazyGroupBy: from narwhals._spark_like.group_by import SparkLikeLazyGroupBy @@ -298,18 +297,18 @@ def sort( ) sort_cols = [sort_f(col) for col, sort_f in zip(by, sort_funcs)] - return self._from_native_frame(self._native_frame.sort(*sort_cols)) + return self._from_native_frame(self.native.sort(*sort_cols)) def drop_nulls(self: Self, subset: Sequence[str] | None) -> Self: subset = list(subset) if subset else None - return self._from_native_frame(self._native_frame.dropna(subset=subset)) + return self._from_native_frame(self.native.dropna(subset=subset)) def rename(self: Self, mapping: Mapping[str, str]) -> Self: rename_mapping = { colname: mapping.get(colname, colname) for colname in self.columns } return self._from_native_frame( - self._native_frame.select( + self.native.select( [self._F.col(old).alias(new) for old, new in rename_mapping.items()] ) ) @@ -325,7 +324,7 @@ def unique( raise ValueError(msg) check_column_exists(self.columns, subset) subset = list(subset) if subset else None - return self._from_native_frame(self._native_frame.dropDuplicates(subset=subset)) + return self._from_native_frame(self.native.dropDuplicates(subset=subset)) def join( self: Self, @@ -335,9 +334,6 @@ def join( right_on: Sequence[str] | None, suffix: str, ) -> Self: - self_native = self._native_frame - other_native = other._native_frame - left_columns = self.columns right_columns = other.columns @@ -360,7 +356,7 @@ def join( for colname in right_cols_to_rename }, } - other_native = other_native.select( + other_native = other.native.select( [self._F.col(old).alias(new) for old, new in rename_mapping.items()] ) @@ -371,11 +367,9 @@ def join( if how in {"inner", "left", "cross"}: col_order.extend( - [ - rename_mapping[colname] - for colname in right_columns - if colname not in right_on_ - ] + rename_mapping[colname] + for colname in right_columns + if colname not in right_on_ ) elif how == "full": col_order.extend(rename_mapping.values()) @@ -385,7 +379,7 @@ def join( reduce( and_, ( - getattr(self_native, left_key) == getattr(other_native, right_key) + getattr(self.native, left_key) == getattr(other_native, right_key) for left_key, right_key in zip(left_on_, right_on_remapped) ), ) @@ -397,7 +391,7 @@ def join( how_native = "full_outer" if how == "full" else how return self._from_native_frame( - self_native.join(other_native, on=on_, how=how_native).select(col_order) + self.native.join(other_native, on=on_, how=how_native).select(col_order) ) def explode(self: Self, columns: Sequence[str]) -> Self: @@ -414,7 +408,6 @@ def explode(self: Self, columns: Sequence[str]) -> Self: ) raise InvalidOperationError(msg) - native_frame = self._native_frame column_names = self.columns if len(columns) != 1: @@ -426,7 +419,7 @@ def explode(self: Self, columns: Sequence[str]) -> Self: if self._implementation.is_pyspark(): return self._from_native_frame( - native_frame.select( + self.native.select( *[ self._F.col(col_name).alias(col_name) if col_name != columns[0] @@ -446,7 +439,7 @@ def null_condition(col_name: str) -> Column: return self._F.isnull(col_name) | (self._F.array_size(col_name) == 0) return self._from_native_frame( - native_frame.select( + self.native.select( *[ self._F.col(col_name).alias(col_name) if col_name != columns[0] @@ -454,7 +447,7 @@ def null_condition(col_name: str) -> Column: for col_name in column_names ] ).union( - native_frame.filter(null_condition(columns[0])).select( + self.native.filter(null_condition(columns[0])).select( *[ self._F.col(col_name).alias(col_name) if col_name != columns[0] @@ -490,7 +483,7 @@ def unpivot( values = ( tuple(set(self.columns).difference(set(ids))) if on is None else tuple(on) ) - unpivoted_native_frame = self._native_frame.unpivot( + unpivoted_native_frame = self.native.unpivot( ids=ids, values=values, variableColumnName=variable_name,