diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0dbac45fd7f9..6018c87b0122 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -601,6 +601,8 @@ class SparkContext(config: SparkConf) extends Logging { .foreach(logLevel => _schedulerBackend.updateExecutorsLogLevel(logLevel)) } + _conf.get(CHECKPOINT_DIR).foreach(setCheckpointDir) + val _executorMetricsSource = if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { Some(new ExecutorMetricsSource) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 87402d2cc17e..dc3edfaa8613 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1317,6 +1317,15 @@ package object config { s" be less than or equal to ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") .createWithDefault(64 * 1024 * 1024) + private[spark] val CHECKPOINT_DIR = + ConfigBuilder("spark.checkpoint.dir") + .doc( + "Set the default directory for checkpointing. It can be overwritten by " + + "SparkContext.setCheckpointDir.") + .version("4.0.0") + .stringConf + .createOptional + private[spark] val CHECKPOINT_COMPRESS = ConfigBuilder("spark.checkpoint.compress") .doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " + diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 874f4896bb01..7a39ba4ab382 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -669,4 +669,20 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext { assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]]) } } + + test("SPARK-48268: checkpoint directory via configuration") { + withTempDir { checkpointDir => + val conf = new SparkConf() + .set("spark.checkpoint.dir", checkpointDir.toString) + .set(UI_ENABLED.key, "false") + sc = new SparkContext("local", "test", conf) + val parCollection = sc.makeRDD(1 to 4) + val flatMappedRDD = parCollection.flatMap(x => 1 to x) + flatMappedRDD.checkpoint() + assert(flatMappedRDD.dependencies.head.rdd === parCollection) + val result = flatMappedRDD.collect() + assert(flatMappedRDD.dependencies.head.rdd != parCollection) + assert(flatMappedRDD.collect() === result) + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index 7884a2af60b2..cb1fb6fba958 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1795,6 +1795,15 @@ Apart from these, the following properties are also available, and may be useful 0.6.0 + + spark.checkpoint.dir + (none) + + Set the default directory for checkpointing. It can be overwritten by + SparkContext.setCheckpointDir. + + 4.0.0 + spark.checkpoint.compress false