Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove implicit switch-ons of "compute.ops_on_diff_frames" #1953

Merged
merged 4 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions databricks/koalas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
StructType,
)

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks import koalas as ks # noqa: F401
from databricks.koalas.base import IndexOpsMixin
from databricks.koalas.utils import (
align_diff_frames,
Expand Down Expand Up @@ -1854,6 +1854,9 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[

Examples
--------
>>> from databricks.koalas.config import set_option, reset_option
>>> set_option("compute.ops_on_diff_frames", True)

Combine two ``Series``.

>>> s1 = ks.Series(['a', 'b'])
Expand Down Expand Up @@ -1953,6 +1956,8 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[
letter number animal name
0 a 1 bird polly
1 b 2 monkey george

>>> reset_option("compute.ops_on_diff_frames")
"""
if isinstance(objs, (DataFrame, IndexOpsMixin)) or not isinstance(
objs, Iterable
Expand Down Expand Up @@ -2017,20 +2022,19 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[
kdfs_not_same_anchor.append(kdf)

if len(kdfs_not_same_anchor) > 0:
with ks.option_context("compute.ops_on_diff_frames", True):

def resolve_func(kdf, this_column_labels, that_column_labels):
raise AssertionError("This should not happen.")

for kdf in kdfs_not_same_anchor:
if join == "inner":
concat_kdf = align_diff_frames(
resolve_func, concat_kdf, kdf, fillna=False, how="inner",
)
elif join == "outer":
concat_kdf = align_diff_frames(
resolve_func, concat_kdf, kdf, fillna=False, how="full",
)

def resolve_func(kdf, this_column_labels, that_column_labels):
raise AssertionError("This should not happen.")

for kdf in kdfs_not_same_anchor:
if join == "inner":
concat_kdf = align_diff_frames(
resolve_func, concat_kdf, kdf, fillna=False, how="inner",
)
elif join == "outer":
concat_kdf = align_diff_frames(
resolve_func, concat_kdf, kdf, fillna=False, how="full",
)

concat_kdf = concat_kdf[column_labels]

Expand Down
20 changes: 12 additions & 8 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.accessors import KoalasSeriesMethods
from databricks.koalas.config import get_option, option_context
from databricks.koalas.config import get_option
from databricks.koalas.base import IndexOpsMixin
from databricks.koalas.exceptions import SparkPandasIndexingError
from databricks.koalas.frame import DataFrame
Expand Down Expand Up @@ -4695,7 +4695,8 @@ def combine_first(self, other) -> "Series":
--------
>>> s1 = ks.Series([1, np.nan])
>>> s2 = ks.Series([3, 4])
>>> s1.combine_first(s2)
>>> with ks.option_context("compute.ops_on_diff_frames", True):
... s1.combine_first(s2)
0 1.0
1 4.0
dtype: float64
Expand All @@ -4707,8 +4708,7 @@ def combine_first(self, other) -> "Series":
that = other.spark.column
combined = self._kdf
else:
with option_context("compute.ops_on_diff_frames", True):
combined = combine_frames(self._kdf, other._kdf)
combined = combine_frames(self._kdf, other._kdf)
this = combined["this"]._internal.spark_column_for(self._column_label)
that = combined["that"]._internal.spark_column_for(other._column_label)
# If `self` has missing value, use value of `other`
Expand Down Expand Up @@ -5561,6 +5561,9 @@ def compare(

Examples
--------

>>> from databricks.koalas.config import set_option, reset_option
>>> set_option("compute.ops_on_diff_frames", True)
>>> s1 = ks.Series(["a", "b", "c", "d", "e"])
>>> s2 = ks.Series(["a", "a", "c", "b", "e"])

Expand Down Expand Up @@ -5590,11 +5593,12 @@ def compare(
2 c c
3 d b
4 e e

>>> reset_option("compute.ops_on_diff_frames")
"""
with option_context("compute.ops_on_diff_frames", True):
if not self.index.equals(other.index):
raise ValueError("Can only compare identically-labeled Series objects")
combined = combine_frames(self.to_frame(), other.to_frame())
if not self.index.equals(other.index):
raise ValueError("Can only compare identically-labeled Series objects")
combined = combine_frames(self.to_frame(), other.to_frame())
Copy link
Collaborator

@ueshin ueshin Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, we should consider the case where other is from the same anchor:

>>> s1 = ks.Series(["a", "b", "c", "d", "e"])
>>> s1.compare(s1)
Traceback (most recent call last):
...
AssertionError: We don't need to combine. `this` and `that` are same.

Of course, this fix should be done in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Let me file a new PR for this.


this_column_label = "self"
that_column_label = "other"
Expand Down
15 changes: 0 additions & 15 deletions databricks/koalas/tests/test_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,24 +206,11 @@ def test_concat_column_axis(self):
pdf4.columns = columns
kdf4.columns = columns

pdf5 = pd.DataFrame({"A": [0, 2, 4], "B": [1, 3, 5]}, index=[1, 2, 3])
pdf6 = pd.DataFrame({"C": [1, 2, 3]}, index=[1, 3, 5])
kdf5 = ks.from_pandas(pdf5)
kdf6 = ks.from_pandas(pdf6)

ignore_indexes = [True, False]
joins = ["inner", "outer"]

objs = [
([kdf1.A, kdf2.C], [pdf1.A, pdf2.C]),
# TODO: ([kdf1, kdf2.C], [pdf1, pdf2.C]),
([kdf1.A, kdf2], [pdf1.A, pdf2]),
([kdf1.A, kdf2.C], [pdf1.A, pdf2.C]),
([kdf1.A, kdf1.A.rename("B")], [pdf1.A, pdf1.A.rename("B")]),
([kdf3[("X", "A")], kdf4[("X", "C")]], [pdf3[("X", "A")], pdf4[("X", "C")]]),
([kdf3, kdf4[("X", "C")]], [pdf3, pdf4[("X", "C")]]),
([kdf3[("X", "A")], kdf4], [pdf3[("X", "A")], pdf4]),
([kdf3, kdf4], [pdf3, pdf4]),
([kdf3[("X", "A")], kdf3[("X", "B")]], [pdf3[("X", "A")], pdf3[("X", "B")]],),
(
[kdf3[("X", "A")], kdf3[("X", "B")].rename("ABC")],
Expand All @@ -233,8 +220,6 @@ def test_concat_column_axis(self):
[kdf3[("X", "A")].rename("ABC"), kdf3[("X", "B")]],
[pdf3[("X", "A")].rename("ABC"), pdf3[("X", "B")]],
),
([kdf5, kdf6], [pdf5, pdf6]),
([kdf6, kdf5], [pdf6, pdf5]),
]

for ignore_index, join in itertools.product(ignore_indexes, joins):
Expand Down
Loading