Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
Expand Down
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.<init>$default$11"),

// [SPARK-17161] Removing Python-friendly constructors not needed
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"),

// [SPARK-19876] Add one time trigger, and improve Trigger APIs
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.streaming.Trigger"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.streaming.ProcessingTime")
)

// Exclude rules for 2.1.x
Expand Down
63 changes: 17 additions & 46 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,44 +277,6 @@ def resetTerminated(self):
self._jsqm.resetTerminated()


class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.

.. note:: Experimental

.. versionadded:: 2.0
"""

__metaclass__ = ABCMeta

@abstractmethod
def _to_java_trigger(self, sqlContext):
"""Internal method to construct the trigger on the jvm.
"""
pass


class ProcessingTime(Trigger):
"""A trigger that runs a query periodically based on the processing time. If `interval` is 0,
the query will run as fast as possible.

The interval should be given as a string, e.g. '2 seconds', '5 minutes', ...

.. note:: Experimental

.. versionadded:: 2.0
"""

def __init__(self, interval):
if type(interval) != str or len(interval.strip()) == 0:
raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.")
self.interval = interval

def _to_java_trigger(self, sqlContext):
return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
self.interval)


class DataStreamReader(OptionUtils):
"""
Interface used to load a streaming :class:`DataFrame` from external storage systems
Expand Down Expand Up @@ -788,7 +750,7 @@ def queryName(self, queryName):

@keyword_only
@since(2.0)
def trigger(self, processingTime=None):
def trigger(self, processingTime=None, once=None):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.

Expand All @@ -798,17 +760,26 @@ def trigger(self, processingTime=None):

>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True)
"""
from pyspark.sql.streaming import ProcessingTime
trigger = None
jTrigger = None
if processingTime is not None:
if once is not None:
raise ValueError('Multiple triggers not allowed.')
if type(processingTime) != str or len(processingTime.strip()) == 0:
raise ValueError('The processing time must be a non empty string. Got: %s' %
raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
processingTime)
trigger = ProcessingTime(processingTime)
if trigger is None:
raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
interval = processingTime.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
interval)
elif once is not None:
if once is not True:
raise ValueError('Value for once must be True. Got: %s' % once)
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()
else:
raise ValueError('No trigger provided')
self._jwrite = self._jwrite.trigger(jTrigger)
return self

@ignore_unicode_prefix
Expand Down
17 changes: 15 additions & 2 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,13 +1242,26 @@ def test_save_and_load_builder(self):

shutil.rmtree(tmpPath)

def test_stream_trigger_takes_keyword_args(self):
def test_stream_trigger(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')

# Should take at least one arg
try:
df.writeStream.trigger()
except ValueError:
pass

# Should not take multiple args
try:
df.writeStream.trigger(once=True, processingTime='5 seconds')
except ValueError:
pass

# Should take only keyword args
try:
df.writeStream.trigger('5 seconds')
self.fail("Should have thrown an exception")
except TypeError:
# should throw error
pass

def test_stream_read_options(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.io.{InputStream, OutputStream}
import java.nio.charset.StandardCharsets._

import scala.io.{Source => IOSource}

import org.apache.spark.sql.SparkSession

/**
* Used to write log files that represent batch commit points in structured streaming.
* A commit log file will be written immediately after the successful completion of a
* batch, and before processing the next batch. Here is an execution summary:
* - trigger batch 1
* - obtain batch 1 offsets and write to offset log
* - process batch 1
* - write batch 1 to completion log
* - trigger batch 2
* - obtain bactch 2 offsets and write to offset log
* - process batch 2
* - write batch 2 to completion log
* ....
*
* The current format of the batch completion log is:
* line 1: version
* line 2: metadata (optional json string)
*/
class BatchCommitLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[String](sparkSession, path) {

override protected def deserialize(in: InputStream): String = {
// called inside a try-finally where the underlying stream is closed in the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file in the offset commit log")
}
parseVersion(lines.next().trim, BatchCommitLog.VERSION)
// read metadata
lines.next().trim match {
case BatchCommitLog.SERIALIZED_VOID => null
case metadata => metadata
}
}

override protected def serialize(metadata: String, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8))
out.write('\n')

// write metadata or void
out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata)
.getBytes(UTF_8))
}
}

object BatchCommitLog {
private val VERSION = 1
private val SERIALIZED_VOID = "{}"
}

Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class StreamExecution(

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}

/** Defines the internal state of execution */
Expand Down Expand Up @@ -209,6 +211,13 @@ class StreamExecution(
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

/**
* A log that records the batch ids that have completed. This is used to check if a batch was
* fully processed, and its output was committed to the sink, hence no need to process it again.
* This is used (for instance) during restart, to help identify which batch to run next.
*/
val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits"))

/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state.get != INITIALIZING

Expand Down Expand Up @@ -291,10 +300,13 @@ class StreamExecution(
runBatch(sparkSessionToRunBatches)
}
}

// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
committedOffsets ++= availableOffsets
batchCommitLog.add(currentBatchId, null)
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
Expand All @@ -306,9 +318,6 @@ class StreamExecution(
} else {
false
}

// Update committed offsets.
committedOffsets ++= availableOffsets
updateStatusMessage("Waiting for next trigger")
continueToRun
})
Expand Down Expand Up @@ -392,13 +401,33 @@ class StreamExecution(
* - currentBatchId
* - committedOffsets
* - availableOffsets
* The basic structure of this method is as follows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really like this explanation.

*
* Identify (from the offset log) the offsets used to run the last batch
* IF last batch exists THEN
* Set the next batch to be executed as the last recovered batch
* Check the commit log to see which batch was committed last
* IF the last batch was committed THEN
* Call getBatch using the last batch start and end offsets
* // ^^^^ above line is needed since some sources assume last batch always re-executes
* Setup for a new batch i.e., start = last batch end, and identify new end
* DONE
* ELSE
* Identify a brand new batch
* DONE
*/
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
case Some((latestBatchId, nextOffsets)) =>
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId =>
committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}

// update offset metadata
nextOffsets.metadata.foreach { metadata =>
Expand All @@ -419,14 +448,37 @@ class StreamExecution(
SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString)
}

logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this debug log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. this does not make sense here any more. But please add similar logging of recovered metadata later, where you have logged the start and available offsets.


offsetLog.get(batchId - 1).foreach {
case lastOffsets =>
committedOffsets = lastOffsets.toStreamProgress(sources)
logDebug(s"Resuming with committed offsets: $committedOffsets")
/* identify the current batch id: if commit log indicates we successfully processed the
* latest batch id in the offset log, then we can safely move to the next batch
* i.e., committedBatchId + 1 */
batchCommitLog.getLatest() match {
case Some((latestCommittedBatchId, _)) =>
if (latestBatchId == latestCommittedBatchId) {
/* The last batch was successfully committed, so we can safely process a
* new next batch but first:
* Make a call to getBatch using the offsets from previous batch.
* because certain sources (e.g., KafkaSource) assume on restart the last
* batch will be executed before getOffset is called again. */
availableOffsets.foreach { ao: (Source, Offset) =>
val (source, end) = ao
if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
val start = committedOffsets.get(source)
source.getBatch(start, end)
}
}
currentBatchId = latestCommittedBatchId + 1
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
constructNextBatch()
} else if (latestCommittedBatchId < latestBatchId - 1) {
logWarning(s"Batch completion log latest batch id is " +
s"${latestCommittedBatchId}, which is not trailing " +
s"batchid $latestBatchId by one")
}
case None => logInfo("no commit log present")
}
logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
s"$committedOffsets and available offsets $availableOffsets")
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
Expand Down Expand Up @@ -523,6 +575,7 @@ class StreamExecution(
// Note that purge is exclusive, i.e. it purges everything before the target ID.
if (minBatchesToRetain < currentBatchId) {
offsetLog.purge(currentBatchId - minBatchesToRetain)
batchCommitLog.purge(currentBatchId - minBatchesToRetain)
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ trait TriggerExecutor {
def execute(batchRunner: () => Boolean): Unit
}

/**
* A trigger executor that runs a single batch only, then terminates.
*/
case class OneTimeExecutor() extends TriggerExecutor {

/**
* Execute a single batch using `batchRunner`.
*/
override def execute(batchRunner: () => Boolean): Unit = batchRunner()
}

/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
Expand Down
Loading