Skip to content

Commit

Permalink
Fix DataFrame.spark.hint to reflect internal changes. (#1865)
Browse files Browse the repository at this point in the history
Fixes `DataFrame.spark.hint` to reflect internal changes.

E.g.,:

```py
>>> kdf1 = ks.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]}).set_index('lkey')
>>> kdf2 = ks.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]}).set_index('rkey')
>>> kdf1.merge((kdf2 + 1).spark.hint("broadcast"), left_index=True, right_index=True).sort_values(['value_x', 'value_y'])
      value_x  value_y
lkey
foo         1        5
foo         1        8
bar         2        6
baz         3        7
foo         5        5
foo         5        8
```

This is different from the result without the hint:

```py
>>> kdf1.merge(kdf2 + 1, left_index=True, right_index=True).sort_values(['value_x', 'value_y'])
      value_x  value_y
lkey
foo         1        6
foo         1        9
bar         2        7
baz         3        8
foo         5        6
foo         5        9
```
  • Loading branch information
ueshin authored Oct 24, 2020
1 parent 720a36a commit deefc2c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 6 deletions.
2 changes: 1 addition & 1 deletion databricks/koalas/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ def to_pandas_frame(self) -> pd.DataFrame:
return pdf

@lazy_property
def resolved_copy(self):
def resolved_copy(self) -> "InternalFrame":
""" Copy the immutable InternalFrame with the updates resolved. """
sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
return self.copy(
Expand Down
7 changes: 2 additions & 5 deletions databricks/koalas/spark/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,8 @@ def hint(self, name: str, *parameters) -> "ks.DataFrame":
"""
from databricks.koalas.frame import DataFrame

return DataFrame(
self._kdf._internal.with_new_sdf(
self._kdf._internal.spark_frame.hint(name, *parameters)
)
)
internal = self._kdf._internal.resolved_copy
return DataFrame(internal.with_new_sdf(internal.spark_frame.hint(name, *parameters)))

def to_table(
self,
Expand Down
40 changes: 40 additions & 0 deletions databricks/koalas/tests/test_frame_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from distutils.version import LooseVersion

import pandas as pd
import pyspark

from databricks import koalas as ks
from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils

Expand All @@ -23,3 +28,38 @@ def test_frame_apply_negative(self):
ValueError, "The output of the function.* pyspark.sql.DataFrame.*int"
):
ks.range(10).spark.apply(lambda scol: 1)

def test_hint(self):
pdf1 = pd.DataFrame(
{"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]}
).set_index("lkey")
pdf2 = pd.DataFrame(
{"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]}
).set_index("rkey")
kdf1 = ks.from_pandas(pdf1)
kdf2 = ks.from_pandas(pdf2)

if LooseVersion(pyspark.__version__) >= LooseVersion("3.0"):
hints = ["broadcast", "merge", "shuffle_hash", "shuffle_replicate_nl"]
else:
hints = ["broadcast"]

for hint in hints:
self.assert_eq(
pdf1.merge(pdf2, left_index=True, right_index=True).sort_values(
["value_x", "value_y"]
),
kdf1.merge(kdf2.spark.hint(hint), left_index=True, right_index=True).sort_values(
["value_x", "value_y"]
),
almost=True,
)
self.assert_eq(
pdf1.merge(pdf2 + 1, left_index=True, right_index=True).sort_values(
["value_x", "value_y"]
),
kdf1.merge(
(kdf2 + 1).spark.hint(hint), left_index=True, right_index=True
).sort_values(["value_x", "value_y"]),
almost=True,
)

0 comments on commit deefc2c

Please sign in to comment.