From 12f5fd30229e441355a05290ed124263c1429acc Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Wed, 8 Mar 2017 13:29:02 -0800 Subject: [PATCH 01/14] Record num shuffle partitions in offset log and enforce in next batch. --- .../sql/execution/streaming/OffsetSeq.scala | 5 ++- .../execution/streaming/StreamExecution.scala | 31 ++++++++++++++++--- .../streaming/OffsetSeqLogSuite.scala | 18 ++++++++--- .../spark/sql/streaming/StreamSuite.scala | 28 +++++++++++++++++ .../sql/streaming/StreamingQuerySuite.scala | 8 +++-- 5 files changed, 79 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index e5a1997d6b80..ac9a3ba447e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -71,7 +71,10 @@ object OffsetSeq { * @param batchTimestampMs: The current batch processing timestamp. * Time unit: milliseconds */ -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) { +case class OffsetSeqMetadata( + var batchWatermarkMs: Long = 0, + var batchTimestampMs: Long = 0, + var numShufflePartitions: Int = 0) { def json: String = Serialization.write(this)(OffsetSeqMetadata.format) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 70912d13ae45..63d72b74c979 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Curre import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -117,7 +118,8 @@ class StreamExecution( } /** Metadata associated with the offset seq of a batch in the query. */ - protected var offsetSeqMetadata = OffsetSeqMetadata() + protected var offsetSeqMetadata = OffsetSeqMetadata( + 0, 0, sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)) override val id: UUID = UUID.fromString(streamMetadata.id) @@ -380,7 +382,20 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata()) + val numShufflePartitionsFromConf = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS) + offsetSeqMetadata = nextOffsets + .metadata + .getOrElse(OffsetSeqMetadata(0, 0, numShufflePartitionsFromConf)) + + /* + * For backwards compatibility, if # partitions was not recorded in the offset log, then + * ensure it is non-zero. The new value is picked up from the conf. + */ + if (offsetSeqMetadata.numShufflePartitions == 0) { + offsetSeqMetadata.numShufflePartitions = numShufflePartitionsFromConf + logDebug("Number of shuffle partitions from previous run not found in checkpoint data. " + + s"Using the value from the conf, $numShufflePartitionsFromConf partitions.") + } logDebug(s"Found possibly unprocessed offsets $availableOffsets " + s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") @@ -542,9 +557,16 @@ class StreamExecution( cd.dataType, cd.timeZoneId) } + // Fork a cloned session and set confs to disallow change in number of partitions + val sparkSessionForCurrentBatch = sparkSession.cloneSession() + sparkSessionForCurrentBatch.conf.set( + SQLConf.SHUFFLE_PARTITIONS.key, + offsetSeqMetadata.numShufflePartitions.toString) + sparkSessionForCurrentBatch.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( - sparkSession, + sparkSessionForCurrentBatch, triggerLogicalPlan, outputMode, checkpointFile("state"), @@ -554,7 +576,8 @@ class StreamExecution( } val nextBatch = - new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema)) + new Dataset(sparkSessionForCurrentBatch, lastExecution, + RowEncoder(lastExecution.analyzed.schema)) reportTimeTaken("addBatch") { sink.addBatch(currentBatchId, nextBatch) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 5ae8b2484d2e..c672ebc6f94a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -28,12 +28,22 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { case class StringOffset(override val json: String) extends Offset test("OffsetSeqMetadata - deserialization") { - assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}""")) - assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) - assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) + assert(OffsetSeqMetadata(0, 0, 0) === OffsetSeqMetadata("""{}""")) + assert(OffsetSeqMetadata(1, 0, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) + assert(OffsetSeqMetadata(0, 2, 0) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) + assert(OffsetSeqMetadata(0, 0, 2) === OffsetSeqMetadata("""{"numShufflePartitions":2}""")) assert( - OffsetSeqMetadata(1, 2) === + OffsetSeqMetadata(1, 2, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + assert( + OffsetSeqMetadata(1, 0, 3) === + OffsetSeqMetadata("""{"batchWatermarkMs":1,"numShufflePartitions":3}""")) + assert( + OffsetSeqMetadata(0, 2, 3) === + OffsetSeqMetadata("""{"batchTimestampMs":2,"numShufflePartitions":3}""")) + assert( + OffsetSeqMetadata(1, 2, 3) === OffsetSeqMetadata( + """{"batchWatermarkMs":1,"batchTimestampMs":2,"numShufflePartitions":3}""")) } test("OffsetSeqLog - serialization - deserialization") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6dfcd8baba20..ec3fabc0cc67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -354,6 +354,34 @@ class StreamSuite extends StreamTest { } } + test("streaming agg without change in number of partitions") { + val inputData = MemoryStream[(Int, Int)] + val agg = inputData.toDS().groupBy("_1").count() + + testStream(agg, OutputMode.Complete())( + AddData(inputData, (1, 1), (2, 1), (1, 2)), + StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "2")), + CheckAnswer((1, 2), (2, 1)), + StopStream, + AddData(inputData, (3, 1), (2, 2), (1, 1)), + StartStream(), + CheckAnswer((1, 3), (2, 2), (3, 1))) + } + + test("streaming agg with change in number of partitions") { + val inputData = MemoryStream[(Int, Int)] + val agg = inputData.toDS().groupBy("_1").count() + + testStream(agg, OutputMode.Complete())( + AddData(inputData, (1, 1), (2, 1), (1, 2)), + StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "2")), + CheckAnswer((1, 2), (2, 1)), + StopStream, + AddData(inputData, (3, 1), (2, 2), (1, 1)), + StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "5")), + CheckAnswer((1, 3), (2, 2), (3, 1))) + } + test("handle IOException when the streaming thread is interrupted (pre Hadoop 2.8)") { // This test uses a fake source to throw the same IOException as pre Hadoop 2.8 when the // streaming thread is interrupted. We should handle it properly by not failing the query. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index a0a2b2b4c9b3..6097da2476a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -151,9 +151,13 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery(q => { q.exception.get.startOffset === - q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString && + q.committedOffsets.toOffsetSeq( + Seq(inputData), + OffsetSeqMetadata(0, 0, spark.conf.get(SQLConf.SHUFFLE_PARTITIONS))).toString && q.exception.get.endOffset === - q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString + q.availableOffsets.toOffsetSeq( + Seq(inputData), + OffsetSeqMetadata(0, 0, spark.conf.get(SQLConf.SHUFFLE_PARTITIONS))).toString }, "incorrect start offset or end offset on exception") ) } From 9ff4d2956bbcf6ca65888cc616efac8ffeea733b Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Wed, 8 Mar 2017 15:07:06 -0800 Subject: [PATCH 02/14] Clean up. --- .../execution/streaming/StreamExecution.scala | 4 +- .../spark/sql/streaming/StreamSuite.scala | 42 +++++++------------ 2 files changed, 16 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 63d72b74c979..d906c914a7eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -118,8 +118,8 @@ class StreamExecution( } /** Metadata associated with the offset seq of a batch in the query. */ - protected var offsetSeqMetadata = OffsetSeqMetadata( - 0, 0, sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)) + protected var offsetSeqMetadata = + OffsetSeqMetadata(numShufflePartitions = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)) override val id: UUID = UUID.fromString(streamMetadata.id) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index ec3fabc0cc67..1b8a7bfb2893 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -354,34 +354,6 @@ class StreamSuite extends StreamTest { } } - test("streaming agg without change in number of partitions") { - val inputData = MemoryStream[(Int, Int)] - val agg = inputData.toDS().groupBy("_1").count() - - testStream(agg, OutputMode.Complete())( - AddData(inputData, (1, 1), (2, 1), (1, 2)), - StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "2")), - CheckAnswer((1, 2), (2, 1)), - StopStream, - AddData(inputData, (3, 1), (2, 2), (1, 1)), - StartStream(), - CheckAnswer((1, 3), (2, 2), (3, 1))) - } - - test("streaming agg with change in number of partitions") { - val inputData = MemoryStream[(Int, Int)] - val agg = inputData.toDS().groupBy("_1").count() - - testStream(agg, OutputMode.Complete())( - AddData(inputData, (1, 1), (2, 1), (1, 2)), - StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "2")), - CheckAnswer((1, 2), (2, 1)), - StopStream, - AddData(inputData, (3, 1), (2, 2), (1, 1)), - StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "5")), - CheckAnswer((1, 3), (2, 2), (3, 1))) - } - test("handle IOException when the streaming thread is interrupted (pre Hadoop 2.8)") { // This test uses a fake source to throw the same IOException as pre Hadoop 2.8 when the // streaming thread is interrupted. We should handle it properly by not failing the query. @@ -417,6 +389,20 @@ class StreamSuite extends StreamTest { query.stop() assert(query.exception.isEmpty) } + + test("SPARK-19873: streaming agg with change in number of partitions") { + val inputData = MemoryStream[(Int, Int)] + val agg = inputData.toDS().groupBy("_1").count() + + testStream(agg, OutputMode.Complete())( + AddData(inputData, (1, 1), (2, 1), (1, 2)), + StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "2")), + CheckAnswer((1, 2), (2, 1)), + StopStream, + AddData(inputData, (3, 1), (2, 2), (1, 1)), + StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "5")), + CheckAnswer((1, 3), (2, 2), (3, 1))) + } } abstract class FakeSource extends StreamSourceProvider { From 60ec7da08ea9b55d6f307c894e0dbd0a116d9413 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Thu, 9 Mar 2017 16:57:07 -0800 Subject: [PATCH 03/14] Add backward compat test. Review changes. --- .../sql/execution/streaming/OffsetSeq.scala | 5 +- .../execution/streaming/StreamExecution.scala | 38 +++++---- .../checkpoint-version-2.1.0/metadata | 1 + .../checkpoint-version-2.1.0/offsets/0 | 3 + .../state/0/0/1.delta | Bin 0 -> 46 bytes .../state/0/1/1.delta | Bin 0 -> 79 bytes .../state/0/2/1.delta | Bin 0 -> 79 bytes .../state/0/3/1.delta | Bin 0 -> 95 bytes .../state/0/4/1.delta | Bin 0 -> 79 bytes .../state/0/5/1.delta | Bin 0 -> 46 bytes .../state/0/6/1.delta | Bin 0 -> 79 bytes .../state/0/7/1.delta | Bin 0 -> 79 bytes .../state/0/8/1.delta | Bin 0 -> 46 bytes .../state/0/9/1.delta | Bin 0 -> 79 bytes .../streaming/OffsetSeqLogSuite.scala | 76 ++++++++++++++---- .../sql/streaming/StreamingQuerySuite.scala | 4 +- 16 files changed, 91 insertions(+), 36 deletions(-) create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index ac9a3ba447e3..bc667dc3a67e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming import org.json4s.NoTypeHints import org.json4s.jackson.Serialization - /** * An ordered collection of offsets, used to track the progress of processing data from one or more * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance @@ -70,11 +69,12 @@ object OffsetSeq { * bound the lateness of data that will processed. Time unit: milliseconds * @param batchTimestampMs: The current batch processing timestamp. * Time unit: milliseconds + * @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions. */ case class OffsetSeqMetadata( var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0, - var numShufflePartitions: Int = 0) { + conf: Map[String, String] = Map.empty) { def json: String = Serialization.write(this)(OffsetSeqMetadata.format) } @@ -82,4 +82,3 @@ object OffsetSeqMetadata { private implicit val format = Serialization.formats(NoTypeHints) def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json) } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d906c914a7eb..b1bd1638ee97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -119,7 +119,8 @@ class StreamExecution( /** Metadata associated with the offset seq of a batch in the query. */ protected var offsetSeqMetadata = - OffsetSeqMetadata(numShufflePartitions = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)) + OffsetSeqMetadata(conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> + sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS).toString)) override val id: UUID = UUID.fromString(streamMetadata.id) @@ -382,20 +383,27 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - val numShufflePartitionsFromConf = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS) - offsetSeqMetadata = nextOffsets - .metadata - .getOrElse(OffsetSeqMetadata(0, 0, numShufflePartitionsFromConf)) - - /* - * For backwards compatibility, if # partitions was not recorded in the offset log, then - * ensure it is non-zero. The new value is picked up from the conf. - */ - if (offsetSeqMetadata.numShufflePartitions == 0) { - offsetSeqMetadata.numShufflePartitions = numShufflePartitionsFromConf - logDebug("Number of shuffle partitions from previous run not found in checkpoint data. " + - s"Using the value from the conf, $numShufflePartitionsFromConf partitions.") + + // initialize metadata + val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS) + offsetSeqMetadata = { + if (nextOffsets.metadata.nonEmpty) { + val offsets = nextOffsets.metadata.get + val shufflePartitionsToUse = offsets.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { + // For backwards compatibility, if # partitions was not recorded in the offset log, + // then ensure it is not missing. The new value is picked up from the conf. + logDebug("Number of shuffle partitions from previous run not found in checkpoint. " + + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.") + shufflePartitionsSparkSession + }) + OffsetSeqMetadata(offsets.batchWatermarkMs, offsets.batchTimestampMs, + Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) + } else { + OffsetSeqMetadata(0, 0, + Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString)) + } } + logDebug(s"Found possibly unprocessed offsets $availableOffsets " + s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") @@ -561,7 +569,7 @@ class StreamExecution( val sparkSessionForCurrentBatch = sparkSession.cloneSession() sparkSessionForCurrentBatch.conf.set( SQLConf.SHUFFLE_PARTITIONS.key, - offsetSeqMetadata.numShufflePartitions.toString) + offsetSeqMetadata.conf(SQLConf.SHUFFLE_PARTITIONS.key)) sparkSessionForCurrentBatch.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") reportTimeTaken("queryPlanning") { diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata new file mode 100644 index 000000000000..2ee11272dc31 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata @@ -0,0 +1 @@ +{"id":"da6d47db-ba1c-491e-bcf6-99ed23e40b39"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 new file mode 100644 index 000000000000..31966a660581 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1489085905815} +2 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..8b566e81f48663efa0ebda2dbf694d65e28def72 GIT binary patch literal 79 zcmeZ?GI7euPtI0VWnf^i0OEK1WO;*uv;YGmgD^7(gCmeF!^Xfa!XU`R$FKm%1A_lR M-~-hu3K4>k06a_$wEzGB literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..361f2db6050209d8ee7ba037f82c6e59868098af GIT binary patch literal 79 zcmeZ?GI7euPtI0VWnf^i0AkVS25iAVT7ZF(L70Vu!4b%oVPjwyVGv~EV^{#>0l|MD M@PX0l|MD M@PX0l|MD M@PXgCmeF!^Xfa!XU`V$FKm%1A_lR M-~-hu3K4>k06R$yw*UYD literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta new file mode 100644 index 0000000000000000000000000000000000000000..0c9b6ac5c863d06d63c46c8a1fc51da716a5fdbd GIT binary patch literal 79 zcmeZ?GI7euPtI0VWnf^i0OIOpUzvh|v;YGmgD@KhgCmeF!^Xfa!XU`R$FKm%1A_lR M-~-hu3K4>k083;I`Tzg` literal 0 HcmV?d00001 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index c672ebc6f94a..bbf05e728b32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming import java.io.File import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { @@ -28,22 +30,32 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { case class StringOffset(override val json: String) extends Offset test("OffsetSeqMetadata - deserialization") { - assert(OffsetSeqMetadata(0, 0, 0) === OffsetSeqMetadata("""{}""")) - assert(OffsetSeqMetadata(1, 0, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) - assert(OffsetSeqMetadata(0, 2, 0) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) - assert(OffsetSeqMetadata(0, 0, 2) === OffsetSeqMetadata("""{"numShufflePartitions":2}""")) - assert( - OffsetSeqMetadata(1, 2, 0) === - OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) - assert( - OffsetSeqMetadata(1, 0, 3) === - OffsetSeqMetadata("""{"batchWatermarkMs":1,"numShufflePartitions":3}""")) - assert( - OffsetSeqMetadata(0, 2, 3) === - OffsetSeqMetadata("""{"batchTimestampMs":2,"numShufflePartitions":3}""")) - assert( - OffsetSeqMetadata(1, 2, 3) === OffsetSeqMetadata( - """{"batchWatermarkMs":1,"batchTimestampMs":2,"numShufflePartitions":3}""")) + val key = SQLConf.SHUFFLE_PARTITIONS.key + + def getMapWith(shufflePartitions: Int): Map[String, String] = { + Map(key -> shufflePartitions.toString) + } + + // None set + assert(OffsetSeqMetadata(0, 0, Map.empty) === OffsetSeqMetadata("""{}""")) + + // One set + assert(OffsetSeqMetadata(1, 0, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) + assert(OffsetSeqMetadata(0, 2, Map.empty) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) + assert(OffsetSeqMetadata(0, 0, getMapWith(shufflePartitions = 2)) === + OffsetSeqMetadata(s"""{"conf": {"$key":2}}""")) + + // Two set + assert(OffsetSeqMetadata(1, 2, Map.empty) === + OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + assert(OffsetSeqMetadata(1, 0, getMapWith(shufflePartitions = 3)) === + OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}""")) + assert(OffsetSeqMetadata(0, 2, getMapWith(shufflePartitions = 3)) === + OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}""")) + + // All set + assert(OffsetSeqMetadata(1, 2, getMapWith(shufflePartitions = 3)) === + OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}""")) } test("OffsetSeqLog - serialization - deserialization") { @@ -90,6 +102,38 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L))) } + test("SPARK-19873: backwards compat with checkpoints that do not record shuffle partitions") { + import org.apache.spark.sql.functions.count + + import testImplicits._ + + // checkpoint data was generated by a query with 10 shuffle partitions + // test if recovery from checkpoint is successful + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + val inputData = MemoryStream[Int] + inputData.addData(1, 2, 3, 4) + inputData.addData(3, 4, 5, 6) + inputData.addData(5, 6, 7, 8) + + val checkpointDir = "./src/test/resources/structured-streaming/checkpoint-version-2.1.0" + val query = inputData + .toDF() + .groupBy($"value") + .agg(count("*")) + .writeStream + .queryName("counts") + .outputMode("complete") + .option("checkpointLocation", checkpointDir) + .format("memory") + .start() + + query.processAllAvailable() + QueryTest.checkAnswer(spark.table("counts").toDF(), + Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: + Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil) + } + } + private def readFromResource(dir: String): (Long, OffsetSeq) = { val input = getClass.getResource(s"/structured-streaming/$dir") val log = new OffsetSeqLog(spark, input.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 6097da2476a5..a6d039c779a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -153,11 +153,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi q.exception.get.startOffset === q.committedOffsets.toOffsetSeq( Seq(inputData), - OffsetSeqMetadata(0, 0, spark.conf.get(SQLConf.SHUFFLE_PARTITIONS))).toString && + OffsetSeqMetadata(0, 0)).toString && q.exception.get.endOffset === q.availableOffsets.toOffsetSeq( Seq(inputData), - OffsetSeqMetadata(0, 0, spark.conf.get(SQLConf.SHUFFLE_PARTITIONS))).toString + OffsetSeqMetadata(0, 0)).toString }, "incorrect start offset or end offset on exception") ) } From c688e84b473f33fb136b39e350e062cb8fc03d2e Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 10 Mar 2017 12:53:15 -0800 Subject: [PATCH 04/14] Clone spark session once before all batches. Update test source metadataPath. --- .../execution/streaming/StreamExecution.scala | 20 ++++++++++------- .../streaming/OffsetSeqLogSuite.scala | 7 ++++-- .../test/DataStreamReaderWriterSuite.scala | 22 ++++++++++--------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b1bd1638ee97..8d831419317d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -259,6 +259,10 @@ class StreamExecution( updateStatusMessage("Initializing sources") // force initialization of the logical plan so that the sources can be created logicalPlan + + // Isolated spark session to run the batches with. + val sparkSessionToRunBatches = sparkSession.cloneSession() + if (state.compareAndSet(INITIALIZING, ACTIVE)) { // Unblock `awaitInitialization` initializationLatch.countDown() @@ -279,7 +283,7 @@ class StreamExecution( if (dataAvailable) { currentStatus = currentStatus.copy(isDataAvailable = true) updateStatusMessage("Processing new data") - runBatch() + runBatch(sparkSessionToRunBatches) } } @@ -521,8 +525,9 @@ class StreamExecution( /** * Processes any data available between `availableOffsets` and `committedOffsets`. + * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with. */ - private def runBatch(): Unit = { + private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = { // Request unprocessed data from all sources. newData = reportTimeTaken("getBatch") { availableOffsets.flatMap { @@ -565,16 +570,15 @@ class StreamExecution( cd.dataType, cd.timeZoneId) } - // Fork a cloned session and set confs to disallow change in number of partitions - val sparkSessionForCurrentBatch = sparkSession.cloneSession() - sparkSessionForCurrentBatch.conf.set( + // Reset confs to disallow change in number of partitions + sparkSessionToRunBatch.conf.set( SQLConf.SHUFFLE_PARTITIONS.key, offsetSeqMetadata.conf(SQLConf.SHUFFLE_PARTITIONS.key)) - sparkSessionForCurrentBatch.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + sparkSessionToRunBatch.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( - sparkSessionForCurrentBatch, + sparkSessionToRunBatch, triggerLogicalPlan, outputMode, checkpointFile("state"), @@ -584,7 +588,7 @@ class StreamExecution( } val nextBatch = - new Dataset(sparkSessionForCurrentBatch, lastExecution, + new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema)) reportTimeTaken("addBatch") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index bbf05e728b32..7fae6a455050 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -114,8 +114,11 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { inputData.addData(1, 2, 3, 4) inputData.addData(3, 4, 5, 6) inputData.addData(5, 6, 7, 8) + inputData.addData(1) - val checkpointDir = "./src/test/resources/structured-streaming/checkpoint-version-2.1.0" + val resourceUri = + this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI + val checkpointDir = new File(resourceUri).getCanonicalPath val query = inputData .toDF() .groupBy($"value") @@ -129,7 +132,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { query.processAllAvailable() QueryTest.checkAnswer(spark.table("counts").toDF(), - Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: + Row("1", 2) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index f61dcdcbcf71..341ab0eb923d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import org.apache.hadoop.fs.Path +import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito._ import org.scalatest.BeforeAndAfter @@ -370,21 +371,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("checkpointLocation", checkpointLocationURI.toString) .trigger(ProcessingTime(10.seconds)) .start() + q.processAllAvailable() q.stop() verify(LastOptions.mockStreamSourceProvider).createSource( - spark.sqlContext, - s"$checkpointLocationURI/sources/0", - None, - "org.apache.spark.sql.streaming.test", - Map.empty) + any(), + meq(s"$checkpointLocationURI/sources/0"), + meq(None), + meq("org.apache.spark.sql.streaming.test"), + meq(Map.empty)) verify(LastOptions.mockStreamSourceProvider).createSource( - spark.sqlContext, - s"$checkpointLocationURI/sources/1", - None, - "org.apache.spark.sql.streaming.test", - Map.empty) + any(), + meq(s"$checkpointLocationURI/sources/1"), + meq(None), + meq("org.apache.spark.sql.streaming.test"), + meq(Map.empty)) } private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath From f6bd071ca794ed4532eef936b69a54f5d2d68cf1 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 10 Mar 2017 13:41:02 -0800 Subject: [PATCH 05/14] Update test. --- .../checkpoint-version-2.1.0/metadata | 2 +- .../checkpoint-version-2.1.0/offsets/0 | 4 +- .../checkpoint-version-2.1.0/offsets/1 | 3 ++ .../state/0/0/2.delta | Bin 0 -> 46 bytes .../state/0/1/1.delta | Bin 79 -> 79 bytes .../state/0/1/2.delta | Bin 0 -> 79 bytes .../state/0/2/1.delta | Bin 79 -> 79 bytes .../state/0/2/2.delta | Bin 0 -> 79 bytes .../state/0/3/1.delta | Bin 95 -> 73 bytes .../state/0/3/2.delta | Bin 0 -> 79 bytes .../state/0/4/2.delta | Bin 0 -> 46 bytes .../state/0/5/2.delta | Bin 0 -> 46 bytes .../state/0/6/1.delta | Bin 79 -> 46 bytes .../state/0/6/2.delta | Bin 0 -> 79 bytes .../state/0/7/1.delta | Bin 79 -> 46 bytes .../state/0/7/2.delta | Bin 0 -> 79 bytes .../state/0/8/2.delta | Bin 0 -> 46 bytes .../state/0/9/1.delta | Bin 79 -> 46 bytes .../state/0/9/2.delta | Bin 0 -> 79 bytes .../streaming/OffsetSeqLogSuite.scala | 50 ++++++++++-------- 20 files changed, 34 insertions(+), 25 deletions(-) create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata index 2ee11272dc31..3492220e36b8 100644 --- a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata @@ -1 +1 @@ -{"id":"da6d47db-ba1c-491e-bcf6-99ed23e40b39"} \ No newline at end of file +{"id":"dddc5e7f-1e71-454c-8362-de184444fb5a"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 index 31966a660581..cbde042e79af 100644 --- a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 @@ -1,3 +1,3 @@ v1 -{"batchWatermarkMs":0,"batchTimestampMs":1489085905815} -2 \ No newline at end of file +{"batchWatermarkMs":0,"batchTimestampMs":1489180207737} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 new file mode 100644 index 000000000000..10b5774746de --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1489180209261} +2 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta index 8b566e81f48663efa0ebda2dbf694d65e28def72..7dc49cb3e47fd7a4001ff7ddc96094e754117c44 100644 GIT binary patch literal 79 zcmeZ?GI7euPtI0VWnf^i0AjiN4z6GzEx^FYAk56c;0R>PuraWUFbFd8F)RS`fZ#t6 M_&{}vLWCeB023(<4FCWD literal 79 zcmeZ?GI7euPtI0VWnf^i0OEK1WO;*uv;YGmgD^7(gCmeF!^Xfa!XU`R$FKm%1A_lR M-~-hu3K4>k06a_$wEzGB diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta new file mode 100644 index 0000000000000000000000000000000000000000..8b566e81f48663efa0ebda2dbf694d65e28def72 GIT binary patch literal 79 zcmeZ?GI7euPtI0VWnf^i0OEK1WO;*uv;YGmgD^7(gCmeF!^Xfa!XU`R$FKm%1A_lR M-~-hu3K4>k06a_$wEzGB literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta index 361f2db6050209d8ee7ba037f82c6e59868098af..ca2a7ed033f3baf749f5a93522e951c15729e4c6 100644 GIT binary patch literal 79 zcmeZ?GI7euPtI0VWnf^i0AjUmuFSzeT7ZF(L70Vu!4b%oVPjwyVGv~GV^{#>0l|MD M@PX0l|MD M@PX0l|MD M@PXPuraWUFbFd8F)RS`fZ#t6 M_&{}vLWCeB00o>3)Bpeg literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta index e69925cabaa94cd8bc660448a138050028120885..6352978051846970ca41a0ca97fd79952105726d 100644 GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 79 zcmeZ?GI7euPtI0VWnf^i0Af{zne4$pT7ZF(L70_;!4b%oVPjwyVGv~EV^{#>0l|MD M@PX0l|MD M@PXgCmeF!^Xfa!XU`V$FKm%1A_lR M-~-hu3K4>k06R$yw*UYD diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta new file mode 100644 index 0000000000000000000000000000000000000000..36397a3dda24813fd4350f8bb3d392d8c66dd16f GIT binary patch literal 79 zcmeZ?GI7euPtI0VWnf^i0Aj^ghuMRHv;YGmgD^V>gCmeF!^Xfa!XU`V$FKm%1A_lR M-~-hu3K4>k06R$yw*UYD literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta new file mode 100644 index 0000000000000000000000000000000000000000..6352978051846970ca41a0ca97fd79952105726d GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta index 0c9b6ac5c863d06d63c46c8a1fc51da716a5fdbd..6352978051846970ca41a0ca97fd79952105726d 100644 GIT binary patch literal 46 icmeZ?GI7euPtF!)VPIeY;oA+q9RGp92POd&g989JFAHe^ literal 79 zcmeZ?GI7euPtI0VWnf^i0OIOpUzvh|v;YGmgD@KhgCmeF!^Xfa!XU`R$FKm%1A_lR M-~-hu3K4>k083;I`Tzg` diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta new file mode 100644 index 0000000000000000000000000000000000000000..0c9b6ac5c863d06d63c46c8a1fc51da716a5fdbd GIT binary patch literal 79 zcmeZ?GI7euPtI0VWnf^i0OIOpUzvh|v;YGmgD@KhgCmeF!^Xfa!XU`R$FKm%1A_lR M-~-hu3K4>k083;I`Tzg` literal 0 HcmV?d00001 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 7fae6a455050..7f14a92116ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -107,34 +107,40 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { import testImplicits._ + val inputData = MemoryStream[Int] + inputData.addData(1, 2, 3, 4) + inputData.addData(3, 4, 5, 6) + inputData.addData(5, 6, 7, 8) + + val resourceUri = + this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI + val checkpointDir = new File(resourceUri).getCanonicalPath + val query = inputData + .toDF() + .groupBy($"value") + .agg(count("*")) + .writeStream + .queryName("counts") + .outputMode("complete") + .option("checkpointLocation", checkpointDir) + .format("memory") + // checkpoint data was generated by a query with 10 shuffle partitions // test if recovery from checkpoint is successful withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { - val inputData = MemoryStream[Int] - inputData.addData(1, 2, 3, 4) - inputData.addData(3, 4, 5, 6) - inputData.addData(5, 6, 7, 8) - inputData.addData(1) - - val resourceUri = - this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI - val checkpointDir = new File(resourceUri).getCanonicalPath - val query = inputData - .toDF() - .groupBy($"value") - .agg(count("*")) - .writeStream - .queryName("counts") - .outputMode("complete") - .option("checkpointLocation", checkpointDir) - .format("memory") - .start() - - query.processAllAvailable() + query.start().processAllAvailable() + QueryTest.checkAnswer(spark.table("counts").toDF(), - Row("1", 2) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: + Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil) } + + // if number of partitions increased, should throw exception + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") { + intercept[IllegalArgumentException] { + query.start().processAllAvailable() + } + } } private def readFromResource(dir: String): (Long, OffsetSeq) = { From 3af1cb4ab978e570da86075264358cc524650bf6 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 10 Mar 2017 14:30:49 -0800 Subject: [PATCH 06/14] Fix and clean up. --- .../execution/streaming/StreamExecution.scala | 12 ++--- .../streaming/OffsetSeqLogSuite.scala | 42 ----------------- .../spark/sql/streaming/StreamSuite.scala | 46 +++++++++++++++++-- 3 files changed, 48 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8d831419317d..321840c6f77d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -391,20 +391,20 @@ class StreamExecution( // initialize metadata val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS) offsetSeqMetadata = { - if (nextOffsets.metadata.nonEmpty) { + if (nextOffsets.metadata.isEmpty) { + OffsetSeqMetadata(0, 0, + Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString)) + } else { val offsets = nextOffsets.metadata.get val shufflePartitionsToUse = offsets.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { - // For backwards compatibility, if # partitions was not recorded in the offset log, + // For backward compatibility, if # partitions was not recorded in the offset log, // then ensure it is not missing. The new value is picked up from the conf. logDebug("Number of shuffle partitions from previous run not found in checkpoint. " + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.") shufflePartitionsSparkSession }) OffsetSeqMetadata(offsets.batchWatermarkMs, offsets.batchTimestampMs, - Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) - } else { - OffsetSeqMetadata(0, 0, - Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString)) + offsets.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 7f14a92116ef..db2235a431b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming import java.io.File import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -102,47 +101,6 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L))) } - test("SPARK-19873: backwards compat with checkpoints that do not record shuffle partitions") { - import org.apache.spark.sql.functions.count - - import testImplicits._ - - val inputData = MemoryStream[Int] - inputData.addData(1, 2, 3, 4) - inputData.addData(3, 4, 5, 6) - inputData.addData(5, 6, 7, 8) - - val resourceUri = - this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI - val checkpointDir = new File(resourceUri).getCanonicalPath - val query = inputData - .toDF() - .groupBy($"value") - .agg(count("*")) - .writeStream - .queryName("counts") - .outputMode("complete") - .option("checkpointLocation", checkpointDir) - .format("memory") - - // checkpoint data was generated by a query with 10 shuffle partitions - // test if recovery from checkpoint is successful - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { - query.start().processAllAvailable() - - QueryTest.checkAnswer(spark.table("counts").toDF(), - Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: - Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil) - } - - // if number of partitions increased, should throw exception - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") { - intercept[IllegalArgumentException] { - query.start().processAllAvailable() - } - } - } - private def readFromResource(dir: String): (Long, OffsetSeq) = { val input = getClass.getResource(s"/structured-streaming/$dir") val log = new OffsetSeqLog(spark, input.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 1b8a7bfb2893..530b439cb2d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.streaming -import java.io.{InterruptedIOException, IOException} +import java.io.{File, InterruptedIOException, IOException} import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} import scala.reflect.ClassTag @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -390,19 +391,56 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } - test("SPARK-19873: streaming agg with change in number of partitions") { + test("SPARK-19873: streaming aggregation with change in number of partitions") { val inputData = MemoryStream[(Int, Int)] val agg = inputData.toDS().groupBy("_1").count() testStream(agg, OutputMode.Complete())( AddData(inputData, (1, 1), (2, 1), (1, 2)), - StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "2")), + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")), CheckAnswer((1, 2), (2, 1)), StopStream, AddData(inputData, (3, 1), (2, 2), (1, 1)), - StartStream(additionalConfs = Map("spark.sql.shuffle.partitions" -> "5")), + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")), CheckAnswer((1, 3), (2, 2), (3, 1))) } + + test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") { + val inputData = MemoryStream[Int] + inputData.addData(1, 2, 3, 4) + inputData.addData(3, 4, 5, 6) + inputData.addData(5, 6, 7, 8) + + val resourceUri = + this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI + val checkpointDir = new File(resourceUri).getCanonicalPath + val query = inputData + .toDF() + .groupBy($"value") + .agg(count("*")) + .writeStream + .queryName("counts") + .outputMode("complete") + .option("checkpointLocation", checkpointDir) + .format("memory") + + // Checkpoint data was generated by a query with 10 shuffle partitions. + // Test if recovery from checkpoint is successful. + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + query.start().processAllAvailable() + + QueryTest.checkAnswer(spark.table("counts").toDF(), + Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: + Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil) + } + + // If the number of partitions is greater, should throw exception. + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") { + intercept[IllegalArgumentException] { + query.start().processAllAvailable() + } + } + } } abstract class FakeSource extends StreamSourceProvider { From 1cacd32e21f074c2ffd79f1ce4390bd9f113da0c Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 10 Mar 2017 14:50:48 -0800 Subject: [PATCH 07/14] Clean up. --- .../sql/execution/streaming/StreamExecution.scala | 11 +++++------ .../sql/execution/streaming/OffsetSeqLogSuite.scala | 10 +++++----- .../org/apache/spark/sql/streaming/StreamSuite.scala | 12 ++++++++---- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 321840c6f77d..28c77e537e8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -395,16 +395,16 @@ class StreamExecution( OffsetSeqMetadata(0, 0, Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString)) } else { - val offsets = nextOffsets.metadata.get - val shufflePartitionsToUse = offsets.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { + val metadata = nextOffsets.metadata.get + val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { // For backward compatibility, if # partitions was not recorded in the offset log, // then ensure it is not missing. The new value is picked up from the conf. logDebug("Number of shuffle partitions from previous run not found in checkpoint. " + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.") shufflePartitionsSparkSession }) - OffsetSeqMetadata(offsets.batchWatermarkMs, offsets.batchTimestampMs, - offsets.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) + OffsetSeqMetadata(metadata.batchWatermarkMs, metadata.batchTimestampMs, + metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) } } @@ -588,8 +588,7 @@ class StreamExecution( } val nextBatch = - new Dataset(sparkSessionToRunBatch, lastExecution, - RowEncoder(lastExecution.analyzed.schema)) + new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema)) reportTimeTaken("addBatch") { sink.addBatch(currentBatchId, nextBatch) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index db2235a431b3..6a42cb667f4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -31,7 +31,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { test("OffsetSeqMetadata - deserialization") { val key = SQLConf.SHUFFLE_PARTITIONS.key - def getMapWith(shufflePartitions: Int): Map[String, String] = { + def getConfWith(shufflePartitions: Int): Map[String, String] = { Map(key -> shufflePartitions.toString) } @@ -41,19 +41,19 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { // One set assert(OffsetSeqMetadata(1, 0, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) assert(OffsetSeqMetadata(0, 2, Map.empty) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) - assert(OffsetSeqMetadata(0, 0, getMapWith(shufflePartitions = 2)) === + assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) === OffsetSeqMetadata(s"""{"conf": {"$key":2}}""")) // Two set assert(OffsetSeqMetadata(1, 2, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) - assert(OffsetSeqMetadata(1, 0, getMapWith(shufflePartitions = 3)) === + assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) === OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}""")) - assert(OffsetSeqMetadata(0, 2, getMapWith(shufflePartitions = 3)) === + assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) === OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}""")) // All set - assert(OffsetSeqMetadata(1, 2, getMapWith(shufflePartitions = 3)) === + assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) === OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}""")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 530b439cb2d0..c65dad378023 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -396,13 +396,17 @@ class StreamSuite extends StreamTest { val agg = inputData.toDS().groupBy("_1").count() testStream(agg, OutputMode.Complete())( - AddData(inputData, (1, 1), (2, 1), (1, 2)), + AddData(inputData, (1, 0), (2, 0)), StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")), - CheckAnswer((1, 2), (2, 1)), + CheckAnswer((1, 1), (2, 1)), StopStream, - AddData(inputData, (3, 1), (2, 2), (1, 1)), + AddData(inputData, (3, 0), (2, 0)), StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")), - CheckAnswer((1, 3), (2, 2), (3, 1))) + CheckAnswer((1, 1), (2, 2), (3, 1)), + StopStream, + AddData(inputData, (3, 0), (1, 0)), + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")), + CheckAnswer((1, 2), (2, 2), (3, 2))) } test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") { From 030e6353abbf084bc62eea64caf494c08ebb294e Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Tue, 14 Mar 2017 15:23:05 -0700 Subject: [PATCH 08/14] Update tests. Remove var in OffsetSeqMetadata. --- .../sql/execution/streaming/OffsetSeq.scala | 4 +- .../execution/streaming/StreamExecution.scala | 13 +- .../spark/sql/streaming/StreamSuite.scala | 111 +++++++++++++----- .../sql/streaming/StreamingQuerySuite.scala | 6 +- 4 files changed, 92 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index bc667dc3a67e..8249adab4bba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -72,8 +72,8 @@ object OffsetSeq { * @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions. */ case class OffsetSeqMetadata( - var batchWatermarkMs: Long = 0, - var batchTimestampMs: Long = 0, + batchWatermarkMs: Long = 0, + batchTimestampMs: Long = 0, conf: Map[String, String] = Map.empty) { def json: String = Serialization.write(this)(OffsetSeqMetadata.format) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 28c77e537e8f..65fa3bbb1028 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -464,8 +464,7 @@ class StreamExecution( } } if (hasNewData) { - // Current batch timestamp in milliseconds - offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() + var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs // Update the eventTime watermark if we find one in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { @@ -473,16 +472,20 @@ class StreamExecution( logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") e.eventTimeStats.value.max - e.delayMs }.headOption.foreach { newWatermarkMs => - if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { + if (newWatermarkMs > batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") - offsetSeqMetadata.batchWatermarkMs = newWatermarkMs + batchWatermarkMs = newWatermarkMs } else { logDebug( s"Event time didn't move: $newWatermarkMs < " + - s"${offsetSeqMetadata.batchWatermarkMs}") + s"$batchWatermarkMs") } } } + offsetSeqMetadata = OffsetSeqMetadata( + batchWatermarkMs, + triggerClock.getTimeMillis(), // Current batch timestamp in milliseconds + offsetSeqMetadata.conf) // Keep the same conf updateStatusMessage("Writing offsets to log") reportTimeTaken("walCommit") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c65dad378023..735489d697ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -23,6 +23,8 @@ import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} import scala.reflect.ClassTag import scala.util.control.ControlThrowable +import org.apache.commons.io.FileUtils + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.ExplainCommand @@ -409,41 +411,88 @@ class StreamSuite extends StreamTest { CheckAnswer((1, 2), (2, 2), (3, 2))) } - test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") { - val inputData = MemoryStream[Int] - inputData.addData(1, 2, 3, 4) - inputData.addData(3, 4, 5, 6) - inputData.addData(5, 6, 7, 8) + test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") { + var inputData: MemoryStream[Int] = null + var query: DataStreamWriter[Row] = null - val resourceUri = - this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI - val checkpointDir = new File(resourceUri).getCanonicalPath - val query = inputData - .toDF() - .groupBy($"value") - .agg(count("*")) - .writeStream - .queryName("counts") - .outputMode("complete") - .option("checkpointLocation", checkpointDir) - .format("memory") - - // Checkpoint data was generated by a query with 10 shuffle partitions. - // Test if recovery from checkpoint is successful. - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { - query.start().processAllAvailable() - - QueryTest.checkAnswer(spark.table("counts").toDF(), - Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: - Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil) + def init(): Unit = { + inputData = MemoryStream[Int] + inputData.addData(1, 2, 3, 4) + inputData.addData(3, 4, 5, 6) + inputData.addData(5, 6, 7, 8) + + query = inputData + .toDF() + .groupBy($"value") + .agg(count("*")) + .writeStream + .outputMode("complete") + .format("memory") } - // If the number of partitions is greater, should throw exception. - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") { - intercept[IllegalArgumentException] { - query.start().processAllAvailable() + // Get an existing checkpoint generated by Spark v2.1. + // v2.1 does not record # shuffle partitions in the offset metadata. + val resourceUri = + this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI + val checkpointDir = new File(resourceUri) + + // 1 - Test if recovery from the checkpoint is successful. + init() + withTempDir(dir => { + // Copy the checkpoint to a temp dir to prevent changes to the original. + // Not doing this will lead to the test passing on the first run, but fail subsequent runs. + FileUtils.copyDirectory(checkpointDir, dir) + + // Checkpoint data was generated by a query with 10 shuffle partitions. + // In order to test reading from the checkpoint, the checkpoint must have two or more batches, + // since the last batch may be rerun. + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + var streamingQuery: StreamingQuery = null + try { + streamingQuery = + query + .queryName("counts") + .option("checkpointLocation", dir.getCanonicalPath) + .start() + streamingQuery.processAllAvailable() + inputData.addData(9) + streamingQuery.processAllAvailable() + + QueryTest.checkAnswer(spark.table("counts").toDF(), + Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: + Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil) + } finally { + if (streamingQuery ne null) { + streamingQuery.stop() + } + } } - } + }) + + // 2 - Check recovery with wrong num shuffle partitions + init() + withTempDir(dir => { + FileUtils.copyDirectory(checkpointDir, dir) + + // Since the number of partitions is greater than 10, should throw exception. + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") { + var streamingQuery: StreamingQuery = null + try { + intercept[StreamingQueryException] { + streamingQuery = + query + .queryName("badQuery") + .option("checkpointLocation", dir.getCanonicalPath) + .start() + streamingQuery.processAllAvailable() + } + } finally { + if (streamingQuery ne null) { + streamingQuery.stop() + } + } + } + }) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index a6d039c779a9..2d7f066679d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -152,12 +152,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery(q => { q.exception.get.startOffset === q.committedOffsets.toOffsetSeq( - Seq(inputData), - OffsetSeqMetadata(0, 0)).toString && + Seq(inputData), OffsetSeqMetadata()).toString && q.exception.get.endOffset === q.availableOffsets.toOffsetSeq( - Seq(inputData), - OffsetSeqMetadata(0, 0)).toString + Seq(inputData), OffsetSeqMetadata()).toString }, "incorrect start offset or end offset on exception") ) } From 5c851a5ce094867873392a4e5071a8efe47e2801 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Tue, 14 Mar 2017 15:34:38 -0700 Subject: [PATCH 09/14] Clean up. --- .../org/apache/spark/sql/streaming/StreamSuite.scala | 10 ++-------- .../spark/sql/streaming/StreamingQuerySuite.scala | 6 ++---- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 735489d697ec..d1d4ce73dc0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -450,10 +450,7 @@ class StreamSuite extends StreamTest { var streamingQuery: StreamingQuery = null try { streamingQuery = - query - .queryName("counts") - .option("checkpointLocation", dir.getCanonicalPath) - .start() + query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start() streamingQuery.processAllAvailable() inputData.addData(9) streamingQuery.processAllAvailable() @@ -480,10 +477,7 @@ class StreamSuite extends StreamTest { try { intercept[StreamingQueryException] { streamingQuery = - query - .queryName("badQuery") - .option("checkpointLocation", dir.getCanonicalPath) - .start() + query.queryName("badQuery").option("checkpointLocation", dir.getCanonicalPath).start() streamingQuery.processAllAvailable() } } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 2d7f066679d6..a0a2b2b4c9b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -151,11 +151,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery(q => { q.exception.get.startOffset === - q.committedOffsets.toOffsetSeq( - Seq(inputData), OffsetSeqMetadata()).toString && + q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString && q.exception.get.endOffset === - q.availableOffsets.toOffsetSeq( - Seq(inputData), OffsetSeqMetadata()).toString + q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString }, "incorrect start offset or end offset on exception") ) } From dfae7beeedc074a2806fd2b1cd9d4652006d9e89 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Wed, 15 Mar 2017 13:20:00 -0700 Subject: [PATCH 10/14] Changes from review. --- .../execution/streaming/StreamExecution.scala | 15 ++++++++------- .../apache/spark/sql/streaming/StreamSuite.scala | 16 ++++++++-------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 65fa3bbb1028..d3f17b1c6732 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -392,14 +392,16 @@ class StreamExecution( val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS) offsetSeqMetadata = { if (nextOffsets.metadata.isEmpty) { - OffsetSeqMetadata(0, 0, - Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString)) + OffsetSeqMetadata( + batchWatermarkMs = 0, + batchTimestampMs = 0, + conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString)) } else { val metadata = nextOffsets.metadata.get val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { // For backward compatibility, if # partitions was not recorded in the offset log, // then ensure it is not missing. The new value is picked up from the conf. - logDebug("Number of shuffle partitions from previous run not found in checkpoint. " + logWarning("Number of shuffle partitions from previous run not found in checkpoint. " + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.") shufflePartitionsSparkSession }) @@ -482,10 +484,9 @@ class StreamExecution( } } } - offsetSeqMetadata = OffsetSeqMetadata( - batchWatermarkMs, - triggerClock.getTimeMillis(), // Current batch timestamp in milliseconds - offsetSeqMetadata.conf) // Keep the same conf + offsetSeqMetadata = offsetSeqMetadata.copy( + batchWatermarkMs = batchWatermarkMs, + batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds updateStatusMessage("Writing offsets to log") reportTimeTaken("walCommit") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index d1d4ce73dc0c..e867fc40f7f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -411,11 +411,11 @@ class StreamSuite extends StreamTest { CheckAnswer((1, 2), (2, 2), (3, 2))) } - test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") { + test("recover from a Spark v2.1 checkpoint") { var inputData: MemoryStream[Int] = null var query: DataStreamWriter[Row] = null - def init(): Unit = { + def prepareMemoryStream(): Unit = { inputData = MemoryStream[Int] inputData.addData(1, 2, 3, 4) inputData.addData(3, 4, 5, 6) @@ -437,8 +437,8 @@ class StreamSuite extends StreamTest { val checkpointDir = new File(resourceUri) // 1 - Test if recovery from the checkpoint is successful. - init() - withTempDir(dir => { + prepareMemoryStream() + withTempDir { dir => // Copy the checkpoint to a temp dir to prevent changes to the original. // Not doing this will lead to the test passing on the first run, but fail subsequent runs. FileUtils.copyDirectory(checkpointDir, dir) @@ -464,11 +464,11 @@ class StreamSuite extends StreamTest { } } } - }) + } // 2 - Check recovery with wrong num shuffle partitions - init() - withTempDir(dir => { + prepareMemoryStream() + withTempDir { dir => FileUtils.copyDirectory(checkpointDir, dir) // Since the number of partitions is greater than 10, should throw exception. @@ -486,7 +486,7 @@ class StreamSuite extends StreamTest { } } } - }) + } } } From 3ae44148913dfab274dc527f7c044070a39f1f61 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 17 Mar 2017 12:06:20 -0700 Subject: [PATCH 11/14] Add test to check serialization behaviour of unknown fields, change conf update occurrence to once at beginning when populating offsets. --- .../execution/streaming/StreamExecution.scala | 16 ++++++++-------- .../execution/streaming/OffsetSeqLogSuite.scala | 5 +++++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 901646acfa1f..9cc022f9a27b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -275,7 +275,7 @@ class StreamExecution( reportTimeTaken("triggerExecution") { if (currentBatchId < 0) { // We'll do this initialization only once - populateStartOffsets() + populateStartOffsets(sparkSessionToRunBatches) logDebug(s"Stream running from $committedOffsets to $availableOffsets") } else { constructNextBatch() @@ -388,7 +388,7 @@ class StreamExecution( * - committedOffsets * - availableOffsets */ - private def populateStartOffsets(): Unit = { + private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { offsetLog.getLatest() match { case Some((batchId, nextOffsets)) => logInfo(s"Resuming streaming query, starting with batch $batchId") @@ -417,6 +417,12 @@ class StreamExecution( } } + // Reset confs to disallow change in number of partitions + sparkSessionToRunBatches.conf.set( + SQLConf.SHUFFLE_PARTITIONS.key, + offsetSeqMetadata.conf(SQLConf.SHUFFLE_PARTITIONS.key)) + sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + logDebug(s"Found possibly unprocessed offsets $availableOffsets " + s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") @@ -581,12 +587,6 @@ class StreamExecution( cd.dataType, cd.timeZoneId) } - // Reset confs to disallow change in number of partitions - sparkSessionToRunBatch.conf.set( - SQLConf.SHUFFLE_PARTITIONS.key, - offsetSeqMetadata.conf(SQLConf.SHUFFLE_PARTITIONS.key)) - sparkSessionToRunBatch.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") - reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionToRunBatch, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 49932ffdbd54..dc556322bedd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -56,6 +56,11 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { // All set assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) === OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}""")) + + // Drop unknown fields + assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) === + OffsetSeqMetadata( + s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}},"unknown":1""")) } test("OffsetSeqLog - serialization - deserialization") { From a2b32ce3ee536d1ea1d12ae4dcc3c561788e3dd2 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 17 Mar 2017 14:03:12 -0700 Subject: [PATCH 12/14] Refactor initialization of OffsetSeqMetadata. --- .../execution/streaming/StreamExecution.scala | 55 +++++++++---------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9cc022f9a27b..663fc5428622 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -118,9 +118,7 @@ class StreamExecution( } /** Metadata associated with the offset seq of a batch in the query. */ - protected var offsetSeqMetadata = - OffsetSeqMetadata(conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> - sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS).toString)) + protected var offsetSeqMetadata = OffsetSeqMetadata() override val id: UUID = UUID.fromString(streamMetadata.id) @@ -262,6 +260,11 @@ class StreamExecution( // Isolated spark session to run the batches with. val sparkSessionToRunBatches = sparkSession.cloneSession() + // Adaptive execution can change num shuffle partitions, disallow + sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, + conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> + sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS.key))) if (state.compareAndSet(INITIALIZING, ACTIVE)) { // Unblock `awaitInitialization` @@ -395,33 +398,25 @@ class StreamExecution( currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - // initialize metadata - val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS) - offsetSeqMetadata = { - if (nextOffsets.metadata.isEmpty) { - OffsetSeqMetadata( - batchWatermarkMs = 0, - batchTimestampMs = 0, - conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString)) - } else { - val metadata = nextOffsets.metadata.get - val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { - // For backward compatibility, if # partitions was not recorded in the offset log, - // then ensure it is not missing. The new value is picked up from the conf. - logWarning("Number of shuffle partitions from previous run not found in checkpoint. " - + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.") - shufflePartitionsSparkSession - }) - OffsetSeqMetadata(metadata.batchWatermarkMs, metadata.batchTimestampMs, - metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) - } - } - - // Reset confs to disallow change in number of partitions - sparkSessionToRunBatches.conf.set( - SQLConf.SHUFFLE_PARTITIONS.key, - offsetSeqMetadata.conf(SQLConf.SHUFFLE_PARTITIONS.key)) - sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + // update offset metadata + nextOffsets.metadata.foreach(metadata => { + val shufflePartitionsSparkSession: Int = + sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS) + val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { + // For backward compatibility, if # partitions was not recorded in the offset log, + // then ensure it is not missing. The new value is picked up from the conf. + logWarning("Number of shuffle partitions from previous run not found in checkpoint. " + + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.") + shufflePartitionsSparkSession + }) + offsetSeqMetadata = OffsetSeqMetadata( + metadata.batchWatermarkMs, + metadata.batchTimestampMs, + metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) + // Update conf with correct number of shuffle partitions + sparkSessionToRunBatches.conf.set( + SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString) + }) logDebug(s"Found possibly unprocessed offsets $availableOffsets " + s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") From 3abe0a0fcb60def5eb14697fa6f78eee318b77b0 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 17 Mar 2017 14:08:10 -0700 Subject: [PATCH 13/14] nits. --- .../spark/sql/execution/streaming/StreamExecution.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 663fc5428622..bd4bcd621a54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -399,7 +399,7 @@ class StreamExecution( availableOffsets = nextOffsets.toStreamProgress(sources) // update offset metadata - nextOffsets.metadata.foreach(metadata => { + nextOffsets.metadata.foreach { metadata => val shufflePartitionsSparkSession: Int = sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS) val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { @@ -410,13 +410,12 @@ class StreamExecution( shufflePartitionsSparkSession }) offsetSeqMetadata = OffsetSeqMetadata( - metadata.batchWatermarkMs, - metadata.batchTimestampMs, + metadata.batchWatermarkMs, metadata.batchTimestampMs, metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) // Update conf with correct number of shuffle partitions sparkSessionToRunBatches.conf.set( SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString) - }) + } logDebug(s"Found possibly unprocessed offsets $availableOffsets " + s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") From a0c71af4ad2dd99815f85e3b0c842419beeea67c Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Fri, 17 Mar 2017 15:13:32 -0700 Subject: [PATCH 14/14] Force disabling of adaptive execution instead of dying if it is enabled. --- .../sql/execution/streaming/StreamExecution.scala | 4 +++- .../spark/sql/streaming/StreamingQueryManager.scala | 8 ++++---- .../sql/streaming/StreamingQueryManagerSuite.scala | 10 ---------- 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bd4bcd621a54..40faddccc242 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -118,7 +118,9 @@ class StreamExecution( } /** Metadata associated with the offset seq of a batch in the query. */ - protected var offsetSeqMetadata = OffsetSeqMetadata() + protected var offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, + conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> + sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS).toString)) override val id: UUID = UUID.fromString(streamMetadata.id) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 38edb40dfb78..7810d9f6e964 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.execution.streaming._ @@ -40,7 +41,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} */ @Experimental @InterfaceStability.Evolving -class StreamingQueryManager private[sql] (sparkSession: SparkSession) { +class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Logging { private[sql] val stateStoreCoordinator = StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) @@ -234,9 +235,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { - throw new AnalysisException( - s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + - "is not supported in streaming DataFrames/Datasets") + logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + + "is not supported in streaming DataFrames/Datasets and will be disabled.") } new StreamingQueryWrapper(new StreamExecution( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index f05e9d1fda73..b49efa689023 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -239,16 +239,6 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } - test("SPARK-19268: Adaptive query execution should be disallowed") { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { - val e = intercept[AnalysisException] { - MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start() - } - assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) && - e.getMessage.contains("not supported")) - } - } - /** Run a body of code by defining a query on each dataset */ private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = { failAfter(streamingTimeout) {