From 5e0f6fc14cd468ae1d06ab40e53189fb292375c0 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 5 Oct 2018 21:06:23 -0700 Subject: [PATCH] [SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build error due to foreachBatch --- .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 4 ++-- .../execution/streaming/sources/ForeachBatchSinkSuite.scala | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 39c2cde7de40..5ee76990b54f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ -import org.apache.spark.sql.{ForeachWriter, SparkSession} +import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.streaming._ @@ -900,7 +900,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } testUtils.waitUntilOffsetAppears(topicPartition, 5) - val q = ds.writeStream.foreachBatch { (ds, epochId) => + val q = ds.writeStream.foreachBatch { (ds: Dataset[String], epochId: Long) => if (epochId == 0) { // Send more message before the tasks of the current batch start reading the current batch // data, so that the executors will prefetch messages in the next batch and drop them. In diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala index 71dff443e883..3e9ccb0f705d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala @@ -99,11 +99,12 @@ class ForeachBatchSinkSuite extends StreamTest { } assert(ex1.getMessage.contains("foreachBatch function cannot be null")) val ex2 = intercept[AnalysisException] { - ds.writeStream.foreachBatch((_, _) => {}).trigger(Trigger.Continuous("1 second")).start() + ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => {}) + .trigger(Trigger.Continuous("1 second")).start() } assert(ex2.getMessage.contains("'foreachBatch' is not supported with continuous trigger")) val ex3 = intercept[AnalysisException] { - ds.writeStream.foreachBatch((_, _) => {}).partitionBy("value").start() + ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => {}).partitionBy("value").start() } assert(ex3.getMessage.contains("'foreachBatch' does not support partitioning")) }