Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ object OffsetSeq {
* @param batchTimestampMs: The current batch processing timestamp.
* Time unit: milliseconds
*/
case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
case class OffsetSeqMetadata(
var batchWatermarkMs: Long = 0,

@tdas tdas Mar 13, 2017

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why we have this as var? Can they be made into vals.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to vals.

var batchTimestampMs: Long = 0,
var numShufflePartitions: Int = 0) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use conf: Map[String, String] here because we probably will add more confs to this class in future.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kunalkhamar, in case you would update OffsetSeq's log version number, the work being done in #17070 might be helpful

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing Changed to a map

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lw-lin Hi Liwei!
Thanks for letting me know, we will not be updating the log version number since backward and forward compatibility is preserved by this patch.

def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Curre
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

Expand Down Expand Up @@ -117,7 +118,8 @@ class StreamExecution(
}

/** Metadata associated with the offset seq of a batch in the query. */
protected var offsetSeqMetadata = OffsetSeqMetadata()
protected var offsetSeqMetadata =
OffsetSeqMetadata(numShufflePartitions = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS))

override val id: UUID = UUID.fromString(streamMetadata.id)

Expand Down Expand Up @@ -380,7 +382,20 @@ class StreamExecution(
logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
val numShufflePartitionsFromConf = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
offsetSeqMetadata = nextOffsets
.metadata
.getOrElse(OffsetSeqMetadata(0, 0, numShufflePartitionsFromConf))

/*
* For backwards compatibility, if # partitions was not recorded in the offset log, then
* ensure it is non-zero. The new value is picked up from the conf.
*/

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for inline comment with the code, use // and not /* .. */.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

if (offsetSeqMetadata.numShufflePartitions == 0) {
offsetSeqMetadata.numShufflePartitions = numShufflePartitionsFromConf
logDebug("Number of shuffle partitions from previous run not found in checkpoint data. " +
s"Using the value from the conf, $numShufflePartitionsFromConf partitions.")
}
logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")

Expand Down Expand Up @@ -542,9 +557,16 @@ class StreamExecution(
cd.dataType, cd.timeZoneId)
}

// Fork a cloned session and set confs to disallow change in number of partitions
val sparkSessionForCurrentBatch = sparkSession.cloneSession()
sparkSessionForCurrentBatch.conf.set(
SQLConf.SHUFFLE_PARTITIONS.key,
offsetSeqMetadata.numShufflePartitions.toString)
sparkSessionForCurrentBatch.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")

reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSession,
sparkSessionForCurrentBatch,
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
Expand All @@ -554,7 +576,8 @@ class StreamExecution(
}

val nextBatch =
new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
new Dataset(sparkSessionForCurrentBatch, lastExecution,
RowEncoder(lastExecution.analyzed.schema))

reportTimeTaken("addBatch") {
sink.addBatch(currentBatchId, nextBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
case class StringOffset(override val json: String) extends Offset

test("OffsetSeqMetadata - deserialization") {
assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
assert(OffsetSeqMetadata(0, 0, 0) === OffsetSeqMetadata("""{}"""))
assert(OffsetSeqMetadata(1, 0, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
assert(OffsetSeqMetadata(0, 2, 0) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
assert(OffsetSeqMetadata(0, 0, 2) === OffsetSeqMetadata("""{"numShufflePartitions":2}"""))
assert(
OffsetSeqMetadata(1, 2) ===
OffsetSeqMetadata(1, 2, 0) ===
OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
assert(
OffsetSeqMetadata(1, 0, 3) ===
OffsetSeqMetadata("""{"batchWatermarkMs":1,"numShufflePartitions":3}"""))
assert(
OffsetSeqMetadata(0, 2, 3) ===
OffsetSeqMetadata("""{"batchTimestampMs":2,"numShufflePartitions":3}"""))
assert(
OffsetSeqMetadata(1, 2, 3) === OffsetSeqMetadata(
"""{"batchWatermarkMs":1,"batchTimestampMs":2,"numShufflePartitions":3}"""))
}

test("OffsetSeqLog - serialization - deserialization") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,20 @@ class StreamSuite extends StreamTest {
query.stop()
assert(query.exception.isEmpty)
}

test("SPARK-19873: streaming agg with change in number of partitions") {
val inputData = MemoryStream[(Int, Int)]
val agg = inputData.toDS().groupBy("_1").count()

testStream(agg, OutputMode.Complete())(
AddData(inputData, (1, 1), (2, 1), (1, 2)),
StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "2")),
CheckAnswer((1, 2), (2, 1)),
StopStream,
AddData(inputData, (3, 1), (2, 2), (1, 1)),
StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "5")),
CheckAnswer((1, 3), (2, 2), (3, 1)))
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
AssertOnQuery(q => {
q.exception.get.startOffset ===
q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString &&
q.committedOffsets.toOffsetSeq(
Seq(inputData),
OffsetSeqMetadata(0, 0, spark.conf.get(SQLConf.SHUFFLE_PARTITIONS))).toString &&
q.exception.get.endOffset ===
q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString
q.availableOffsets.toOffsetSeq(
Seq(inputData),
OffsetSeqMetadata(0, 0, spark.conf.get(SQLConf.SHUFFLE_PARTITIONS))).toString
}, "incorrect start offset or end offset on exception")
)
}
Expand Down