From b24c3ae98651bd5173916f6ab6290e533c45caf2 Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 31 Oct 2019 15:25:06 +0900 Subject: [PATCH 01/10] Implement DataFrame.quantile --- databricks/koalas/frame.py | 70 +++++++++++++++++++++++ databricks/koalas/tests/test_dataframe.py | 9 +++ 2 files changed, 79 insertions(+) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 0b61452ac6..5ae2a24a23 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7451,6 +7451,76 @@ def keys(self): """ return self.columns + # TODO: fix parameter 'axis' and 'numeric_only' to work same as pandas' + def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): + """ + Return value at the given quantile. + + .. note:: Unlike pandas', the quantile in Koalas is an approximated quantile based upon + approximate percentile computation because computing quantile across a large dataset + is extremely expensive. + + Parameters + ---------- + q : float or array-like, default 0.5 (50% quantile) + 0 <= q <= 1, the quantile(s) to compute. + axis : int, default 0 or 'index' + Can only be set to 0 at the moment. + numeric_only : bool, default True + If False, the quantile of datetime and timedelta data will be computed as well. + Can only be set to True at the moment. + accuracy : int, optional + Default accuracy of approximation. Larger value means better accuracy. + The relative error can be deduced by 1.0 / accuracy. + + Returns + ------- + Series or DataFrame + If q is an array, a DataFrame will be returned where the + index is q, the columns are the columns of self, and the values are the quantiles. + If q is a float, a Series will be returned where the + index is the columns of self and the values are the quantiles. + + Examples + -------- + >>> kdf = ks.DataFrame({'a': [1, 2, 3, 4, 5], 'b': [6, 7, 8, 9, 0]}) + >>> kdf + a b + 0 1 6 + 1 2 7 + 2 3 8 + 3 4 9 + 4 5 0 + + >>> kdf.quantile(.5) + a 3 + b 7 + Name: 0.5, dtype: int64 + + >>> kdf.quantile([.25, .5, .75]) + a b + 0.25 2 6 + 0.5 3 7 + 0.75 4 8 + """ + if axis not in [0, 'index']: + raise ValueError('axis should be either 0 or "index" currently.') + if numeric_only is not True: + raise ValueError("quantile currently doesn't supports numeric_only") + if isinstance(q, float): + q = (q,) + + quantile_columns = [self[col_name].quantile(q) for col_name in self.columns] + if len(q) == 1: + from databricks.koalas.series import _col + return _col(reduce( + lambda x, y: x.to_frame().merge(y.to_frame(), left_index=True, right_index=True), + quantile_columns).T) + else: + return reduce( + lambda x, y: x.to_frame().merge(y.to_frame(), left_index=True, right_index=True), + quantile_columns) + def _get_from_multiindex_column(self, key): """ Select columns from multi-index columns. diff --git a/databricks/koalas/tests/test_dataframe.py b/databricks/koalas/tests/test_dataframe.py index 010a472faa..178c91d97d 100644 --- a/databricks/koalas/tests/test_dataframe.py +++ b/databricks/koalas/tests/test_dataframe.py @@ -2177,3 +2177,12 @@ def test_keys(self): pdf = kdf.to_pandas() self.assert_eq(kdf.keys(), pdf.keys()) + + def test_quantile(self): + kdf = ks.from_pandas(self.pdf) + + with self.assertRaisesRegex(ValueError, 'axis should be either 0 or "index" currently.'): + kdf.quantile(.5, axis=1) + + with self.assertRaisesRegex(ValueError, "quantile currently doesn't supports numeric_only"): + kdf.quantile(.5, numeric_only=False) From 9f1c3e1e11d4d6d8044330383d1cc40159661526 Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 31 Oct 2019 15:26:37 +0900 Subject: [PATCH 02/10] add doc ref --- databricks/koalas/missing/frame.py | 1 - docs/source/reference/frame.rst | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/missing/frame.py b/databricks/koalas/missing/frame.py index 99386afafb..bdb9963d77 100644 --- a/databricks/koalas/missing/frame.py +++ b/databricks/koalas/missing/frame.py @@ -74,7 +74,6 @@ class _MissingPandasLikeDataFrame(object): pct_change = unsupported_function('pct_change') prod = unsupported_function('prod') product = unsupported_function('product') - quantile = unsupported_function('quantile') query = unsupported_function('query') reindex_like = unsupported_function('reindex_like') rename_axis = unsupported_function('rename_axis') diff --git a/docs/source/reference/frame.rst b/docs/source/reference/frame.rst index 5100da0585..ce740aa4e7 100644 --- a/docs/source/reference/frame.rst +++ b/docs/source/reference/frame.rst @@ -119,6 +119,7 @@ Computations / Descriptive Stats DataFrame.mean DataFrame.min DataFrame.median + DataFrame.quantile DataFrame.nunique DataFrame.skew DataFrame.sum From a7653db3e00178d9793057c8f0ba6cb4d7616677 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 1 Nov 2019 14:08:01 +0900 Subject: [PATCH 03/10] Refactore logic to reuse seires one --- databricks/koalas/frame.py | 40 ++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 5ae2a24a23..b671d51dea 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7503,23 +7503,43 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): 0.5 3 7 0.75 4 8 """ + result_as_series = False if axis not in [0, 'index']: raise ValueError('axis should be either 0 or "index" currently.') if numeric_only is not True: raise ValueError("quantile currently doesn't supports numeric_only") if isinstance(q, float): + result_as_series = True + key = str(q) q = (q,) + quantiles = q + args = ", ".join(map(str, quantiles)) + internal_index_column = SPARK_INDEX_NAME_FORMAT(0) + sdf_list = [] + for i, column in enumerate(self.columns): + sdf = self._sdf + percentile_col = F.expr( + "approx_percentile(`%s`, array(%s), %s)" % (column, args, accuracy)) + sdf = sdf.select(percentile_col.alias("percentiles")) + value_column = column + cols = [] + for i, quantile in enumerate(quantiles): + cols.append(F.struct( + F.lit("%s" % quantile).alias(internal_index_column), + F.expr("percentiles[%s]" % i).alias(value_column))) + sdf = sdf.select(F.array(*cols).alias("arrays")) + sdf = sdf.select(F.explode(F.col("arrays"))).selectExpr("col.*") + sdf_list.append(sdf) + + sdf = reduce(lambda x, y: x.join(y, on=internal_index_column), sdf_list) + internal = self._internal.copy( + sdf=sdf, + data_columns=self.columns, + index_map=[(internal_index_column, None)], + column_index=self._internal.column_index, + column_index_names=None) - quantile_columns = [self[col_name].quantile(q) for col_name in self.columns] - if len(q) == 1: - from databricks.koalas.series import _col - return _col(reduce( - lambda x, y: x.to_frame().merge(y.to_frame(), left_index=True, right_index=True), - quantile_columns).T) - else: - return reduce( - lambda x, y: x.to_frame().merge(y.to_frame(), left_index=True, right_index=True), - quantile_columns) + return DataFrame(internal) if not result_as_series else DataFrame(internal).T[key] def _get_from_multiindex_column(self, key): """ Select columns from multi-index columns. From 3ce58a38d1147a7a126774ddd64b6aea52fe6f94 Mon Sep 17 00:00:00 2001 From: itholic Date: Sat, 2 Nov 2019 21:52:23 +0900 Subject: [PATCH 04/10] Fix logic to dont use expensive join --- databricks/koalas/frame.py | 63 ++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index b671d51dea..5bd79bf4de 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7503,6 +7503,7 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): 0.5 3 7 0.75 4 8 """ + from collections import defaultdict result_as_series = False if axis not in [0, 'index']: raise ValueError('axis should be either 0 or "index" currently.') @@ -7512,29 +7513,59 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): result_as_series = True key = str(q) q = (q,) + quantiles = q + # First calculate the percentiles from all columns and map it to each `quantiles` + # by creating each entry as a struct. So, it becomes an array of structs as below: + # + # +-----------------------------------------+ + # | arrays| + # +-----------------------------------------+ + # |[[0.25, 2, 6], [0.5, 3, 7], [0.75, 4, 8]]| + # +-----------------------------------------+ + sdf = self._sdf args = ", ".join(map(str, quantiles)) - internal_index_column = SPARK_INDEX_NAME_FORMAT(0) - sdf_list = [] + + percentile_cols = [] for i, column in enumerate(self.columns): sdf = self._sdf - percentile_col = F.expr( + percentile_cols.append(F.expr( "approx_percentile(`%s`, array(%s), %s)" % (column, args, accuracy)) - sdf = sdf.select(percentile_col.alias("percentiles")) - value_column = column - cols = [] - for i, quantile in enumerate(quantiles): - cols.append(F.struct( - F.lit("%s" % quantile).alias(internal_index_column), - F.expr("percentiles[%s]" % i).alias(value_column))) - sdf = sdf.select(F.array(*cols).alias("arrays")) - sdf = sdf.select(F.explode(F.col("arrays"))).selectExpr("col.*") - sdf_list.append(sdf) - - sdf = reduce(lambda x, y: x.join(y, on=internal_index_column), sdf_list) + .alias(column)) + sdf = sdf.select(percentile_cols) + # Here, after select percntile cols, a sdf looks like below: + # +---------+---------+ + # | a| b| + # +---------+---------+ + # |[2, 3, 4]|[6, 7, 8]| + # +---------+---------+ + + cols_dict = defaultdict(list) + for column in self._internal.data_columns: + for i in range(len(quantiles)): + cols_dict[column].append(F.expr("%s[%s]" % (column, i)).alias(column)) + + internal_index_column = SPARK_INDEX_NAME_FORMAT(0) + cols = [] + for i, col in enumerate(zip(*cols_dict.values())): + cols.append(F.struct( + F.lit("%s" % quantiles[i]).alias(internal_index_column), + *col)) + sdf = sdf.select(F.array(*cols).alias("arrays")) + + # And then, explode it and manually set the index. + # +-----------------+---+---+ + # |__index_level_0__| a| b| + # +-----------------+---+---+ + # | 0.25| 2| 6| + # | 0.5| 3| 7| + # | 0.75| 4| 8| + # +-----------------+---+---+ + sdf = sdf.select(F.explode(F.col("arrays"))).selectExpr("col.*") + internal = self._internal.copy( sdf=sdf, - data_columns=self.columns, + data_columns=self._internal.data_columns, index_map=[(internal_index_column, None)], column_index=self._internal.column_index, column_index_names=None) From 7a26e3c749e0608368b7bad454d1c30856f9baad Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 5 Nov 2019 09:02:21 +0900 Subject: [PATCH 05/10] fix --- databricks/koalas/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 5bd79bf4de..573b9f631d 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7527,7 +7527,7 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): args = ", ".join(map(str, quantiles)) percentile_cols = [] - for i, column in enumerate(self.columns): + for i, column in enumerate(self._internal.data_columns): sdf = self._sdf percentile_cols.append(F.expr( "approx_percentile(`%s`, array(%s), %s)" % (column, args, accuracy)) From 5610e90bda2baea15cfba6c8eb92c466b007d835 Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 5 Nov 2019 10:00:35 +0900 Subject: [PATCH 06/10] fix --- databricks/koalas/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 573b9f631d..427af9d0e6 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7527,7 +7527,7 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): args = ", ".join(map(str, quantiles)) percentile_cols = [] - for i, column in enumerate(self._internal.data_columns): + for column in self._internal.data_columns: sdf = self._sdf percentile_cols.append(F.expr( "approx_percentile(`%s`, array(%s), %s)" % (column, args, accuracy)) From b23af7bec432f62d1d6dd981a7e09c0502b14b67 Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 5 Nov 2019 17:11:15 +0900 Subject: [PATCH 07/10] using OrderedDict rather than defaultdict --- databricks/koalas/frame.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 427af9d0e6..ac29870757 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7503,7 +7503,6 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): 0.5 3 7 0.75 4 8 """ - from collections import defaultdict result_as_series = False if axis not in [0, 'index']: raise ValueError('axis should be either 0 or "index" currently.') @@ -7540,8 +7539,9 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): # |[2, 3, 4]|[6, 7, 8]| # +---------+---------+ - cols_dict = defaultdict(list) + cols_dict = OrderedDict() for column in self._internal.data_columns: + cols_dict[column] = list() for i in range(len(quantiles)): cols_dict[column].append(F.expr("%s[%s]" % (column, i)).alias(column)) From cd598f2ac9b461694ce16de791a0b1307efd05eb Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 5 Nov 2019 17:58:04 +0900 Subject: [PATCH 08/10] use api rather than string --- databricks/koalas/frame.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index ac29870757..03a1db5ebc 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7540,10 +7540,10 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): # +---------+---------+ cols_dict = OrderedDict() - for column in self._internal.data_columns: + for column in self.columns: cols_dict[column] = list() for i in range(len(quantiles)): - cols_dict[column].append(F.expr("%s[%s]" % (column, i)).alias(column)) + cols_dict[column].append(self._internal.scol_for(column).getItem(i).alias(column)) internal_index_column = SPARK_INDEX_NAME_FORMAT(0) cols = [] From 37902f96d616988ea2cde4b711069bd3b8a97420 Mon Sep 17 00:00:00 2001 From: itholic Date: Wed, 6 Nov 2019 14:22:25 +0900 Subject: [PATCH 09/10] fix failure --- databricks/koalas/frame.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 03a1db5ebc..54e0d87670 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7540,10 +7540,10 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): # +---------+---------+ cols_dict = OrderedDict() - for column in self.columns: + for column in self._internal.data_columns: cols_dict[column] = list() for i in range(len(quantiles)): - cols_dict[column].append(self._internal.scol_for(column).getItem(i).alias(column)) + cols_dict[column].append(F.col(column).getItem(i).alias(column)) internal_index_column = SPARK_INDEX_NAME_FORMAT(0) cols = [] From f493227d73e830213a4f071f991a5e48784aaf46 Mon Sep 17 00:00:00 2001 From: itholic Date: Wed, 6 Nov 2019 17:47:53 +0900 Subject: [PATCH 10/10] user scol_for --- databricks/koalas/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 54e0d87670..41d34d9031 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -7543,7 +7543,7 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): for column in self._internal.data_columns: cols_dict[column] = list() for i in range(len(quantiles)): - cols_dict[column].append(F.col(column).getItem(i).alias(column)) + cols_dict[column].append(scol_for(sdf, column).getItem(i).alias(column)) internal_index_column = SPARK_INDEX_NAME_FORMAT(0) cols = []