Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.io.{InputStream, OutputStream}
import java.nio.charset.StandardCharsets._

import scala.io.{Source => IOSource}

import org.apache.spark.sql.SparkSession

class OffsetCommitLog(sparkSession: SparkSession, path: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala doc please

extends HDFSMetadataLog[Option[String]](sparkSession, path) {

override protected def deserialize(in: InputStream): Option[String] = {
// called inside a try-finally where the underlying stream is closed in the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you say "incomplete log file in the offset commit log"

}
val version = lines.next()
if (version != OffsetCommitLog.VERSION) {
throw new IllegalStateException(s"Unknown log version: ${version}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure the error here is consistent with the work being done in #17070

}
// read metadata
lines.next().trim match {
case OffsetCommitLog.SERIALIZED_VOID => None
case metadata => Some(metadata)
}
}

override protected def serialize(metadata: Option[String], out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(OffsetCommitLog.VERSION.getBytes(UTF_8))
out.write('\n')

// write metadata or void
out.write(metadata.getOrElse(OffsetCommitLog.SERIALIZED_VOID).getBytes(UTF_8))
}
}

object OffsetCommitLog {
private val VERSION = "v1"
private val SERIALIZED_VOID = "-"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make this an empty json object? {}

}

Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class StreamExecution(

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case _ @ OneTime => OneTimeExecutor()
}

/** Defines the internal state of execution */
Expand Down Expand Up @@ -206,6 +207,12 @@ class StreamExecution(
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

/**
* A log that records the committed batch ids. This is used to check if a batch was committed
* on restart, instead of (possibly) re-running the previous batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "if a batch was committed on restart" sounds like batches are supposed to get committed only on restart. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, keep the comment generic such that it does mean that not only the previous batch will be re-run. in future we could be rerun multiple batches.

*/
val commitLog = new OffsetCommitLog(sparkSession, checkpointFile("commits"))

/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state.get != INITIALIZING

Expand Down Expand Up @@ -284,6 +291,7 @@ class StreamExecution(
finishTrigger(dataAvailable)
if (dataAvailable) {
// We'll increase currentBatchId after we complete processing current batch's data
commitLog.add(currentBatchId, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make this async?

currentBatchId += 1
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
Expand Down Expand Up @@ -377,17 +385,25 @@ class StreamExecution(
private def populateStartOffsets(): Unit = {
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename batchId to something more descriptive so that we can semantically differentiate it from the currentBatchId?

logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this debug log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. this does not make sense here any more. But please add similar logging of recovered metadata later, where you have logged the start and available offsets.


offsetLog.get(batchId - 1).foreach {
case lastOffsets =>
committedOffsets = lastOffsets.toStreamProgress(sources)
logDebug(s"Resuming with committed offsets: $committedOffsets")
currentBatchId = commitLog.getLatest() match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind adding a few more comments here. This logic is getting very dense. I think that its doing something like the following:

  • finding the max committed batch
  • checking to see if there is a started but uncommitted batch
  • otherwise constructing a new batch

case Some((committedBatchId, metadata)) => committedBatchId + 1
case None => batchId // can only happen if batchId is 0
}
if (currentBatchId > 0) {
offsetLog.get(currentBatchId - 1).foreach {
case lastOffsets =>
committedOffsets = lastOffsets.toStreamProgress(sources)
logDebug(s"Resuming with committed offsets: $committedOffsets")
}
}
logInfo(s"Resuming streaming query, starting with batch $currentBatchId")
if (currentBatchId == batchId) {
availableOffsets = nextOffsets.toStreamProgress(sources)
offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
} else {
constructNextBatch()
}
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ trait TriggerExecutor {
def execute(batchRunner: () => Boolean): Unit
}

/**
* A trigger executor that runs a single batch only, then terminates.
*/
case class OneTimeExecutor() extends TriggerExecutor {

/**
* Execute a single batch using `batchRunner`.
*/
override def execute(batchRunner: () => Boolean): Unit = batchRunner()
}

/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ import org.apache.spark.unsafe.types.CalendarInterval
@InterfaceStability.Evolving
sealed trait Trigger

/**
* :: Experimental ::
* A trigger that runs a query once then terminates
*
* Scala Example:
* {{{
* df.write.trigger(OneTime)
* }}}
*
* Java Example:
* {{{
* df.write.trigger(OneTime.create())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. this doesnt. please fix them.

* }}}
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
object OneTime extends Trigger

/**
* :: Experimental ::
* A trigger that runs a query periodically based on the processing time. If `interval` is 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ class StreamSuite extends StreamTest {
AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
s"offsetLog's latest should be $expectedId")

// Check the latest batchid in the commit log
def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.commitLog.getLatest().get._1 == expectedId,
s"commitLog's latest should be $expectedId")

// Ensure that there has not been an incremental execution after restart
def CheckNoIncrementalExecutionCurrentBatchId(): AssertOnQuery =
AssertOnQuery(_.lastExecution == null, s"lastExecution not expected to run")

// For each batch, we would log the state change during the execution
// This checks whether the key of the state change log is the expected batch id
def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
Expand All @@ -177,6 +186,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 0
CheckAnswer(1, 2, 3),
CheckIncrementalExecutionCurrentBatchId(0),
CheckCommitLogLatestBatchId(0),
CheckOffsetLogLatestBatchId(0),
CheckSinkLatestBatchId(0),
// Add some data in batch 1
Expand All @@ -187,6 +197,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),

Expand All @@ -199,21 +210,23 @@ class StreamSuite extends StreamTest {
// the currentId does not get logged (e.g. as 2) even if the clock has advanced many times
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),

/* Stop then restart the Stream */
StopStream,
StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),

/* -- batch 1 rerun ----------------- */
// this batch 1 would re-run because the latest batch id logged in offset log is 1
/* -- batch 1 no rerun ----------------- */
// batch 1 would not re-run because the latest batch id logged in commit log is 1
AdvanceManualClock(10 * 1000),
CheckNoIncrementalExecutionCurrentBatchId(),

/* -- batch 2 ----------------------- */
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
// Add some data in batch 2
Expand All @@ -224,6 +237,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 2
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
CheckIncrementalExecutionCurrentBatchId(2),
CheckCommitLogLatestBatchId(2),
CheckOffsetLogLatestBatchId(2),
CheckSinkLatestBatchId(2))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
true
},
StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
CheckLastBatch((20L, 1), (85L, 1)),
// The commit log should ensure that we do not run another batch
CheckLastBatch(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as below.

AssertOnQuery { q =>
clock.getTimeMillis() == 90000L
},
Expand Down Expand Up @@ -327,7 +328,8 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
true
},
StartStream(ProcessingTime("10 day"), triggerClock = clock),
CheckLastBatch((20L, 1), (85L, 1)),
// Commit log should prevent batch from running again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still check whether re-executing the last batch uses the watermark from the offset log correctly or not.

So i suggest deleting the commit log to force execution of the last batch.

CheckLastBatch(),

// advance clock to 100 days, should retain keys >= 90
AddData(inputData, 85L, 90L, 100L, 105L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

// Termination event generated with exception message when stopped with error
StartStream(ProcessingTime(100), triggerClock = clock),
AdvanceManualClock(100), // advance clock to ensure completed initial trigger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

AddData(inputData, 0),
AdvanceManualClock(100),
AdvanceManualClock(100), // process bad data
ExpectFailure[SparkException](),
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,51 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

testQuietly("OneTime trigger, commit log, and exception") {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map { 6 / _}

testStream(mapped)(
AssertOnQuery(_.isActive === true),
StopStream,
AddData(inputData, 1, 2),
StartStream(trigger = OneTime),
CheckAnswer(6, 3),
AssertOnQuery(_.isActive === false),
StopStream, // clears out StreamTest state
AssertOnQuery { q =>
// both commit log and offset log contain the same (latest) batch id
q.commitLog.getLatest().map(_._1).getOrElse(-1L) ==
q.offsetLog.getLatest().map(_._1).getOrElse(-2L)
},
AssertOnQuery { q =>
// blow away commit log and sink result
q.commitLog.purge(1)
q.sink.asInstanceOf[MemorySink].clear()
true
},
StartStream(trigger = OneTime),
CheckAnswer(6, 3), // ensure we fall back to offset log and reprocess batch
AssertOnQuery(_.isActive === false),
StopStream,
AddData(inputData, 3),
StartStream(trigger = OneTime),
CheckLastBatch(2), // commit log should be back in place
AssertOnQuery(_.isActive === false),
StopStream,
AddData(inputData, 0),
StartStream(OneTime),
ExpectFailure[SparkException](),
AssertOnQuery(_.isActive === false),
AssertOnQuery(q => {
q.exception.get.startOffset ===
q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString &&
q.exception.get.endOffset ===
q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString
}, "incorrect start offset or end offset on exception")
)
}

testQuietly("status, lastProgress, and recentProgress") {
import StreamingQuerySuite._
clock = new StreamManualClock
Expand Down Expand Up @@ -306,8 +351,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi

// Test status and progress after query terminated with error
StartStream(ProcessingTime(100), triggerClock = clock),
AdvanceManualClock(100), // ensure initial trigger completes before AddData
AddData(inputData, 0),
AdvanceManualClock(100),
AdvanceManualClock(100), // allow another trigger
ExpectFailure[SparkException](),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
Expand Down