Skip to content

Commit e3af17c

Browse files
committed
allow empty batch
1 parent 71cc6e4 commit e3af17c

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, Stri
3838
override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized {
3939
val batch = messages.collect {
4040
case PackedRowCommitMessage(rows) => rows
41-
}.reduce(_ ++ _)
41+
}.fold(Array())(_ ++ _)
4242

4343
// scalastyle:off println
4444
println("-------------------------------------------")

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class ConsoleWriterSuite extends StreamTest {
3636
query.processAllAvailable()
3737
input.addData(4, 5, 6)
3838
query.processAllAvailable()
39+
input.addData()
40+
query.processAllAvailable()
3941
} finally {
4042
query.stop()
4143
}
@@ -64,6 +66,14 @@ class ConsoleWriterSuite extends StreamTest {
6466
|| 6|
6567
|+-----+
6668
|
69+
|-------------------------------------------
70+
|Batch: 2
71+
|-------------------------------------------
72+
|+-----+
73+
||value|
74+
|+-----+
75+
|+-----+
76+
|
6777
|""".stripMargin)
6878
}
6979
}

0 commit comments

Comments
 (0)