Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6c9acde
Initial version of changes to Source trait
frreiss Aug 9, 2016
dae72ff
Changes to files that depend on the Source trait
frreiss Aug 11, 2016
f78b4d5
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 11, 2016
cf426fa
Added method to garbage-collect the metadata log.
frreiss Aug 15, 2016
c028432
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 15, 2016
f92a9a7
Fixing problems with building from Maven.
frreiss Aug 16, 2016
4cd181d
Various bug fixes.
frreiss Aug 19, 2016
fcc90bd
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 19, 2016
35cdae9
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 22, 2016
9096c56
Merge branch 'master' of https://github.com/apache/spark into fred-16…
frreiss Aug 27, 2016
ecaf732
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
5638281
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
43ffbf3
Removed a few blank lines.
frreiss Aug 29, 2016
f5c15f8
Additional whitespace cleanup.
frreiss Aug 29, 2016
a79c557
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 30, 2016
7c6a30d
Narrowing the size of the diff by moving some changes out to future w…
frreiss Aug 31, 2016
5e340c2
Fixed a regression introduced in an earlier merge.
frreiss Sep 8, 2016
128f7fe
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 28, 2016
6334a4b
Fixed compilation problem from merging someone else's PR.
frreiss Sep 28, 2016
09e4b8e
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 29, 2016
aaf0307
Removed a safety check that was invalidated by SPARK-17643 and fixed …
frreiss Sep 29, 2016
947b510
Updating regression tests after merge.
frreiss Sep 30, 2016
ed887ca
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 15, 2016
ec67429
Changes to address review comments.
frreiss Oct 15, 2016
e7ef7ab
Fix compilation problems.
frreiss Oct 15, 2016
7d98c6b
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 19, 2016
c726549
Changes to address review comments.
frreiss Oct 21, 2016
47eee52
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 21, 2016
46f6411
Commit before merge.
frreiss Oct 26, 2016
d9eaf5a
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 26, 2016
0a56e4a
Addressing review comments.
frreiss Oct 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ class FileStreamSource(

override def toString: String = s"FileStreamSource[$qualifiedBasePath]"

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
override def commit(end: Offset): Unit = {
// No-op for now; FileStreamSource currently garbage-collects files based on timestamp
// and the value of the maxFileAge parameter.
}

override def stop() {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,30 @@ trait Source {
/** Returns the schema of the data from this source */
def schema: StructType

/** Returns the maximum available offset for this source. */
/**
* Returns the maximum available offset for this source.
* Returns `None` if this source has never received any data.
*/
def getOffset: Option[Offset]

/**
* Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
* the batch should begin with the first available record. This method must always return the
* same data for a particular `start` and `end` pair.
* Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`,
* then the batch should begin with the first record. This method must always return the
* same data for a particular `start` and `end` pair; even after the Source has been restarted
* on a different node.
*
* Higher layers will always call this method with a value of `start` greater than or equal
* to the last value passed to `commit` and a value of `end` less than or equal to the
* last value returned by `getOffset`
*/
def getBatch(start: Option[Offset], end: Offset): DataFrame

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
def commit(end: Offset) : Unit = {}

/** Stop this source and free any resources it has allocated. */
def stop(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,19 @@ class StreamExecution(
/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
* Only the scheduler thread should modify this field, and only in atomic steps.
* Other threads should make a shallow copy if they are going to access this field more than
* once, since the field's value may change at any time.
*/
@volatile
var committedOffsets = new StreamProgress

/**
* Tracks the offsets that are available to be processed, but have not yet be committed to the
* sink.
* Only the scheduler thread should modify this field, and only in atomic steps.
* Other threads should make a shallow copy if they are going to access this field more than
* once, since the field's value may change at any time.
*/
@volatile
private var availableOffsets = new StreamProgress
Expand Down Expand Up @@ -337,17 +343,27 @@ class StreamExecution(
}
if (hasNewData) {
reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
assert(
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")

// NOTE: The following code is correct because runBatches() processes exactly one
// batch at a time. If we add pipeline parallelism (multiple batches in flight at
// the same time), this cleanup logic will need to change.

// Now that we've updated the scheduler's persistent checkpoint, it is safe for the
// sources to discard data from the previous batch.
val prevBatchOff = offsetLog.get(currentBatchId - 1)
if (prevBatchOff.isDefined) {
prevBatchOff.get.toStreamProgress(sources).foreach {
case (src, off) => src.commit(off)
}
}

// Now that we have logged the new batch, no further processing will happen for
// the previous batch, and it is safe to discard the old metadata.
// Note that purge is exclusive, i.e. it purges everything before currentBatchId.
// NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
// flight at the same time), this cleanup logic will need to change.
offsetLog.purge(currentBatchId)
// the batch before the previous batch, and it is safe to discard the old metadata.
// Note that purge is exclusive, i.e. it purges everything before the target ID.
offsetLog.purge(currentBatchId - 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be offsetLog.purge(currentBatchId), it's exclusive, then you can revert changes to StreamingQuerySuite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move this change to another JIRA if you'd like, but we really should change currentBatchId to currentBatchId - 1 at some point. The call to offsetLog.purge(currentBatchId), which I introduced in my PR for SPARK-17513, contains a subtle bug. The recovery logic in populateStartOffsets() reads the last and second-to-last entries in offsetLog. populateStartOffsets() uses those entries to populate availableOffsets and committedOffsets, respectively. Calling offsetLog.purge(currentBatchId) at line 350/366 results in the offsetLog being truncated to one entry, which in turn results in committedOffsets being left empty on recovery, which in turn causes the first call to getBatch() for any source to have None as its first argument. Sources that do not prune buffered data in their commit() methods will return a previously committed data in response to such a getBatch() call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for your clarifying.

}
} else {
awaitBatchLock.lock()
Expand Down Expand Up @@ -455,7 +471,7 @@ class StreamExecution(

/**
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `Offset`. This method is indented for use primarily when writing tests.
* least the given `Offset`. This method is intended for use primarily when writing tests.
*/
private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
def notDone = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
protected val logicalPlan = StreamingExecutionRelation(this)
protected val output = logicalPlan.output

/**
* All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
* Stored in a ListBuffer to facilitate removing committed batches.
*/
@GuardedBy("this")
protected val batches = new ArrayBuffer[Dataset[A]]
protected val batches = new ListBuffer[Dataset[A]]

@GuardedBy("this")
protected var currentOffset: LongOffset = new LongOffset(-1)

/**
* Last offset that was discarded, or -1 if no commits have occurred. Note that the value
* -1 is used in calculations below and isn't just an arbitrary constant.
*/
@GuardedBy("this")
protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)

def schema: StructType = encoder.schema

def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
Expand Down Expand Up @@ -85,21 +96,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]"

override def getOffset: Option[Offset] = synchronized {
if (batches.isEmpty) {
if (currentOffset.offset == -1) {
None
} else {
Some(currentOffset)
}
}

/**
* Returns the data that is between the offsets (`start`, `end`].
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
// Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
val startOrdinal =
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) }

// Internal buffer only holds the batches after lastCommittedOffset.
val newBlocks = synchronized {
val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
batches.slice(sliceStart, sliceEnd)
}

logDebug(
s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
Expand All @@ -111,11 +126,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
}

override def commit(end: Offset): Unit = synchronized {
end match {
case newOffset: LongOffset =>
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}

batches.trimStart(offsetDiff)
lastOffsetCommitted = newOffset
case _ =>
sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " +
"an instance of this class")
}
}

override def stop() {}

def reset(): Unit = synchronized {
batches.clear()
currentOffset = new LongOffset(-1)
lastOffsetCommitted = new LongOffset(-1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import java.text.SimpleDateFormat
import java.util.Calendar
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql._
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}


object TextSocketSource {
val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
Expand All @@ -53,8 +54,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
@GuardedBy("this")
private var readThread: Thread = null

/**
* All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
* Stored in a ListBuffer to facilitate removing committed batches.
*/
@GuardedBy("this")
protected val batches = new ListBuffer[(String, Timestamp)]

@GuardedBy("this")
protected var currentOffset: LongOffset = new LongOffset(-1)

@GuardedBy("this")
private var lines = new ArrayBuffer[(String, Timestamp)]
protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)

initialize()

Expand All @@ -74,10 +85,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
return
}
TextSocketSource.this.synchronized {
lines += ((line,
val newData = (line,
Timestamp.valueOf(
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
))
)
currentOffset = currentOffset + 1
batches.append(newData)
}
}
} catch {
Expand All @@ -92,21 +105,54 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
else TextSocketSource.SCHEMA_REGULAR

/** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = synchronized {
if (lines.isEmpty) None else Some(LongOffset(lines.size - 1))
if (currentOffset.offset == -1) {
None
} else {
Some(currentOffset)
}
}

/** Returns the data that is between the offsets (`start`, `end`]. */
override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0)
val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
val data = synchronized { lines.slice(startIdx, endIdx) }
val startOrdinal =
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1

// Internal buffer only holds the batches after lastOffsetCommitted
val rawList = synchronized {
val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
batches.slice(sliceStart, sliceEnd)
}

import sqlContext.implicits._
val rawBatch = sqlContext.createDataset(rawList)

// Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp
// if requested.
if (includeTimestamp) {
data.toDF("value", "timestamp")
rawBatch.toDF("value", "timestamp")
} else {
// Strip out timestamp
rawBatch.select("_1").toDF("value")
}
}

override def commit(end: Offset): Unit = synchronized {
if (end.isInstanceOf[LongOffset]) {
val newOffset = end.asInstanceOf[LongOffset]
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}

batches.trimStart(offsetDiff)
lastOffsetCommitted = newOffset
} else {
data.map(_._1).toDF("value")
sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
}
}

Expand Down Expand Up @@ -141,7 +187,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
logWarning("The socket source should not be used for production applications! " +
"It does not support recovery and stores state indefinitely.")
"It does not support recovery.")
if (!parameters.contains("host")) {
throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map(6 / _)

// Run 3 batches, and then assert that only 1 metadata file is left at the end
// since the first 2 should have been purged.
// Run 3 batches, and then assert that only 2 metadata files is are at the end
// since the first should have been purged.
testStream(mapped)(
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
Expand All @@ -262,11 +262,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AddData(inputData, 4, 6),
CheckAnswer(6, 3, 6, 3, 1, 1),

AssertOnQuery("metadata log should contain only one file") { q =>
AssertOnQuery("metadata log should contain only two files") { q =>
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475
assert(toTest.size == 1 && toTest.head == "2")
assert(toTest.size == 2 && toTest.head == "1")
true
}
)
Expand Down