Skip to content

Commit be880b1

Browse files
committed
fix options and add tests
1 parent 7154f34 commit be880b1

File tree

3 files changed

+61
-4
lines changed

3 files changed

+61
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class ConsoleSinkProvider extends DataSourceV2
4646
schema: StructType,
4747
mode: OutputMode,
4848
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
49-
Optional.of(new ConsoleWriter(epochId, schema, options.asMap.asScala.toMap))
49+
Optional.of(new ConsoleWriter(epochId, schema, options))
5050
}
5151

5252
def createRelation(

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@ package org.apache.spark.sql.execution.streaming.sources
1919

2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.{Row, SparkSession}
22+
import org.apache.spark.sql.sources.v2.DataSourceV2Options
2223
import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
2324
import org.apache.spark.sql.types.StructType
2425

25-
class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, String])
26+
class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)
2627
extends DataSourceV2Writer with Logging {
2728
// Number of rows to display, by default 20 rows
28-
private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20)
29+
private val numRowsToShow = options.getInt("numRows", 20)
2930

3031
// Truncate the displayed data if it is too long, by default it is true
31-
private val isTruncated = options.get("truncate").map(_.toBoolean).getOrElse(true)
32+
private val isTruncated = options.getBoolean("truncate", true)
3233

3334
assert(SparkSession.getActiveSession.isDefined)
3435
private val spark = SparkSession.getActiveSession.get

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,60 @@ class ConsoleWriterSuite extends StreamTest {
7676
|
7777
|""".stripMargin)
7878
}
79+
80+
test("console with numRows") {
81+
val input = MemoryStream[Int]
82+
83+
val captured = new ByteArrayOutputStream()
84+
Console.withOut(captured) {
85+
val query = input.toDF().writeStream.format("console").option("NUMROWS", 2).start()
86+
try {
87+
input.addData(1, 2, 3)
88+
query.processAllAvailable()
89+
} finally {
90+
query.stop()
91+
}
92+
}
93+
94+
assert(captured.toString() ==
95+
"""-------------------------------------------
96+
|Batch: 0
97+
|-------------------------------------------
98+
|+-----+
99+
||value|
100+
|+-----+
101+
|| 1|
102+
|| 2|
103+
|+-----+
104+
|only showing top 2 rows
105+
|
106+
|""".stripMargin)
107+
}
108+
109+
test("console with truncation") {
110+
val input = MemoryStream[String]
111+
112+
val captured = new ByteArrayOutputStream()
113+
Console.withOut(captured) {
114+
val query = input.toDF().writeStream.format("console").option("TRUNCATE", true).start()
115+
try {
116+
input.addData("123456789012345678901234567890")
117+
query.processAllAvailable()
118+
} finally {
119+
query.stop()
120+
}
121+
}
122+
123+
assert(captured.toString() ==
124+
"""-------------------------------------------
125+
|Batch: 0
126+
|-------------------------------------------
127+
|+--------------------+
128+
|| value|
129+
|+--------------------+
130+
||12345678901234567...|
131+
|+--------------------+
132+
|
133+
|""".stripMargin)
134+
}
79135
}

0 commit comments

Comments
 (0)