From 120b9a29be6d62b534fd6e7fa19ba8714547f632 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 29 Oct 2020 10:37:54 -0700 Subject: [PATCH 1/4] df.spark.checkpoint --- databricks/koalas/spark/accessors.py | 39 +++++++++++++++++++++ databricks/koalas/tests/test_frame_spark.py | 13 +++++-- docs/source/reference/frame.rst | 1 + 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/databricks/koalas/spark/accessors.py b/databricks/koalas/spark/accessors.py index b248ea4ea2..3364a41691 100644 --- a/databricks/koalas/spark/accessors.py +++ b/databricks/koalas/spark/accessors.py @@ -993,6 +993,45 @@ def coalesce(self, num_partitions: int) -> "ks.DataFrame": coalesced_sdf = internal.spark_frame.coalesce(num_partitions) return DataFrame(internal.with_new_sdf(coalesced_sdf)) + def checkpoint(self, eager=True): + """Returns a checkpointed version of this DataFrame. + + Checkpointing can be used to truncate the logical plan of this DataFrame, which is + especially useful in iterative algorithms where the plan may grow exponentially. It will be + saved to files inside the checkpoint directory set with `SparkContext.setCheckpointDir`. + + Parameters + ---------- + eager : bool + Whether to checkpoint this DataFrame immediately + + Returns + ------- + DataFrame + + .. note:: Experimental + + Examples + -------- + >>> kdf = ks.DataFrame({"a": ["a", "b", "c"]}) + >>> kdf # doctest: +NORMALIZE_WHITESPACE + a + 0 a + 1 b + 2 c + >>> new_kdf = kdf.spark.checkpoint() # doctest: +SKIP + >>> new_kdf # doctest: +SKIP + a + 0 a + 1 b + 2 c + """ + from databricks.koalas.frame import DataFrame + + internal = self._kdf._internal.resolved_copy + checkpointed_sdf = internal.spark_frame.checkpoint(eager) + return DataFrame(internal.with_new_sdf(checkpointed_sdf)) + @property def analyzed(self) -> "ks.DataFrame": """ diff --git a/databricks/koalas/tests/test_frame_spark.py b/databricks/koalas/tests/test_frame_spark.py index d0a582b025..0ea186b043 100644 --- a/databricks/koalas/tests/test_frame_spark.py +++ b/databricks/koalas/tests/test_frame_spark.py @@ -13,16 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os from distutils.version import LooseVersion import pandas as pd import pyspark from databricks import koalas as ks -from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils +from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils, TestUtils -class SparkFrameMethodsTest(ReusedSQLTestCase, SQLTestUtils): +class SparkFrameMethodsTest(ReusedSQLTestCase, SQLTestUtils, TestUtils): def test_frame_apply_negative(self): with self.assertRaisesRegex( ValueError, "The output of the function.* pyspark.sql.DataFrame.*int" @@ -127,3 +128,11 @@ def test_coalesce(self): new_kdf = kdf.spark.coalesce(num_partitions) self.assertEqual(new_kdf.to_spark().rdd.getNumPartitions(), num_partitions) self.assert_eq(kdf.sort_index(), new_kdf.sort_index()) + + def test_checkpoint(self): + with self.temp_dir() as tmp: + self.spark.sparkContext.setCheckpointDir(tmp) + kdf = ks.DataFrame({"a": ["a", "b", "c"]}) + new_kdf = kdf.spark.checkpoint() + self.assertIsNotNone(os.listdir(tmp)) + self.assert_eq(kdf, new_kdf) diff --git a/docs/source/reference/frame.rst b/docs/source/reference/frame.rst index 1fcee69bdd..e04d4c5446 100644 --- a/docs/source/reference/frame.rst +++ b/docs/source/reference/frame.rst @@ -278,6 +278,7 @@ in Spark. These can be accessed by ``DataFrame.spark.``. DataFrame.spark.apply DataFrame.spark.repartition DataFrame.spark.coalesce + DataFrame.spark.checkpoint .. _api.dataframe.plot: From f62f2f2302aff0d5f7f467713733648d8f01ad63 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 29 Oct 2020 10:40:50 -0700 Subject: [PATCH 2/4] Adjust imports --- databricks/koalas/tests/test_frame_spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/tests/test_frame_spark.py b/databricks/koalas/tests/test_frame_spark.py index 0ea186b043..95b079bd1e 100644 --- a/databricks/koalas/tests/test_frame_spark.py +++ b/databricks/koalas/tests/test_frame_spark.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os from distutils.version import LooseVersion +import os import pandas as pd import pyspark From 7bec13392bd31661f86b773fe6fc3b7048c95ca6 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 29 Oct 2020 11:51:45 -0700 Subject: [PATCH 3/4] Resolve comments --- databricks/koalas/spark/accessors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/databricks/koalas/spark/accessors.py b/databricks/koalas/spark/accessors.py index 3364a41691..70cd264c72 100644 --- a/databricks/koalas/spark/accessors.py +++ b/databricks/koalas/spark/accessors.py @@ -993,7 +993,7 @@ def coalesce(self, num_partitions: int) -> "ks.DataFrame": coalesced_sdf = internal.spark_frame.coalesce(num_partitions) return DataFrame(internal.with_new_sdf(coalesced_sdf)) - def checkpoint(self, eager=True): + def checkpoint(self, eager=True) -> "ks.DataFrame": """Returns a checkpointed version of this DataFrame. Checkpointing can be used to truncate the logical plan of this DataFrame, which is @@ -1014,7 +1014,7 @@ def checkpoint(self, eager=True): Examples -------- >>> kdf = ks.DataFrame({"a": ["a", "b", "c"]}) - >>> kdf # doctest: +NORMALIZE_WHITESPACE + >>> kdf a 0 a 1 b From e3d705a18e6e228430fdd9ef50bf5fc9643da597 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 29 Oct 2020 13:18:39 -0700 Subject: [PATCH 4/4] Type for eager --- databricks/koalas/spark/accessors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/spark/accessors.py b/databricks/koalas/spark/accessors.py index 70cd264c72..cb2d80af60 100644 --- a/databricks/koalas/spark/accessors.py +++ b/databricks/koalas/spark/accessors.py @@ -993,7 +993,7 @@ def coalesce(self, num_partitions: int) -> "ks.DataFrame": coalesced_sdf = internal.spark_frame.coalesce(num_partitions) return DataFrame(internal.with_new_sdf(coalesced_sdf)) - def checkpoint(self, eager=True) -> "ks.DataFrame": + def checkpoint(self, eager: bool = True) -> "ks.DataFrame": """Returns a checkpointed version of this DataFrame. Checkpointing can be used to truncate the logical plan of this DataFrame, which is