From ba0837dabf2cf9e63d86c32e4290e70b33ed2eee Mon Sep 17 00:00:00 2001 From: itholic Date: Mon, 14 Oct 2019 17:48:03 +0900 Subject: [PATCH 1/8] Implement update --- databricks/koalas/missing/series.py | 1 - databricks/koalas/series.py | 60 ++++++++++++++++++++++++++ databricks/koalas/tests/test_series.py | 8 ++++ docs/source/reference/series.rst | 1 + 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/databricks/koalas/missing/series.py b/databricks/koalas/missing/series.py index 825acdca58..4c7b4dfed5 100644 --- a/databricks/koalas/missing/series.py +++ b/databricks/koalas/missing/series.py @@ -111,7 +111,6 @@ class _MissingPandasLikeSeries(object): tz_convert = unsupported_function('tz_convert') tz_localize = unsupported_function('tz_localize') unstack = unsupported_function('unstack') - update = unsupported_function('update') view = unsupported_function('view') where = unsupported_function('where') xs = unsupported_function('xs') diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index 7c6f0c5963..1a0ca28a5a 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -3409,6 +3409,66 @@ def replace(self, to_replace=None, value=None, regex=False) -> 'Series': return self._with_new_scol(current) + def update(self, other): + """ + Modify Series in place using non-NA values from passed Series. Aligns on index. + + Parameters + ---------- + other : Series + + Examples + -------- + >>> s = ks.Series([1, 2, 3]) + >>> s.update(ks.Series([4, 5, 6])) + >>> s + 0 4 + 1 5 + 2 6 + Name: 0, dtype: int64 + + >>> s = ks.Series(['a', 'b', 'c']) + >>> s.update(ks.Series(['d', 'e'], index=[0, 2])) + >>> s + 0 d + 1 b + 2 e + Name: 0, dtype: object + + >>> s = ks.Series([1, 2, 3]) + >>> s.update(ks.Series([4, 5, 6, 7, 8])) + >>> s + 0 4 + 1 5 + 2 6 + Name: 0, dtype: int64 + + If ``other`` contains NaNs the corresponding values are not updated + in the original Series. + + >>> s = ks.Series([1, 2, 3]) + >>> s.update(ks.Series([4, np.nan, 6])) + >>> s + 0 4.0 + 1 2.0 + 2 6.0 + Name: 0, dtype: float64 + """ + if not isinstance(other, Series): + raise ValueError("'other' must be a Series") + + self_sdf = self._internal.sdf + other_sdf = other._internal.sdf.limit(len(self)) + temp_col = self.name + "_" + temp_idx = self._index_map[0][0] + + other_temp = other_sdf.withColumn(temp_col, other_sdf[other.name]) + new_sdf = self_sdf.join(other_temp, temp_idx, 'outer').sort(temp_idx) + cond = F.when(other_temp[temp_col].isNotNull(), other_temp[temp_col]) \ + .otherwise(self._scol) \ + .alias(self.name) + self._internal = _col(ks.DataFrame(_InternalFrame(sdf=new_sdf.select(cond))))._internal + def _cum(self, func, skipna, part_cols=()): # This is used to cummin, cummax, cumsum, etc. index_columns = self._internal.index_columns diff --git a/databricks/koalas/tests/test_series.py b/databricks/koalas/tests/test_series.py index ee23cede28..ecbf6325fd 100644 --- a/databricks/koalas/tests/test_series.py +++ b/databricks/koalas/tests/test_series.py @@ -728,3 +728,11 @@ def test_duplicates(self): self.assert_eq(pser.drop_duplicates().sort_values(), kser.drop_duplicates().sort_values()) + + def test_update(self): + pser = pd.Series([10, 20, 15, 30, 45], name='x') + kser = ks.Series(pser) + + msg = "'other' must be a Series" + with self.assertRaisesRegex(ValueError, msg): + kser.update(10) diff --git a/docs/source/reference/series.rst b/docs/source/reference/series.rst index 511255c729..a7f0bc72e8 100644 --- a/docs/source/reference/series.rst +++ b/docs/source/reference/series.rst @@ -176,6 +176,7 @@ Combining / joining / merging Series.append Series.replace + Series.update Time series-related ------------------- From 8758be767201657bebc91761bc22122dac7f3c47 Mon Sep 17 00:00:00 2001 From: itholic Date: Mon, 14 Oct 2019 21:07:14 +0900 Subject: [PATCH 2/8] Reuse function & add test cases --- databricks/koalas/series.py | 46 ++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index 1a0ca28a5a..77a8a80c7f 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -42,7 +42,7 @@ from databricks.koalas.internal import IndexMap, _InternalFrame, SPARK_INDEX_NAME_FORMAT from databricks.koalas.missing.series import _MissingPandasLikeSeries from databricks.koalas.plot import KoalasSeriesPlotMethods -from databricks.koalas.utils import validate_arguments_and_invoke_function, scol_for +from databricks.koalas.utils import validate_arguments_and_invoke_function, scol_for, combine_frames from databricks.koalas.datetimes import DatetimeMethods from databricks.koalas.strings import StringMethods @@ -3419,6 +3419,8 @@ def update(self, other): Examples -------- + >>> from databricks.koalas.config import set_option, reset_option + >>> set_option("compute.ops_on_diff_frames", True) >>> s = ks.Series([1, 2, 3]) >>> s.update(ks.Series([4, 5, 6])) >>> s @@ -3443,6 +3445,27 @@ def update(self, other): 2 6 Name: 0, dtype: int64 + >>> s = ks.Series([1, 2, 3], index=[10, 11, 12]) + >>> s + 10 1 + 11 2 + 12 3 + Name: 0, dtype: int64 + + >>> s.update(ks.Series([4, 5, 6])) + >>> s + 10 1 + 11 2 + 12 3 + Name: 0, dtype: int64 + + >>> s.update(ks.Series([4, 5, 6], index=[11, 12, 13])) + >>> s + 10 1 + 11 4 + 12 5 + Name: 0, dtype: int64 + If ``other`` contains NaNs the corresponding values are not updated in the original Series. @@ -3453,21 +3476,22 @@ def update(self, other): 1 2.0 2 6.0 Name: 0, dtype: float64 + + >>> reset_option("compute.ops_on_diff_frames") """ if not isinstance(other, Series): raise ValueError("'other' must be a Series") - self_sdf = self._internal.sdf - other_sdf = other._internal.sdf.limit(len(self)) - temp_col = self.name + "_" - temp_idx = self._index_map[0][0] - - other_temp = other_sdf.withColumn(temp_col, other_sdf[other.name]) - new_sdf = self_sdf.join(other_temp, temp_idx, 'outer').sort(temp_idx) - cond = F.when(other_temp[temp_col].isNotNull(), other_temp[temp_col]) \ - .otherwise(self._scol) \ + index_scol_name = self._index_map[0][0] + combined = combine_frames(self._kdf, other._kdf, how='leftouter') + combined_sdf = combined._sdf.sort(index_scol_name) + cond = F.when(combined_sdf['__that_0'].isNotNull(), combined_sdf['__that_0']) \ + .otherwise(combined_sdf['__this_0']) \ .alias(self.name) - self._internal = _col(ks.DataFrame(_InternalFrame(sdf=new_sdf.select(cond))))._internal + internal = _InternalFrame( + sdf=combined_sdf.select(index_scol_name, cond), + index_map=self._index_map) + self._internal = _col(ks.DataFrame(internal))._internal def _cum(self, func, skipna, part_cols=()): # This is used to cummin, cummax, cumsum, etc. From aca24e700342a186146b121e5c92e3b0e7885796 Mon Sep 17 00:00:00 2001 From: itholic Date: Mon, 14 Oct 2019 22:39:11 +0900 Subject: [PATCH 3/8] Add missing logics --- databricks/koalas/series.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index 77a8a80c7f..1ba1dc716c 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -3482,16 +3482,20 @@ def update(self, other): if not isinstance(other, Series): raise ValueError("'other' must be a Series") - index_scol_name = self._index_map[0][0] - combined = combine_frames(self._kdf, other._kdf, how='leftouter') - combined_sdf = combined._sdf.sort(index_scol_name) - cond = F.when(combined_sdf['__that_0'].isNotNull(), combined_sdf['__that_0']) \ - .otherwise(combined_sdf['__this_0']) \ + index_scol_names = [index_map[0] for index_map in self._index_map] + combined = combine_frames(self.to_frame(), other.to_frame(), how='leftouter') + combined_sdf = combined._sdf.sort(index_scol_names) + this_col = "__this_%s" % self.name + that_col = "__that_%s" % other.name + cond = F.when(combined_sdf[that_col].isNotNull(), combined_sdf[that_col]) \ + .otherwise(combined_sdf[this_col]) \ .alias(self.name) internal = _InternalFrame( - sdf=combined_sdf.select(index_scol_name, cond), + sdf=combined_sdf.select(index_scol_names + [cond]), index_map=self._index_map) - self._internal = _col(ks.DataFrame(internal))._internal + self_updated = _col(ks.DataFrame(internal)) + self._internal = self_updated._internal + self._kdf = self_updated._kdf def _cum(self, func, skipna, part_cols=()): # This is used to cummin, cummax, cumsum, etc. From 3dfb6233cdf25403c2642f4bcbf9b9ef65bc52c7 Mon Sep 17 00:00:00 2001 From: itholic Date: Mon, 14 Oct 2019 22:56:18 +0900 Subject: [PATCH 4/8] don't use sort --- databricks/koalas/series.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index 1ba1dc716c..f0f2bc190b 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -3455,15 +3455,15 @@ def update(self, other): >>> s.update(ks.Series([4, 5, 6])) >>> s 10 1 - 11 2 12 3 + 11 2 Name: 0, dtype: int64 >>> s.update(ks.Series([4, 5, 6], index=[11, 12, 13])) >>> s 10 1 - 11 4 12 5 + 11 4 Name: 0, dtype: int64 If ``other`` contains NaNs the corresponding values are not updated @@ -3484,7 +3484,7 @@ def update(self, other): index_scol_names = [index_map[0] for index_map in self._index_map] combined = combine_frames(self.to_frame(), other.to_frame(), how='leftouter') - combined_sdf = combined._sdf.sort(index_scol_names) + combined_sdf = combined._sdf this_col = "__this_%s" % self.name that_col = "__that_%s" % other.name cond = F.when(combined_sdf[that_col].isNotNull(), combined_sdf[that_col]) \ From fde34aebedb3167134a7e6f29ddde1b4bfc27366 Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 17 Oct 2019 15:46:01 +0900 Subject: [PATCH 5/8] fix logic for maintain internal column_index --- databricks/koalas/series.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index 8b518782ad..dcb4156c62 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -3478,17 +3478,18 @@ def update(self, other): if not isinstance(other, Series): raise ValueError("'other' must be a Series") - index_scol_names = [index_map[0] for index_map in self._index_map] + index_scol_names = [index_map[0] for index_map in self._internal.index_map] combined = combine_frames(self.to_frame(), other.to_frame(), how='leftouter') combined_sdf = combined._sdf - this_col = "__this_%s" % self.name - that_col = "__that_%s" % other.name + this_col = "__this_%s" % str(self.name) + that_col = "__that_%s" % str(other.name) cond = F.when(combined_sdf[that_col].isNotNull(), combined_sdf[that_col]) \ .otherwise(combined_sdf[this_col]) \ - .alias(self.name) + .alias(str(self.name)) internal = _InternalFrame( sdf=combined_sdf.select(index_scol_names + [cond]), - index_map=self._index_map) + index_map=self._internal.index_map, + column_index=self._internal.column_index) self_updated = _col(ks.DataFrame(internal)) self._internal = self_updated._internal self._kdf = self_updated._kdf From cc5c74ecd4838d0cd84b9a8b489a6faffd00faf3 Mon Sep 17 00:00:00 2001 From: itholic Date: Sun, 20 Oct 2019 10:27:59 +0900 Subject: [PATCH 6/8] fix to get column names properly --- databricks/koalas/series.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index dcb4156c62..8a88fed704 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -3419,7 +3419,7 @@ def update(self, other): >>> set_option("compute.ops_on_diff_frames", True) >>> s = ks.Series([1, 2, 3]) >>> s.update(ks.Series([4, 5, 6])) - >>> s + >>> s.sort_index() 0 4 1 5 2 6 @@ -3427,7 +3427,7 @@ def update(self, other): >>> s = ks.Series(['a', 'b', 'c']) >>> s.update(ks.Series(['d', 'e'], index=[0, 2])) - >>> s + >>> s.sort_index() 0 d 1 b 2 e @@ -3435,7 +3435,7 @@ def update(self, other): >>> s = ks.Series([1, 2, 3]) >>> s.update(ks.Series([4, 5, 6, 7, 8])) - >>> s + >>> s.sort_index() 0 4 1 5 2 6 @@ -3449,17 +3449,17 @@ def update(self, other): Name: 0, dtype: int64 >>> s.update(ks.Series([4, 5, 6])) - >>> s + >>> s.sort_index() 10 1 - 12 3 11 2 + 12 3 Name: 0, dtype: int64 >>> s.update(ks.Series([4, 5, 6], index=[11, 12, 13])) - >>> s + >>> s.sort_index() 10 1 - 12 5 11 4 + 12 5 Name: 0, dtype: int64 If ``other`` contains NaNs the corresponding values are not updated @@ -3467,7 +3467,7 @@ def update(self, other): >>> s = ks.Series([1, 2, 3]) >>> s.update(ks.Series([4, np.nan, 6])) - >>> s + >>> s.sort_index() 0 4.0 1 2.0 2 6.0 @@ -3481,11 +3481,14 @@ def update(self, other): index_scol_names = [index_map[0] for index_map in self._internal.index_map] combined = combine_frames(self.to_frame(), other.to_frame(), how='leftouter') combined_sdf = combined._sdf - this_col = "__this_%s" % str(self.name) - that_col = "__that_%s" % str(other.name) - cond = F.when(combined_sdf[that_col].isNotNull(), combined_sdf[that_col]) \ + this_col = "__this_%s" % str( + self._internal.column_name_for(self._internal.column_index[0])) + that_col = "__that_%s" % str( + self._internal.column_name_for(other._internal.column_index[0])) + cond = F.when(scol_for(combined_sdf, that_col).isNotNull(), + scol_for(combined_sdf, that_col)) \ .otherwise(combined_sdf[this_col]) \ - .alias(str(self.name)) + .alias(str(self._internal.column_name_for(self._internal.column_index[0]))) internal = _InternalFrame( sdf=combined_sdf.select(index_scol_names + [cond]), index_map=self._internal.index_map, From 26fc3f85216a82c12d849a12a35ae578f2abc4cd Mon Sep 17 00:00:00 2001 From: itholic Date: Wed, 6 Nov 2019 17:42:15 +0900 Subject: [PATCH 7/8] fix lint fail --- databricks/koalas/series.py | 1 - databricks/koalas/tests/test_series.py | 1 - 2 files changed, 2 deletions(-) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index a648e43936..1f7d694cf9 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -3991,7 +3991,6 @@ def xs(self, key, level=None): index_map=[(index_col, index_map_dict[index_col]) for index_col in index_cols]) return _col(DataFrame(internal)) ->>>>>>> e596a588450670eed3354aec07e7c71030b9c9ad def _cum(self, func, skipna, part_cols=()): # This is used to cummin, cummax, cumsum, etc. diff --git a/databricks/koalas/tests/test_series.py b/databricks/koalas/tests/test_series.py index f4fdc9ab08..aa6536a8db 100644 --- a/databricks/koalas/tests/test_series.py +++ b/databricks/koalas/tests/test_series.py @@ -815,7 +815,6 @@ def test_truncate(self): msg = "Truncate: 2 must be after 5" with self.assertRaisesRegex(ValueError, msg): kser.truncate(5, 2) ->>>>>>> e596a588450670eed3354aec07e7c71030b9c9ad def test_getitem(self): pser = pd.Series([10, 20, 15, 30, 45], ['A', 'A', 'B', 'C', 'D']) From 2e219e031385a794c7decdb0efff7d11cd1d94aa Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 7 Nov 2019 02:48:44 +0900 Subject: [PATCH 8/8] fix lint fail --- databricks/koalas/series.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index 1f7d694cf9..774e135e93 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -43,7 +43,7 @@ from databricks.koalas.missing.series import _MissingPandasLikeSeries from databricks.koalas.plot import KoalasSeriesPlotMethods from databricks.koalas.utils import (validate_arguments_and_invoke_function, scol_for, - combine_frames, tuple_like_strings, name_like_string) + combine_frames, name_like_string) from databricks.koalas.datetimes import DatetimeMethods from databricks.koalas.strings import StringMethods