-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16963] [STREAMING] [SQL] Changes to Source trait and related implementation classes #14553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
6c9acde
dae72ff
f78b4d5
cf426fa
c028432
f92a9a7
4cd181d
fcc90bd
35cdae9
9096c56
ecaf732
5638281
43ffbf3
f5c15f8
a79c557
7c6a30d
5e340c2
128f7fe
6334a4b
09e4b8e
aaf0307
947b510
ed887ca
ec67429
e7ef7ab
7d98c6b
c726549
47eee52
46f6411
d9eaf5a
0a56e4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,16 +30,37 @@ trait Source { | |
| /** Returns the schema of the data from this source */ | ||
| def schema: StructType | ||
|
|
||
| /** Returns the maximum available offset for this source. */ | ||
| /** | ||
| * Returns the highest offset that this source has <b>removed</b> from its internal buffer | ||
| * in response to a call to `commit`. | ||
| * Returns `None` if this source has not removed any data. | ||
| */ | ||
| def lastCommittedOffset: Option[Offset] = (None) | ||
|
|
||
| /** | ||
| * Returns the maximum available offset for this source. | ||
| * Returns `None` if this source has never received any data. | ||
| */ | ||
| def getOffset: Option[Offset] | ||
|
|
||
| /** | ||
| * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then | ||
| * the batch should begin with the first available record. This method must always return the | ||
| * same data for a particular `start` and `end` pair. | ||
| * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`, | ||
| * then the batch should begin with the first record. This method must always return the | ||
| * same data for a particular `start` and `end` pair; even after the Source has been restarted | ||
| * on a different node. | ||
| * <p> | ||
|
||
| * Higher layers will always call this method with a value of `start` greater than or equal | ||
| * to the last value passed to `commit` and a value of `end` less than or equal to the | ||
| * last value returned by `getMaxOffset` | ||
|
||
| */ | ||
| def getBatch(start: Option[Offset], end: Offset): DataFrame | ||
|
|
||
| /** | ||
| * Informs the source that Spark has completed processing all data for offsets less than or | ||
| * equal to `end` and will only request offsets greater than `end` in the future. | ||
| */ | ||
| def commit(end: Offset) : Unit = {} | ||
|
|
||
| /** Stop this source and free any resources it has allocated. */ | ||
| def stop(): Unit | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,9 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} | |
| import java.util.concurrent.atomic.AtomicLong | ||
| import java.util.concurrent.locks.ReentrantLock | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.collection.mutable.Map | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.hadoop.fs.Path | ||
|
|
@@ -72,13 +74,17 @@ class StreamExecution( | |
| /** | ||
| * Tracks how much data we have processed and committed to the sink or state store from each | ||
| * input source. | ||
| * Only the scheduler thread should modify this field, and only in atomic steps. Other threads | ||
| * must create a local copy before iterating over this data structure. | ||
|
||
| */ | ||
| @volatile | ||
| var committedOffsets = new StreamProgress | ||
|
|
||
| /** | ||
| * Tracks the offsets that are available to be processed, but have not yet be committed to the | ||
| * sink. | ||
| * Only the scheduler thread should modify this field, and only in atomic steps. Other threads | ||
| * must create a local copy before iterating over this data structure. | ||
| */ | ||
| @volatile | ||
| private var availableOffsets = new StreamProgress | ||
|
|
@@ -291,12 +297,23 @@ class StreamExecution( | |
| s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") | ||
| logInfo(s"Committed offsets for batch $currentBatchId.") | ||
|
|
||
| // Now that we've updated the scheduler's persistent checkpoint, it is safe for the | ||
| // sources to discard data from before the *previous* batch. | ||
| // The scheduler might still request the previous batch from a source in some cases | ||
| // if a crash and recovery occured. | ||
| val prevBatchOff = offsetLog.get(currentBatchId - 2) | ||
| if (prevBatchOff.isDefined) { | ||
| prevBatchOff.get.toStreamProgress(sources).foreach { | ||
| case (src, off) => src.commit(off) | ||
| } | ||
| } | ||
|
|
||
| // Now that we have logged the new batch, no further processing will happen for | ||
| // the previous batch, and it is safe to discard the old metadata. | ||
| // Note that purge is exclusive, i.e. it purges everything before currentBatchId. | ||
| // the batch before the previous batch, and it is safe to discard the old metadata. | ||
| // Note that purge is exclusive, i.e. it purges everything before the target ID. | ||
| // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in | ||
| // flight at the same time), this cleanup logic will need to change. | ||
| offsetLog.purge(currentBatchId) | ||
| offsetLog.purge(currentBatchId - 1) | ||
| } else { | ||
| awaitBatchLock.lock() | ||
| try { | ||
|
|
@@ -403,7 +420,7 @@ class StreamExecution( | |
|
|
||
| /** | ||
| * Blocks the current thread until processing for data from the given `source` has reached at | ||
| * least the given `Offset`. This method is indented for use primarily when writing tests. | ||
| * least the given `Offset`. This method is intended for use primarily when writing tests. | ||
| */ | ||
| private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { | ||
| def notDone = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming | |
| import java.util.concurrent.atomic.AtomicInteger | ||
| import javax.annotation.concurrent.GuardedBy | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.collection.mutable.{ArrayBuffer, ListBuffer} | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
|
|
@@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
| protected val logicalPlan = StreamingExecutionRelation(this) | ||
| protected val output = logicalPlan.output | ||
|
|
||
| /** | ||
| * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. | ||
| * Stored in a ListBuffer to facilitate removing committed batches. | ||
| */ | ||
| @GuardedBy("this") | ||
| protected val batches = new ArrayBuffer[Dataset[A]] | ||
| protected val batches = new ListBuffer[Dataset[A]] | ||
|
|
||
| @GuardedBy("this") | ||
| protected var currentOffset: LongOffset = new LongOffset(-1) | ||
|
|
||
| /** | ||
| * Last offset that was discarded, or -1 if no commits have occurred. Note that the value | ||
| * -1 is used in calculations below and isn't just an arbitrary constant. | ||
| */ | ||
| @GuardedBy("this") | ||
| protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) | ||
|
|
||
| def schema: StructType = encoder.schema | ||
|
|
||
| def toDS()(implicit sqlContext: SQLContext): Dataset[A] = { | ||
|
|
@@ -84,22 +95,34 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
|
|
||
| override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" | ||
|
|
||
| override def lastCommittedOffset: Option[Offset] = synchronized { | ||
| if (lastOffsetCommitted.offset == -1) { | ||
| None | ||
| } else { | ||
| Some(lastOffsetCommitted) | ||
| } | ||
| } | ||
|
|
||
| override def getOffset: Option[Offset] = synchronized { | ||
| if (batches.isEmpty) { | ||
| if (currentOffset.offset == -1) { | ||
| None | ||
| } else { | ||
| Some(currentOffset) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns the data that is between the offsets (`start`, `end`]. | ||
| */ | ||
| override def getBatch(start: Option[Offset], end: Offset): DataFrame = { | ||
| // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) | ||
| val startOrdinal = | ||
| start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 | ||
| val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 | ||
| val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) } | ||
|
|
||
| // Internal buffer only holds the batches after lastCommittedOffset. | ||
| val newBlocks = synchronized { | ||
| val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 | ||
| val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 | ||
| batches.slice(sliceStart, sliceEnd) | ||
| } | ||
|
|
||
| logDebug( | ||
| s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") | ||
|
|
@@ -111,6 +134,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
| } | ||
| } | ||
|
|
||
| override def commit(end: Offset): Unit = synchronized { | ||
| if (end.isInstanceOf[LongOffset]) { | ||
|
||
| val newOffset = end.asInstanceOf[LongOffset] | ||
| val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt | ||
|
|
||
| if (offsetDiff < 0) { | ||
| sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") | ||
| } | ||
|
|
||
| batches.trimStart(offsetDiff) | ||
| lastOffsetCommitted = newOffset | ||
| } else { | ||
| sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + | ||
| s"an instance of this class") | ||
|
||
| } | ||
| } | ||
|
|
||
| override def stop() {} | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,14 +24,15 @@ import java.text.SimpleDateFormat | |
| import java.util.Calendar | ||
| import javax.annotation.concurrent.GuardedBy | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.collection.mutable.ListBuffer | ||
| import scala.util.{Failure, Success, Try} | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} | ||
| import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} | ||
|
|
||
|
|
||
| object TextSocketSource { | ||
| val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) | ||
| val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: | ||
|
|
@@ -53,8 +54,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo | |
| @GuardedBy("this") | ||
| private var readThread: Thread = null | ||
|
|
||
| /** | ||
| * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. | ||
| * Stored in a ListBuffer to facilitate removing committed batches. | ||
| */ | ||
| @GuardedBy("this") | ||
| protected val batches = new ListBuffer[(String, Timestamp)] | ||
|
|
||
| @GuardedBy("this") | ||
| private var lines = new ArrayBuffer[(String, Timestamp)] | ||
| protected var currentOffset: LongOffset = new LongOffset(-1) | ||
|
|
||
| @GuardedBy("this") | ||
| protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) | ||
|
|
||
| initialize() | ||
|
|
||
|
|
@@ -74,10 +85,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo | |
| return | ||
| } | ||
| TextSocketSource.this.synchronized { | ||
| lines += ((line, | ||
| val newData = (line, | ||
| Timestamp.valueOf( | ||
| TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) | ||
| )) | ||
| ) | ||
| currentOffset = currentOffset + 1 | ||
| batches.append(newData) | ||
| } | ||
| } | ||
| } catch { | ||
|
|
@@ -92,21 +105,64 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo | |
| override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP | ||
| else TextSocketSource.SCHEMA_REGULAR | ||
|
|
||
| /** Returns the maximum available offset for this source. */ | ||
| override def lastCommittedOffset: Option[Offset] = synchronized { | ||
| if (lastOffsetCommitted.offset == -1) { | ||
| None | ||
| } else { | ||
| Some(lastOffsetCommitted) | ||
| } | ||
| } | ||
|
|
||
| override def getOffset: Option[Offset] = synchronized { | ||
| if (lines.isEmpty) None else Some(LongOffset(lines.size - 1)) | ||
| if (currentOffset.offset == -1) { | ||
| None | ||
| } else { | ||
| Some(currentOffset) | ||
| } | ||
| } | ||
|
|
||
| /** Returns the data that is between the offsets (`start`, `end`]. */ | ||
| override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { | ||
| val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0) | ||
| val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1 | ||
| val data = synchronized { lines.slice(startIdx, endIdx) } | ||
| val startOrdinal = | ||
| start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 | ||
| val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 | ||
|
|
||
| // Internal buffer only holds the batches after lastOffsetCommitted | ||
| val rawList = synchronized { | ||
| val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 | ||
| val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 | ||
| batches.slice(sliceStart, sliceEnd) | ||
| } | ||
|
|
||
| import sqlContext.implicits._ | ||
| val rawBatch = sqlContext.createDataset(rawList) | ||
|
|
||
|
|
||
|
|
||
|
||
| // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp | ||
| // if requested. | ||
| if (includeTimestamp) { | ||
| data.toDF("value", "timestamp") | ||
| rawBatch.toDF("value", "timestamp") | ||
| } else { | ||
| // Strip out timestamp | ||
| rawBatch.select("_1").toDF("value") | ||
| } | ||
| } | ||
|
|
||
| override def commit(end: Offset): Unit = synchronized { | ||
| if (end.isInstanceOf[LongOffset]) { | ||
| val newOffset = end.asInstanceOf[LongOffset] | ||
| val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt | ||
|
|
||
| if (offsetDiff < 0) { | ||
| sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") | ||
| } | ||
|
|
||
| batches.trimStart(offsetDiff) | ||
| lastOffsetCommitted = newOffset | ||
| } else { | ||
| data.map(_._1).toDF("value") | ||
| sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + | ||
| s"originate with an instance of this class") | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -141,7 +197,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis | |
| providerName: String, | ||
| parameters: Map[String, String]): (String, StructType) = { | ||
| logWarning("The socket source should not be used for production applications! " + | ||
| "It does not support recovery and stores state indefinitely.") | ||
| "It does not support recovery.") | ||
| if (!parameters.contains("host")) { | ||
| throw new AnalysisException("Set a host to read from with option(\"host\", ...).") | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree to leave this out if its only used to output a warning message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, nit: no
()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropped this method from my local copy.