Skip to content

Commit c79cba9

Browse files
committed
remove the useless Batch class
1 parent 7dde1da commit c79cba9

File tree

9 files changed

+16
-42
lines changed

9 files changed

+16
-42
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala

Lines changed: 0 additions & 26 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class FileStreamSink(
4747
private val fileLog = new FileStreamSinkLog(sqlContext, logPath.toUri.toString)
4848
private val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
4949

50-
override def addBatch(batchId: Long, data: DataFrame): Unit = {
50+
override def addData(batchId: Long, data: DataFrame): Unit = {
5151
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
5252
logInfo(s"Skipping already committed batch $batchId")
5353
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ class FileStreamSource(
8888
}
8989

9090
/**
91-
* Returns the next batch of data that is available after `start`, if any is available.
91+
* Returns the data that is between the offsets (`start`, `end`].
9292
*/
93-
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
93+
override def getData(start: Option[Offset], end: Offset): DataFrame = {
9494
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
9595
val endId = end.asInstanceOf[LongOffset].offset
9696

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ import org.apache.spark.sql.DataFrame
2727
trait Sink {
2828

2929
/**
30-
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
30+
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
3131
* this method is called more than once with the same batchId (which will happen in the case of
3232
* failures), then `data` should only be added once.
3333
*/
34-
def addBatch(batchId: Long, data: DataFrame): Unit
34+
def addData(batchId: Long, data: DataFrame): Unit
3535
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ trait Source {
3434
def getOffset: Option[Offset]
3535

3636
/**
37-
* Returns the data that is is between the offsets (`start`, `end`]. When `start` is `None` then
38-
* the batch should begin with the first available record. This method must always return the
37+
* Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then the
38+
* returned data should begin with the first available record. This method must always return the
3939
* same data for a particular `start` and `end` pair.
4040
*/
41-
def getBatch(start: Option[Offset], end: Offset): DataFrame
41+
def getData(start: Option[Offset], end: Offset): DataFrame
4242
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ class StreamExecution(
287287
val newData = availableOffsets.flatMap {
288288
case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
289289
val current = committedOffsets.get(source)
290-
val batch = source.getBatch(current, available)
290+
val batch = source.getData(current, available)
291291
logDebug(s"Retrieving data from $source: $current -> $available")
292292
Some(source -> batch)
293293
case _ => None
@@ -329,7 +329,7 @@ class StreamExecution(
329329

330330
val nextBatch =
331331
new Dataset(sqlContext, lastExecution, RowEncoder(lastExecution.analyzed.schema))
332-
sink.addBatch(currentBatchId - 1, nextBatch)
332+
sink.addData(currentBatchId - 1, nextBatch)
333333

334334
awaitBatchLock.synchronized {
335335
// Wake up any threads that are waiting for the stream to progress.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
9191
}
9292

9393
/**
94-
* Returns the next batch of data that is available after `start`, if any is available.
94+
* Returns the data that is between the offsets (`start`, `end`].
9595
*/
96-
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
96+
override def getData(start: Option[Offset], end: Offset): DataFrame = {
9797
val startOrdinal =
9898
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
9999
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
@@ -135,7 +135,7 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
135135
}.mkString("\n")
136136
}
137137

138-
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
138+
override def addData(batchId: Long, data: DataFrame): Unit = synchronized {
139139
if (batchId == batches.size) {
140140
logDebug(s"Committing batch $batchId")
141141
batches.append(data.collect())

sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
7979

8080
override def getOffset: Option[Offset] = Some(new LongOffset(0))
8181

82-
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
82+
override def getData(start: Option[Offset], end: Offset): DataFrame = {
8383
import sqlContext.implicits._
8484

8585
Seq[Int]().toDS().toDF()
@@ -95,7 +95,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
9595
LastOptions.partitionColumns = partitionColumns
9696
LastOptions.mockStreamSinkProvider.createSink(sqlContext, parameters, partitionColumns)
9797
new Sink {
98-
override def addBatch(batchId: Long, data: DataFrame): Unit = {}
98+
override def addData(batchId: Long, data: DataFrame): Unit = {}
9999
}
100100
}
101101
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class FakeDefaultSource extends StreamSourceProvider {
172172
}
173173
}
174174

175-
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
175+
override def getData(start: Option[Offset], end: Offset): DataFrame = {
176176
val startOffset = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) + 1
177177
sqlContext.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a")
178178
}

0 commit comments

Comments
 (0)