19
19
"""
20
20
21
21
from functools import wraps
22
- from typing import Union
22
+ from typing import Union , Callable , Any
23
23
24
24
import numpy as np
25
25
import pandas as pd
30
30
from pyspark .sql .functions import monotonically_increasing_id
31
31
32
32
from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
33
- from databricks .koalas . frame import DataFrame
34
- from databricks .koalas .internal import _InternalFrame
33
+ from databricks .koalas import numpy_compat
34
+ from databricks .koalas .internal import _InternalFrame , SPARK_INDEX_NAME_FORMAT
35
35
from databricks .koalas .typedef import pandas_wraps , spark_type_to_pandas_dtype
36
36
from databricks .koalas .utils import align_diff_series , scol_for , validate_axis
37
+ from databricks .koalas .frame import DataFrame
38
+
39
+
40
+ def booleanize_null (left_scol , scol , f ):
41
+ """
42
+ Booleanize Null in Spark Column
43
+ """
44
+ comp_ops = [getattr (spark .Column , '__{}__' .format (comp_op ))
45
+ for comp_op in ['eq' , 'ne' , 'lt' , 'le' , 'ge' , 'gt' ]]
46
+
47
+ if f in comp_ops :
48
+ # if `f` is "!=", fill null with True otherwise False
49
+ filler = f == spark .Column .__ne__
50
+ scol = F .when (scol .isNull (), filler ).otherwise (scol )
51
+
52
+ elif f == spark .Column .__or__ :
53
+ scol = F .when (left_scol .isNull () | scol .isNull (), False ).otherwise (scol )
54
+
55
+ elif f == spark .Column .__and__ :
56
+ scol = F .when (scol .isNull (), False ).otherwise (scol )
57
+
58
+ return scol
37
59
38
60
39
61
def _column_op (f ):
@@ -57,27 +79,14 @@ def wrapper(self, *args):
57
79
# Same DataFrame anchors
58
80
args = [arg ._scol if isinstance (arg , IndexOpsMixin ) else arg for arg in args ]
59
81
scol = f (self ._scol , * args )
60
-
61
- # check if `f` is a comparison operator
62
- comp_ops = ['eq' , 'ne' , 'lt' , 'le' , 'ge' , 'gt' ]
63
- is_comp_op = any (f == getattr (spark .Column , '__{}__' .format (comp_op ))
64
- for comp_op in comp_ops )
65
-
66
- if is_comp_op :
67
- filler = f == spark .Column .__ne__
68
- scol = F .when (scol .isNull (), filler ).otherwise (scol )
69
-
70
- elif f == spark .Column .__or__ :
71
- scol = F .when (self ._scol .isNull () | scol .isNull (), False ).otherwise (scol )
72
-
73
- elif f == spark .Column .__and__ :
74
- scol = F .when (scol .isNull (), False ).otherwise (scol )
82
+ scol = booleanize_null (self ._scol , scol , f )
75
83
76
84
return self ._with_new_scol (scol )
77
85
else :
78
86
# Different DataFrame anchors
79
87
def apply_func (this_column , * that_columns ):
80
- return f (this_column , * that_columns )
88
+ scol = f (this_column , * that_columns )
89
+ return booleanize_null (this_column , scol , f )
81
90
82
91
return align_diff_series (apply_func , self , * args , how = "full" )
83
92
@@ -216,6 +225,23 @@ def __rfloordiv__(self, other):
216
225
__rand__ = _column_op (spark .Column .__rand__ )
217
226
__ror__ = _column_op (spark .Column .__ror__ )
218
227
228
+ # NDArray Compat
229
+ def __array_ufunc__ (self , ufunc : Callable , method : str , * inputs : Any , ** kwargs : Any ):
230
+ # Try dunder methods first.
231
+ result = numpy_compat .maybe_dispatch_ufunc_to_dunder_op (
232
+ self , ufunc , method , * inputs , ** kwargs )
233
+
234
+ # After that, we try with PySpark APIs.
235
+ if result is NotImplemented :
236
+ result = numpy_compat .maybe_dispatch_ufunc_to_spark_func (
237
+ self , ufunc , method , * inputs , ** kwargs )
238
+
239
+ if result is not NotImplemented :
240
+ return result
241
+ else :
242
+ # TODO: support more APIs?
243
+ raise NotImplementedError ("Koalas objects currently do not support %s." % ufunc )
244
+
219
245
@property
220
246
def dtype (self ):
221
247
"""Return the dtype object of the underlying data.
@@ -763,3 +789,187 @@ def _shift(self, periods, fill_value, part_cols=()):
763
789
lag_col = F .lag (col , periods ).over (window )
764
790
col = F .when (lag_col .isNull () | F .isnan (lag_col ), fill_value ).otherwise (lag_col )
765
791
return self ._with_new_scol (col ).rename (self .name )
792
+
793
+ # TODO: Update Documentation for Bins Parameter when its supported
794
+ def value_counts (self , normalize = False , sort = True , ascending = False , bins = None , dropna = True ):
795
+ """
796
+ Return a Series containing counts of unique values.
797
+ The resulting object will be in descending order so that the
798
+ first element is the most frequently-occurring element.
799
+ Excludes NA values by default.
800
+
801
+ Parameters
802
+ ----------
803
+ normalize : boolean, default False
804
+ If True then the object returned will contain the relative
805
+ frequencies of the unique values.
806
+ sort : boolean, default True
807
+ Sort by values.
808
+ ascending : boolean, default False
809
+ Sort in ascending order.
810
+ bins : Not Yet Supported
811
+ dropna : boolean, default True
812
+ Don't include counts of NaN.
813
+
814
+ Returns
815
+ -------
816
+ counts : Series
817
+
818
+ See Also
819
+ --------
820
+ Series.count: Number of non-NA elements in a Series.
821
+
822
+ Examples
823
+ --------
824
+ For Series
825
+
826
+ >>> df = ks.DataFrame({'x':[0, 0, 1, 1, 1, np.nan]})
827
+ >>> df.x.value_counts() # doctest: +NORMALIZE_WHITESPACE
828
+ 1.0 3
829
+ 0.0 2
830
+ Name: x, dtype: int64
831
+
832
+ With `normalize` set to `True`, returns the relative frequency by
833
+ dividing all values by the sum of values.
834
+
835
+ >>> df.x.value_counts(normalize=True) # doctest: +NORMALIZE_WHITESPACE
836
+ 1.0 0.6
837
+ 0.0 0.4
838
+ Name: x, dtype: float64
839
+
840
+ **dropna**
841
+ With `dropna` set to `False` we can also see NaN index values.
842
+
843
+ >>> df.x.value_counts(dropna=False) # doctest: +NORMALIZE_WHITESPACE
844
+ 1.0 3
845
+ 0.0 2
846
+ NaN 1
847
+ Name: x, dtype: int64
848
+
849
+ For Index
850
+
851
+ >>> from databricks.koalas.indexes import Index
852
+ >>> idx = Index([3, 1, 2, 3, 4, np.nan])
853
+ >>> idx
854
+ Float64Index([3.0, 1.0, 2.0, 3.0, 4.0, nan], dtype='float64')
855
+
856
+ >>> idx.value_counts().sort_index()
857
+ 1.0 1
858
+ 2.0 1
859
+ 3.0 2
860
+ 4.0 1
861
+ Name: count, dtype: int64
862
+
863
+ **sort**
864
+
865
+ With `sort` set to `False`, the result wouldn't be sorted by number of count.
866
+
867
+ >>> idx.value_counts(sort=True).sort_index()
868
+ 1.0 1
869
+ 2.0 1
870
+ 3.0 2
871
+ 4.0 1
872
+ Name: count, dtype: int64
873
+
874
+ **normalize**
875
+
876
+ With `normalize` set to `True`, returns the relative frequency by
877
+ dividing all values by the sum of values.
878
+
879
+ >>> idx.value_counts(normalize=True).sort_index()
880
+ 1.0 0.2
881
+ 2.0 0.2
882
+ 3.0 0.4
883
+ 4.0 0.2
884
+ Name: count, dtype: float64
885
+
886
+ **dropna**
887
+
888
+ With `dropna` set to `False` we can also see NaN index values.
889
+
890
+ >>> idx.value_counts(dropna=False).sort_index() # doctest: +SKIP
891
+ 1.0 1
892
+ 2.0 1
893
+ 3.0 2
894
+ 4.0 1
895
+ NaN 1
896
+ Name: count, dtype: int64
897
+
898
+ For MultiIndex.
899
+
900
+ >>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'],
901
+ ... ['speed', 'weight', 'length']],
902
+ ... [[0, 0, 0, 1, 1, 1, 2, 2, 2],
903
+ ... [1, 1, 1, 1, 1, 2, 1, 2, 2]])
904
+ >>> s = ks.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx)
905
+ >>> s.index # doctest: +SKIP
906
+ MultiIndex([( 'lama', 'weight'),
907
+ ( 'lama', 'weight'),
908
+ ( 'lama', 'weight'),
909
+ ( 'cow', 'weight'),
910
+ ( 'cow', 'weight'),
911
+ ( 'cow', 'length'),
912
+ ('falcon', 'weight'),
913
+ ('falcon', 'length'),
914
+ ('falcon', 'length')],
915
+ )
916
+
917
+ >>> s.index.value_counts().sort_index()
918
+ (cow, length) 1
919
+ (cow, weight) 2
920
+ (falcon, length) 2
921
+ (falcon, weight) 1
922
+ (lama, weight) 3
923
+ Name: count, dtype: int64
924
+
925
+ >>> s.index.value_counts(normalize=True).sort_index()
926
+ (cow, length) 0.111111
927
+ (cow, weight) 0.222222
928
+ (falcon, length) 0.222222
929
+ (falcon, weight) 0.111111
930
+ (lama, weight) 0.333333
931
+ Name: count, dtype: float64
932
+
933
+ If Index has name, keep the name up.
934
+
935
+ >>> idx = Index([0, 0, 0, 1, 1, 2, 3], name='koalas')
936
+ >>> idx.value_counts().sort_index()
937
+ 0 3
938
+ 1 2
939
+ 2 1
940
+ 3 1
941
+ Name: koalas, dtype: int64
942
+ """
943
+ from databricks .koalas .series import Series , _col
944
+ if bins is not None :
945
+ raise NotImplementedError ("value_counts currently does not support bins" )
946
+
947
+ if dropna :
948
+ sdf_dropna = self ._internal ._sdf .dropna ()
949
+ else :
950
+ sdf_dropna = self ._internal ._sdf
951
+ index_name = SPARK_INDEX_NAME_FORMAT (0 )
952
+ sdf = sdf_dropna .groupby (self ._scol .alias (index_name )).count ()
953
+ if sort :
954
+ if ascending :
955
+ sdf = sdf .orderBy (F .col ('count' ))
956
+ else :
957
+ sdf = sdf .orderBy (F .col ('count' ).desc ())
958
+
959
+ if normalize :
960
+ sum = sdf_dropna .count ()
961
+ sdf = sdf .withColumn ('count' , F .col ('count' ) / F .lit (sum ))
962
+
963
+ column_index = self ._internal .column_index
964
+ if (column_index [0 ] is None ) or (None in column_index [0 ]):
965
+ internal = _InternalFrame (sdf = sdf ,
966
+ index_map = [(index_name , None )],
967
+ column_scols = [scol_for (sdf , 'count' )])
968
+ else :
969
+ internal = _InternalFrame (sdf = sdf ,
970
+ index_map = [(index_name , None )],
971
+ column_index = column_index ,
972
+ column_scols = [scol_for (sdf , 'count' )],
973
+ column_index_names = self ._internal .column_index_names )
974
+
975
+ return _col (DataFrame (internal ))
0 commit comments