Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.io.{InputStream, OutputStream}
import java.nio.charset.StandardCharsets._

import scala.io.{Source => IOSource}

import org.apache.spark.sql.SparkSession

/**
* Used to write log files that represent commit points in structured streaming.
* A log file will be written immediately after the successful completion of a
* batch, and before processing the next batch. Here is an execution summary:
* - trigger batch 1
* - obtain batch 1 offsets and write to offset log
* - process batch 1
* - write batch 1 to commit log
* - trigger batch 2
* - obtain bactch 2 offsets and write to offset log
* - process batch 2
* - write batch 2 to commit log
* ....
*
* The current format of the commit log is:
* line 1: version
* line 2: metadata (optional json string)
*/
class OffsetCommitLog(sparkSession: SparkSession, path: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala doc please

extends HDFSMetadataLog[Option[String]](sparkSession, path) {

override protected def deserialize(in: InputStream): Option[String] = {
// called inside a try-finally where the underlying stream is closed in the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you say "incomplete log file in the offset commit log"

}
val version = lines.next().trim.toInt
if (OffsetCommitLog.VERSION < version) {
throw new IllegalStateException(s"Incompatible log file version ${version}")
}
// read metadata
lines.next().trim match {
case OffsetCommitLog.SERIALIZED_VOID => None
case metadata => Some(metadata)
}
}

override protected def serialize(metadata: Option[String], out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(OffsetCommitLog.VERSION.toString.getBytes(UTF_8))
out.write('\n')

// write metadata or void
out.write(metadata.getOrElse(OffsetCommitLog.SERIALIZED_VOID).getBytes(UTF_8))
}
}

object OffsetCommitLog {
private val VERSION = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets be consistent with other logs in writing "v1" for version and not "1"

private val SERIALIZED_VOID = "{}"
}

Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class StreamExecution(

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case _: OneTime => OneTimeExecutor()
}

/** Defines the internal state of execution */
Expand Down Expand Up @@ -206,6 +207,12 @@ class StreamExecution(
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

/**
* A log that records the committed batch ids. This is used to check if a batch was committed
* on restart, instead of (possibly) re-running the previous batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "if a batch was committed on restart" sounds like batches are supposed to get committed only on restart. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, keep the comment generic such that it does mean that not only the previous batch will be re-run. in future we could be rerun multiple batches.

*/
val commitLog = new OffsetCommitLog(sparkSession, checkpointFile("commits"))

/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state.get != INITIALIZING

Expand Down Expand Up @@ -373,22 +380,66 @@ class StreamExecution(
* - currentBatchId
* - committedOffsets
* - availableOffsets
* The basic structure of this method is as follows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really like this explanation.

*
* Identify (from the offset log) the offsets used to run the last batch
* IF a last batch exists THEN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a last batch" is grammatically weird .. isnt it?

* Set the next batch to that last batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be "set the next batch to be executed as the last recovered batch"

* Check the commit log to see which batch was committed last
* IF the last batch was committed THEN
* Call getBatch using the last batch start and end offsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the reason regarding why we do this.

* Setup for a new batch i.e., start = last batch end, and identify new end
* DONE
* ELSE
* Identify a brand new batch
* DONE
*/
private def populateStartOffsets(): Unit = {
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename batchId to something more descriptive so that we can semantically differentiate it from the currentBatchId?

logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this debug log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. this does not make sense here any more. But please add similar logging of recovered metadata later, where you have logged the start and available offsets.


offsetLog.get(batchId - 1).foreach {
case lastOffsets =>
if (batchId > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we introducing this condition?

// We have committed at least one batch
offsetLog.get(batchId - 1).foreach { lastOffsets =>
committedOffsets = lastOffsets.toStreamProgress(sources)
logDebug(s"Resuming with committed offsets: $committedOffsets")
}
}
/* identify the current batch id: if commit log indicates we successfully processed the
* latest batch id in the offset log, then we can safely move to the next batch
* i.e., committedBatchId + 1
*/
currentBatchId = commitLog.getLatest() match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind adding a few more comments here. This logic is getting very dense. I think that its doing something like the following:

  • finding the max committed batch
  • checking to see if there is a started but uncommitted batch
  • otherwise constructing a new batch

case Some((committedBatchId, _))
if batchId == committedBatchId => committedBatchId + 1
case _ => batchId
}
if (batchId < currentBatchId) {
Copy link
Contributor

@tdas tdas Mar 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above match-case, and this if statement essentially are the same semantic conditions - both will be true or both will be false. So might as well merge these two into a single if condition.

val completedBatchId = completedLog.getLatest()
if (completedBatchId.isDefined && completedBatchId.get == batchId) {
  // call source.getBatch
  currentBatchId = completedBatchId.get + 1  
} else {  
  // warn if completedBatchId.get < batchId - 1
  currentBatchId = batchId
}

/* The last batch was successfully committed, so we can safely process a
* new next batch but first:
* Make a call a call to getBatch using the offsets from previous batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a call" is present twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also when referring to getBatch, use source.getBatch to be more clear.

* because certain sources (e.g., KafkaSource) assume on restart the last
* batch will be executed before getOffset is called again.
*/
availableOffsets.foreach {
case (source, end)
if committedOffsets.get(source).map(_ != end).getOrElse(true) =>
val start = committedOffsets.get(source)
logDebug(s"Initializing offset retrieval from $source " +
s"at start $start end $end")
source.getBatch(start, end)
case _ =>
}
// Move committed offsets to the last offsets of the last batch
offsetLog.get(currentBatchId - 1).foreach { lastOffsets =>
committedOffsets = lastOffsets.toStreamProgress(sources)
}
// Construct a new batch be recomputing availableOffsets
constructNextBatch()
}
logDebug(s"Resuming with committed offsets $committedOffsets " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also print the batch id.

s"and available offsets $availableOffsets")
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
Expand Down Expand Up @@ -559,6 +610,8 @@ class StreamExecution(
reportTimeTaken("addBatch") {
sink.addBatch(currentBatchId, nextBatch)
}
logDebug(s"Commit log write ${currentBatchId}")
commitLog.add(currentBatchId, None)

awaitBatchLock.lock()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ trait TriggerExecutor {
def execute(batchRunner: () => Boolean): Unit
}

/**
* A trigger executor that runs a single batch only, then terminates.
*/
case class OneTimeExecutor() extends TriggerExecutor {

/**
* Execute a single batch using `batchRunner`.
*/
override def execute(batchRunner: () => Boolean): Unit = batchRunner()
}

/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,51 @@ import org.apache.spark.unsafe.types.CalendarInterval
@InterfaceStability.Evolving
sealed trait Trigger

/**
* :: Experimental ::
* A trigger that runs a query once then terminates
*
* Scala Example:
* {{{
* df.write.trigger(OneTime)
* }}}
*
* Java Example:
* {{{
* df.write.trigger(OneTime.create())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. this doesnt. please fix them.

* }}}
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
case class OneTime() extends Trigger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need python APIs as well.


/**
* :: Experimental ::
* Used to create [[OneTime]] triggers for [[StreamingQuery]]s.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain that does one time mean.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explanation of OneTime trigger is given in the OneTime class definition. Does that suffice, or should the explanation be reiterated in the object definition?

If it should be reiterated, then I can do the same with ProcessingTime.

*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
object OneTime {

/**
* Create a [[OneTime]] trigger.
*
* Example:
* {{{
* df.write.trigger(OneTime.create())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df.writeStream
(also fix if this is present anywhere else)

* }}}
*
* @since 2.0.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix version to 2.2

*/
def create(): OneTime = {
apply()
}
}

/**
* :: Experimental ::
* A trigger that runs a query periodically based on the processing time. If `interval` is 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
AddData(inputData, 25), // Evict items less than previous watermark.
CheckLastBatch((10, 5)),
StopStream,
AssertOnQuery { q => // clear the sink
AssertOnQuery { q => // purge commit and clear the sink
val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
q.commitLog.purge(commit)
q.sink.asInstanceOf[MemorySink].clear()
true
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ class StreamSuite extends StreamTest {
AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
s"offsetLog's latest should be $expectedId")

// Check the latest batchid in the commit log
def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.commitLog.getLatest().get._1 == expectedId,
s"commitLog's latest should be $expectedId")

// Ensure that there has not been an incremental execution after restart
def CheckNoIncrementalExecutionCurrentBatchId(): AssertOnQuery =
AssertOnQuery(_.lastExecution == null, s"lastExecution not expected to run")

// For each batch, we would log the state change during the execution
// This checks whether the key of the state change log is the expected batch id
def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
Expand All @@ -177,6 +186,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 0
CheckAnswer(1, 2, 3),
CheckIncrementalExecutionCurrentBatchId(0),
CheckCommitLogLatestBatchId(0),
CheckOffsetLogLatestBatchId(0),
CheckSinkLatestBatchId(0),
// Add some data in batch 1
Expand All @@ -187,6 +197,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),

Expand All @@ -199,21 +210,23 @@ class StreamSuite extends StreamTest {
// the currentId does not get logged (e.g. as 2) even if the clock has advanced many times
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),

/* Stop then restart the Stream */
StopStream,
StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),

/* -- batch 1 rerun ----------------- */
// this batch 1 would re-run because the latest batch id logged in offset log is 1
/* -- batch 1 no rerun ----------------- */
// batch 1 would not re-run because the latest batch id logged in commit log is 1
AdvanceManualClock(10 * 1000),
CheckNoIncrementalExecutionCurrentBatchId(),

/* -- batch 2 ----------------------- */
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
// Add some data in batch 2
Expand All @@ -224,6 +237,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 2
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
CheckIncrementalExecutionCurrentBatchId(2),
CheckCommitLogLatestBatchId(2),
CheckOffsetLogLatestBatchId(2),
CheckSinkLatestBatchId(2))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
true
},
StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
CheckLastBatch((20L, 1), (85L, 1)),
// The commit log should ensure that we do not run another batch
CheckLastBatch(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as below.

AssertOnQuery { q =>
clock.getTimeMillis() == 90000L
},
Expand Down Expand Up @@ -327,7 +328,8 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
true
},
StartStream(ProcessingTime("10 day"), triggerClock = clock),
CheckLastBatch((20L, 1), (85L, 1)),
// Commit log should prevent batch from running again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still check whether re-executing the last batch uses the watermark from the offset log correctly or not.

So i suggest deleting the commit log to force execution of the last batch.

CheckLastBatch(),

// advance clock to 100 days, should retain keys >= 90
AddData(inputData, 85L, 90L, 100L, 105L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

// Termination event generated with exception message when stopped with error
StartStream(ProcessingTime(100), triggerClock = clock),
AdvanceManualClock(100), // advance clock to ensure completed initial trigger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

AddData(inputData, 0),
AdvanceManualClock(100),
AdvanceManualClock(100), // process bad data
ExpectFailure[SparkException](),
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {
Expand Down
Loading