Skip to content

Commit 95f4fba

Browse files
lw-linzsxwing
authored andcommitted
[SPARK-14942][SQL][STREAMING] Reduce delay between batch construction and execution
## Problem Currently in `StreamExecution`, [we first run the batch, then construct the next](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L165): ```scala if (dataAvailable) runBatch() constructNextBatch() ``` This is good when we run batches ASAP, where data would get processed in the **very next batch**: ![1](https://cloud.githubusercontent.com/assets/15843379/14779964/2786e698-0b0d-11e6-9d2c-bb41513488b2.png) However, when we run batches at trigger like `ProcessTime("1 minute")`, data - such as _y_ below - may not get processed in the very next batch i.e. _batch 1_, but in _batch 2_: ![2](https://cloud.githubusercontent.com/assets/15843379/14779818/6f3bb064-0b0c-11e6-9f16-c1ce4897186b.png) ## What changes were proposed in this pull request? This patch reverses the order of `constructNextBatch()` and `runBatch()`. After this patch, data would get processed in the **very next batch**, i.e. _batch 1_: ![3](https://cloud.githubusercontent.com/assets/15843379/14779816/6f36ee62-0b0c-11e6-9e53-bc8397fade18.png) In addition, this patch alters when we do `currentBatchId += 1`: let's do that when the processing of the current batch's data is completed, so we won't bother passing `currentBatchId + 1` or `currentBatchId - 1` to states or sinks. ## How was this patch tested? New added test case. Also this should be covered by existing test suits, e.g. stress tests and others. Author: Liwei Lin <[email protected]> Closes #12725 from lw-lin/construct-before-run-3.
1 parent fabc8e5 commit 95f4fba

File tree

4 files changed

+99
-19
lines changed

4 files changed

+99
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner,
2727
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
2828
* plan incrementally. Possibly preserving state in between each execution.
2929
*/
30-
class IncrementalExecution(
30+
class IncrementalExecution private[sql](
3131
sparkSession: SparkSession,
3232
logicalPlan: LogicalPlan,
3333
outputMode: OutputMode,
3434
checkpointLocation: String,
35-
currentBatchId: Long)
35+
val currentBatchId: Long)
3636
extends QueryExecution(sparkSession, logicalPlan) {
3737

3838
// TODO: make this always part of planning.
@@ -57,7 +57,7 @@ class IncrementalExecution(
5757
case StateStoreSaveExec(keys, None,
5858
UnaryExecNode(agg,
5959
StateStoreRestoreExec(keys2, None, child))) =>
60-
val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId - 1)
60+
val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId)
6161
operatorId += 1
6262

6363
StateStoreSaveExec(

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class StreamExecution(
122122
* processing is done. Thus, the Nth record in this log indicated data that is currently being
123123
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
124124
*/
125-
private val offsetLog =
125+
private[sql] val offsetLog =
126126
new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets"))
127127

128128
/** Whether the query is currently active or not */
@@ -174,12 +174,21 @@ class StreamExecution(
174174

175175
// While active, repeatedly attempt to run batches.
176176
SQLContext.setActive(sparkSession.wrapped)
177-
populateStartOffsets()
178-
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
177+
179178
triggerExecutor.execute(() => {
180179
if (isActive) {
181-
if (dataAvailable) runBatch()
182-
constructNextBatch()
180+
if (currentBatchId < 0) {
181+
// We'll do this initialization only once
182+
populateStartOffsets()
183+
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
184+
} else {
185+
constructNextBatch()
186+
}
187+
if (dataAvailable) {
188+
runBatch()
189+
// We'll increase currentBatchId after we complete processing current batch's data
190+
currentBatchId += 1
191+
}
183192
true
184193
} else {
185194
false
@@ -214,7 +223,7 @@ class StreamExecution(
214223
offsetLog.getLatest() match {
215224
case Some((batchId, nextOffsets)) =>
216225
logInfo(s"Resuming continuous query, starting with batch $batchId")
217-
currentBatchId = batchId + 1
226+
currentBatchId = batchId
218227
availableOffsets = nextOffsets.toStreamProgress(sources)
219228
logDebug(s"Found possibly uncommitted offsets $availableOffsets")
220229

@@ -285,7 +294,6 @@ class StreamExecution(
285294
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
286295
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
287296
}
288-
currentBatchId += 1
289297
logInfo(s"Committed offsets for batch $currentBatchId.")
290298
} else {
291299
awaitBatchLock.lock()
@@ -352,7 +360,7 @@ class StreamExecution(
352360

353361
val nextBatch =
354362
new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
355-
sink.addBatch(currentBatchId - 1, nextBatch)
363+
sink.addBatch(currentBatchId, nextBatch)
356364

357365
awaitBatchLock.lock()
358366
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
124124
batches.flatten
125125
}
126126

127+
def latestBatchId: Option[Int] = synchronized {
128+
if (batches.size == 0) None else Some(batches.size - 1)
129+
}
130+
127131
def lastBatch: Seq[Row] = synchronized { batches.last }
128132

129133
def toDebugString: String = synchronized {

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with SharedSQLContext {
137137
}
138138
}
139139

140-
// This would fail for now -- error is "Timed out waiting for stream"
141-
// Root cause is that data generated in batch 0 may not get processed in batch 1
142-
// Let's enable this after SPARK-14942: Reduce delay between batch construction and execution
143-
ignore("minimize delay between batch construction and execution") {
140+
test("minimize delay between batch construction and execution") {
141+
142+
// For each batch, we would retrieve new data's offsets and log them before we run the execution
143+
// This checks whether the key of the offset log is the expected batch id
144+
def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery =
145+
AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
146+
s"offsetLog's latest should be $expectedId")
147+
148+
// For each batch, we would log the state change during the execution
149+
// This checks whether the key of the state change log is the expected batch id
150+
def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
151+
AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId == expectedId,
152+
s"lastExecution's currentBatchId should be $expectedId")
153+
154+
// For each batch, we would log the sink change after the execution
155+
// This checks whether the key of the sink change log is the expected batch id
156+
def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery =
157+
AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == expectedId,
158+
s"sink's lastBatchId should be $expectedId")
159+
144160
val inputData = MemoryStream[Int]
145161
testStream(inputData.toDS())(
146162
StartStream(ProcessingTime("10 seconds"), new ManualClock),
163+
147164
/* -- batch 0 ----------------------- */
148-
AddData(inputData, 1),
149-
AddData(inputData, 2),
150-
AddData(inputData, 3),
165+
// Add some data in batch 0
166+
AddData(inputData, 1, 2, 3),
151167
AdvanceManualClock(10 * 1000), // 10 seconds
168+
152169
/* -- batch 1 ----------------------- */
153-
CheckAnswer(1, 2, 3))
170+
// Check the results of batch 0
171+
CheckAnswer(1, 2, 3),
172+
CheckIncrementalExecutionCurrentBatchId(0),
173+
CheckOffsetLogLatestBatchId(0),
174+
CheckSinkLatestBatchId(0),
175+
// Add some data in batch 1
176+
AddData(inputData, 4, 5, 6),
177+
AdvanceManualClock(10 * 1000),
178+
179+
/* -- batch _ ----------------------- */
180+
// Check the results of batch 1
181+
CheckAnswer(1, 2, 3, 4, 5, 6),
182+
CheckIncrementalExecutionCurrentBatchId(1),
183+
CheckOffsetLogLatestBatchId(1),
184+
CheckSinkLatestBatchId(1),
185+
186+
AdvanceManualClock(10 * 1000),
187+
AdvanceManualClock(10 * 1000),
188+
AdvanceManualClock(10 * 1000),
189+
190+
/* -- batch __ ---------------------- */
191+
// Check the results of batch 1 again; this is to make sure that, when there's no new data,
192+
// the currentId does not get logged (e.g. as 2) even if the clock has advanced many times
193+
CheckAnswer(1, 2, 3, 4, 5, 6),
194+
CheckIncrementalExecutionCurrentBatchId(1),
195+
CheckOffsetLogLatestBatchId(1),
196+
CheckSinkLatestBatchId(1),
197+
198+
/* Stop then restart the Stream */
199+
StopStream,
200+
StartStream(ProcessingTime("10 seconds"), new ManualClock),
201+
202+
/* -- batch 1 rerun ----------------- */
203+
// this batch 1 would re-run because the latest batch id logged in offset log is 1
204+
AdvanceManualClock(10 * 1000),
205+
206+
/* -- batch 2 ----------------------- */
207+
// Check the results of batch 1
208+
CheckAnswer(1, 2, 3, 4, 5, 6),
209+
CheckIncrementalExecutionCurrentBatchId(1),
210+
CheckOffsetLogLatestBatchId(1),
211+
CheckSinkLatestBatchId(1),
212+
// Add some data in batch 2
213+
AddData(inputData, 7, 8, 9),
214+
AdvanceManualClock(10 * 1000),
215+
216+
/* -- batch 3 ----------------------- */
217+
// Check the results of batch 2
218+
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
219+
CheckIncrementalExecutionCurrentBatchId(2),
220+
CheckOffsetLogLatestBatchId(2),
221+
CheckSinkLatestBatchId(2))
154222
}
155223
}
156224

0 commit comments

Comments
 (0)