diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 77bc0ba5548d..16f279cd49e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -163,6 +163,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( batchAdded } + /** + * CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal + * state, specifically which latest compaction batch is purged. + * + * To simplify the situation, this method just throws UnsupportedOperationException regardless + * of given parameter, and let CompactibleFileStreamLog handles purging by itself. + */ + override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException( + s"Cannot purge as it might break internal state.") + /** * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the * corresponding `batchId` file. It will delete expired files as well if enabled. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index ec961a9ecb59..71dc3776bcaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -232,6 +232,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext }) } + test("prevent removing metadata files via method purge") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = 10000, + defaultCompactInterval = 2, + defaultMinBatchesToRetain = 3, + compactibleLog => { + // compaction batches: 1 + compactibleLog.add(0, Array("some_path_0")) + compactibleLog.add(1, Array("some_path_1")) + compactibleLog.add(2, Array("some_path_2")) + + val exc = intercept[UnsupportedOperationException] { + compactibleLog.purge(2) + } + assert(exc.getMessage.contains("Cannot purge as it might break internal state")) + + // Below line would fail with IllegalStateException if we don't prevent purge: + // - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch + // - allFiles() would read batch 1 (latest compaction) and 2 which batch 1 is deleted + compactibleLog.allFiles() + }) + } + private def withFakeCompactibleFileStreamLog( fileCleanupDelayMs: Long, defaultCompactInterval: Int,