From 4c9649a2771e5c1b49793b840f326bc25a2c2cd5 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 14 May 2024 14:08:08 +0900 Subject: [PATCH 1/7] Add a configuration for SparkContext.setCheckpointDir --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/internal/config/package.scala | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0dbac45fd7f9..900c2c7e0aff 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -373,7 +373,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def cleaner: Option[ContextCleaner] = _cleaner - private[spark] var checkpointDir: Option[String] = None + private[spark] var checkpointDir: Option[String] = conf.getOption(CHECKPOINT_DIR.key) // Thread Local variable that can be used by users to pass information down the stack protected[spark] val localProperties = new InheritableThreadLocal[Properties] { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 87402d2cc17e..76224948ebb4 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1317,6 +1317,14 @@ package object config { s" be less than or equal to ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") .createWithDefault(64 * 1024 * 1024) + private[spark] val CHECKPOINT_DIR = + ConfigBuilder("spark.checkpoint.dir") + .doc("Equivalent with SparkContext.setCheckpointDir. If set, the path becomes " + + "the directory for checkpointing.") + .version("4.0.0") + .stringConf + .createOptional + private[spark] val CHECKPOINT_COMPRESS = ConfigBuilder("spark.checkpoint.compress") .doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " + From 0f6a9c31caf7c7082d80f5debb188f4f6aa8c22f Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 14 May 2024 16:36:43 +0900 Subject: [PATCH 2/7] Update core/src/main/scala/org/apache/spark/SparkContext.scala Co-authored-by: Mridul Muralidharan <1591700+mridulm@users.noreply.github.com> --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 900c2c7e0aff..36d48a22ea7c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -373,7 +373,8 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def cleaner: Option[ContextCleaner] = _cleaner - private[spark] var checkpointDir: Option[String] = conf.getOption(CHECKPOINT_DIR.key) + private[spark] var checkpointDir: Option[String] = None + conf.getOption(CHECKPOINT_DIR).foreach(setCheckpointDir) // Thread Local variable that can be used by users to pass information down the stack protected[spark] val localProperties = new InheritableThreadLocal[Properties] { From 14a7aed933086342207c8ca270096ba9d0261ad4 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 14 May 2024 17:24:30 +0900 Subject: [PATCH 3/7] fixup --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 36d48a22ea7c..ba517e3d4f21 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -374,7 +374,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def cleaner: Option[ContextCleaner] = _cleaner private[spark] var checkpointDir: Option[String] = None - conf.getOption(CHECKPOINT_DIR).foreach(setCheckpointDir) + config.getOption(CHECKPOINT_DIR.key).foreach(setCheckpointDir) // Thread Local variable that can be used by users to pass information down the stack protected[spark] val localProperties = new InheritableThreadLocal[Properties] { From 995efb7d1cafd422775c8112455b25bcb67eb4d0 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 16 May 2024 08:31:54 +0900 Subject: [PATCH 4/7] Address a comment --- .../org/apache/spark/internal/config/package.scala | 6 ++++-- docs/configuration.md | 10 ++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 76224948ebb4..a10648307fc0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1319,8 +1319,10 @@ package object config { private[spark] val CHECKPOINT_DIR = ConfigBuilder("spark.checkpoint.dir") - .doc("Equivalent with SparkContext.setCheckpointDir. If set, the path becomes " + - "the directory for checkpointing.") + .doc( + "Equivalent with SparkContext.setCheckpointDir. If set, the path becomes" + + "the default directory for checkpointing. It can be overwritten by" + + "SparkContext.setCheckpointDir.") .version("4.0.0") .stringConf .createOptional diff --git a/docs/configuration.md b/docs/configuration.md index 7884a2af60b2..754c241cc951 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1795,6 +1795,16 @@ Apart from these, the following properties are also available, and may be useful 0.6.0 + + spark.checkpoint.dir + (none) + + Equivalent with SparkContext.setCheckpointDir. If set, the path becomes + the default directory for checkpointing. It can be overwritten by + SparkContext.setCheckpointDir. + + 4.0.0 + spark.checkpoint.compress false From f40ffe0f97a2e753878af8f8f47038272a1e1657 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 16 May 2024 08:39:33 +0900 Subject: [PATCH 5/7] indentation --- .../main/scala/org/apache/spark/internal/config/package.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a10648307fc0..737540d4e5e4 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1321,8 +1321,8 @@ package object config { ConfigBuilder("spark.checkpoint.dir") .doc( "Equivalent with SparkContext.setCheckpointDir. If set, the path becomes" + - "the default directory for checkpointing. It can be overwritten by" + - "SparkContext.setCheckpointDir.") + "the default directory for checkpointing. It can be overwritten by" + + "SparkContext.setCheckpointDir.") .version("4.0.0") .stringConf .createOptional From 952d9b54fa7f1813017b8b1ed7c0bb4a7a158be4 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 16 May 2024 10:55:58 +0900 Subject: [PATCH 6/7] Address a comment --- .../scala/org/apache/spark/SparkContext.scala | 3 ++- .../scala/org/apache/spark/CheckpointSuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ba517e3d4f21..6018c87b0122 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -374,7 +374,6 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def cleaner: Option[ContextCleaner] = _cleaner private[spark] var checkpointDir: Option[String] = None - config.getOption(CHECKPOINT_DIR.key).foreach(setCheckpointDir) // Thread Local variable that can be used by users to pass information down the stack protected[spark] val localProperties = new InheritableThreadLocal[Properties] { @@ -602,6 +601,8 @@ class SparkContext(config: SparkConf) extends Logging { .foreach(logLevel => _schedulerBackend.updateExecutorsLogLevel(logLevel)) } + _conf.get(CHECKPOINT_DIR).foreach(setCheckpointDir) + val _executorMetricsSource = if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { Some(new ExecutorMetricsSource) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 874f4896bb01..7a39ba4ab382 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -669,4 +669,20 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext { assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]]) } } + + test("SPARK-48268: checkpoint directory via configuration") { + withTempDir { checkpointDir => + val conf = new SparkConf() + .set("spark.checkpoint.dir", checkpointDir.toString) + .set(UI_ENABLED.key, "false") + sc = new SparkContext("local", "test", conf) + val parCollection = sc.makeRDD(1 to 4) + val flatMappedRDD = parCollection.flatMap(x => 1 to x) + flatMappedRDD.checkpoint() + assert(flatMappedRDD.dependencies.head.rdd === parCollection) + val result = flatMappedRDD.collect() + assert(flatMappedRDD.dependencies.head.rdd != parCollection) + assert(flatMappedRDD.collect() === result) + } + } } From 0fbe9182d7ede82808e8804616bd8daf5d495adf Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 17 May 2024 08:34:55 +0900 Subject: [PATCH 7/7] address comments --- .../main/scala/org/apache/spark/internal/config/package.scala | 3 +-- docs/configuration.md | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 737540d4e5e4..dc3edfaa8613 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1320,8 +1320,7 @@ package object config { private[spark] val CHECKPOINT_DIR = ConfigBuilder("spark.checkpoint.dir") .doc( - "Equivalent with SparkContext.setCheckpointDir. If set, the path becomes" + - "the default directory for checkpointing. It can be overwritten by" + + "Set the default directory for checkpointing. It can be overwritten by " + "SparkContext.setCheckpointDir.") .version("4.0.0") .stringConf diff --git a/docs/configuration.md b/docs/configuration.md index 754c241cc951..cb1fb6fba958 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1799,8 +1799,7 @@ Apart from these, the following properties are also available, and may be useful spark.checkpoint.dir (none) - Equivalent with SparkContext.setCheckpointDir. If set, the path becomes - the default directory for checkpointing. It can be overwritten by + Set the default directory for checkpointing. It can be overwritten by SparkContext.setCheckpointDir. 4.0.0