From fa5b387dcd7ef598cfe76fce317f342aeb49cdd3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 17 Oct 2022 20:17:32 -0700 Subject: [PATCH 1/4] Add additional renamed file check --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ .../sql/execution/streaming/CheckpointFileManager.scala | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2f96209222b2..76ea545bd51f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1830,6 +1830,13 @@ object SQLConf { .stringConf .createWithDefault("lz4") + val CHECKPOINT_RENAMEDFILE_CHECK_ENABLED = + buildConf("spark.sql.streaming.checkpoint.renamedFileCheck") + .doc("When true, Spark will validate if renamed checkpoint file exists.") + .internal() + .booleanConf + .createWithDefault(false) + /** * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places should be updated * together. @@ -4234,6 +4241,8 @@ class SQLConf extends Serializable with Logging { def stateStoreCompressionCodec: String = getConf(STATE_STORE_COMPRESSION_CODEC) + def checkpointRenamedFileCheck: Boolean = getConf(CHECKPOINT_RENAMEDFILE_CHECK_ENABLED) + def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index cf5d54fd20a5..013efd3c7bae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -158,6 +158,13 @@ object CheckpointFileManager extends Logging { s"Failed to rename temp file $tempPath to $finalPath because file exists", fe) if (!overwriteIfPossible) throw fe } + + // Optionally, check if the renamed file exists + if (SQLConf.get.checkpointRenamedFileCheck && !fm.exists(finalPath)) { + throw new IllegalStateException(s"Renamed temp file $tempPath to $finalPath. " + + s"But $finalPath does not exist.") + } + logInfo(s"Renamed temp file $tempPath to $finalPath") } finally { terminated = true From 7535fa98de79fa14fe0e783f5286cd9a11bd88d8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 18 Oct 2022 08:23:35 -0700 Subject: [PATCH 2/4] Change config name --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 76ea545bd51f..06d02c3e9f0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1831,7 +1831,7 @@ object SQLConf { .createWithDefault("lz4") val CHECKPOINT_RENAMEDFILE_CHECK_ENABLED = - buildConf("spark.sql.streaming.checkpoint.renamedFileCheck") + buildConf("spark.sql.streaming.checkpoint.renamedFileCheck.enabled") .doc("When true, Spark will validate if renamed checkpoint file exists.") .internal() .booleanConf From 90287662003f3543a6e0fe724138fdff9e26a259 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 18 Oct 2022 13:07:19 -0700 Subject: [PATCH 3/4] Add version --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 06d02c3e9f0b..a99a795018d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1834,6 +1834,7 @@ object SQLConf { buildConf("spark.sql.streaming.checkpoint.renamedFileCheck.enabled") .doc("When true, Spark will validate if renamed checkpoint file exists.") .internal() + .version("3.4.0") .booleanConf .createWithDefault(false) From 115c0e09cbddb56a7dfe97d4afe3c8ae44edb4a6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 18 Oct 2022 15:46:03 -0700 Subject: [PATCH 4/4] Trigger Build