From ae8a851c3a2860637777646fb673cfd68282cfd8 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 3 Dec 2025 11:03:28 -0800 Subject: [PATCH 1/6] [WIP] Integrate OffsetMap with MicroBatchExecution --- .../apache/spark/sql/internal/SQLConf.scala | 13 ++ .../streaming/checkpointing/OffsetSeq.scala | 5 +- .../streaming/OffsetSeqLogSuite.scala | 149 +++++++++++++++++- 3 files changed, 164 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c331f1724854..ecc4e77862af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2997,6 +2997,19 @@ object SQLConf { .checkValue(v => Set(1).contains(v), "Valid version is 1") .createWithDefault(1) + val STREAMING_OFFSET_LOG_FORMAT_VERSION = + buildConf("spark.sql.streaming.offsetLog.formatVersion") + .doc("Offset log format version used in streaming query checkpoints. " + + "Version 1 uses sequence-based OffsetSeq format, version 2 uses map-based OffsetMap " + + "format which provides better flexibility for source management. " + + "Offset log format versions are incompatible, so this configuration only affects new " + + "streaming queries. Existing queries will continue using the format version from their " + + "checkpoint.") + .version("4.1.0") + .intConf + .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") + .createWithDefault(1) + val UNSUPPORTED_OPERATION_CHECK_ENABLED = buildConf("spark.sql.streaming.unsupportedOperationCheck") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala index a882d9539c4b..b0963f39cd59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala @@ -169,7 +169,7 @@ object OffsetSeqMetadata extends Logging { STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC, STATE_STORE_ROCKSDB_FORMAT_VERSION, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION, PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT, - STATE_STORE_ROW_CHECKSUM_ENABLED + STATE_STORE_ROW_CHECKSUM_ENABLED, STREAMING_OFFSET_LOG_FORMAT_VERSION ) /** @@ -244,7 +244,8 @@ object OffsetSeqMetadata extends Logging { case (confInSession, confInOffsetLog) => confInOffsetLog.key -> sessionConf.get(confInSession.key) }.toMap - OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ confsFromRebind) + val version = sessionConf.get(STREAMING_OFFSET_LOG_FORMAT_VERSION.key).toInt + OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ confsFromRebind, version) } /** Set the SparkSession configuration with the values in the metadata */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 2c3ae11a4e7f..46493addba78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata} -import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, SerializedOffset} +import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, MemoryStream, SerializedOffset} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils @@ -237,4 +237,151 @@ class OffsetSeqLogSuite extends SharedSparkSession { assert(metadata.batchWatermarkMs === 0) assert(metadata.batchTimestampMs === 1758651405232L) } + + test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - new query with VERSION_2") { + import testImplicits._ + withTempDir { checkpointDir => + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + val inputData = MemoryStream[Int] + val query = inputData.toDF() + .writeStream + .format("memory") + .queryName("offsetlog_v2_test") + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + try { + // Add data and process batches + inputData.addData(1, 2, 3) + query.processAllAvailable() + inputData.addData(4, 5) + query.processAllAvailable() + + // Read the offset log from checkpoint + val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets") + val latestBatch = offsetLog.getLatest() + assert(latestBatch.isDefined, "Offset log should have at least one entry") + + val (batchId, offsetSeq) = latestBatch.get + + // Verify it's OffsetMap (VERSION_2) + assert(offsetSeq.isInstanceOf[OffsetMap], + s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}") + + // Verify metadata version is 2 + assert(offsetSeq.metadataOpt.isDefined) + val metadata = offsetSeq.metadataOpt.get + assert(metadata.version === 2, s"Expected version 2 but got ${metadata.version}") + + // Verify the config is persisted + assert(metadata.conf.contains(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key)) + assert(metadata.conf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key) === "2") + + // Verify OffsetMap uses string keys ("0", "1", etc.) + val offsetMap = offsetSeq.asInstanceOf[OffsetMap] + assert(offsetMap.offsetsMap.keys.forall(_.forall(_.isDigit)), + "OffsetMap keys should be string representations of ordinals (e.g., '0', '1')") + } finally { + query.stop() + } + } + } + } + + test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - default VERSION_1") { + import testImplicits._ + withTempDir { checkpointDir => + // Don't set the config, should default to VERSION_1 + val inputData = MemoryStream[Int] + val query = inputData.toDF() + .writeStream + .format("memory") + .queryName("offsetlog_v1_test") + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + try { + // Add data and process batches + inputData.addData(1, 2, 3) + query.processAllAvailable() + + // Read the offset log from checkpoint + val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets") + val latestBatch = offsetLog.getLatest() + assert(latestBatch.isDefined, "Offset log should have at least one entry") + + val (batchId, offsetSeq) = latestBatch.get + + // Verify it's OffsetSeq (VERSION_1) + assert(offsetSeq.isInstanceOf[OffsetSeq], + s"Expected OffsetSeq but got ${offsetSeq.getClass.getSimpleName}") + + // Verify metadata version is 1 + assert(offsetSeq.metadataOpt.isDefined) + val metadata = offsetSeq.metadataOpt.get + assert(metadata.version === 1, s"Expected version 1 but got ${metadata.version}") + } finally { + query.stop() + } + } + } + + test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - checkpoint wins on restart") { + import testImplicits._ + withTempDir { checkpointDir => + withTempDir { outputDir => + val inputData = MemoryStream[Int] + + // Start query with VERSION_1 (default) + val query1 = inputData.toDF() + .writeStream + .format("parquet") + .option("path", outputDir.getAbsolutePath) + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + inputData.addData(1, 2) + query1.processAllAvailable() + query1.stop() + + // Verify VERSION_1 was used in the initial checkpoint + val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets") + val batch1 = offsetLog.getLatest() + assert(batch1.isDefined) + assert(batch1.get._2.isInstanceOf[OffsetSeq], "Initial checkpoint should use VERSION_1") + assert(batch1.get._2.metadataOpt.get.version === 1) + + // Restart query with VERSION_2 config - should still use VERSION_1 from checkpoint + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + val query2 = inputData.toDF() + .writeStream + .format("parquet") + .option("path", outputDir.getAbsolutePath) + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + try { + inputData.addData(3, 4) + query2.processAllAvailable() + + // Read the latest offset log entry + val latestBatch = offsetLog.getLatest() + assert(latestBatch.isDefined) + + val (batchId, offsetSeq) = latestBatch.get + + // Should still be VERSION_1 because checkpoint was created with VERSION_1 + assert(offsetSeq.isInstanceOf[OffsetSeq], + "Query should continue using VERSION_1 format from checkpoint despite config change") + + val metadata = offsetSeq.metadataOpt.get + assert(metadata.version === 1, + "Query should continue using version 1 from checkpoint despite config being set to 2") + } finally { + query2.stop() + } + } + } + } + } } From c8e10f94392cc52d284611f5f30beafb1cc3ff4a Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 3 Dec 2025 11:42:07 -0800 Subject: [PATCH 2/6] adding conf changes --- .../checkpointing/OffsetSeqLog.scala | 8 ++--- .../runtime/MicroBatchExecution.scala | 36 +++++++++++++++++-- .../runtime/StreamExecutionContext.scala | 2 ++ 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala index 891a66b21b52..5f066ffd9f14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeqLog.scala @@ -138,11 +138,11 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) } object OffsetSeqLog { - private[streaming] val VERSION_1 = 1 - private[streaming] val VERSION_2 = 2 + val VERSION_1 = 1 + val VERSION_2 = 2 private[streaming] val VERSION = VERSION_1 // Default version for backward compatibility - private[streaming] val MAX_VERSION = VERSION_2 - private[streaming] val SERIALIZED_VOID_OFFSET = "-" + val MAX_VERSION = VERSION_2 + private[spark] val SERIALIZED_VOID_OFFSET = "-" private[checkpointing] def parseOffset(value: String): OffsetV2 = value match { case SERIALIZED_VOID_OFFSET => null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 5ea97e6a2c32..d3f2b9ec5f4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqMetadata} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} @@ -77,6 +77,7 @@ class MicroBatchExecution( progressReporter, -1, sparkSession, + offsetLogFormatVersionOpt = None, previousContext = None) override def getLatestExecutionContext(): StreamExecutionContext = latestExecutionContext @@ -417,11 +418,29 @@ class MicroBatchExecution( sourceIdMap ) + // Read the offset log format version from the last written offset log entry. If no entries + // are found, use the set/default value from the config. + val offsetLogFormatVersion = if (latestStartedBatch.isDefined) { + latestStartedBatch.get._2.version + } else { + // If no offset log entries are found, assert that the query does not have any committed + // batches to be extra safe. + assert(lastCommittedBatchId == -1L) + sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION) + } + + // Set additional configurations depending on the offset log format version + setSparkSessionConfigsForOffsetLog(sparkSessionForStream, offsetLogFormatVersion) + val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink, - progressReporter, -1, sparkSession, None) + progressReporter, -1, sparkSession, Some(offsetLogFormatVersion), None) - execCtx.offsetSeqMetadata = + execCtx.offsetSeqMetadata = if (offsetLogFormatVersion == OffsetSeqLog.VERSION_2) { + OffsetSeqMetadataV2(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) + } else { OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) + } + setLatestExecutionContext(execCtx) populateStartOffsets(execCtx, sparkSessionForStream) @@ -719,6 +738,17 @@ class MicroBatchExecution( } } + /** + * Set the SparkSession configurations for the offset log format version. + */ + private def setSparkSessionConfigsForOffsetLog( + sparkSessionForStream: SparkSession, + offsetLogFormatVersion: Int): Unit = { + // Set the offset log format version in the sparkSessionForStream conf + sparkSessionForStream.conf.set( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion) + } + /** * Returns true if there is any new data available to be processed. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecutionContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecutionContext.scala index b177bbdede74..a773abb7e501 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecutionContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecutionContext.scala @@ -127,6 +127,7 @@ class MicroBatchExecutionContext( progressReporter: ProgressReporter, var _batchId: Long, sparkSession: SparkSession, + val offsetLogFormatVersionOpt: Option[Int], var previousContext: Option[MicroBatchExecutionContext]) extends StreamExecutionContext( id, @@ -190,6 +191,7 @@ class MicroBatchExecutionContext( progressReporter, batchId + 1, sparkSession, + offsetLogFormatVersionOpt, Some(this)) } From 17830b4fa90e826fb92ba3c750eb835c52034cad Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 3 Dec 2025 14:33:24 -0800 Subject: [PATCH 3/6] suite --- .../test/DataStreamReaderWriterSuite.scala | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 2a186a9296f4..a7e6aea53196 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.classic.Dataset.ofRows import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source} +import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog import org.apache.spark.sql.execution.streaming.runtime._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} @@ -873,4 +874,72 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { query.stop() } } + + private def runQuery( + inputDir: File, + outputDir: File, + checkpointDir: File, + formatVersion: String): Unit = { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> formatVersion) { + def writeToInputTable(): Unit = { + spark.range(1) + .write.format("delta") + .mode("append") + .save(inputDir.getCanonicalPath) + } + + writeToInputTable() + + val query = spark.readStream + .format("delta") + .load(inputDir.getCanonicalPath) + .writeStream + .format("delta") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + + query.processAllAvailable() + query.stop() + query.awaitTermination() + } + } + + test("Verify that switching offset log format versions has no effect - v1 to v2") { + withTempPaths(3) { dirs => + val (inputDir, outputDir, checkpointDir) = (dirs(0), dirs(1), dirs(2)) + // Run the query twice to ensure that the offset log format version is persisted + // and reused across multiple runs. + runQuery(inputDir, outputDir, checkpointDir, "1") + runQuery(inputDir, outputDir, checkpointDir, "2") + + // Check OffsetSeqLog to verify existence of 2 batches + val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets") + val offsetBatches = offsetSeqLog.get(Some(0), Some(1)) + assert(offsetBatches.length == 2) + assert(offsetBatches(0)._1 == 0) + assert(offsetBatches(0)._2.version == 1) + assert(offsetBatches(1)._1 == 1) + assert(offsetBatches(1)._2.version == 1) + } + } + + test("Verify that switching offset log format versions has no effect - v2 to v1") { + withTempPaths(3) { dirs => + val (inputDir, outputDir, checkpointDir) = (dirs(0), dirs(1), dirs(2)) + // Run the query twice to ensure that the offset log format version is persisted + // and reused across multiple runs. + runQuery(inputDir, outputDir, checkpointDir, "2") + runQuery(inputDir, outputDir, checkpointDir, "1") + + // Check OffsetSeqLog to verify existence of 2 batches + val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets") + val offsetBatches = offsetSeqLog.get(Some(0), Some(1)) + assert(offsetBatches.length == 2) + assert(offsetBatches(0)._1 == 0) + assert(offsetBatches(0)._2.version == 2) + assert(offsetBatches(1)._1 == 1) + assert(offsetBatches(1)._2.version == 2) + } + } } From 4f80482006b66f691f8918bc110a74c432d26570 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 3 Dec 2025 15:02:42 -0800 Subject: [PATCH 4/6] tests pass --- .../runtime/MicroBatchExecution.scala | 9 +- .../test/DataStreamReaderWriterSuite.scala | 150 +++++++++++------- 2 files changed, 95 insertions(+), 64 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index d3f2b9ec5f4f..cb46bb2e5bf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} @@ -421,7 +421,7 @@ class MicroBatchExecution( // Read the offset log format version from the last written offset log entry. If no entries // are found, use the set/default value from the config. val offsetLogFormatVersion = if (latestStartedBatch.isDefined) { - latestStartedBatch.get._2.version + latestStartedBatch.get._2.metadataOpt.map(_.version).getOrElse(OffsetSeqLog.VERSION_1) } else { // If no offset log entries are found, assert that the query does not have any committed // batches to be extra safe. @@ -435,11 +435,8 @@ class MicroBatchExecution( val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink, progressReporter, -1, sparkSession, Some(offsetLogFormatVersion), None) - execCtx.offsetSeqMetadata = if (offsetLogFormatVersion == OffsetSeqLog.VERSION_2) { - OffsetSeqMetadataV2(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) - } else { + execCtx.offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) - } setLatestExecutionContext(execCtx) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index a7e6aea53196..52474cf428ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -875,71 +875,105 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } } - private def runQuery( - inputDir: File, - outputDir: File, - checkpointDir: File, - formatVersion: String): Unit = { - withSQLConf( - SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> formatVersion) { - def writeToInputTable(): Unit = { - spark.range(1) - .write.format("delta") - .mode("append") - .save(inputDir.getCanonicalPath) - } - - writeToInputTable() + test("Verify that switching offset log format versions has no effect - v1 to v2") { + withTempDir { inputDir => + withTempDir { outputDir => + withTempDir { checkpointDir => + // Write initial data to input directory + spark.range(10).toDF("value").write.mode("overwrite").parquet(inputDir.getCanonicalPath) + + // First query with offset log format version 1 + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1") { + val query = spark.readStream + .schema("value long") + .parquet(inputDir.getCanonicalPath) + .writeStream + .format("parquet") + .outputMode("append") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + query.processAllAvailable() + query.stop() + } - val query = spark.readStream - .format("delta") - .load(inputDir.getCanonicalPath) - .writeStream - .format("delta") - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .start(outputDir.getCanonicalPath) + // Write more data for second batch + spark.range(10, 20).toDF("value").write.mode("append").parquet(inputDir.getCanonicalPath) + + // Second query with offset log format version 2 (should be ignored) + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + val query = spark.readStream + .schema("value long") + .parquet(inputDir.getCanonicalPath) + .writeStream + .format("parquet") + .outputMode("append") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + query.processAllAvailable() + query.stop() + } - query.processAllAvailable() - query.stop() - query.awaitTermination() + // Check OffsetSeqLog to verify both batches use version 1 + val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets") + val offsetBatches = offsetSeqLog.get(Some(0), Some(1)) + assert(offsetBatches.length == 2) + assert(offsetBatches(0)._1 == 0) + assert(offsetBatches(0)._2.metadataOpt.map(_.version).getOrElse(1) == 1) + assert(offsetBatches(1)._1 == 1) + assert(offsetBatches(1)._2.metadataOpt.map(_.version).getOrElse(1) == 1) + } + } } } - test("Verify that switching offset log format versions has no effect - v1 to v2") { - withTempPaths(3) { dirs => - val (inputDir, outputDir, checkpointDir) = (dirs(0), dirs(1), dirs(2)) - // Run the query twice to ensure that the offset log format version is persisted - // and reused across multiple runs. - runQuery(inputDir, outputDir, checkpointDir, "1") - runQuery(inputDir, outputDir, checkpointDir, "2") + test("Verify that switching offset log format versions has no effect - v2 to v1") { + withTempDir { inputDir => + withTempDir { outputDir => + withTempDir { checkpointDir => + // Write initial data to input directory + spark.range(10).toDF("value").write.mode("overwrite").parquet(inputDir.getCanonicalPath) + + // First query with offset log format version 2 + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + val query = spark.readStream + .schema("value long") + .parquet(inputDir.getCanonicalPath) + .writeStream + .format("parquet") + .outputMode("append") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + query.processAllAvailable() + query.stop() + } - // Check OffsetSeqLog to verify existence of 2 batches - val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets") - val offsetBatches = offsetSeqLog.get(Some(0), Some(1)) - assert(offsetBatches.length == 2) - assert(offsetBatches(0)._1 == 0) - assert(offsetBatches(0)._2.version == 1) - assert(offsetBatches(1)._1 == 1) - assert(offsetBatches(1)._2.version == 1) - } - } + // Write more data for second batch + spark.range(10, 20).toDF("value").write.mode("append").parquet(inputDir.getCanonicalPath) + + // Second query with offset log format version 1 (should be ignored) + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1") { + val query = spark.readStream + .schema("value long") + .parquet(inputDir.getCanonicalPath) + .writeStream + .format("parquet") + .outputMode("append") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + query.processAllAvailable() + query.stop() + } - test("Verify that switching offset log format versions has no effect - v2 to v1") { - withTempPaths(3) { dirs => - val (inputDir, outputDir, checkpointDir) = (dirs(0), dirs(1), dirs(2)) - // Run the query twice to ensure that the offset log format version is persisted - // and reused across multiple runs. - runQuery(inputDir, outputDir, checkpointDir, "2") - runQuery(inputDir, outputDir, checkpointDir, "1") - - // Check OffsetSeqLog to verify existence of 2 batches - val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets") - val offsetBatches = offsetSeqLog.get(Some(0), Some(1)) - assert(offsetBatches.length == 2) - assert(offsetBatches(0)._1 == 0) - assert(offsetBatches(0)._2.version == 2) - assert(offsetBatches(1)._1 == 1) - assert(offsetBatches(1)._2.version == 2) + // Check OffsetSeqLog to verify both batches use version 2 + val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets") + val offsetBatches = offsetSeqLog.get(Some(0), Some(1)) + assert(offsetBatches.length == 2) + assert(offsetBatches(0)._1 == 0) + assert(offsetBatches(0)._2.metadataOpt.map(_.version).getOrElse(1) == 2) + assert(offsetBatches(1)._1 == 1) + assert(offsetBatches(1)._2.metadataOpt.map(_.version).getOrElse(1) == 2) + } + } } } } From 84c2e846b44970daecba093330167d12c982a2d6 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Thu, 4 Dec 2025 10:23:28 -0800 Subject: [PATCH 5/6] removing unnecessary sqlconf --- .../sql/execution/streaming/checkpointing/OffsetSeq.scala | 2 +- .../spark/sql/execution/streaming/OffsetSeqLogSuite.scala | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala index b0963f39cd59..b32adc17d7d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala @@ -169,7 +169,7 @@ object OffsetSeqMetadata extends Logging { STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC, STATE_STORE_ROCKSDB_FORMAT_VERSION, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION, PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT, - STATE_STORE_ROW_CHECKSUM_ENABLED, STREAMING_OFFSET_LOG_FORMAT_VERSION + STATE_STORE_ROW_CHECKSUM_ENABLED ) /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 46493addba78..fed74b9cebd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -273,10 +273,6 @@ class OffsetSeqLogSuite extends SharedSparkSession { val metadata = offsetSeq.metadataOpt.get assert(metadata.version === 2, s"Expected version 2 but got ${metadata.version}") - // Verify the config is persisted - assert(metadata.conf.contains(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key)) - assert(metadata.conf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key) === "2") - // Verify OffsetMap uses string keys ("0", "1", etc.) val offsetMap = offsetSeq.asInstanceOf[OffsetMap] assert(offsetMap.offsetsMap.keys.forall(_.forall(_.isDigit)), From 46b513baedf5e904a7ca941461566c9de26e6a0b Mon Sep 17 00:00:00 2001 From: ericm-db Date: Thu, 4 Dec 2025 12:34:09 -0800 Subject: [PATCH 6/6] feedback --- .../streaming/OffsetSeqLogSuite.scala | 137 ++++++++++-------- 1 file changed, 79 insertions(+), 58 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index fed74b9cebd7..dc1d368e2959 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils class OffsetSeqLogSuite extends SharedSparkSession { + import testImplicits._ /** test string offset type */ case class StringOffset(override val json: String) extends Offset @@ -239,11 +240,16 @@ class OffsetSeqLogSuite extends SharedSparkSession { } test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - new query with VERSION_2") { - import testImplicits._ withTempDir { checkpointDir => withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { - val inputData = MemoryStream[Int] - val query = inputData.toDF() + // Create two MemoryStream sources + val inputData1 = MemoryStream[Int] + val inputData2 = MemoryStream[String] + + // Create a query that unions both sources + val df1 = inputData1.toDF().select($"value".cast("string")) + val df2 = inputData2.toDF() + val query = df1.union(df2) .writeStream .format("memory") .queryName("offsetlog_v2_test") @@ -252,9 +258,11 @@ class OffsetSeqLogSuite extends SharedSparkSession { try { // Add data and process batches - inputData.addData(1, 2, 3) + inputData1.addData(1, 2, 3) + inputData2.addData("a", "b", "c") query.processAllAvailable() - inputData.addData(4, 5) + inputData1.addData(4, 5) + inputData2.addData("d", "e") query.processAllAvailable() // Read the offset log from checkpoint @@ -277,6 +285,10 @@ class OffsetSeqLogSuite extends SharedSparkSession { val offsetMap = offsetSeq.asInstanceOf[OffsetMap] assert(offsetMap.offsetsMap.keys.forall(_.forall(_.isDigit)), "OffsetMap keys should be string representations of ordinals (e.g., '0', '1')") + // Verify the specific keys for both MemoryStream sources are "0" and "1" + assert(offsetMap.offsetsMap.keySet === Set("0", "1"), + s"Expected two sources with keys '0' and '1', but got keys: " + + s"${offsetMap.offsetsMap.keySet}") } finally { query.stop() } @@ -285,7 +297,6 @@ class OffsetSeqLogSuite extends SharedSparkSession { } test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - default VERSION_1") { - import testImplicits._ withTempDir { checkpointDir => // Don't set the config, should default to VERSION_1 val inputData = MemoryStream[Int] @@ -322,59 +333,69 @@ class OffsetSeqLogSuite extends SharedSparkSession { } } - test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - checkpoint wins on restart") { - import testImplicits._ - withTempDir { checkpointDir => - withTempDir { outputDir => - val inputData = MemoryStream[Int] - - // Start query with VERSION_1 (default) - val query1 = inputData.toDF() - .writeStream - .format("parquet") - .option("path", outputDir.getAbsolutePath) - .option("checkpointLocation", checkpointDir.getAbsolutePath) - .start() - - inputData.addData(1, 2) - query1.processAllAvailable() - query1.stop() + Seq( + (1, 2, classOf[OffsetSeq]), + (2, 1, classOf[OffsetMap]) + ).foreach { case (startingVersion, restartVersion, expectedClass) => + test(s"STREAMING_OFFSET_LOG_FORMAT_VERSION config - checkpoint wins on restart " + + s"(v$startingVersion to v$restartVersion)") { + withTempDir { checkpointDir => + withTempDir { outputDir => + val inputData = MemoryStream[Int] + + // Start query with initial version + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> startingVersion.toString) { + val query1 = inputData.toDF() + .writeStream + .format("parquet") + .option("path", outputDir.getAbsolutePath) + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + inputData.addData(1, 2) + query1.processAllAvailable() + query1.stop() + } - // Verify VERSION_1 was used in the initial checkpoint - val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets") - val batch1 = offsetLog.getLatest() - assert(batch1.isDefined) - assert(batch1.get._2.isInstanceOf[OffsetSeq], "Initial checkpoint should use VERSION_1") - assert(batch1.get._2.metadataOpt.get.version === 1) - - // Restart query with VERSION_2 config - should still use VERSION_1 from checkpoint - withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { - val query2 = inputData.toDF() - .writeStream - .format("parquet") - .option("path", outputDir.getAbsolutePath) - .option("checkpointLocation", checkpointDir.getAbsolutePath) - .start() - - try { - inputData.addData(3, 4) - query2.processAllAvailable() - - // Read the latest offset log entry - val latestBatch = offsetLog.getLatest() - assert(latestBatch.isDefined) - - val (batchId, offsetSeq) = latestBatch.get - - // Should still be VERSION_1 because checkpoint was created with VERSION_1 - assert(offsetSeq.isInstanceOf[OffsetSeq], - "Query should continue using VERSION_1 format from checkpoint despite config change") - - val metadata = offsetSeq.metadataOpt.get - assert(metadata.version === 1, - "Query should continue using version 1 from checkpoint despite config being set to 2") - } finally { - query2.stop() + // Verify initial version was used in the checkpoint + val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets") + val batch1 = offsetLog.getLatest() + assert(batch1.isDefined) + assert(batch1.get._2.getClass === expectedClass, + s"Initial checkpoint should use VERSION_$startingVersion") + assert(batch1.get._2.metadataOpt.get.version === startingVersion) + + // Restart query with different version config - should still use initial version + withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> restartVersion.toString) { + val query2 = inputData.toDF() + .writeStream + .format("parquet") + .option("path", outputDir.getAbsolutePath) + .option("checkpointLocation", checkpointDir.getAbsolutePath) + .start() + + try { + inputData.addData(3, 4) + query2.processAllAvailable() + + // Read the latest offset log entry + val latestBatch = offsetLog.getLatest() + assert(latestBatch.isDefined) + + val (batchId, offsetSeq) = latestBatch.get + + // Should still use initial version because checkpoint was created with it + assert(offsetSeq.getClass === expectedClass, + s"Query should continue using VERSION_$startingVersion format from checkpoint " + + s"despite config change") + + val metadata = offsetSeq.metadataOpt.get + assert(metadata.version === startingVersion, + s"Query should continue using version $startingVersion from checkpoint " + + s"despite config being set to $restartVersion") + } finally { + query2.stop() + } } } }