52
52
from databricks .koalas .missing .frame import _MissingPandasLikeDataFrame
53
53
from databricks .koalas .ml import corr
54
54
from databricks .koalas .utils import column_index_level , scol_for
55
- from databricks .koalas .typedef import as_spark_type , as_python_type
55
+ from databricks .koalas .typedef import _infer_return_type , as_spark_type , as_python_type
56
56
from databricks .koalas .plot import KoalasFramePlotMethods
57
57
from databricks .koalas .config import get_option
58
58
@@ -3531,8 +3531,6 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
3531
3531
axis = 0
3532
3532
if not (axis == 0 or axis == "index" ):
3533
3533
raise NotImplementedError ("fillna currently only works for axis=0 or axis='index'" )
3534
- if (value is None ) and (method is None ):
3535
- raise ValueError ("Must specify a fillna 'value' or 'method' parameter." )
3536
3534
if not isinstance (value , (float , int , str , bool , dict , pd .Series )):
3537
3535
raise TypeError ("Unsupported type %s" % type (value ))
3538
3536
if isinstance (value , pd .Series ):
@@ -3547,6 +3545,9 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
3547
3545
sdf = sdf .fillna (value )
3548
3546
internal = self ._internal .copy (sdf = sdf )
3549
3547
else :
3548
+ if method is None :
3549
+ raise ValueError ("Must specify a fillna 'value' or 'method' parameter." )
3550
+
3550
3551
applied = []
3551
3552
for idx in self ._internal .column_index :
3552
3553
applied .append (self [idx ].fillna (value = value , method = method , axis = axis ,
@@ -5378,7 +5379,8 @@ def merge(self, right: 'DataFrame', how: str = 'inner',
5378
5379
column_index = column_index )
5379
5380
return DataFrame (internal )
5380
5381
5381
- def join (self , right : 'DataFrame' , on : Optional [Union [str , List [str ]]] = None ,
5382
+ def join (self , right : 'DataFrame' ,
5383
+ on : Optional [Union [str , List [str ], Tuple [str , ...], List [Tuple [str , ...]]]] = None ,
5382
5384
how : str = 'left' , lsuffix : str = '' , rsuffix : str = '' ) -> 'DataFrame' :
5383
5385
"""
5384
5386
Join columns of another DataFrame.
@@ -5632,13 +5634,14 @@ def update(self, other: 'DataFrame', join: str = 'left', overwrite: bool = True)
5632
5634
if isinstance (other , ks .Series ):
5633
5635
other = DataFrame (other )
5634
5636
5635
- update_columns = list (set (self ._internal .data_columns )
5636
- .intersection (set (other ._internal .data_columns )))
5637
+ update_columns = list (set (self ._internal .column_index )
5638
+ .intersection (set (other ._internal .column_index )))
5637
5639
update_sdf = self .join (other [update_columns ], rsuffix = '_new' )._sdf
5638
5640
5639
- for column_name in update_columns :
5641
+ for column_index in update_columns :
5642
+ column_name = self ._internal .column_name_for (column_index )
5640
5643
old_col = scol_for (update_sdf , column_name )
5641
- new_col = scol_for (update_sdf , column_name + '_new' )
5644
+ new_col = scol_for (update_sdf , other . _internal . column_name_for ( column_index ) + '_new' )
5642
5645
if overwrite :
5643
5646
update_sdf = update_sdf .withColumn (column_name , F .when (new_col .isNull (), old_col )
5644
5647
.otherwise (new_col ))
@@ -6853,6 +6856,224 @@ def filter(self, items=None, like=None, regex=None, axis=None):
6853
6856
else :
6854
6857
raise TypeError ("Must pass either `items`, `like`, or `regex`" )
6855
6858
6859
+ def rename (self ,
6860
+ mapper = None ,
6861
+ index = None ,
6862
+ columns = None ,
6863
+ axis = 'index' ,
6864
+ inplace = False ,
6865
+ level = None ,
6866
+ errors = 'ignore' ):
6867
+
6868
+ """
6869
+ Alter axes labels.
6870
+ Function / dict values must be unique (1-to-1). Labels not contained in a dict / Series
6871
+ will be left as-is. Extra labels listed don’t throw an error.
6872
+
6873
+ Parameters
6874
+ ----------
6875
+ mapper : dict-like or function
6876
+ Dict-like or functions transformations to apply to that axis’ values.
6877
+ Use either `mapper` and `axis` to specify the axis to target with `mapper`, or `index`
6878
+ and `columns`.
6879
+ index : dict-like or function
6880
+ Alternative to specifying axis ("mapper, axis=0" is equivalent to "index=mapper").
6881
+ columns : dict-like or function
6882
+ Alternative to specifying axis ("mapper, axis=1" is equivalent to "columns=mapper").
6883
+ axis : int or str, default 'index'
6884
+ Axis to target with mapper. Can be either the axis name ('index', 'columns') or
6885
+ number (0, 1).
6886
+ inplace : bool, default False
6887
+ Whether to return a new DataFrame.
6888
+ level : int or level name, default None
6889
+ In case of a MultiIndex, only rename labels in the specified level.
6890
+ errors : {'ignore', 'raise}, default 'ignore'
6891
+ If 'raise', raise a `KeyError` when a dict-like `mapper`, `index`, or `columns`
6892
+ contains labels that are not present in the Index being transformed. If 'ignore',
6893
+ existing keys will be renamed and extra keys will be ignored.
6894
+
6895
+ Returns
6896
+ -------
6897
+ DataFrame with the renamed axis labels.
6898
+
6899
+ Raises:
6900
+ -------
6901
+ `KeyError`
6902
+ If any of the labels is not found in the selected axis and "errors='raise'".
6903
+
6904
+ Examples
6905
+ --------
6906
+ >>> kdf1 = ks.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
6907
+ >>> kdf1.rename(columns={"A": "a", "B": "c"}) # doctest: +NORMALIZE_WHITESPACE
6908
+ a c
6909
+ 0 1 4
6910
+ 1 2 5
6911
+ 2 3 6
6912
+
6913
+ >>> kdf1.rename(index={1: 10, 2: 20}) # doctest: +NORMALIZE_WHITESPACE
6914
+ A B
6915
+ 0 1 4
6916
+ 10 2 5
6917
+ 20 3 6
6918
+
6919
+ >>> def str_lower(s) -> str:
6920
+ ... return str.lower(s)
6921
+ >>> kdf1.rename(str_lower, axis='columns') # doctest: +NORMALIZE_WHITESPACE
6922
+ a b
6923
+ 0 1 4
6924
+ 1 2 5
6925
+ 2 3 6
6926
+
6927
+ >>> def mul10(x) -> int:
6928
+ ... return x * 10
6929
+ >>> kdf1.rename(mul10, axis='index') # doctest: +NORMALIZE_WHITESPACE
6930
+ A B
6931
+ 0 1 4
6932
+ 10 2 5
6933
+ 20 3 6
6934
+
6935
+ >>> idx = pd.MultiIndex.from_tuples([('X', 'A'), ('X', 'B'), ('Y', 'C'), ('Y', 'D')])
6936
+ >>> kdf2 = ks.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=idx)
6937
+ >>> kdf2.rename(columns=str_lower, level=0) # doctest: +NORMALIZE_WHITESPACE
6938
+ x y
6939
+ A B C D
6940
+ 0 1 2 3 4
6941
+ 1 5 6 7 8
6942
+
6943
+ >>> kdf3 = ks.DataFrame([[1, 2], [3, 4], [5, 6], [7, 8]], index=idx, columns=list('ab'))
6944
+ >>> kdf3.rename(index=str_lower) # doctest: +NORMALIZE_WHITESPACE
6945
+ a b
6946
+ x a 1 2
6947
+ b 3 4
6948
+ y c 5 6
6949
+ d 7 8
6950
+ """
6951
+
6952
+ def gen_mapper_fn (mapper ):
6953
+ if isinstance (mapper , dict ):
6954
+ if len (mapper ) == 0 :
6955
+ if errors == 'raise' :
6956
+ raise KeyError ('Index include label which is not in the `mapper`.' )
6957
+ else :
6958
+ return DataFrame (self ._internal )
6959
+
6960
+ type_set = set (map (lambda x : type (x ), mapper .values ()))
6961
+ if len (type_set ) > 1 :
6962
+ raise ValueError ("Mapper dict should have the same value type." )
6963
+ spark_return_type = as_spark_type (list (type_set )[0 ])
6964
+
6965
+ def mapper_fn (x ):
6966
+ if x in mapper :
6967
+ return mapper [x ]
6968
+ else :
6969
+ if errors == 'raise' :
6970
+ raise KeyError ('Index include value which is not in the `mapper`' )
6971
+ return x
6972
+ elif callable (mapper ):
6973
+ spark_return_type = _infer_return_type (mapper ).tpe
6974
+
6975
+ def mapper_fn (x ):
6976
+ return mapper (x )
6977
+ else :
6978
+ raise ValueError ("`mapper` or `index` or `columns` should be "
6979
+ "either dict-like or function type." )
6980
+ return mapper_fn , spark_return_type
6981
+
6982
+ index_mapper_fn = None
6983
+ index_mapper_ret_stype = None
6984
+ columns_mapper_fn = None
6985
+
6986
+ if mapper :
6987
+ if axis == 'index' or axis == 0 :
6988
+ index_mapper_fn , index_mapper_ret_stype = gen_mapper_fn (mapper )
6989
+ elif axis == 'columns' or axis == 1 :
6990
+ columns_mapper_fn , columns_mapper_ret_stype = gen_mapper_fn (mapper )
6991
+ else :
6992
+ raise ValueError ("argument axis should be either the axis name "
6993
+ "(‘index’, ‘columns’) or number (0, 1)" )
6994
+ else :
6995
+ if index :
6996
+ index_mapper_fn , index_mapper_ret_stype = gen_mapper_fn (index )
6997
+ if columns :
6998
+ columns_mapper_fn , _ = gen_mapper_fn (columns )
6999
+
7000
+ if not index and not columns :
7001
+ raise ValueError ("Either `index` or `columns` should be provided." )
7002
+
7003
+ internal = self ._internal
7004
+ if index_mapper_fn :
7005
+ # rename index labels, if `level` is None, rename all index columns, otherwise only
7006
+ # rename the corresponding level index.
7007
+ # implement this by transform the underlying spark dataframe,
7008
+ # Example:
7009
+ # suppose the kdf index column in underlying spark dataframe is "index_0", "index_1",
7010
+ # if rename level 0 index labels, will do:
7011
+ # ``kdf._sdf.withColumn("index_0", mapper_fn_udf(col("index_0"))``
7012
+ # if rename all index labels (`level` is None), then will do:
7013
+ # ```
7014
+ # kdf._sdf.withColumn("index_0", mapper_fn_udf(col("index_0"))
7015
+ # .withColumn("index_1", mapper_fn_udf(col("index_1"))
7016
+ # ```
7017
+
7018
+ index_columns = internal .index_columns
7019
+ num_indices = len (index_columns )
7020
+ if level :
7021
+ if level < 0 or level >= num_indices :
7022
+ raise ValueError ("level should be an integer between [0, num_indices)" )
7023
+
7024
+ def gen_new_index_column (level ):
7025
+ index_col_name = index_columns [level ]
7026
+
7027
+ index_mapper_udf = pandas_udf (lambda s : s .map (index_mapper_fn ),
7028
+ returnType = index_mapper_ret_stype )
7029
+ return index_mapper_udf (scol_for (internal .sdf , index_col_name ))
7030
+
7031
+ sdf = internal .sdf
7032
+ if level is None :
7033
+ for i in range (num_indices ):
7034
+ sdf = sdf .withColumn (index_columns [i ], gen_new_index_column (i ))
7035
+ else :
7036
+ sdf = sdf .withColumn (index_columns [level ], gen_new_index_column (level ))
7037
+ internal = internal .copy (sdf = sdf )
7038
+ if columns_mapper_fn :
7039
+ # rename column name.
7040
+ # Will modify the `_internal._column_index` and transform underlying spark dataframe
7041
+ # to the same column name with `_internal._column_index`.
7042
+ if level :
7043
+ if level < 0 or level >= internal .column_index_level :
7044
+ raise ValueError ("level should be an integer between [0, column_index_level)" )
7045
+
7046
+ def gen_new_column_index_entry (column_index_entry ):
7047
+ if isinstance (column_index_entry , tuple ):
7048
+ if level is None :
7049
+ # rename all level columns
7050
+ return tuple (map (columns_mapper_fn , column_index_entry ))
7051
+ else :
7052
+ # only rename specified level column
7053
+ entry_list = list (column_index_entry )
7054
+ entry_list [level ] = columns_mapper_fn (entry_list [level ])
7055
+ return tuple (entry_list )
7056
+ else :
7057
+ return columns_mapper_fn (column_index_entry )
7058
+
7059
+ new_column_index = list (map (gen_new_column_index_entry , internal .column_index ))
7060
+
7061
+ if internal .column_index_level == 1 :
7062
+ new_data_columns = [col [0 ] for col in new_column_index ]
7063
+ else :
7064
+ new_data_columns = [str (col ) for col in new_column_index ]
7065
+ new_data_scols = [scol_for (internal .sdf , old_col_name ).alias (new_col_name )
7066
+ for old_col_name , new_col_name
7067
+ in zip (internal .data_columns , new_data_columns )]
7068
+ sdf = internal .sdf .select (* (internal .index_scols + new_data_scols ))
7069
+ internal = internal .copy (sdf = sdf , column_index = new_column_index ,
7070
+ data_columns = new_data_columns )
7071
+ if inplace :
7072
+ self ._internal = internal
7073
+ return self
7074
+ else :
7075
+ return DataFrame (internal )
7076
+
6856
7077
def _get_from_multiindex_column (self , key ):
6857
7078
""" Select columns from multi-index columns.
6858
7079
0 commit comments