Skip to content

Conversation

@ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Dec 3, 2025

What changes were proposed in this pull request?

Add the SQLConf that allows us to use the OffsetMap format instead of OffsetSeq.

Why are the changes needed?

We need to allow users to set the offset log format version which will enable features like source naming in the future.

Does this PR introduce any user-facing change?

What changes?

This PR adds a new public configuration spark.sql.streaming.offsetLog.formatVersion that allows users to control the offset log format version
used in streaming query checkpoints.

Previous behavior:

Streaming queries could only use VERSION_1 offset log format (OffsetSeq), which stores offsets as an ordered sequence. Although VERSION_2 format
(OffsetMap) was implemented internally, there was no way for users to enable it.

New behavior:

Users can now set spark.sql.streaming.offsetLog.formatVersion to choose between two offset log formats:

  • Version 1 (default): Uses sequence-based OffsetSeq format (existing behavior)
  • Version 2: Uses map-based OffsetMap format with string-based source IDs ("0", "1", "2", etc.)

Example:

  // Create a new streaming query with VERSION_2 format
  spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")

  val query = spark.readStream
    .format("rate")
    .load()
    .writeStream
    .format("parquet")
    .option("checkpointLocation", "/path/to/checkpoint")
    .start()

The checkpoint will use OffsetMap format with source IDs as map keys instead of relying on source ordering.

Important notes:

  • Default behavior unchanged: The configuration defaults to 1, so existing queries are unaffected
  • New queries only: This config only affects new streaming queries; existing queries continue using the format from their checkpoint
  • No migration path: Checkpoints cannot be migrated between versions; changing the config requires starting a fresh query

Comparison:

This is a user-facing change compared to released Spark versions as it introduces a new public configuration that was not previously available.

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@ericm-db ericm-db changed the title [SPARK-54583] Add SQLConf to enable use of OffsetMap [SPARK-54583][SS] Add SQLConf to enable use of OffsetMap Dec 3, 2025
}

test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - default VERSION_1") {
import testImplicits._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the import testImplicits._ be at the test class level so it does not need to be duplicated for the tests?

}
}

test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - checkpoint wins on restart") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also test the inverse? Start with v2 and ensure it remains v2 ?

confInOffsetLog.key -> sessionConf.get(confInSession.key)
}.toMap
OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ confsFromRebind)
val version = sessionConf.get(STREAMING_OFFSET_LOG_FORMAT_VERSION.key).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the version here refer to the OffsetSeqMetadata version or the OffsetSeqBase version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OffsetSeqBase - we don't have multiple metadata versions here.

@ericm-db ericm-db closed this Dec 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants