Skip to content

Commit

Permalink
Remove implicit switch-ons of "compute.ops_on_diff_frames" (#1953)
Browse files Browse the repository at this point in the history
  • Loading branch information
xinrong-meng authored Dec 7, 2020
1 parent 5105b37 commit 58993f8
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 268 deletions.
34 changes: 19 additions & 15 deletions databricks/koalas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
StructType,
)

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks import koalas as ks # noqa: F401
from databricks.koalas.base import IndexOpsMixin
from databricks.koalas.utils import (
align_diff_frames,
Expand Down Expand Up @@ -1854,6 +1854,9 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[
Examples
--------
>>> from databricks.koalas.config import set_option, reset_option
>>> set_option("compute.ops_on_diff_frames", True)
Combine two ``Series``.
>>> s1 = ks.Series(['a', 'b'])
Expand Down Expand Up @@ -1953,6 +1956,8 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[
letter number animal name
0 a 1 bird polly
1 b 2 monkey george
>>> reset_option("compute.ops_on_diff_frames")
"""
if isinstance(objs, (DataFrame, IndexOpsMixin)) or not isinstance(
objs, Iterable
Expand Down Expand Up @@ -2017,20 +2022,19 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[
kdfs_not_same_anchor.append(kdf)

if len(kdfs_not_same_anchor) > 0:
with ks.option_context("compute.ops_on_diff_frames", True):

def resolve_func(kdf, this_column_labels, that_column_labels):
raise AssertionError("This should not happen.")

for kdf in kdfs_not_same_anchor:
if join == "inner":
concat_kdf = align_diff_frames(
resolve_func, concat_kdf, kdf, fillna=False, how="inner",
)
elif join == "outer":
concat_kdf = align_diff_frames(
resolve_func, concat_kdf, kdf, fillna=False, how="full",
)

def resolve_func(kdf, this_column_labels, that_column_labels):
raise AssertionError("This should not happen.")

for kdf in kdfs_not_same_anchor:
if join == "inner":
concat_kdf = align_diff_frames(
resolve_func, concat_kdf, kdf, fillna=False, how="inner",
)
elif join == "outer":
concat_kdf = align_diff_frames(
resolve_func, concat_kdf, kdf, fillna=False, how="full",
)

concat_kdf = concat_kdf[column_labels]

Expand Down
20 changes: 12 additions & 8 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.accessors import KoalasSeriesMethods
from databricks.koalas.config import get_option, option_context
from databricks.koalas.config import get_option
from databricks.koalas.base import IndexOpsMixin
from databricks.koalas.exceptions import SparkPandasIndexingError
from databricks.koalas.frame import DataFrame
Expand Down Expand Up @@ -4691,7 +4691,8 @@ def combine_first(self, other) -> "Series":
--------
>>> s1 = ks.Series([1, np.nan])
>>> s2 = ks.Series([3, 4])
>>> s1.combine_first(s2)
>>> with ks.option_context("compute.ops_on_diff_frames", True):
... s1.combine_first(s2)
0 1.0
1 4.0
dtype: float64
Expand All @@ -4703,8 +4704,7 @@ def combine_first(self, other) -> "Series":
that = other.spark.column
combined = self._kdf
else:
with option_context("compute.ops_on_diff_frames", True):
combined = combine_frames(self._kdf, other._kdf)
combined = combine_frames(self._kdf, other._kdf)
this = combined["this"]._internal.spark_column_for(self._column_label)
that = combined["that"]._internal.spark_column_for(other._column_label)
# If `self` has missing value, use value of `other`
Expand Down Expand Up @@ -5585,6 +5585,9 @@ def compare(
Examples
--------
>>> from databricks.koalas.config import set_option, reset_option
>>> set_option("compute.ops_on_diff_frames", True)
>>> s1 = ks.Series(["a", "b", "c", "d", "e"])
>>> s2 = ks.Series(["a", "a", "c", "b", "e"])
Expand Down Expand Up @@ -5614,11 +5617,12 @@ def compare(
2 c c
3 d b
4 e e
>>> reset_option("compute.ops_on_diff_frames")
"""
with option_context("compute.ops_on_diff_frames", True):
if not self.index.equals(other.index):
raise ValueError("Can only compare identically-labeled Series objects")
combined = combine_frames(self.to_frame(), other.to_frame())
if not self.index.equals(other.index):
raise ValueError("Can only compare identically-labeled Series objects")
combined = combine_frames(self.to_frame(), other.to_frame())

this_column_label = "self"
that_column_label = "other"
Expand Down
15 changes: 0 additions & 15 deletions databricks/koalas/tests/test_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,24 +206,11 @@ def test_concat_column_axis(self):
pdf4.columns = columns
kdf4.columns = columns

pdf5 = pd.DataFrame({"A": [0, 2, 4], "B": [1, 3, 5]}, index=[1, 2, 3])
pdf6 = pd.DataFrame({"C": [1, 2, 3]}, index=[1, 3, 5])
kdf5 = ks.from_pandas(pdf5)
kdf6 = ks.from_pandas(pdf6)

ignore_indexes = [True, False]
joins = ["inner", "outer"]

objs = [
([kdf1.A, kdf2.C], [pdf1.A, pdf2.C]),
# TODO: ([kdf1, kdf2.C], [pdf1, pdf2.C]),
([kdf1.A, kdf2], [pdf1.A, pdf2]),
([kdf1.A, kdf2.C], [pdf1.A, pdf2.C]),
([kdf1.A, kdf1.A.rename("B")], [pdf1.A, pdf1.A.rename("B")]),
([kdf3[("X", "A")], kdf4[("X", "C")]], [pdf3[("X", "A")], pdf4[("X", "C")]]),
([kdf3, kdf4[("X", "C")]], [pdf3, pdf4[("X", "C")]]),
([kdf3[("X", "A")], kdf4], [pdf3[("X", "A")], pdf4]),
([kdf3, kdf4], [pdf3, pdf4]),
([kdf3[("X", "A")], kdf3[("X", "B")]], [pdf3[("X", "A")], pdf3[("X", "B")]],),
(
[kdf3[("X", "A")], kdf3[("X", "B")].rename("ABC")],
Expand All @@ -233,8 +220,6 @@ def test_concat_column_axis(self):
[kdf3[("X", "A")].rename("ABC"), kdf3[("X", "B")]],
[pdf3[("X", "A")].rename("ABC"), pdf3[("X", "B")]],
),
([kdf5, kdf6], [pdf5, pdf6]),
([kdf6, kdf5], [pdf6, pdf5]),
]

for ignore_index, join in itertools.product(ignore_indexes, joins):
Expand Down
Loading

0 comments on commit 58993f8

Please sign in to comment.