Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class FileStreamSink(
private val fileLog = new FileStreamSinkLog(sqlContext, logPath.toUri.toString)
private val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)

override def addBatch(batchId: Long, data: DataFrame): Unit = {
override def addData(batchId: Long, data: DataFrame): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change this name. Its still a batch of data and one of the parameters is still named batch.

if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ class FileStreamSource(
}

/**
* Returns the next batch of data that is available after `start`, if any is available.
* Returns the data that is between the offsets (`start`, `end`].
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc should be updated, following Source.getBatch()'s doc change from Returns the next batch of data that is available after start, if any is available. to Returns the data that is between the offsets (start, end].

*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
override def getData(start: Option[Offset], end: Offset): DataFrame = {
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
val endId = end.asInstanceOf[LongOffset].offset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import org.apache.spark.sql.DataFrame
trait Sink {

/**
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
* this method is called more than once with the same batchId (which will happen in the case of
* failures), then `data` should only be added once.
*/
def addBatch(batchId: Long, data: DataFrame): Unit
def addData(batchId: Long, data: DataFrame): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ trait Source {
def getOffset: Option[Offset]

/**
* Returns the data that is is between the offsets (`start`, `end`]. When `start` is `None` then
* the batch should begin with the first available record. This method must always return the
* Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then the
* returned data should begin with the first available record. This method must always return the
* same data for a particular `start` and `end` pair.
*/
def getBatch(start: Option[Offset], end: Offset): DataFrame
def getData(start: Option[Offset], end: Offset): DataFrame
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class StreamExecution(
val newData = availableOffsets.flatMap {
case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
val current = committedOffsets.get(source)
val batch = source.getBatch(current, available)
val batch = source.getData(current, available)
logDebug(s"Retrieving data from $source: $current -> $available")
Some(source -> batch)
case _ => None
Expand Down Expand Up @@ -329,7 +329,7 @@ class StreamExecution(

val nextBatch =
new Dataset(sqlContext, lastExecution, RowEncoder(lastExecution.analyzed.schema))
sink.addBatch(currentBatchId - 1, nextBatch)
sink.addData(currentBatchId - 1, nextBatch)

awaitBatchLock.synchronized {
// Wake up any threads that are waiting for the stream to progress.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}

/**
* Returns the next batch of data that is available after `start`, if any is available.
* Returns the data that is between the offsets (`start`, `end`].
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
override def getData(start: Option[Offset], end: Offset): DataFrame = {
val startOrdinal =
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
Expand Down Expand Up @@ -135,7 +135,7 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
}.mkString("\n")
}

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
override def addData(batchId: Long, data: DataFrame): Unit = synchronized {
if (batchId == batches.size) {
logDebug(s"Committing batch $batchId")
batches.append(data.collect())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {

override def getOffset: Option[Offset] = Some(new LongOffset(0))

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
override def getData(start: Option[Offset], end: Offset): DataFrame = {
import sqlContext.implicits._

Seq[Int]().toDS().toDF()
Expand All @@ -95,7 +95,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
LastOptions.partitionColumns = partitionColumns
LastOptions.mockStreamSinkProvider.createSink(sqlContext, parameters, partitionColumns)
new Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {}
override def addData(batchId: Long, data: DataFrame): Unit = {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class FakeDefaultSource extends StreamSourceProvider {
}
}

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
override def getData(start: Option[Offset], end: Offset): DataFrame = {
val startOffset = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) + 1
sqlContext.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a")
}
Expand Down