Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce df.spark.checkpoint() #1877

Merged
merged 4 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions databricks/koalas/spark/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,45 @@ def coalesce(self, num_partitions: int) -> "ks.DataFrame":
coalesced_sdf = internal.spark_frame.coalesce(num_partitions)
return DataFrame(internal.with_new_sdf(coalesced_sdf))

def checkpoint(self, eager: bool = True) -> "ks.DataFrame":
"""Returns a checkpointed version of this DataFrame.

Checkpointing can be used to truncate the logical plan of this DataFrame, which is
especially useful in iterative algorithms where the plan may grow exponentially. It will be
saved to files inside the checkpoint directory set with `SparkContext.setCheckpointDir`.

Parameters
----------
eager : bool
Whether to checkpoint this DataFrame immediately

Returns
-------
DataFrame

.. note:: Experimental

Examples
--------
>>> kdf = ks.DataFrame({"a": ["a", "b", "c"]})
>>> kdf
a
0 a
1 b
2 c
>>> new_kdf = kdf.spark.checkpoint() # doctest: +SKIP
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipped because otherwise a checkpoint directory needs to be set.

>>> new_kdf # doctest: +SKIP
a
0 a
1 b
2 c
"""
from databricks.koalas.frame import DataFrame

internal = self._kdf._internal.resolved_copy
checkpointed_sdf = internal.spark_frame.checkpoint(eager)
return DataFrame(internal.with_new_sdf(checkpointed_sdf))

@property
def analyzed(self) -> "ks.DataFrame":
"""
Expand Down
13 changes: 11 additions & 2 deletions databricks/koalas/tests/test_frame_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
# limitations under the License.
#
from distutils.version import LooseVersion
import os

import pandas as pd
import pyspark

from databricks import koalas as ks
from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils
from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils, TestUtils


class SparkFrameMethodsTest(ReusedSQLTestCase, SQLTestUtils):
class SparkFrameMethodsTest(ReusedSQLTestCase, SQLTestUtils, TestUtils):
def test_frame_apply_negative(self):
with self.assertRaisesRegex(
ValueError, "The output of the function.* pyspark.sql.DataFrame.*int"
Expand Down Expand Up @@ -127,3 +128,11 @@ def test_coalesce(self):
new_kdf = kdf.spark.coalesce(num_partitions)
self.assertEqual(new_kdf.to_spark().rdd.getNumPartitions(), num_partitions)
self.assert_eq(kdf.sort_index(), new_kdf.sort_index())

def test_checkpoint(self):
with self.temp_dir() as tmp:
self.spark.sparkContext.setCheckpointDir(tmp)
kdf = ks.DataFrame({"a": ["a", "b", "c"]})
new_kdf = kdf.spark.checkpoint()
self.assertIsNotNone(os.listdir(tmp))
self.assert_eq(kdf, new_kdf)
1 change: 1 addition & 0 deletions docs/source/reference/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ in Spark. These can be accessed by ``DataFrame.spark.<function/property>``.
DataFrame.spark.apply
DataFrame.spark.repartition
DataFrame.spark.coalesce
DataFrame.spark.checkpoint

.. _api.dataframe.plot:

Expand Down