Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2997,6 +2997,19 @@ object SQLConf {
.checkValue(v => Set(1).contains(v), "Valid version is 1")
.createWithDefault(1)

val STREAMING_OFFSET_LOG_FORMAT_VERSION =
buildConf("spark.sql.streaming.offsetLog.formatVersion")
.doc("Offset log format version used in streaming query checkpoints. " +
"Version 1 uses sequence-based OffsetSeq format, version 2 uses map-based OffsetMap " +
"format which provides better flexibility for source management. " +
"Offset log format versions are incompatible, so this configuration only affects new " +
"streaming queries. Existing queries will continue using the format version from their " +
"checkpoint.")
.version("4.1.0")
.intConf
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
.createWithDefault(1)

val UNSUPPORTED_OPERATION_CHECK_ENABLED =
buildConf("spark.sql.streaming.unsupportedOperationCheck")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ object OffsetSeqMetadata extends Logging {
STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
STATE_STORE_ROCKSDB_FORMAT_VERSION, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT,
STATE_STORE_ROW_CHECKSUM_ENABLED
STATE_STORE_ROW_CHECKSUM_ENABLED, STREAMING_OFFSET_LOG_FORMAT_VERSION
)

/**
Expand Down Expand Up @@ -244,7 +244,8 @@ object OffsetSeqMetadata extends Logging {
case (confInSession, confInOffsetLog) =>
confInOffsetLog.key -> sessionConf.get(confInSession.key)
}.toMap
OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ confsFromRebind)
val version = sessionConf.get(STREAMING_OFFSET_LOG_FORMAT_VERSION.key).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the version here refer to the OffsetSeqMetadata version or the OffsetSeqBase version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OffsetSeqBase - we don't have multiple metadata versions here.

OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ confsFromRebind, version)
}

/** Set the SparkSession configuration with the values in the metadata */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
}

object OffsetSeqLog {
private[streaming] val VERSION_1 = 1
private[streaming] val VERSION_2 = 2
val VERSION_1 = 1
val VERSION_2 = 2
private[streaming] val VERSION = VERSION_1 // Default version for backward compatibility
private[streaming] val MAX_VERSION = VERSION_2
private[streaming] val SERIALIZED_VOID_OFFSET = "-"
val MAX_VERSION = VERSION_2
private[spark] val SERIALIZED_VOID_OFFSET = "-"

private[checkpointing] def parseOffset(value: String): OffsetV2 = value match {
case SERIALIZED_VOID_OFFSET => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqMetadata}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
Expand Down Expand Up @@ -77,6 +77,7 @@ class MicroBatchExecution(
progressReporter,
-1,
sparkSession,
offsetLogFormatVersionOpt = None,
previousContext = None)

override def getLatestExecutionContext(): StreamExecutionContext = latestExecutionContext
Expand Down Expand Up @@ -417,11 +418,29 @@ class MicroBatchExecution(
sourceIdMap
)

// Read the offset log format version from the last written offset log entry. If no entries
// are found, use the set/default value from the config.
val offsetLogFormatVersion = if (latestStartedBatch.isDefined) {
latestStartedBatch.get._2.version
} else {
// If no offset log entries are found, assert that the query does not have any committed
// batches to be extra safe.
assert(lastCommittedBatchId == -1L)
sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION)
}

// Set additional configurations depending on the offset log format version
setSparkSessionConfigsForOffsetLog(sparkSessionForStream, offsetLogFormatVersion)

val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink,
progressReporter, -1, sparkSession, None)
progressReporter, -1, sparkSession, Some(offsetLogFormatVersion), None)

execCtx.offsetSeqMetadata =
execCtx.offsetSeqMetadata = if (offsetLogFormatVersion == OffsetSeqLog.VERSION_2) {
OffsetSeqMetadataV2(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
} else {
OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
}

setLatestExecutionContext(execCtx)

populateStartOffsets(execCtx, sparkSessionForStream)
Expand Down Expand Up @@ -719,6 +738,17 @@ class MicroBatchExecution(
}
}

/**
* Set the SparkSession configurations for the offset log format version.
*/
private def setSparkSessionConfigsForOffsetLog(
sparkSessionForStream: SparkSession,
offsetLogFormatVersion: Int): Unit = {
// Set the offset log format version in the sparkSessionForStream conf
sparkSessionForStream.conf.set(
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
}

/**
* Returns true if there is any new data available to be processed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class MicroBatchExecutionContext(
progressReporter: ProgressReporter,
var _batchId: Long,
sparkSession: SparkSession,
val offsetLogFormatVersionOpt: Option[Int],
var previousContext: Option[MicroBatchExecutionContext])
extends StreamExecutionContext(
id,
Expand Down Expand Up @@ -190,6 +191,7 @@ class MicroBatchExecutionContext(
progressReporter,
batchId + 1,
sparkSession,
offsetLogFormatVersionOpt,
Some(this))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata}
import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, SerializedOffset}
import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, MemoryStream, SerializedOffset}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -237,4 +237,151 @@ class OffsetSeqLogSuite extends SharedSparkSession {
assert(metadata.batchWatermarkMs === 0)
assert(metadata.batchTimestampMs === 1758651405232L)
}

test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - new query with VERSION_2") {
import testImplicits._
withTempDir { checkpointDir =>
withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
val inputData = MemoryStream[Int]
val query = inputData.toDF()
.writeStream
.format("memory")
.queryName("offsetlog_v2_test")
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start()

try {
// Add data and process batches
inputData.addData(1, 2, 3)
query.processAllAvailable()
inputData.addData(4, 5)
query.processAllAvailable()

// Read the offset log from checkpoint
val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets")
val latestBatch = offsetLog.getLatest()
assert(latestBatch.isDefined, "Offset log should have at least one entry")

val (batchId, offsetSeq) = latestBatch.get

// Verify it's OffsetMap (VERSION_2)
assert(offsetSeq.isInstanceOf[OffsetMap],
s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}")

// Verify metadata version is 2
assert(offsetSeq.metadataOpt.isDefined)
val metadata = offsetSeq.metadataOpt.get
assert(metadata.version === 2, s"Expected version 2 but got ${metadata.version}")

// Verify the config is persisted
assert(metadata.conf.contains(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key))
assert(metadata.conf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key) === "2")

// Verify OffsetMap uses string keys ("0", "1", etc.)
val offsetMap = offsetSeq.asInstanceOf[OffsetMap]
assert(offsetMap.offsetsMap.keys.forall(_.forall(_.isDigit)),
"OffsetMap keys should be string representations of ordinals (e.g., '0', '1')")
} finally {
query.stop()
}
}
}
}

test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - default VERSION_1") {
import testImplicits._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the import testImplicits._ be at the test class level so it does not need to be duplicated for the tests?

withTempDir { checkpointDir =>
// Don't set the config, should default to VERSION_1
val inputData = MemoryStream[Int]
val query = inputData.toDF()
.writeStream
.format("memory")
.queryName("offsetlog_v1_test")
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start()

try {
// Add data and process batches
inputData.addData(1, 2, 3)
query.processAllAvailable()

// Read the offset log from checkpoint
val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets")
val latestBatch = offsetLog.getLatest()
assert(latestBatch.isDefined, "Offset log should have at least one entry")

val (batchId, offsetSeq) = latestBatch.get

// Verify it's OffsetSeq (VERSION_1)
assert(offsetSeq.isInstanceOf[OffsetSeq],
s"Expected OffsetSeq but got ${offsetSeq.getClass.getSimpleName}")

// Verify metadata version is 1
assert(offsetSeq.metadataOpt.isDefined)
val metadata = offsetSeq.metadataOpt.get
assert(metadata.version === 1, s"Expected version 1 but got ${metadata.version}")
} finally {
query.stop()
}
}
}

test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - checkpoint wins on restart") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also test the inverse? Start with v2 and ensure it remains v2 ?

import testImplicits._
withTempDir { checkpointDir =>
withTempDir { outputDir =>
val inputData = MemoryStream[Int]

// Start query with VERSION_1 (default)
val query1 = inputData.toDF()
.writeStream
.format("parquet")
.option("path", outputDir.getAbsolutePath)
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start()

inputData.addData(1, 2)
query1.processAllAvailable()
query1.stop()

// Verify VERSION_1 was used in the initial checkpoint
val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets")
val batch1 = offsetLog.getLatest()
assert(batch1.isDefined)
assert(batch1.get._2.isInstanceOf[OffsetSeq], "Initial checkpoint should use VERSION_1")
assert(batch1.get._2.metadataOpt.get.version === 1)

// Restart query with VERSION_2 config - should still use VERSION_1 from checkpoint
withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
val query2 = inputData.toDF()
.writeStream
.format("parquet")
.option("path", outputDir.getAbsolutePath)
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start()

try {
inputData.addData(3, 4)
query2.processAllAvailable()

// Read the latest offset log entry
val latestBatch = offsetLog.getLatest()
assert(latestBatch.isDefined)

val (batchId, offsetSeq) = latestBatch.get

// Should still be VERSION_1 because checkpoint was created with VERSION_1
assert(offsetSeq.isInstanceOf[OffsetSeq],
"Query should continue using VERSION_1 format from checkpoint despite config change")

val metadata = offsetSeq.metadataOpt.get
assert(metadata.version === 1,
"Query should continue using version 1 from checkpoint despite config being set to 2")
} finally {
query2.stop()
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.classic.Dataset.ofRows
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source}
import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog
import org.apache.spark.sql.execution.streaming.runtime._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
Expand Down Expand Up @@ -873,4 +874,72 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
query.stop()
}
}

private def runQuery(
inputDir: File,
outputDir: File,
checkpointDir: File,
formatVersion: String): Unit = {
withSQLConf(
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> formatVersion) {
def writeToInputTable(): Unit = {
spark.range(1)
.write.format("delta")
.mode("append")
.save(inputDir.getCanonicalPath)
}

writeToInputTable()

val query = spark.readStream
.format("delta")
.load(inputDir.getCanonicalPath)
.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)

query.processAllAvailable()
query.stop()
query.awaitTermination()
}
}

test("Verify that switching offset log format versions has no effect - v1 to v2") {
withTempPaths(3) { dirs =>
val (inputDir, outputDir, checkpointDir) = (dirs(0), dirs(1), dirs(2))
// Run the query twice to ensure that the offset log format version is persisted
// and reused across multiple runs.
runQuery(inputDir, outputDir, checkpointDir, "1")
runQuery(inputDir, outputDir, checkpointDir, "2")

// Check OffsetSeqLog to verify existence of 2 batches
val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets")
val offsetBatches = offsetSeqLog.get(Some(0), Some(1))
assert(offsetBatches.length == 2)
assert(offsetBatches(0)._1 == 0)
assert(offsetBatches(0)._2.version == 1)
assert(offsetBatches(1)._1 == 1)
assert(offsetBatches(1)._2.version == 1)
}
}

test("Verify that switching offset log format versions has no effect - v2 to v1") {
withTempPaths(3) { dirs =>
val (inputDir, outputDir, checkpointDir) = (dirs(0), dirs(1), dirs(2))
// Run the query twice to ensure that the offset log format version is persisted
// and reused across multiple runs.
runQuery(inputDir, outputDir, checkpointDir, "2")
runQuery(inputDir, outputDir, checkpointDir, "1")

// Check OffsetSeqLog to verify existence of 2 batches
val offsetSeqLog = new OffsetSeqLog(spark, checkpointDir.getCanonicalPath + "/offsets")
val offsetBatches = offsetSeqLog.get(Some(0), Some(1))
assert(offsetBatches.length == 2)
assert(offsetBatches(0)._1 == 0)
assert(offsetBatches(0)._2.version == 2)
assert(offsetBatches(1)._1 == 1)
assert(offsetBatches(1)._2.version == 2)
}
}
}