Skip to content

Commit e2eb540

Browse files
Dylan Wongdylanwong250
authored andcommitted
[SPARK-53103][SS] Throw an error if state directory is not empty when query starts
### What changes were proposed in this pull request? We are implementing a check to ensure the state directory of the checkpoint location is empty before the first batch of a streaming query begins. The new check will throw an error if the directory isn't empty. ### Why are the changes needed? This will prevent a conflict where background maintenance processes delete files that the new query has just created and prevents sharing state directories. ### Does this PR introduce _any_ user-facing change? Yes. There is a new error condition ```STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY``` that occurs when the state directory is not empty on the first batch of a query. ### How was this patch tested? Unit tests are added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51817 from dylanwong250/SPARK-53103. Lead-authored-by: Dylan Wong <[email protected]> Co-authored-by: dylanwong250 <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent ce646b3 commit e2eb540

File tree

6 files changed

+148
-3
lines changed

6 files changed

+148
-3
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5161,6 +5161,13 @@
51615161
],
51625162
"sqlState" : "42802"
51635163
},
5164+
"STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY" : {
5165+
"message" : [
5166+
"The checkpoint location <checkpointLocation> should be empty on batch 0",
5167+
"Please either use a new checkpoint location, or delete the existing data in the checkpoint location."
5168+
],
5169+
"sqlState" : "42K03"
5170+
},
51645171
"STATE_STORE_COLUMN_FAMILY_SCHEMA_INCOMPATIBLE" : {
51655172
"message" : [
51665173
"Incompatible schema transformation with column family=<colFamilyName>, oldSchema=<oldSchema>, newSchema=<newSchema>."

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2687,6 +2687,16 @@ object SQLConf {
26872687
.intConf
26882688
.createWithDefault(16)
26892689

2690+
val STREAMING_VERIFY_CHECKPOINT_DIRECTORY_EMPTY_ON_START =
2691+
buildConf("spark.sql.streaming.verifyCheckpointDirectoryEmptyOnStart")
2692+
.internal()
2693+
.doc("When true, verifies that the checkpoint directory (offsets, state, commits) is " +
2694+
"empty when first starting a streaming query. This prevents prevents sharing checkpoint " +
2695+
"directories between different queries.")
2696+
.version("4.1.0")
2697+
.booleanConf
2698+
.createWithDefault(true)
2699+
26902700
val STATE_STORE_COMPRESSION_CODEC =
26912701
buildConf("spark.sql.streaming.stateStore.compression.codec")
26922702
.internal()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import scala.collection.mutable.{Map => MutableMap}
2121
import scala.collection.mutable
2222
import scala.util.control.NonFatal
2323

24+
import org.apache.hadoop.fs.Path
25+
2426
import org.apache.spark.internal.LogKeys
2527
import org.apache.spark.internal.LogKeys.BATCH_ID
2628
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -38,11 +40,12 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
3840
import org.apache.spark.sql.execution.datasources.LogicalRelation
3941
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
4042
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, Sink, Source}
41-
import org.apache.spark.sql.execution.streaming.checkpointing.{CommitMetadata, OffsetSeq, OffsetSeqMetadata}
43+
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeq, OffsetSeqMetadata}
4244
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
4345
import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler
46+
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
4447
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
45-
import org.apache.spark.sql.execution.streaming.state.StateSchemaBroadcast
48+
import org.apache.spark.sql.execution.streaming.state.{StateSchemaBroadcast, StateStoreErrors}
4649
import org.apache.spark.sql.internal.SQLConf
4750
import org.apache.spark.sql.streaming.Trigger
4851
import org.apache.spark.util.{Clock, Utils}
@@ -562,12 +565,40 @@ class MicroBatchExecution(
562565
log"offsets ${MDC(LogKeys.STREAMING_OFFSETS_START, execCtx.startOffsets)} and " +
563566
log"available offsets ${MDC(LogKeys.STREAMING_OFFSETS_END, execCtx.endOffsets)}")
564567
case None => // We are starting this stream for the first time.
568+
val shouldVerifyNewCheckpointDirectory =
569+
sparkSession.conf.get(SQLConf.STREAMING_VERIFY_CHECKPOINT_DIRECTORY_EMPTY_ON_START)
570+
if (shouldVerifyNewCheckpointDirectory) {
571+
verifyNewCheckpointDirectory()
572+
}
565573
logInfo(s"Starting new streaming query.")
566574
execCtx.batchId = 0
567575
watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf, logicalPlan)
568576
}
569577
}
570578

579+
/**
580+
* Verify that the checkpoint directory is in a good state to start a new
581+
* streaming query. This checks that the offsets, state, commits directories are
582+
* either non-existent or empty.
583+
*
584+
* If this check fails, an exception is thrown.
585+
*/
586+
private def verifyNewCheckpointDirectory(): Unit = {
587+
val fileManager = CheckpointFileManager.create(new Path(resolvedCheckpointRoot),
588+
sparkSession.sessionState.newHadoopConf())
589+
val dirNamesThatShouldNotHaveFiles = Array[String](
590+
DIR_NAME_OFFSETS, DIR_NAME_STATE, DIR_NAME_COMMITS)
591+
592+
dirNamesThatShouldNotHaveFiles.foreach { dirName =>
593+
val path = new Path(resolvedCheckpointRoot, dirName)
594+
595+
if (fileManager.exists(path) && !fileManager.list(path).isEmpty) {
596+
val loc = path.toString
597+
throw StateStoreErrors.streamingStateCheckpointLocationNotEmpty(loc)
598+
}
599+
}
600+
}
601+
571602
/**
572603
* Returns true if there is any new data available to be processed.
573604
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ object StateStoreErrors {
187187
numSchemaFiles, schemaFilesThreshold, addedColFamilies, removedColFamilies)
188188
}
189189

190+
def streamingStateCheckpointLocationNotEmpty(checkpointLocation: String)
191+
: StateStoreCheckpointLocationNotEmpty = {
192+
new StateStoreCheckpointLocationNotEmpty(checkpointLocation)
193+
}
194+
190195
def stateStoreColumnFamilyMismatch(
191196
columnFamilyName: String,
192197
oldColumnFamilySchema: String,
@@ -474,6 +479,13 @@ class StateStoreStateSchemaFilesThresholdExceeded(
474479
"addedColumnFamilies" -> addedColFamilies.mkString("(", ",", ")"),
475480
"removedColumnFamilies" -> removedColFamilies.mkString("(", ",", ")")))
476481

482+
class StateStoreCheckpointLocationNotEmpty(
483+
checkpointLocation: String)
484+
extends SparkUnsupportedOperationException(
485+
errorClass = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY",
486+
messageParameters = Map(
487+
"checkpointLocation" -> checkpointLocation))
488+
477489
class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String)
478490
extends SparkRuntimeException(
479491
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE",

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ class StreamingQueryManagerSuite extends StreamTest {
320320
val query1 = ds1.writeStream.format("parquet")
321321
.option("checkpointLocation", chkLocation).start(dataLocation)
322322
ms1.addData(1, 2, 3)
323+
query1.processAllAvailable() // ensure offset log has been written
324+
323325
val query2 = ds2.writeStream.format("parquet")
324326
.option("checkpointLocation", chkLocation).start(dataLocation)
325327
try {
@@ -382,6 +384,8 @@ class StreamingQueryManagerSuite extends StreamTest {
382384
val query1 = ms1.toDS().writeStream.format("parquet")
383385
.option("checkpointLocation", chkLocation).start(dataLocation)
384386
ms1.addData(1, 2, 3)
387+
query1.processAllAvailable() // ensure offset log has been written
388+
385389
val query2 = ds2.writeStream.format("parquet")
386390
.option("checkpointLocation", chkLocation).start(dataLocation)
387391
try {

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ import org.apache.spark.sql.connector.read.InputPartition
4545
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit}
4646
import org.apache.spark.sql.execution.exchange.{REQUIRED_BY_STATEFUL_OPERATOR, ReusedExchangeExec, ShuffleExchangeExec}
4747
import org.apache.spark.sql.execution.streaming._
48-
import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata
48+
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqMetadata}
4949
import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, MemoryStream, MetricsReporter, StreamExecution, StreamingExecutionRelation, StreamingQueryWrapper}
5050
import org.apache.spark.sql.execution.streaming.sources.{MemorySink, TestForeachWriter}
51+
import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, RocksDBStateStoreProvider, StateStoreCheckpointLocationNotEmpty}
5152
import org.apache.spark.sql.functions._
5253
import org.apache.spark.sql.internal.SQLConf
5354
import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
@@ -1475,6 +1476,86 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
14751476
)
14761477
}
14771478

1479+
private val TEST_PROVIDERS = Seq(
1480+
classOf[HDFSBackedStateStoreProvider].getName,
1481+
classOf[RocksDBStateStoreProvider].getName
1482+
)
1483+
1484+
TEST_PROVIDERS.foreach { provider =>
1485+
test("SPARK-53103: non empty state and commits checkpoint directory on first batch"
1486+
+ s"(with $provider)") {
1487+
withSQLConf(
1488+
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> provider) {
1489+
1490+
withTempDir { checkpointDir =>
1491+
val q = MemoryStream[Int].toDS().groupBy().count()
1492+
.writeStream
1493+
.format("memory")
1494+
.outputMode("complete")
1495+
.queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}")
1496+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
1497+
.start()
1498+
// Verify that the query can start successfully when the checkpoint directory is empty.
1499+
q.stop()
1500+
}
1501+
1502+
withTempDir { checkpointDir =>
1503+
val hadoopConf = spark.sessionState.newHadoopConf()
1504+
val fm = CheckpointFileManager.create(new Path(checkpointDir.toString), hadoopConf)
1505+
1506+
// Create a non-empty state checkpoint directory to simulate the case that the user
1507+
// a directory that already has state data.
1508+
fm.mkdirs(new Path(new Path(checkpointDir.getCanonicalPath, "state"), "0"))
1509+
1510+
checkError(
1511+
exception = intercept[StreamingQueryException] {
1512+
MemoryStream[Int].toDS().groupBy().count()
1513+
.writeStream
1514+
.format("memory")
1515+
.outputMode("complete")
1516+
.queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}")
1517+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
1518+
.start()
1519+
.processAllAvailable()
1520+
}.getCause.asInstanceOf[StateStoreCheckpointLocationNotEmpty],
1521+
condition = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY",
1522+
sqlState = "42K03",
1523+
parameters = Map(
1524+
"checkpointLocation" ->
1525+
("file:" + (new Path(checkpointDir.getCanonicalPath, "state")).toString)
1526+
))
1527+
}
1528+
1529+
withTempDir { checkpointDir =>
1530+
val hadoopConf = spark.sessionState.newHadoopConf()
1531+
val fm = CheckpointFileManager.create(new Path(checkpointDir.toString), hadoopConf)
1532+
1533+
// Create a non-empty state checkpoint directory to simulate the case that the user
1534+
// a directory that already has commits data.
1535+
fm.mkdirs(new Path(new Path(checkpointDir.getCanonicalPath, "commits"), "0"))
1536+
1537+
checkError(
1538+
exception = intercept[StreamingQueryException] {
1539+
MemoryStream[Int].toDS().groupBy().count()
1540+
.writeStream
1541+
.format("memory")
1542+
.outputMode("complete")
1543+
.queryName(s"name${RandomStringUtils.secure.nextAlphabetic(10)}")
1544+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
1545+
.start()
1546+
.processAllAvailable()
1547+
}.getCause.asInstanceOf[StateStoreCheckpointLocationNotEmpty],
1548+
condition = "STATE_STORE_CHECKPOINT_LOCATION_NOT_EMPTY",
1549+
sqlState = "42K03",
1550+
parameters = Map(
1551+
"checkpointLocation" ->
1552+
("file:" + (new Path(checkpointDir.getCanonicalPath, "commits")).toString)
1553+
))
1554+
}
1555+
}
1556+
}
1557+
}
1558+
14781559
private def checkAppendOutputModeException(df: DataFrame): Unit = {
14791560
withTempDir { outputDir =>
14801561
withTempDir { checkpointDir =>

0 commit comments

Comments
 (0)