Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner,
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
* plan incrementally. Possibly preserving state in between each execution.
*/
class IncrementalExecution(
class IncrementalExecution private[sql](
sparkSession: SparkSession,
logicalPlan: LogicalPlan,
outputMode: OutputMode,
checkpointLocation: String,
currentBatchId: Long)
val currentBatchId: Long)
Copy link
Contributor Author

@lw-lin lw-lin May 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's expose this to test suits

extends QueryExecution(sparkSession, logicalPlan) {

// TODO: make this always part of planning.
Expand All @@ -57,7 +57,7 @@ class IncrementalExecution(
case StateStoreSaveExec(keys, None,
UnaryExecNode(agg,
StateStoreRestoreExec(keys2, None, child))) =>
val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId - 1)
val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId)
operatorId += 1

StateStoreSaveExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class StreamExecution(
* processing is done. Thus, the Nth record in this log indicated data that is currently being
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
private val offsetLog =
private[sql] val offsetLog =
Copy link
Contributor Author

@lw-lin lw-lin May 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's expose this to test suits

new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets"))

/** Whether the query is currently active or not */
Expand Down Expand Up @@ -174,12 +174,21 @@ class StreamExecution(

// While active, repeatedly attempt to run batches.
SQLContext.setActive(sparkSession.wrapped)
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")

triggerExecutor.execute(() => {
if (isActive) {
if (dataAvailable) runBatch()
constructNextBatch()
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets()
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
}
if (dataAvailable) {
runBatch()
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
}
true
} else {
false
Expand Down Expand Up @@ -214,7 +223,7 @@ class StreamExecution(
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
logInfo(s"Resuming continuous query, starting with batch $batchId")
currentBatchId = batchId + 1
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
logDebug(s"Found possibly uncommitted offsets $availableOffsets")

Expand Down Expand Up @@ -285,7 +294,6 @@ class StreamExecution(
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
}
currentBatchId += 1
logInfo(s"Committed offsets for batch $currentBatchId.")
} else {
awaitBatchLock.lock()
Expand Down Expand Up @@ -352,7 +360,7 @@ class StreamExecution(

val nextBatch =
new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
sink.addBatch(currentBatchId - 1, nextBatch)
sink.addBatch(currentBatchId, nextBatch)

awaitBatchLock.lock()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
batches.flatten
}

def latestBatchId: Option[Int] = synchronized {
if (batches.size == 0) None else Some(batches.size - 1)
}

def lastBatch: Seq[Row] = synchronized { batches.last }

def toDebugString: String = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with SharedSQLContext {
}
}

// This would fail for now -- error is "Timed out waiting for stream"
// Root cause is that data generated in batch 0 may not get processed in batch 1
// Let's enable this after SPARK-14942: Reduce delay between batch construction and execution
ignore("minimize delay between batch construction and execution") {
test("minimize delay between batch construction and execution") {

// For each batch, we would retrieve new data's offsets and log them before we run the execution
// This checks whether the key of the offset log is the expected batch id
def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
s"offsetLog's latest should be $expectedId")

// For each batch, we would log the state change during the execution
// This checks whether the key of the state change log is the expected batch id
def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId == expectedId,
s"lastExecution's currentBatchId should be $expectedId")

// For each batch, we would log the sink change after the execution
// This checks whether the key of the sink change log is the expected batch id
def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == expectedId,
s"sink's lastBatchId should be $expectedId")

val inputData = MemoryStream[Int]
testStream(inputData.toDS())(
StartStream(ProcessingTime("10 seconds"), new ManualClock),

/* -- batch 0 ----------------------- */
AddData(inputData, 1),
AddData(inputData, 2),
AddData(inputData, 3),
// Add some data in batch 0
AddData(inputData, 1, 2, 3),
AdvanceManualClock(10 * 1000), // 10 seconds

/* -- batch 1 ----------------------- */
CheckAnswer(1, 2, 3))
// Check the results of batch 0
CheckAnswer(1, 2, 3),
CheckIncrementalExecutionCurrentBatchId(0),
CheckOffsetLogLatestBatchId(0),
CheckSinkLatestBatchId(0),
// Add some data in batch 1
AddData(inputData, 4, 5, 6),
AdvanceManualClock(10 * 1000),

/* -- batch _ ----------------------- */
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),

AdvanceManualClock(10 * 1000),
AdvanceManualClock(10 * 1000),
AdvanceManualClock(10 * 1000),

/* -- batch __ ---------------------- */
// Check the results of batch 1 again; this is to make sure that, when there's no new data,
// the currentId does not get logged (e.g. as 2) even if the clock has advanced many times
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),

/* Stop then restart the Stream */
StopStream,
StartStream(ProcessingTime("10 seconds"), new ManualClock),

/* -- batch 1 rerun ----------------- */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can avoid to rerun a batch that has already finished before stopping. How about storing the offsets after finishing a batch instead of storing it before running a batch? @marmbrus what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failure is the rare case, so I don't think its that bad to rerun if it reduces the complexity of the implementation.

// this batch 1 would re-run because the latest batch id logged in offset log is 1
AdvanceManualClock(10 * 1000),

/* -- batch 2 ----------------------- */
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
// Add some data in batch 2
AddData(inputData, 7, 8, 9),
AdvanceManualClock(10 * 1000),

/* -- batch 3 ----------------------- */
// Check the results of batch 2
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
CheckIncrementalExecutionCurrentBatchId(2),
CheckOffsetLogLatestBatchId(2),
CheckSinkLatestBatchId(2))
}
}

Expand Down