diff --git a/databricks/koalas/accessors.py b/databricks/koalas/accessors.py index 5decf26796..08dfb427db 100644 --- a/databricks/koalas/accessors.py +++ b/databricks/koalas/accessors.py @@ -171,8 +171,8 @@ def attach_id_column(self, id_type: str, column: Union[Any, Tuple]) -> "DataFram return DataFrame( InternalFrame( spark_frame=sdf, - index_spark_column_names=[ - SPARK_INDEX_NAME_FORMAT(i) for i in range(internal.index_level) + index_spark_columns=[ + scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) for i in range(internal.index_level) ], index_names=internal.index_names, column_labels=internal.column_labels + [column], @@ -386,7 +386,7 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame": ) # Otherwise, it loses index. - internal = InternalFrame(spark_frame=sdf, index_spark_column_names=None) + internal = InternalFrame(spark_frame=sdf, index_spark_columns=None) return DataFrame(internal) diff --git a/databricks/koalas/base.py b/databricks/koalas/base.py index f759e84442..785ad5f70a 100644 --- a/databricks/koalas/base.py +++ b/databricks/koalas/base.py @@ -1362,7 +1362,7 @@ def value_counts( internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=[index_name], + index_spark_columns=[scol_for(sdf, index_name)], column_labels=self._internal.column_labels, data_spark_columns=[scol_for(sdf, "count")], column_label_names=self._internal.column_label_names, diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index cf9d27daeb..8aaa0c3f41 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -471,7 +471,7 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, copy=False): assert columns is None assert dtype is None assert not copy - internal = InternalFrame(spark_frame=data, index_spark_column_names=None) + internal = InternalFrame(spark_frame=data, index_spark_columns=None) elif isinstance(data, ks.Series): assert index is None assert columns is None @@ -664,7 +664,7 @@ def _reduce_for_stat_function(self, sfun, name, axis=None, numeric_only=True): with ks.option_context("compute.max_rows", 1): internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=[SPARK_DEFAULT_INDEX_NAME], + index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)], column_labels=new_column_labels, column_label_names=self._internal.column_label_names, ) @@ -2154,9 +2154,9 @@ def transpose(self) -> "DataFrame": for label in (tuple(json.loads(col)["a"]) for col in new_data_columns) ] - internal = self._internal.copy( + internal = InternalFrame( spark_frame=transposed_df, - index_spark_column_names=internal_index_columns, + index_spark_columns=[scol_for(transposed_df, col) for col in internal_index_columns], index_names=self._internal.column_label_names, column_labels=column_labels, data_spark_columns=[scol_for(transposed_df, col) for col in new_data_columns], @@ -2478,7 +2478,7 @@ def apply_func(pdf): # Otherwise, it loses index. internal = InternalFrame( - spark_frame=sdf, index_spark_column_names=None, column_labels=column_labels + spark_frame=sdf, index_spark_columns=None, column_labels=column_labels ) result = DataFrame(internal) # type: "DataFrame" @@ -2842,14 +2842,14 @@ class locomotion else: return first_series(DataFrame(pdf.transpose())) else: - index_spark_column_names = ( - internal.index_spark_column_names[:level] - + internal.index_spark_column_names[level + len(key) :] + index_spark_columns = ( + internal.index_spark_columns[:level] + + internal.index_spark_columns[level + len(key) :] ) index_names = internal.index_names[:level] + internal.index_names[level + len(key) :] internal = internal.copy( - index_spark_column_names=index_spark_column_names, index_names=index_names + index_spark_columns=index_spark_columns, index_names=index_names ).resolved_copy return DataFrame(internal) @@ -3262,22 +3262,19 @@ def set_index(self, keys, drop=True, append=False, inplace=False) -> Optional["D else: column_labels = self._internal.column_labels if append: - index_spark_column_names = self._internal.index_spark_column_names + [ - self._internal.spark_column_name_for(label) for label in keys + index_spark_columns = self._internal.index_spark_columns + [ + self._internal.spark_column_for(label) for label in keys ] index_names = self._internal.index_names + keys else: - index_spark_column_names = [ - self._internal.spark_column_name_for(label) for label in keys - ] + index_spark_columns = [self._internal.spark_column_for(label) for label in keys] index_names = keys - internal = self._internal.resolved_copy - internal = internal.copy( - index_spark_column_names=index_spark_column_names, + internal = self._internal.copy( + index_spark_columns=index_spark_columns, index_names=index_names, column_labels=column_labels, - data_spark_columns=[internal.spark_column_for(label) for label in column_labels], + data_spark_columns=[self._internal.spark_column_for(label) for label in column_labels], ) if inplace: @@ -3453,7 +3450,7 @@ def rename(index): for scol, label in zip(self._internal.index_spark_columns, new_column_labels) ] - index_spark_column_names = [] + index_spark_columns = [] index_names = [] else: if is_list_like(level): @@ -3499,13 +3496,10 @@ def rename(index): new_column_labels = [] new_data_spark_columns = [] - index_spark_column_names = self._internal.index_spark_column_names.copy() index_spark_columns = self._internal.index_spark_columns.copy() index_names = self._internal.index_names.copy() for i in idx[::-1]: - index_spark_column_names.pop(i) - name = index_names.pop(i) new_column_labels.insert(0, name if name is not None else rename(i)) @@ -3540,7 +3534,7 @@ def rename(index): ] internal = self._internal.copy( - index_spark_column_names=index_spark_column_names, + index_spark_columns=index_spark_columns, index_names=index_names, column_labels=new_column_labels + self._internal.column_labels, data_spark_columns=new_data_spark_columns + self._internal.data_spark_columns, @@ -3809,7 +3803,7 @@ def nunique( with ks.option_context("compute.max_rows", 1): internal = self._internal.copy( spark_frame=sdf, - index_spark_column_names=[SPARK_DEFAULT_INDEX_NAME], + index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)], index_names=[None], data_spark_columns=[ scol_for(sdf, col) for col in self._internal.data_spark_column_names @@ -4004,7 +3998,9 @@ def duplicated(self, subset=None, keep="first") -> "Series": DataFrame( InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, column_labels=[None], # type: ignore data_spark_columns=[scol_for(sdf, SPARK_DEFAULT_SERIES_NAME)], @@ -4073,11 +4069,9 @@ def to_koalas(self, index_col: Optional[Union[str, List[str]]] = None) -> "DataF assert isinstance(self, spark.DataFrame), type(self) from databricks.koalas.namespace import _get_index_map - index_spark_column_names, index_names = _get_index_map(self, index_col) + index_spark_columns, index_names = _get_index_map(self, index_col) internal = InternalFrame( - spark_frame=self, - index_spark_column_names=index_spark_column_names, - index_names=index_names, + spark_frame=self, index_spark_columns=index_spark_columns, index_names=index_names ) return DataFrame(internal) @@ -5379,7 +5373,7 @@ def pivot_table( column_label_names = ([None] * column_labels_level(values)) + [columns] internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=index_columns, + index_spark_columns=[scol_for(sdf, col) for col in index_columns], index_names=index, column_labels=column_labels, data_spark_columns=[scol_for(sdf, col) for col in data_columns], @@ -5391,7 +5385,7 @@ def pivot_table( column_label_names = ([None] * len(values[0])) + [columns] internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=index_columns, + index_spark_columns=[scol_for(sdf, col) for col in index_columns], index_names=index, column_labels=column_labels, data_spark_columns=[scol_for(sdf, col) for col in data_columns], @@ -5402,7 +5396,7 @@ def pivot_table( index_columns = [self._internal.spark_column_name_for(label) for label in index] internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=index_columns, + index_spark_columns=[scol_for(sdf, col) for col in index_columns], index_names=index, column_label_names=[columns], ) @@ -5419,7 +5413,7 @@ def pivot_table( index_map[colname] = None internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=list(index_map.keys()), + index_spark_columns=[scol_for(sdf, col) for col in index_map.keys()], index_names=list(index_map.values()), column_label_names=[columns], ) @@ -5993,19 +5987,18 @@ def droplevel(self, level, axis=0) -> "DataFrame": "at least one level must be left.".format(len(level), nlevels) ) - index_spark_column_names, index_names = zip( + index_spark_columns, index_names = zip( *[ item for i, item in enumerate( - zip(self._internal.index_spark_column_names, self._internal.index_names) + zip(self._internal.index_spark_columns, self._internal.index_names) ) if i not in int_level ] ) internal = self._internal.copy( - index_spark_column_names=list(index_spark_column_names), - index_names=list(index_names), + index_spark_columns=list(index_spark_columns), index_names=list(index_names) ) return DataFrame(internal) else: @@ -6350,7 +6343,7 @@ def sort_index( elif is_list_like(level): by = [self._internal.index_spark_columns[l] for l in level] # type: ignore else: - by = [self._internal.index_spark_columns[level]] + by = [self._internal.index_spark_columns[level]] # type: ignore return self._sort(by=by, ascending=ascending, inplace=inplace, na_position=na_position) @@ -6500,11 +6493,11 @@ def _swaplevel_index(self, i, j) -> InternalFrame: "%s is not a valid level number" % (self._internal.index_level, index) ) - index_map = list(zip(self._internal.index_spark_column_names, self._internal.index_names)) + index_map = list(zip(self._internal.index_spark_columns, self._internal.index_names)) index_map[i], index_map[j], = index_map[j], index_map[i] - index_spark_column_names, index_names = zip(*index_map) + index_spark_columns, index_names = zip(*index_map) internal = self._internal.copy( - index_spark_column_names=list(index_spark_column_names), index_names=list(index_names), + index_spark_columns=list(index_spark_columns), index_names=list(index_names), ) return internal @@ -6934,8 +6927,11 @@ def to_list(os: Optional[Union[Any, List[Any], Tuple, List[Tuple]]]) -> List[Tup "['inner', 'left', 'right', 'outer']", ) - left_table = self._internal.resolved_copy.spark_frame.alias("left_table") - right_table = right._internal.resolved_copy.spark_frame.alias("right_table") + left_internal = self._internal.resolved_copy + right_internal = right._internal.resolved_copy + + left_table = left_internal.spark_frame.alias("left_table") + right_table = right_internal.spark_frame.alias("right_table") left_key_columns = [scol_for(left_table, label) for label in left_key_names] right_key_columns = [scol_for(right_table, label) for label in right_key_names] @@ -6952,24 +6948,24 @@ def to_list(os: Optional[Union[Any, List[Any], Tuple, List[Tuple]]]) -> List[Tup right_suffix = suffixes[1] # Append suffixes to columns with the same name to avoid conflicts later - duplicate_columns = set(self._internal.column_labels) & set(right._internal.column_labels) + duplicate_columns = set(left_internal.column_labels) & set(right_internal.column_labels) exprs = [] data_columns = [] column_labels = [] left_scol_for = lambda label: scol_for( - left_table, self._internal.spark_column_name_for(label) + left_table, left_internal.spark_column_name_for(label) ) right_scol_for = lambda label: scol_for( - right_table, right._internal.spark_column_name_for(label) + right_table, right_internal.spark_column_name_for(label) ) - for label in self._internal.column_labels: - col = self._internal.spark_column_name_for(label) + for label in left_internal.column_labels: + col = left_internal.spark_column_name_for(label) scol = left_scol_for(label) if label in duplicate_columns: - spark_column_name = self._internal.spark_column_name_for(label) + spark_column_name = left_internal.spark_column_name_for(label) if spark_column_name in left_key_names and spark_column_name in right_key_names: right_scol = right_scol_for(label) if how == "right": @@ -6985,11 +6981,11 @@ def to_list(os: Optional[Union[Any, List[Any], Tuple, List[Tuple]]]) -> List[Tup exprs.append(scol) data_columns.append(col) column_labels.append(label) - for label in right._internal.column_labels: - col = right._internal.spark_column_name_for(label) + for label in right_internal.column_labels: + col = right_internal.spark_column_name_for(label) scol = right_scol_for(label) if label in duplicate_columns: - spark_column_name = self._internal.spark_column_name_for(label) + spark_column_name = left_internal.spark_column_name_for(label) if spark_column_name in left_key_names and spark_column_name in right_key_names: continue else: @@ -7000,23 +6996,23 @@ def to_list(os: Optional[Union[Any, List[Any], Tuple, List[Tuple]]]) -> List[Tup data_columns.append(col) column_labels.append(label) - left_index_scols = self._internal.index_spark_columns - right_index_scols = right._internal.index_spark_columns + left_index_scols = left_internal.index_spark_columns + right_index_scols = right_internal.index_spark_columns # Retain indices if they are used for joining if left_index: if right_index: if how in ("inner", "left"): exprs.extend(left_index_scols) - index_spark_column_names = self._internal.index_spark_column_names - index_names = self._internal.index_names + index_spark_column_names = left_internal.index_spark_column_names + index_names = left_internal.index_names elif how == "right": exprs.extend(right_index_scols) - index_spark_column_names = right._internal.index_spark_column_names - index_names = right._internal.index_names + index_spark_column_names = right_internal.index_spark_column_names + index_names = right_internal.index_names else: - index_spark_column_names = self._internal.index_spark_column_names - index_names = self._internal.index_names + index_spark_column_names = left_internal.index_spark_column_names + index_names = left_internal.index_names for col, left_scol, right_scol in zip( index_spark_column_names, left_index_scols, right_index_scols ): @@ -7024,21 +7020,23 @@ def to_list(os: Optional[Union[Any, List[Any], Tuple, List[Tuple]]]) -> List[Tup exprs.append(scol.alias(col)) else: exprs.extend(right_index_scols) - index_spark_column_names = right._internal.index_spark_column_names - index_names = right._internal.index_names + index_spark_column_names = right_internal.index_spark_column_names + index_names = right_internal.index_names elif right_index: exprs.extend(left_index_scols) - index_spark_column_names = self._internal.index_spark_column_names - index_names = self._internal.index_names + index_spark_column_names = left_internal.index_spark_column_names + index_names = left_internal.index_names else: - index_spark_column_names = None - index_names = None + index_spark_column_names = [] + index_names = [] selected_columns = joined_table.select(*exprs) internal = InternalFrame( spark_frame=selected_columns, - index_spark_column_names=index_spark_column_names, + index_spark_columns=[ + scol_for(selected_columns, col) for col in index_spark_column_names + ], index_names=index_names, column_labels=column_labels, data_spark_columns=[scol_for(selected_columns, col) for col in data_columns], @@ -7780,7 +7778,7 @@ def describe(self, percentiles: Optional[List[float]] = None) -> "DataFrame": internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=["summary"], + index_spark_columns=[scol_for(sdf, "summary")], column_labels=column_labels, data_spark_columns=[ scol_for(sdf, self._internal.spark_column_name_for(label)) @@ -8127,6 +8125,9 @@ def _reindex_index(self, index, fill_value): sdf = joined_df.drop(NATURAL_ORDER_COLUMN_NAME) internal = self._internal.copy( spark_frame=sdf, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=index_names, data_spark_columns=[ scol_for(sdf, col) for col in self._internal.data_spark_column_names @@ -8449,7 +8450,7 @@ def melt(self, id_vars=None, value_vars=None, var_name=None, value_name="value") return DataFrame( InternalFrame( spark_frame=exploded_df, - index_spark_column_names=None, + index_spark_columns=None, column_labels=( [label if len(label) == 1 else (name_like_string(label),) for label in id_vars] + [(name,) for name in var_name] @@ -8624,7 +8625,10 @@ def stack(self) -> Union["DataFrame", "Series"]: internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names + [index_column], + index_spark_columns=[ + scol_for(sdf, col) + for col in (self._internal.index_spark_column_names + [index_column]) + ], index_names=self._internal.index_names + [index_name], column_labels=list(column_labels), data_spark_columns=[scol_for(sdf, col) for col in data_columns], @@ -8786,7 +8790,9 @@ def unstack(self) -> Union["DataFrame", "Series"]: DataFrame( InternalFrame( exploded_df, - index_spark_column_names=list(index_spark_column_names), + index_spark_columns=[ + scol_for(exploded_df, col) for col in index_spark_column_names + ], index_names=list(index_names), column_labels=[None], ) @@ -8869,8 +8875,9 @@ def all(self, axis: Union[int, str] = 0) -> "Series": internal = self._internal.copy( spark_frame=sdf, - index_spark_column_names=[ - SPARK_INDEX_NAME_FORMAT(i) for i in range(self._internal.column_labels_level) + index_spark_columns=[ + scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) + for i in range(self._internal.column_labels_level) ], index_names=self._internal.column_label_names, column_labels=[None], @@ -8956,8 +8963,9 @@ def any(self, axis: Union[int, str] = 0) -> "Series": internal = self._internal.copy( spark_frame=sdf, - index_spark_column_names=[ - SPARK_INDEX_NAME_FORMAT(i) for i in range(self._internal.column_labels_level) + index_spark_columns=[ + scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) + for i in range(self._internal.column_labels_level) ], index_names=self._internal.column_label_names, column_labels=[None], @@ -10043,7 +10051,7 @@ def quantile( internal = self._internal.copy( spark_frame=sdf, - index_spark_column_names=[internal_index_column], + index_spark_columns=[scol_for(sdf, internal_index_column)], index_names=[None], data_spark_columns=[ scol_for(sdf, col) for col in self._internal.data_spark_column_names @@ -10498,7 +10506,7 @@ def get_spark_column(kdf, label): with ks.option_context("compute.max_rows", 1): internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=[SPARK_DEFAULT_INDEX_NAME], + index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)], column_labels=new_column_labels, column_label_names=self._internal.column_label_names, ) @@ -10671,7 +10679,7 @@ def product(self) -> "Series": internal = InternalFrame( spark_frame=spark_frame, - index_spark_column_names=[SPARK_DEFAULT_INDEX_NAME], + index_spark_columns=[scol_for(spark_frame, SPARK_DEFAULT_INDEX_NAME)], column_labels=column_labels, column_label_names=self._internal.column_label_names, ) diff --git a/databricks/koalas/groupby.py b/databricks/koalas/groupby.py index b959959bc7..ea77c01666 100644 --- a/databricks/koalas/groupby.py +++ b/databricks/koalas/groupby.py @@ -318,11 +318,11 @@ def _spark_groupby(kdf, func, groupkeys=()): index_spark_column_names = groupkey_names index_names = [kser._column_label for kser in groupkeys] else: - index_spark_column_names = None - index_names = None + index_spark_column_names = [] + index_names = [] return InternalFrame( spark_frame=sdf, - index_spark_column_names=index_spark_column_names, + index_spark_columns=[scol_for(sdf, col) for col in index_spark_column_names], index_names=index_names, column_labels=column_labels, data_spark_columns=[scol_for(sdf, col) for col in data_columns], @@ -613,7 +613,7 @@ def size(self) -> Series: sdf = sdf.groupby(*groupkey_names).count() internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=groupkey_names, + index_spark_columns=[scol_for(sdf, col) for col in groupkey_names], index_names=[kser._column_label for kser in groupkeys], column_labels=[None], data_spark_columns=[scol_for(sdf, "count")], @@ -1211,7 +1211,7 @@ def wrapped_func(df, *a, **k): ) else: # Otherwise, it loses index. - internal = InternalFrame(spark_frame=sdf, index_spark_column_names=None) + internal = InternalFrame(spark_frame=sdf, index_spark_columns=None) if should_return_series: kser = first_series(DataFrame(internal)) @@ -1532,7 +1532,7 @@ def idxmax(self, skipna=True) -> Union[DataFrame, Series]: internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=groupkey_names, + index_spark_columns=[scol_for(sdf, col) for col in groupkey_names], index_names=[kser._column_label for kser in self._groupkeys], column_labels=[kser._column_label for kser in self._agg_columns], data_spark_columns=[ @@ -1610,7 +1610,7 @@ def idxmin(self, skipna=True) -> Union[DataFrame, Series]: internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=groupkey_names, + index_spark_columns=[scol_for(sdf, col) for col in groupkey_names], index_names=[kser._column_label for kser in self._groupkeys], column_labels=[kser._column_label for kser in self._agg_columns], data_spark_columns=[ @@ -2093,7 +2093,7 @@ def pandas_transform(pdf): retain_index=False, ) # Otherwise, it loses index. - internal = InternalFrame(spark_frame=sdf, index_spark_column_names=None) + internal = InternalFrame(spark_frame=sdf, index_spark_columns=None) return DataFrame(internal) @@ -2261,6 +2261,9 @@ def get_group(self, name) -> Union[DataFrame, Series]: internal = internal.copy( spark_frame=spark_frame, + index_spark_columns=[ + scol_for(spark_frame, col) for col in internal.index_spark_column_names + ], column_labels=[s._column_label for s in self._agg_columns], data_spark_columns=[ scol_for(spark_frame, s._internal.data_spark_column_names[0]) @@ -2310,7 +2313,7 @@ def _reduce_for_stat_function(self, sfun, only_numeric): internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=groupkey_names, + index_spark_columns=[scol_for(sdf, col) for col in groupkey_names], index_names=[kser._column_label for kser in self._groupkeys], column_labels=column_labels, data_spark_columns=[scol_for(sdf, col) for col in data_columns], @@ -2613,8 +2616,8 @@ def describe(self) -> DataFrame: # Reindex the DataFrame to reflect initial grouping and agg columns. internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=[ - kser._internal.data_spark_column_names[0] for kser in self._groupkeys + index_spark_columns=[ + scol_for(sdf, kser._internal.data_spark_column_names[0]) for kser in self._groupkeys ], index_names=[kser._column_label for kser in self._groupkeys], column_labels=column_labels, @@ -2767,14 +2770,14 @@ def nsmallest(self, n=5) -> Series: sdf.withColumn(temp_rank_column, F.row_number().over(window)) .filter(F.col(temp_rank_column) <= n) .drop(temp_rank_column) - ) + ).drop(NATURAL_ORDER_COLUMN_NAME) internal = InternalFrame( - spark_frame=sdf.drop(NATURAL_ORDER_COLUMN_NAME), - index_spark_column_names=( - groupkey_col_names + spark_frame=sdf, + index_spark_columns=( + [scol_for(sdf, col) for col in groupkey_col_names] + [ - SPARK_INDEX_NAME_FORMAT(i + len(self._groupkeys)) + scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i + len(self._groupkeys))) for i in range(self._kdf._internal.index_level) ] ), @@ -2840,14 +2843,14 @@ def nlargest(self, n=5) -> Series: sdf.withColumn(temp_rank_column, F.row_number().over(window)) .filter(F.col(temp_rank_column) <= n) .drop(temp_rank_column) - ) + ).drop(NATURAL_ORDER_COLUMN_NAME) internal = InternalFrame( - spark_frame=sdf.drop(NATURAL_ORDER_COLUMN_NAME), - index_spark_column_names=( - groupkey_col_names + spark_frame=sdf, + index_spark_columns=( + [scol_for(sdf, col) for col in groupkey_col_names] + [ - SPARK_INDEX_NAME_FORMAT(i + len(self._groupkeys)) + scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i + len(self._groupkeys))) for i in range(self._kdf._internal.index_level) ] ), @@ -2916,7 +2919,7 @@ def value_counts(self, sort=None, ascending=None, dropna=True) -> Series: internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=groupkey_names, + index_spark_columns=[scol_for(sdf, col) for col in groupkey_names], index_names=[kser._column_label for kser in groupkeys], column_labels=[self._agg_columns[0]._column_label], data_spark_columns=[scol_for(sdf, agg_column)], diff --git a/databricks/koalas/indexes.py b/databricks/koalas/indexes.py index a43920eea2..951a5a48dc 100644 --- a/databricks/koalas/indexes.py +++ b/databricks/koalas/indexes.py @@ -157,7 +157,7 @@ def _with_new_scol(self, scol: spark.Column) -> "Index": sdf = self._internal.spark_frame.select(scol.alias(SPARK_DEFAULT_INDEX_NAME)) internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=sdf.columns, + index_spark_columns=[scol_for(sdf, col) for col in sdf.columns], index_names=self._internal.index_names, ) return DataFrame(internal).index @@ -739,7 +739,9 @@ def drop_duplicates(self) -> "Index": ).drop_duplicates() internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) return DataFrame(internal).index @@ -853,13 +855,13 @@ def _to_frame(self, index, names): if index: index_spark_column_names = self._internal.index_spark_column_names index_names = self._internal.index_names - data_columns = self._internal.index_spark_column_names + data_columns = index_spark_column_names sdf = self._internal.spark_frame.select( - index_spark_column_names + [NATURAL_ORDER_COLUMN_NAME] + self._internal.index_spark_columns + [NATURAL_ORDER_COLUMN_NAME] ) else: - index_spark_column_names = None - index_names = None + index_spark_column_names = [] + index_names = [] data_columns = [name_like_string(label) for label in names] sdf = self._internal.spark_frame.select( [ @@ -871,7 +873,7 @@ def _to_frame(self, index, names): internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=index_spark_column_names, + index_spark_columns=[scol_for(sdf, col) for col in index_spark_column_names], index_names=index_names, column_labels=names, data_spark_columns=[scol_for(sdf, col) for col in data_columns], @@ -1022,11 +1024,12 @@ def dropna(self) -> "Index": ('falcon', 'length')], ) """ - kdf = self._kdf.copy() - sdf = kdf._internal.spark_frame.select(self._internal.index_spark_columns).dropna() + sdf = self._internal.spark_frame.select(self._internal.index_spark_columns).dropna() internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) return DataFrame(internal).index @@ -1069,13 +1072,13 @@ def unique(self, level=None) -> "Index": if level is not None: self._validate_index_level(level) scols = self._internal.index_spark_columns - scol_names = self._internal.index_spark_column_names - scols = [scol.alias(scol_name) for scol, scol_name in zip(scols, scol_names)] sdf = self._kdf._internal.spark_frame.select(scols).distinct() return DataFrame( InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) ).index @@ -1107,7 +1110,9 @@ def drop(self, labels) -> "Index": DataFrame( InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) ) @@ -1254,21 +1259,25 @@ def droplevel(self, level) -> "Index": "left.".format(len(level), nlevels) ) - index_spark_column_names, index_names = zip( + index_spark_columns, index_spark_column_names, index_names = zip( *[ item for i, item in enumerate( - zip(self._internal.index_spark_column_names, self._internal.index_names) + zip( + self._internal.index_spark_columns, + self._internal.index_spark_column_names, + self._internal.index_names, + ) ) if i not in int_level ] ) sdf = self._internal.spark_frame - sdf = sdf.select(*index_spark_column_names) + sdf = sdf.select(*index_spark_columns) internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=list(index_spark_column_names), + index_spark_columns=[scol_for(sdf, col) for col in index_spark_column_names], index_names=list(index_names), ) return DataFrame(internal).index @@ -1335,7 +1344,9 @@ def symmetric_difference(self, other, result_name=None, sort=None) -> "Index": internal = InternalFrame( spark_frame=sdf_symdiff, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf_symdiff, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) result = Index(DataFrame(internal)) @@ -1407,11 +1418,15 @@ def sort_values(self, ascending=True) -> "Index": ) """ sdf = self._internal.spark_frame - sdf = sdf.orderBy(self._internal.index_spark_columns, ascending=ascending) + sdf = sdf.orderBy(self._internal.index_spark_columns, ascending=ascending).select( + self._internal.index_spark_columns + ) internal = InternalFrame( - spark_frame=sdf.select(self._internal.index_spark_columns), - index_spark_column_names=self._internal.index_spark_column_names, + spark_frame=sdf, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) return DataFrame(internal).index @@ -1597,10 +1612,13 @@ def is_len_exceeded(index): index_value_column_names, self._internal.index_spark_column_names ) ] + sdf = sdf.select(index_origin_columns) internal = InternalFrame( - spark_frame=sdf.select(index_origin_columns), - index_spark_column_names=self._internal.index_spark_column_names, + spark_frame=sdf, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) @@ -1659,7 +1677,9 @@ def append(self, other: "Index") -> "Index": internal = InternalFrame( spark_frame=sdf_appended, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf_appended, col) for col in self._internal.index_spark_column_names + ], index_names=index_names, ) @@ -1867,7 +1887,9 @@ def difference(self, other, sort=None) -> "Index": sdf_diff = sdf_self.select(idx_self).subtract(sdf_other.select(idx_other)) internal = InternalFrame( spark_frame=sdf_diff, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf_diff, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) result = DataFrame(internal).index @@ -1971,13 +1993,7 @@ def repeat(self, repeats: int) -> "Index": elif repeats < 0: raise ValueError("negative dimensions are not allowed") - sdf = self._internal.spark_frame.select(self._internal.index_spark_columns) - internal = InternalFrame( - spark_frame=sdf, - index_spark_column_names=sdf.columns, - index_names=self._internal.index_names, - ) - kdf = DataFrame(internal) # type: DataFrame + kdf = DataFrame(self._internal.resolved_copy) # type: DataFrame if repeats == 0: return DataFrame(kdf._internal.with_filter(F.lit(False))).index else: @@ -2103,7 +2119,9 @@ def union(self, other, sort=None) -> "Index": sdf = sdf.sort(self._internal.index_spark_column_names) internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) @@ -2163,8 +2181,6 @@ def intersection(self, other) -> "Index": >>> idx1.intersection(idx2).sort_values() Int64Index([3, 4], dtype='int64') """ - keep_name = True - if isinstance(other, DataFrame): raise ValueError("Index data must be 1-dimensional") elif isinstance(other, MultiIndex): @@ -2188,14 +2204,12 @@ def intersection(self, other) -> "Index": spark_frame_self = self.to_frame(name=SPARK_DEFAULT_INDEX_NAME).to_spark() spark_frame_intersected = spark_frame_self.intersect(spark_frame_other) if keep_name: - index_spark_column_names = self._internal.index_spark_column_names index_names = self._internal.index_names else: - index_spark_column_names = [SPARK_DEFAULT_INDEX_NAME] index_names = None internal = InternalFrame( spark_frame=spark_frame_intersected, - index_spark_column_names=index_spark_column_names, + index_spark_columns=[scol_for(spark_frame_intersected, SPARK_DEFAULT_INDEX_NAME)], index_names=index_names, ) @@ -2256,7 +2270,6 @@ def insert(self, loc: int, item) -> "Index": loc = 0 if loc < 0 else loc index_name = self._internal.index_spark_column_names[0] - sdf = self._internal.spark_frame sdf_before = self.to_frame(name=index_name)[:loc].to_spark() sdf_middle = Index([item]).to_frame(name=index_name).to_spark() sdf_after = self.to_frame(name=index_name)[loc:].to_spark() @@ -2578,7 +2591,9 @@ def from_frame(df, names=None) -> "MultiIndex": names = [name if is_name_like_tuple(name) else (name,) for name in names] internal = InternalFrame( - spark_frame=sdf, index_spark_column_names=sdf.columns, index_names=names + spark_frame=sdf, + index_spark_columns=[scol_for(sdf, col) for col in sdf.columns], + index_names=names, ) return cast(MultiIndex, DataFrame(internal).index) @@ -2655,11 +2670,11 @@ def swaplevel(self, i=-2, j=-1) -> "MultiIndex": "%s is not a valid level number" % (len(self.names), index) ) - index_map = list(zip(self._internal.index_spark_column_names, self._internal.index_names)) + index_map = list(zip(self._internal.index_spark_columns, self._internal.index_names)) index_map[i], index_map[j], = index_map[j], index_map[i] - index_spark_column_names, index_names = zip(*index_map) + index_spark_columns, index_names = zip(*index_map) internal = self._kdf._internal.copy( - index_spark_column_names=list(index_spark_column_names), index_names=list(index_names), + index_spark_columns=list(index_spark_columns), index_names=list(index_names) ) return cast(MultiIndex, DataFrame(internal).index) @@ -2715,11 +2730,20 @@ def _is_monotonic_increasing(self): cond = has_not_null & (prev.isNull() | cond) + cond_name = verify_temp_column_name( + self._internal.spark_frame.select(self._internal.index_spark_columns), + "__is_monotonic_increasing_cond__", + ) + + sdf = self._internal.spark_frame.select( + self._internal.index_spark_columns + [cond.alias(cond_name)] + ) + internal = InternalFrame( - spark_frame=self._internal.spark_frame.select( - self._internal.index_spark_columns + [cond] - ), - index_spark_column_names=self._internal.index_spark_column_names, + spark_frame=sdf, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) @@ -2749,11 +2773,20 @@ def _is_monotonic_decreasing(self): cond = has_not_null & (prev.isNull() | cond) + cond_name = verify_temp_column_name( + self._internal.spark_frame.select(self._internal.index_spark_columns), + "__is_monotonic_decreasing_cond__", + ) + + sdf = self._internal.spark_frame.select( + self._internal.index_spark_columns + [cond.alias(cond_name)] + ) + internal = InternalFrame( - spark_frame=self._internal.spark_frame.select( - self._internal.index_spark_columns + [cond] - ), - index_spark_column_names=self._internal.index_spark_column_names, + spark_frame=sdf, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) @@ -2982,7 +3015,9 @@ def symmetric_difference(self, other, result_name=None, sort=None) -> "MultiInde internal = InternalFrame( spark_frame=sdf_symdiff, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf_symdiff, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) result = MultiIndex(DataFrame(internal)) @@ -3054,7 +3089,9 @@ def drop(self, codes, level=None) -> "MultiIndex": DataFrame( InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) ) @@ -3189,7 +3226,9 @@ def get_level_values(self, level) -> Index: scol = self._internal.index_spark_columns[level] sdf = self._internal.spark_frame.select(scol) internal = InternalFrame( - spark_frame=sdf, index_spark_column_names=[index_scol_name], index_names=[index_name] + spark_frame=sdf, + index_spark_columns=[scol_for(sdf, index_scol_name)], + index_names=[index_name], ) return DataFrame(internal).index @@ -3250,7 +3289,9 @@ def insert(self, loc: int, item) -> Index: internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, ) return DataFrame(internal).index @@ -3320,14 +3361,12 @@ def intersection(self, other) -> "MultiIndex": spark_frame_self = self.to_frame(name=default_name).to_spark() spark_frame_intersected = spark_frame_self.intersect(spark_frame_other) if keep_name: - index_spark_column_names = self._internal.index_spark_column_names index_names = self._internal.index_names else: - index_spark_column_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(self.nlevels)] index_names = None internal = InternalFrame( spark_frame=spark_frame_intersected, - index_spark_column_names=index_spark_column_names, + index_spark_columns=[scol_for(spark_frame_intersected, col) for col in default_name], index_names=index_names, ) return cast(MultiIndex, DataFrame(internal).index) diff --git a/databricks/koalas/indexing.py b/databricks/koalas/indexing.py index c172f63051..99b35c1a99 100644 --- a/databricks/koalas/indexing.py +++ b/databricks/koalas/indexing.py @@ -437,12 +437,10 @@ def __getitem__(self, key) -> Union["Series", "DataFrame"]: return kser if remaining_index is not None: - index_scols = self._internal.index_spark_columns[-remaining_index:] - index_spark_column_names = self._internal.index_spark_column_names[-remaining_index:] + index_spark_columns = self._internal.index_spark_columns[-remaining_index:] index_names = self._internal.index_names[-remaining_index:] else: - index_scols = self._internal.index_spark_columns - index_spark_column_names = self._internal.index_spark_column_names + index_spark_columns = self._internal.index_spark_columns index_names = self._internal.index_names if len(column_labels) > 0: @@ -470,8 +468,10 @@ def __getitem__(self, key) -> Union["Series", "DataFrame"]: sdf = self._internal.spark_frame if cond is not None: + index_columns = sdf.select(index_spark_columns).columns data_columns = sdf.select(data_spark_columns).columns - sdf = sdf.filter(cond).select(index_scols + data_spark_columns) + sdf = sdf.filter(cond).select(index_spark_columns + data_spark_columns) + index_spark_columns = [scol_for(sdf, col) for col in index_columns] data_spark_columns = [scol_for(sdf, col) for col in data_columns] if limit is not None: @@ -489,7 +489,7 @@ def __getitem__(self, key) -> Union["Series", "DataFrame"]: internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=index_spark_column_names, + index_spark_columns=index_spark_columns, index_names=index_names, column_labels=column_labels, data_spark_columns=data_spark_columns, diff --git a/databricks/koalas/internal.py b/databricks/koalas/internal.py index 383466fc6f..45618b3f8e 100644 --- a/databricks/koalas/internal.py +++ b/databricks/koalas/internal.py @@ -363,7 +363,7 @@ class InternalFrame(object): def __init__( self, spark_frame: spark.DataFrame, - index_spark_column_names: Optional[List[str]], + index_spark_columns: Optional[List[spark.Column]], index_names: Optional[List[Optional[Tuple]]] = None, column_labels: Optional[List[Tuple]] = None, data_spark_columns: Optional[List[spark.Column]] = None, @@ -374,15 +374,14 @@ def __init__( index fields and names. :param spark_frame: Spark DataFrame to be managed. - :param index_spark_column_names: list of strings - the index field names which exists in Spark fields. + :param index_spark_columns: list of Spark Column + Spark Columns for the index. :param index_names: list of tuples the index names. :param column_labels: list of tuples with the same length The multi-level values in the tuples. :param data_spark_columns: list of Spark Column - Spark Columns to appear as columns. If spark_column is not None, - this argument is ignored, otherwise if this is None, calculated + Spark Columns to appear as columns. If this is None, calculated from spark_frame. :param column_label_names: Names for each of the column index levels. @@ -415,8 +414,8 @@ def __init__( | zoo| bar| 7| 8| 9|... +-----------------+-----------------+------+------+------+... - >>> internal._index_spark_column_names - ['__index_level_0__', '__index_level_1__', '(a, x)'] + >>> internal._index_spark_columns + [Column, Column, Column] >>> internal._index_names [('row_index_a',), ('row_index_b',), ('a', 'x')] @@ -434,7 +433,7 @@ def __init__( assert isinstance(spark_frame, spark.DataFrame) assert not spark_frame.isStreaming, "Koalas does not support Structured Streaming." - if not index_spark_column_names: + if not index_spark_columns: if data_spark_columns is not None: spark_frame = spark_frame.select(data_spark_columns) @@ -445,7 +444,7 @@ def __init__( # Create default index. spark_frame = InternalFrame.attach_default_index(spark_frame) - index_spark_column_names = [SPARK_DEFAULT_INDEX_NAME] + index_spark_columns = [scol_for(spark_frame, SPARK_DEFAULT_INDEX_NAME)] if data_spark_columns is not None: data_spark_columns = [ @@ -460,15 +459,15 @@ def __init__( ) if not index_names: - index_names = [None] * len(index_spark_column_names) + index_names = [None] * len(index_spark_columns) - assert len(index_spark_column_names) == len(index_names), ( - len(index_spark_column_names), + assert len(index_spark_columns) == len(index_names), ( + len(index_spark_columns), len(index_names), ) assert all( - isinstance(index_column_name, str) for index_column_name in index_spark_column_names - ), index_spark_column_names + isinstance(index_scol, spark.Column) for index_scol in index_spark_columns + ), index_spark_columns assert all( is_name_like_tuple(index_name, check_type=True) for index_name in index_names ), index_names @@ -477,15 +476,18 @@ def __init__( ) self._sdf = spark_frame # type: spark.DataFrame - self._index_spark_column_names = index_spark_column_names # type: List[str] + self._index_spark_columns = index_spark_columns # type: List[spark.Column] self._index_names = index_names # type: List[Optional[Tuple]] if data_spark_columns is None: - index_columns = set(index_spark_column_names) self._data_spark_columns = [ scol_for(spark_frame, col) for col in spark_frame.columns - if col not in index_columns and col not in HIDDEN_COLUMNS + if all( + not scol_for(spark_frame, col)._jc.equals(index_scol._jc) + for index_scol in index_spark_columns + ) + and col not in HIDDEN_COLUMNS ] else: self._data_spark_columns = data_spark_columns @@ -737,12 +739,12 @@ def data_spark_columns(self) -> List[spark.Column]: @property def index_spark_column_names(self) -> List[str]: """ Return the managed index field names. """ - return self._index_spark_column_names + return self.spark_frame.select(self.index_spark_columns).columns - @lazy_property + @property def index_spark_columns(self) -> List[spark.Column]: """ Return Spark Columns for the managed index columns. """ - return [scol_for(self.spark_frame, column) for column in self.index_spark_column_names] + return self._index_spark_columns @lazy_property def spark_column_names(self) -> List[str]: @@ -858,6 +860,7 @@ def resolved_copy(self) -> "InternalFrame": sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS)) return self.copy( spark_frame=sdf, + index_spark_columns=[scol_for(sdf, col) for col in self.index_spark_column_names], data_spark_columns=[scol_for(sdf, col) for col in self.data_spark_column_names], ) @@ -880,7 +883,9 @@ def with_new_sdf( ) sdf = spark_frame.drop(NATURAL_ORDER_COLUMN_NAME) return self.copy( - spark_frame=sdf, data_spark_columns=[scol_for(sdf, col) for col in data_columns] + spark_frame=sdf, + index_spark_columns=[scol_for(sdf, col) for col in self.index_spark_column_names], + data_spark_columns=[scol_for(sdf, col) for col in data_columns], ) def with_new_columns( @@ -933,15 +938,19 @@ def with_new_columns( sdf = self.spark_frame if not keep_order: sdf = self.spark_frame.select(self.index_spark_columns + data_spark_columns) + index_spark_columns = [scol_for(sdf, col) for col in self.index_spark_column_names] data_spark_columns = [ scol_for(sdf, col) for col in self.spark_frame.select(data_spark_columns).columns ] + else: + index_spark_columns = self.index_spark_columns if column_label_names is _NoValue: column_label_names = self._column_label_names return self.copy( spark_frame=sdf, + index_spark_columns=index_spark_columns, column_labels=column_labels, data_spark_columns=data_spark_columns, column_label_names=column_label_names, @@ -999,7 +1008,7 @@ def select_column(self, column_label: Tuple) -> "InternalFrame": def copy( self, spark_frame: Union[spark.DataFrame, _NoValueType] = _NoValue, - index_spark_column_names: Union[List[str], _NoValueType] = _NoValue, + index_spark_columns: Union[List[spark.Column], _NoValueType] = _NoValue, index_names: Union[List[Optional[Tuple]], _NoValueType] = _NoValue, column_labels: Optional[Union[List[Tuple], _NoValueType]] = _NoValue, data_spark_columns: Optional[Union[List[spark.Column], _NoValueType]] = _NoValue, @@ -1008,8 +1017,8 @@ def copy( """ Copy the immutable InternalFrame. :param spark_frame: the new Spark DataFrame. If not specified, the original one is used. - :param index_spark_column_names: the index field names which exists in Spark fields. - If not specified, the original ones are used. + :param index_spark_columns: the list of Spark Column. + If not specified, the original ones are used. :param index_names: the index names. If not specified, the original ones are used. :param column_labels: the new column labels. If not specified, the original ones are used. :param data_spark_columns: the new Spark Columns. @@ -1020,19 +1029,19 @@ def copy( """ if spark_frame is _NoValue: spark_frame = self.spark_frame - if index_spark_column_names is _NoValue: - index_spark_column_names = self.index_spark_column_names + if index_spark_columns is _NoValue: + index_spark_columns = self.index_spark_columns if index_names is _NoValue: index_names = self.index_names if column_labels is _NoValue: - column_labels = self._column_labels + column_labels = self.column_labels if data_spark_columns is _NoValue: - data_spark_columns = self._data_spark_columns + data_spark_columns = self.data_spark_columns if column_label_names is _NoValue: - column_label_names = self._column_label_names + column_label_names = self.column_label_names return InternalFrame( spark_frame=spark_frame, - index_spark_column_names=index_spark_column_names, + index_spark_columns=index_spark_columns, index_names=index_names, column_labels=column_labels, data_spark_columns=data_spark_columns, @@ -1081,7 +1090,7 @@ def from_pandas(pdf: pd.DataFrame) -> "InternalFrame": sdf = default_session().createDataFrame(reset_index, schema=schema) return InternalFrame( spark_frame=sdf, - index_spark_column_names=index_columns, + index_spark_columns=[scol_for(sdf, col) for col in index_columns], index_names=index_names, column_labels=column_labels, data_spark_columns=[scol_for(sdf, col) for col in data_columns], diff --git a/databricks/koalas/namespace.py b/databricks/koalas/namespace.py index 1ad68ef0a5..90bace222a 100644 --- a/databricks/koalas/namespace.py +++ b/databricks/koalas/namespace.py @@ -376,19 +376,19 @@ def read_csv( if col not in column_labels: raise KeyError(col) index_spark_column_names = [column_labels[col] for col in index_col] - index_names = [(col,) for col in index_col] + index_names = [(col,) for col in index_col] # type: List[Tuple] column_labels = OrderedDict( (label, col) for label, col in column_labels.items() if label not in index_col ) else: - index_spark_column_names = None - index_names = None + index_spark_column_names = [] + index_names = [] kdf = DataFrame( InternalFrame( spark_frame=sdf, - index_spark_column_names=index_spark_column_names, - index_names=cast(Optional[List[Optional[Tuple[Any, ...]]]], index_names), + index_spark_columns=[scol_for(sdf, col) for col in index_spark_column_names], + index_names=index_names, column_labels=[ label if is_name_like_tuple(label) else (label,) for label in column_labels ], @@ -581,13 +581,11 @@ def read_table(name: str, index_col: Optional[Union[str, List[str]]] = None) -> 0 0 """ sdf = default_session().read.table(name) - index_spark_column_names, index_names = _get_index_map(sdf, index_col) + index_spark_columns, index_names = _get_index_map(sdf, index_col) return DataFrame( InternalFrame( - spark_frame=sdf, - index_spark_column_names=index_spark_column_names, - index_names=index_names, + spark_frame=sdf, index_spark_columns=index_spark_columns, index_names=index_names ) ) @@ -667,13 +665,11 @@ def read_spark_io( options = options.get("options") # type: ignore sdf = default_session().read.load(path=path, format=format, schema=schema, **options) - index_spark_column_names, index_names = _get_index_map(sdf, index_col) + index_spark_columns, index_names = _get_index_map(sdf, index_col) return DataFrame( InternalFrame( - spark_frame=sdf, - index_spark_column_names=index_spark_column_names, - index_names=index_names, + spark_frame=sdf, index_spark_columns=index_spark_columns, index_names=index_names ) ) @@ -773,11 +769,11 @@ def read_index_metadata(pser): kdf = kdf[new_columns] else: sdf = default_session().createDataFrame([], schema=StructType()) - index_spark_column_names, index_names = _get_index_map(sdf, index_col) + index_spark_columns, index_names = _get_index_map(sdf, index_col) kdf = DataFrame( InternalFrame( spark_frame=sdf, - index_spark_column_names=index_spark_column_names, + index_spark_columns=index_spark_columns, index_names=index_names, ) ) @@ -1333,12 +1329,10 @@ def read_sql_table( reader.schema(schema) reader.options(**options) sdf = reader.format("jdbc").load() - index_spark_column_names, index_names = _get_index_map(sdf, index_col) + index_spark_columns, index_names = _get_index_map(sdf, index_col) kdf = DataFrame( InternalFrame( - spark_frame=sdf, - index_spark_column_names=index_spark_column_names, - index_names=index_names, + spark_frame=sdf, index_spark_columns=index_spark_columns, index_names=index_names ) ) # type: DataFrame if columns is not None: @@ -1393,12 +1387,10 @@ def read_sql_query(sql, con, index_col=None, **options) -> DataFrame: reader.option("url", con) reader.options(**options) sdf = reader.format("jdbc").load() - index_spark_column_names, index_names = _get_index_map(sdf, index_col) + index_spark_columns, index_names = _get_index_map(sdf, index_col) return DataFrame( InternalFrame( - spark_frame=sdf, - index_spark_column_names=index_spark_column_names, - index_names=index_names, + spark_frame=sdf, index_spark_columns=index_spark_columns, index_names=index_names ) ) @@ -2146,6 +2138,9 @@ def resolve_func(kdf, this_column_labels, that_column_labels): kdf = DataFrame( kdf._internal.copy( spark_frame=sdf, + index_spark_columns=[ + scol_for(sdf, col) for col in kdf._internal.index_spark_column_names + ], column_labels=(kdf._internal.column_labels + columns_to_add), data_spark_columns=[scol_for(sdf, col) for col in data_columns], ) @@ -2165,8 +2160,8 @@ def resolve_func(kdf, this_column_labels, that_column_labels): concatenated = reduce(lambda x, y: x.union(y), sdfs) if ignore_index: - index_spark_column_names = None - index_names = None + index_spark_column_names = [] + index_names = [] else: index_spark_column_names = kdfs[0]._internal.index_spark_column_names index_names = kdfs[0]._internal.index_names @@ -2174,7 +2169,7 @@ def resolve_func(kdf, this_column_labels, that_column_labels): result_kdf = DataFrame( kdfs[0]._internal.copy( spark_frame=concatenated, - index_spark_column_names=index_spark_column_names, + index_spark_columns=[scol_for(concatenated, col) for col in index_spark_column_names], index_names=index_names, data_spark_columns=[ scol_for(concatenated, col) for col in kdfs[0]._internal.data_spark_column_names @@ -2602,7 +2597,7 @@ def broadcast(obj) -> DataFrame: def _get_index_map( sdf: spark.DataFrame, index_col: Optional[Union[str, List[str]]] = None -) -> Tuple[Optional[List[str]], Optional[List[Tuple]]]: +) -> Tuple[Optional[List[spark.Column]], Optional[List[Tuple]]]: if index_col is not None: if isinstance(index_col, str): index_col = [index_col] @@ -2610,13 +2605,15 @@ def _get_index_map( for col in index_col: if col not in sdf_columns: raise KeyError(col) - index_spark_column_names = index_col # type: Optional[List[str]] + index_spark_columns = [ + scol_for(sdf, col) for col in index_col + ] # type: Optional[List[spark.Column]] index_names = [(col,) for col in index_col] # type: Optional[List[Tuple]] else: - index_spark_column_names = None + index_spark_columns = None index_names = None - return index_spark_column_names, index_names + return index_spark_columns, index_names _get_dummies_default_accept_types = (DecimalType, StringType, DateType) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index 67438ecdae..1a292027b7 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -2234,7 +2234,7 @@ def unique(self) -> "Series": sdf = self._internal.spark_frame.select(self.spark.column).distinct() internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=None, + index_spark_columns=None, column_labels=[self._column_label], data_spark_columns=[scol_for(sdf, self._internal.data_spark_column_names[0])], column_label_names=self._internal.column_label_names, @@ -2531,7 +2531,7 @@ def add_prefix(self, prefix) -> "Series": dtype: int64 """ assert isinstance(prefix, str) - internal = self._internal + internal = self._internal.resolved_copy sdf = internal.spark_frame.select( [ F.concat(F.lit(prefix), index_spark_column).alias(index_spark_column_name) @@ -2584,7 +2584,7 @@ def add_suffix(self, suffix) -> "Series": dtype: int64 """ assert isinstance(suffix, str) - internal = self._internal + internal = self._internal.resolved_copy sdf = internal.spark_frame.select( [ F.concat(index_spark_column, F.lit(suffix)).alias(index_spark_column_name) @@ -3317,7 +3317,7 @@ def quantile(self, q=0.5, accuracy=10000) -> Union[Scalar, "Series"]: internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=[internal_index_column], + index_spark_columns=[scol_for(sdf, internal_index_column)], column_labels=None, data_spark_columns=[scol_for(sdf, value_column)], column_label_names=None, @@ -3911,20 +3911,22 @@ def pop(self, item) -> Union["Series", Scalar]: pdf = sdf.limit(2).toPandas() length = len(pdf) if length == 1: - return pdf[self._internal.data_spark_column_names[0]].iloc[0] + return pdf[internal.data_spark_column_names[0]].iloc[0] item_string = name_like_string(item) sdf = sdf.withColumn(SPARK_DEFAULT_INDEX_NAME, F.lit(str(item_string))) internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=[SPARK_DEFAULT_INDEX_NAME], + index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)], column_labels=[self._column_label], ) return first_series(DataFrame(internal)) else: internal = internal.copy( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names[len(item) :], + index_spark_columns=[ + scol_for(sdf, col) for col in internal.index_spark_column_names[len(item) :] + ], index_names=self._internal.index_names[len(item) :], data_spark_columns=[scol_for(sdf, internal.data_spark_column_names[0])], ) @@ -4038,7 +4040,7 @@ def mode(self, dropna=True) -> "Series": F.col(SPARK_DEFAULT_INDEX_NAME).alias(SPARK_DEFAULT_SERIES_NAME) ) internal = InternalFrame( - spark_frame=sdf, index_spark_column_names=None, column_labels=[self._column_label] + spark_frame=sdf, index_spark_columns=None, column_labels=[self._column_label] ) return first_series(DataFrame(internal)) @@ -4607,7 +4609,7 @@ def xs(self, key, level=None) -> "Series": internal = internal.copy( spark_frame=sdf, - index_spark_column_names=index_spark_column_names, + index_spark_columns=[scol_for(sdf, col) for col in index_spark_column_names], index_names=index_names, data_spark_columns=[scol_for(sdf, internal.data_spark_column_names[0])], ) @@ -4863,6 +4865,9 @@ def repeat(self, repeats: Union[int, "Series"]) -> "Series": sdf = self._internal.spark_frame.select(self._internal.index_spark_columns + [scol]) internal = self._internal.copy( spark_frame=sdf, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], data_spark_columns=[scol_for(sdf, name_like_string(self.name))], ) return first_series(DataFrame(internal)) @@ -5053,7 +5058,7 @@ def unstack(self, level=-1) -> DataFrame: sdf = sdf.groupby(list(index_scol_names)).pivot(pivot_col).agg(F.first(scol_for(sdf, col))) internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=list(index_scol_names), + index_spark_columns=[scol_for(sdf, col) for col in index_scol_names], index_names=list(index_names), column_label_names=[column_label_names], ) @@ -5620,7 +5625,9 @@ def compare( sdf = sdf.select(index_scols + [this_scol, that_scol, NATURAL_ORDER_COLUMN_NAME]) internal = InternalFrame( spark_frame=sdf, - index_spark_column_names=self._internal.index_spark_column_names, + index_spark_columns=[ + scol_for(sdf, col) for col in self._internal.index_spark_column_names + ], index_names=self._internal.index_names, column_labels=[(this_column_label,), (that_column_label,)], data_spark_columns=[scol_for(sdf, this_column_label), scol_for(sdf, that_column_label)], diff --git a/databricks/koalas/tests/test_namespace.py b/databricks/koalas/tests/test_namespace.py index 9127d5f217..5b11aa1fb6 100644 --- a/databricks/koalas/tests/test_namespace.py +++ b/databricks/koalas/tests/test_namespace.py @@ -265,8 +265,17 @@ def test_get_index_map(self): kdf = ks.DataFrame({"year": [2015, 2016], "month": [2, 3], "day": [4, 5]}) sdf = kdf.to_spark() self.assertEqual(_get_index_map(sdf), (None, None)) - self.assertEqual(_get_index_map(sdf, "year"), (["year"], [("year",)])) - self.assertEqual( - _get_index_map(sdf, ["year", "month"]), (["year", "month"], [("year",), ("month",)]) - ) + + def check(actual, expected): + actual_scols, actual_labels = actual + expected_column_names, expected_labels = expected + self.assertEqual(len(actual_scols), len(expected_column_names)) + for actual_scol, expected_column_name in zip(actual_scols, expected_column_names): + expected_scol = sdf[expected_column_name] + self.assertTrue(actual_scol._jc.equals(expected_scol._jc)) + self.assertEqual(actual_labels, expected_labels) + + check(_get_index_map(sdf, "year"), (["year"], [("year",)])) + check(_get_index_map(sdf, ["year", "month"]), (["year", "month"], [("year",), ("month",)])) + self.assertRaises(KeyError, lambda: _get_index_map(sdf, ["year", "hour"])) diff --git a/databricks/koalas/utils.py b/databricks/koalas/utils.py index c95e07c186..ac72864cd6 100644 --- a/databricks/koalas/utils.py +++ b/databricks/koalas/utils.py @@ -67,7 +67,13 @@ def same_anchor( return ( this_internal.spark_frame is that_internal.spark_frame - and this_internal.index_spark_column_names == that_internal.index_spark_column_names + and this_internal.index_level == that_internal.index_level + and all( + this_scol._jc.equals(that_scol._jc) + for this_scol, that_scol in zip( + this_internal.index_spark_columns, that_internal.index_spark_columns + ) + ) ) @@ -124,7 +130,9 @@ def resolve(internal, side): ) return internal.copy( spark_frame=sdf, - index_spark_column_names=[rename(col) for col in internal.index_spark_column_names], + index_spark_columns=[ + scol_for(sdf, rename(col)) for col in internal.index_spark_column_names + ], data_spark_columns=[ scol_for(sdf, rename(col)) for col in internal.data_spark_column_names ], @@ -217,7 +225,7 @@ def fill_label(label): return DataFrame( InternalFrame( spark_frame=joined_df, - index_spark_column_names=index_column_names, + index_spark_columns=[scol_for(joined_df, col) for col in index_column_names], index_names=this_internal.index_names, column_labels=column_labels, data_spark_columns=[scol_for(joined_df, col) for col in new_data_columns], diff --git a/databricks/koalas/window.py b/databricks/koalas/window.py index 5e0bd1c04e..a8b685e9df 100644 --- a/databricks/koalas/window.py +++ b/databricks/koalas/window.py @@ -703,7 +703,7 @@ def _apply_as_series_or_frame(self, func): internal = kdf._internal.copy( spark_frame=sdf, - index_spark_column_names=list(new_index_map.keys()), + index_spark_columns=[scol_for(sdf, col) for col in new_index_map.keys()], index_names=list(new_index_map.values()), column_labels=[c._column_label for c in applied], data_spark_columns=[