Skip to content

Commit 733b81b

Browse files
Bill Chambersbrkyvz
authored andcommitted
[SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans
## What changes were proposed in this pull request? We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka. ## How was this patch tested? New unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bill Chambers <[email protected]> Closes #17804 from anabranch/SPARK-20496-2.
1 parent 8c911ad commit 733b81b

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging {
4747
queryExecution: QueryExecution,
4848
kafkaParameters: ju.Map[String, Object],
4949
topic: Option[String] = None): Unit = {
50-
val schema = queryExecution.logical.output
50+
val schema = queryExecution.analyzed.output
5151
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
5252
if (topic == None) {
5353
throw new AnalysisException(s"topic option required when no " +
@@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging {
8484
queryExecution: QueryExecution,
8585
kafkaParameters: ju.Map[String, Object],
8686
topic: Option[String] = None): Unit = {
87-
val schema = queryExecution.logical.output
87+
val schema = queryExecution.analyzed.output
8888
validateQuery(queryExecution, kafkaParameters, topic)
8989
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
9090
queryExecution.toRdd.foreachPartition { iter =>

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.SparkException
2828
import org.apache.spark.sql._
2929
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
3030
import org.apache.spark.sql.execution.streaming.MemoryStream
31+
import org.apache.spark.sql.functions._
3132
import org.apache.spark.sql.streaming._
3233
import org.apache.spark.sql.test.SharedSQLContext
3334
import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -108,6 +109,21 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
108109
s"save mode overwrite not allowed for kafka"))
109110
}
110111

112+
test("SPARK-20496: batch - enforce analyzed plans") {
113+
val inputEvents =
114+
spark.range(1, 1000)
115+
.select(to_json(struct("*")) as 'value)
116+
117+
val topic = newTopic()
118+
testUtils.createTopic(topic)
119+
// used to throw UnresolvedException
120+
inputEvents.write
121+
.format("kafka")
122+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
123+
.option("topic", topic)
124+
.save()
125+
}
126+
111127
test("streaming - write to kafka with topic field") {
112128
val input = MemoryStream[String]
113129
val topic = newTopic()

0 commit comments

Comments
 (0)