diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c331f1724854..ecc4e77862af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2997,6 +2997,19 @@ object SQLConf { .checkValue(v => Set(1).contains(v), "Valid version is 1") .createWithDefault(1) + val STREAMING_OFFSET_LOG_FORMAT_VERSION = + buildConf("spark.sql.streaming.offsetLog.formatVersion") + .doc("Offset log format version used in streaming query checkpoints. " + + "Version 1 uses sequence-based OffsetSeq format, version 2 uses map-based OffsetMap " + + "format which provides better flexibility for source management. " + + "Offset log format versions are incompatible, so this configuration only affects new " + + "streaming queries. Existing queries will continue using the format version from their " + + "checkpoint.") + .version("4.1.0") + .intConf + .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") + .createWithDefault(1) + val UNSUPPORTED_OPERATION_CHECK_ENABLED = buildConf("spark.sql.streaming.unsupportedOperationCheck") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala index a882d9539c4b..b32adc17d7d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala @@ -244,7 +244,8 @@ object OffsetSeqMetadata extends Logging { case (confInSession, confInOffsetLog) => confInOffsetLog.key -> sessionConf.get(confInSession.key) }.toMap - OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ confsFromRebind) + val version = sessionConf.get(STREAMING_OFFSET_LOG_FORMAT_VERSION.key).toInt + OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ confsFromRebind, version) } /** Set the SparkSession configuration with the values in the metadata */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala index 891a66b21b52..5f066ffd9f14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala @@ -138,11 +138,11 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) } object OffsetSeqLog { - private[streaming] val VERSION_1 = 1 - private[streaming] val VERSION_2 = 2 + val VERSION_1 = 1 + val VERSION_2 = 2 private[streaming] val VERSION = VERSION_1 // Default version for backward compatibility - private[streaming] val MAX_VERSION = VERSION_2 - private[streaming] val SERIALIZED_VOID_OFFSET = "-" + val MAX_VERSION = VERSION_2 + private[spark] val SERIALIZED_VOID_OFFSET = "-" private[checkpointing] def parseOffset(value: String): OffsetV2 = value match { case SERIALIZED_VOID_OFFSET => null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 5ea97e6a2c32..cb46bb2e5bf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqMetadata} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} @@ -77,6 +77,7 @@ class MicroBatchExecution( progressReporter, -1, sparkSession, + offsetLogFormatVersionOpt = None, previousContext = None) override def getLatestExecutionContext(): StreamExecutionContext = latestExecutionContext @@ -417,11 +418,26 @@ class MicroBatchExecution( sourceIdMap ) + // Read the offset log format version from the last written offset log entry. If no entries + // are found, use the set/default value from the config. + val offsetLogFormatVersion = if (latestStartedBatch.isDefined) { + latestStartedBatch.get._2.metadataOpt.map(_.version).getOrElse(OffsetSeqLog.VERSION_1) + } else { + // If no offset log entries are found, assert that the query does not have any committed + // batches to be extra safe. + assert(lastCommittedBatchId == -1L) + sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION) + } + + // Set additional configurations depending on the offset log format version + setSparkSessionConfigsForOffsetLog(sparkSessionForStream, offsetLogFormatVersion) + val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink, - progressReporter, -1, sparkSession, None) + progressReporter, -1, sparkSession, Some(offsetLogFormatVersion), None) execCtx.offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) + setLatestExecutionContext(execCtx) populateStartOffsets(execCtx, sparkSessionForStream) @@ -719,6 +735,17 @@ class MicroBatchExecution( } } + /** + * Set the SparkSession configurations for the offset log format version. + */ + private def setSparkSessionConfigsForOffsetLog( + sparkSessionForStream: SparkSession, + offsetLogFormatVersion: Int): Unit = { + // Set the offset log format version in the sparkSessionForStream conf + sparkSessionForStream.conf.set( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion) + } + /** * Returns true if there is any new data available to be processed. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecutionContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecutionContext.scala index b177bbdede74..a773abb7e501 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecutionContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecutionContext.scala @@ -127,6 +127,7 @@ class MicroBatchExecutionContext( progressReporter: ProgressReporter, var _batchId: Long, sparkSession: SparkSession, + val offsetLogFormatVersionOpt: Option[Int], var previousContext: Option[MicroBatchExecutionContext]) extends StreamExecutionContext( id, @@ -190,6 +191,7 @@ class MicroBatchExecutionContext( progressReporter, batchId + 1, sparkSession, + offsetLogFormatVersionOpt, Some(this)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 2c3ae11a4e7f..dc1d368e2959 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -21,12 +21,13 @@ import java.io.File import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata} -import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, SerializedOffset} +import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, MemoryStream, SerializedOffset} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils class OffsetSeqLogSuite extends SharedSparkSession { + import testImplicits._ /** test string offset type */ case class StringOffset(override val json: String) extends Offset @@ -237,4 +238,167 @@ class OffsetSeqLogSuite extends SharedSparkSession { assert(metadata.batchWatermarkMs === 0) assert(metadata.batchTimestampMs === 1758651405232L) } + + test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - new query with VERSION_2") { + withTempDir { checkpointDir => + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + // Create two MemoryStream sources + val inputData1 = MemoryStream[Int] + val inputData2 = MemoryStream[String] + + // Create a query that unions both sources + val df1 = inputData1.toDF().select($"value".cast("string")) + val df2 = inputData2.toDF() + val query = df1.union(df2) + .writeStream + .format("memory") + .queryName("offsetlog_v2_test") + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + try { + // Add data and process batches + inputData1.addData(1, 2, 3) + inputData2.addData("a", "b", "c") + query.processAllAvailable() + inputData1.addData(4, 5) + inputData2.addData("d", "e") + query.processAllAvailable() + + // Read the offset log from checkpoint + val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets") + val latestBatch = offsetLog.getLatest() + assert(latestBatch.isDefined, "Offset log should have at least one entry") + + val (batchId, offsetSeq) = latestBatch.get + + // Verify it's OffsetMap (VERSION_2) + assert(offsetSeq.isInstanceOf[OffsetMap], + s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}") + + // Verify metadata version is 2 + assert(offsetSeq.metadataOpt.isDefined) + val metadata = offsetSeq.metadataOpt.get + assert(metadata.version === 2, s"Expected version 2 but got ${metadata.version}") + + // Verify OffsetMap uses string keys ("0", "1", etc.) + val offsetMap = offsetSeq.asInstanceOf[OffsetMap] + assert(offsetMap.offsetsMap.keys.forall(_.forall(_.isDigit)), + "OffsetMap keys should be string representations of ordinals (e.g., '0', '1')") + // Verify the specific keys for both MemoryStream sources are "0" and "1" + assert(offsetMap.offsetsMap.keySet === Set("0", "1"), + s"Expected two sources with keys '0' and '1', but got keys: " + + s"${offsetMap.offsetsMap.keySet}") + } finally { + query.stop() + } + } + } + } + + test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - default VERSION_1") { + withTempDir { checkpointDir => + // Don't set the config, should default to VERSION_1 + val inputData = MemoryStream[Int] + val query = inputData.toDF() + .writeStream + .format("memory") + .queryName("offsetlog_v1_test") + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + try { + // Add data and process batches + inputData.addData(1, 2, 3) + query.processAllAvailable() + + // Read the offset log from checkpoint + val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets") + val latestBatch = offsetLog.getLatest() + assert(latestBatch.isDefined, "Offset log should have at least one entry") + + val (batchId, offsetSeq) = latestBatch.get + + // Verify it's OffsetSeq (VERSION_1) + assert(offsetSeq.isInstanceOf[OffsetSeq], + s"Expected OffsetSeq but got ${offsetSeq.getClass.getSimpleName}") + + // Verify metadata version is 1 + assert(offsetSeq.metadataOpt.isDefined) + val metadata = offsetSeq.metadataOpt.get + assert(metadata.version === 1, s"Expected version 1 but got ${metadata.version}") + } finally { + query.stop() + } + } + } + + Seq( + (1, 2, classOf[OffsetSeq]), + (2, 1, classOf[OffsetMap]) + ).foreach { case (startingVersion, restartVersion, expectedClass) => + test(s"STREAMING_OFFSET_LOG_FORMAT_VERSION config - checkpoint wins on restart " + + s"(v$startingVersion to v$restartVersion)") { + withTempDir { checkpointDir => + withTempDir { outputDir => + val inputData = MemoryStream[Int] + + // Start query with initial version + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> startingVersion.toString) { + val query1 = inputData.toDF() + .writeStream + .format("parquet") + .option("path", outputDir.getAbsolutePath) + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + inputData.addData(1, 2) + query1.processAllAvailable() + query1.stop() + } + + // Verify initial version was used in the checkpoint + val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets") + val batch1 = offsetLog.getLatest() + assert(batch1.isDefined) + assert(batch1.get._2.getClass === expectedClass, + s"Initial checkpoint should use VERSION_$startingVersion") + assert(batch1.get._2.metadataOpt.get.version === startingVersion) + + // Restart query with different version config - should still use initial version + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> restartVersion.toString) { + val query2 = inputData.toDF() + .writeStream + .format("parquet") + .option("path", outputDir.getAbsolutePath) + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + try { + inputData.addData(3, 4) + query2.processAllAvailable() + + // Read the latest offset log entry + val latestBatch = offsetLog.getLatest() + assert(latestBatch.isDefined) + + val (batchId, offsetSeq) = latestBatch.get + + // Should still use initial version because checkpoint was created with it + assert(offsetSeq.getClass === expectedClass, + s"Query should continue using VERSION_$startingVersion format from checkpoint " + + s"despite config change") + + val metadata = offsetSeq.metadataOpt.get + assert(metadata.version === startingVersion, + s"Query should continue using version $startingVersion from checkpoint " + + s"despite config being set to $restartVersion") + } finally { + query2.stop() + } + } + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 2a186a9296f4..52474cf428ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.classic.Dataset.ofRows import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source} +import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog import org.apache.spark.sql.execution.streaming.runtime._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} @@ -873,4 +874,106 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { query.stop() } } + + test("Verify that switching offset log format versions has no effect - v1 to v2") { + withTempDir { inputDir => + withTempDir { outputDir => + withTempDir { checkpointDir => + // Write initial data to input directory + spark.range(10).toDF("value").write.mode("overwrite").parquet(inputDir.getCanonicalPath) + + // First query with offset log format version 1 + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1") { + val query = spark.readStream + .schema("value long") + .parquet(inputDir.getCanonicalPath) + .writeStream + .format("parquet") + .outputMode("append") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + query.processAllAvailable() + query.stop() + } + + // Write more data for second batch + spark.range(10, 20).toDF("value").write.mode("append").parquet(inputDir.getCanonicalPath) + + // Second query with offset log format version 2 (should be ignored) + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + val query = spark.readStream + .schema("value long") + .parquet(inputDir.getCanonicalPath) + .writeStream + .format("parquet") + .outputMode("append") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + query.processAllAvailable() + query.stop() + } + + // Check OffsetSeqLog to verify both batches use version 1 + val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets") + val offsetBatches = offsetSeqLog.get(Some(0), Some(1)) + assert(offsetBatches.length == 2) + assert(offsetBatches(0)._1 == 0) + assert(offsetBatches(0)._2.metadataOpt.map(_.version).getOrElse(1) == 1) + assert(offsetBatches(1)._1 == 1) + assert(offsetBatches(1)._2.metadataOpt.map(_.version).getOrElse(1) == 1) + } + } + } + } + + test("Verify that switching offset log format versions has no effect - v2 to v1") { + withTempDir { inputDir => + withTempDir { outputDir => + withTempDir { checkpointDir => + // Write initial data to input directory + spark.range(10).toDF("value").write.mode("overwrite").parquet(inputDir.getCanonicalPath) + + // First query with offset log format version 2 + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + val query = spark.readStream + .schema("value long") + .parquet(inputDir.getCanonicalPath) + .writeStream + .format("parquet") + .outputMode("append") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + query.processAllAvailable() + query.stop() + } + + // Write more data for second batch + spark.range(10, 20).toDF("value").write.mode("append").parquet(inputDir.getCanonicalPath) + + // Second query with offset log format version 1 (should be ignored) + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1") { + val query = spark.readStream + .schema("value long") + .parquet(inputDir.getCanonicalPath) + .writeStream + .format("parquet") + .outputMode("append") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + query.processAllAvailable() + query.stop() + } + + // Check OffsetSeqLog to verify both batches use version 2 + val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets") + val offsetBatches = offsetSeqLog.get(Some(0), Some(1)) + assert(offsetBatches.length == 2) + assert(offsetBatches(0)._1 == 0) + assert(offsetBatches(0)._2.metadataOpt.map(_.version).getOrElse(1) == 2) + assert(offsetBatches(1)._1 == 1) + assert(offsetBatches(1)._2.metadataOpt.map(_.version).getOrElse(1) == 2) + } + } + } + } }