diff --git a/build/mvn b/build/mvn index aee9358fe44c..0f454e159409 100755 --- a/build/mvn +++ b/build/mvn @@ -187,6 +187,8 @@ if [ ! -z "${SPARK_LOCAL_IP}" ]; then echo "Using SPARK_LOCAL_IP=$SPARK_LOCAL_IP" 1>&2 fi +export USE_MAVEN=1 + # call the `mvn` command as usual # SPARK-25854 "${MVN_BIN}" "$@" diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index bdef6b92ece0..9415d77112d0 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -955,6 +955,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM val session = spark import session.implicits._ implicit val ec = ExecutionContext.global + assume(transferredClientTestJarIfNeed && transferredClientJarIfNeed) val q1 = Future { spark.range(10).map(n => { Thread.sleep(30000); n }).collect() } @@ -996,6 +997,8 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM import session.implicits._ implicit val ec = ExecutionContext.global + assume(transferredClientTestJarIfNeed && transferredClientJarIfNeed) + @volatile var finished = false val interruptor = Future { eventually(timeout(20.seconds), interval(1.seconds)) { diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala index e7a77eed70d3..41c865ba5296 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala @@ -33,6 +33,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { import session.implicits._ test("mapGroups") { + assume(transferredClientTestJarIfNeed) val session: SparkSession = spark import session.implicits._ val values = spark @@ -44,6 +45,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("flatGroupMap") { + assume(transferredClientTestJarIfNeed) val values = spark .range(10) .groupByKey(v => v % 2) @@ -53,6 +55,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("keys") { + assume(transferredClientTestJarIfNeed) val values = spark .range(10) .groupByKey(v => v % 2) @@ -62,6 +65,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("keyAs - keys") { + assume(transferredClientTestJarIfNeed) // It is okay to cast from Long to Double, but not Long to Int. val values = spark .range(10) @@ -73,6 +77,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("keyAs - flatGroupMap") { + assume(transferredClientTestJarIfNeed) val values = spark .range(10) .groupByKey(v => v % 2) @@ -83,6 +88,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("keyAs mapValues - cogroup") { + assume(transferredClientTestJarIfNeed) val grouped = spark .range(10) .groupByKey(v => v % 2) @@ -114,6 +120,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("mapValues - flatGroupMap") { + assume(transferredClientTestJarIfNeed) val values = spark .range(10) .groupByKey(v => v % 2) @@ -124,6 +131,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("mapValues - keys") { + assume(transferredClientTestJarIfNeed) val values = spark .range(10) .groupByKey(v => v % 2) @@ -134,6 +142,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("flatMapSortedGroups") { + assume(transferredClientTestJarIfNeed) val grouped = spark .range(10) .groupByKey(v => v % 2) @@ -157,6 +166,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("cogroup") { + assume(transferredClientTestJarIfNeed) val grouped = spark .range(10) .groupByKey(v => v % 2) @@ -173,6 +183,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("cogroupSorted") { + assume(transferredClientTestJarIfNeed) val grouped = spark .range(10) .groupByKey(v => v % 2) @@ -200,6 +211,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("agg, keyAs") { + assume(transferredClientTestJarIfNeed) val ds = spark .range(10) .groupByKey(v => v % 2) @@ -210,6 +222,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("typed aggregation: expr") { + assume(transferredClientTestJarIfNeed) val session: SparkSession = spark import session.implicits._ val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() @@ -222,6 +235,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("typed aggregation: expr, expr") { + assume(transferredClientTestJarIfNeed) val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() checkDatasetUnorderly( @@ -232,6 +246,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("typed aggregation: expr, expr, expr") { + assume(transferredClientTestJarIfNeed) val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() checkDatasetUnorderly( @@ -242,6 +257,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("typed aggregation: expr, expr, expr, expr") { + assume(transferredClientTestJarIfNeed) val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() checkDatasetUnorderly( @@ -257,6 +273,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("typed aggregation: expr, expr, expr, expr, expr") { + assume(transferredClientTestJarIfNeed) val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() checkDatasetUnorderly( @@ -273,6 +290,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("typed aggregation: expr, expr, expr, expr, expr, expr") { + assume(transferredClientTestJarIfNeed) val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() checkDatasetUnorderly( @@ -290,6 +308,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("typed aggregation: expr, expr, expr, expr, expr, expr, expr") { + assume(transferredClientTestJarIfNeed) val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() checkDatasetUnorderly( @@ -308,6 +327,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("typed aggregation: expr, expr, expr, expr, expr, expr, expr, expr") { + assume(transferredClientTestJarIfNeed) val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() checkDatasetUnorderly( @@ -377,6 +397,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("SPARK-24762: typed agg on Option[Product] type") { + assume(transferredClientTestJarIfNeed) val ds = Seq(Some((1, 2)), Some((2, 3)), Some((1, 3))).toDS() assert(ds.groupByKey(_.get._1).count().collect() === Seq((1, 2), (2, 1))) @@ -386,6 +407,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("SPARK-25942: typed aggregation on primitive type") { + assume(transferredClientTestJarIfNeed) val ds = Seq(1, 2, 3).toDS() val agg = ds @@ -395,6 +417,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("SPARK-25942: typed aggregation on product type") { + assume(transferredClientTestJarIfNeed) val ds = Seq((1, 2), (2, 3), (3, 4)).toDS() val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long]) checkDatasetUnorderly(agg, ((1, 2), 1L, 3L), ((2, 3), 2L, 4L), ((3, 4), 3L, 5L)) @@ -413,6 +436,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("reduceGroups") { + assume(transferredClientTestJarIfNeed) val ds = Seq("abc", "xyz", "hello").toDS() checkDatasetUnorderly( ds.groupByKey(_.length).reduceGroups(_ + _), @@ -421,6 +445,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { } test("groupby") { + assume(transferredClientTestJarIfNeed) val ds = Seq(("a", 1, 10), ("a", 2, 20), ("b", 2, 1), ("b", 1, 2), ("c", 1, 1)) .toDF("key", "seq", "value") val grouped = ds.groupBy($"key").as[String, (String, Int, Int)] diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala index b5bbee678033..5ee37b172284 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala @@ -34,11 +34,13 @@ import org.apache.spark.sql.functions.{col, udf} */ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { test("Dataset typed filter") { + assume(transferredClientTestJarIfNeed) val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList() assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8)) } test("Dataset typed filter - java") { + assume(transferredClientTestJarIfNeed) val rows = spark .range(10) .filter(new FilterFunction[JLong] { @@ -49,11 +51,13 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset typed map") { + assume(transferredClientTestJarIfNeed) val rows = spark.range(10).map(n => n / 2)(PrimitiveLongEncoder).collectAsList() assert(rows == Arrays.asList[Long](0, 0, 1, 1, 2, 2, 3, 3, 4, 4)) } test("filter with condition") { + assume(transferredClientTestJarIfNeed) // This should go via `def filter(condition: Column)` rather than // `def filter(func: T => Boolean)` def func(i: Long): Boolean = i < 5 @@ -63,6 +67,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("filter with col(*)") { + assume(transferredClientTestJarIfNeed) // This should go via `def filter(condition: Column)` but it is executed as // `def filter(func: T => Boolean)`. This is fine as the result is the same. def func(i: Long): Boolean = i < 5 @@ -72,6 +77,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset typed map - java") { + assume(transferredClientTestJarIfNeed) val rows = spark .range(10) .map( @@ -84,6 +90,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset typed flat map") { + assume(transferredClientTestJarIfNeed) val session: SparkSession = spark import session.implicits._ val rows = spark @@ -95,6 +102,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset typed flat map - java") { + assume(transferredClientTestJarIfNeed) val rows = spark .range(5) .flatMap( @@ -108,6 +116,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset typed map partition") { + assume(transferredClientTestJarIfNeed) val session: SparkSession = spark import session.implicits._ val df = spark.range(0, 100, 1, 50).repartition(4) @@ -117,6 +126,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset typed map partition - java") { + assume(transferredClientTestJarIfNeed) val df = spark.range(0, 100, 1, 50).repartition(4) val result = df .mapPartitions( @@ -131,6 +141,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset foreach") { + assume(transferredClientTestJarIfNeed) val func: JLong => Unit = _ => { throw new RuntimeException("Hello foreach") } @@ -141,6 +152,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset foreach - java") { + assume(transferredClientTestJarIfNeed) val exception = intercept[Exception] { spark .range(2) @@ -154,6 +166,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset foreachPartition") { + assume(transferredClientTestJarIfNeed) val sum = new AtomicLong() val func: Iterator[JLong] => Unit = f => { f.foreach(v => sum.addAndGet(v)) @@ -170,6 +183,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset foreachPartition - java") { + assume(transferredClientTestJarIfNeed) val sum = new AtomicLong() val exception = intercept[Exception] { spark @@ -190,6 +204,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset foreach: change not visible to client") { + assume(transferredClientTestJarIfNeed) val sum = new AtomicLong() val func: Iterator[JLong] => Unit = f => { f.foreach(v => sum.addAndGet(v)) @@ -199,12 +214,14 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { } test("Dataset reduce") { + assume(transferredClientTestJarIfNeed) val session: SparkSession = spark import session.implicits._ assert(spark.range(10).map(_ + 1).reduce(_ + _) == 55) } test("Dataset reduce - java") { + assume(transferredClientTestJarIfNeed) val session: SparkSession = spark import session.implicits._ assert( diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala index e05828606d09..120d26abf07e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.client.util import java.io.{BufferedOutputStream, File} import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import scala.io.Source @@ -29,6 +30,7 @@ import org.apache.spark.sql.connect.client.SparkConnectClient import org.apache.spark.sql.connect.client.util.IntegrationTestUtils._ import org.apache.spark.sql.connect.common.config.ConnectCommon + /** * An util class to start a local spark connect server in a different process for local E2E tests. * Pre-running the tests, the spark connect artifact needs to be built using e.g. `build/sbt @@ -126,18 +128,7 @@ object SparkConnectServerUtils { Seq("--conf", s"spark.sql.catalogImplementation=$catalogImplementation") } - // For UDF maven E2E tests, the server needs the client code to find the UDFs defined in tests. - val udfTestConfigs = tryFindJar( - "connector/connect/client/jvm", - // SBT passes the client & test jars to the server process automatically. - // So we skip building or finding this jar for SBT. - "sbt-tests-do-not-need-this-jar", - "spark-connect-client-jvm", - test = true) - .map(clientTestJar => Seq("--jars", clientTestJar.getCanonicalPath)) - .getOrElse(Seq.empty) - - writerV2Configs ++ hiveTestConfigs ++ udfTestConfigs + writerV2Configs ++ hiveTestConfigs } def start(): Unit = { @@ -168,6 +159,56 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll { import SparkConnectServerUtils._ var spark: SparkSession = _ protected lazy val serverPort: Int = port + protected lazy val transferredClientJarIfNeed: Boolean = { + if (sys.env.contains("USE_MAVEN")) { + if (RemoteSparkSession.transferredClientJar.get()) { + true + } else { + val fileOption = IntegrationTestUtils.tryFindJar( + "connector/connect/client/jvm", + // SBT passes the client & test jars to the server process automatically. + // So we skip building or finding this jar for SBT. + "sbt-tests-do-not-need-this-jar", + "spark-connect-client-jvm") + fileOption match { + case Some(f) if spark != null => + if (RemoteSparkSession.transferredClientJar.compareAndSet(false, true)) { + spark.addArtifact(f.getCanonicalPath) + } + true + case _ => false + } + } + } else { + true + } + } + + protected lazy val transferredClientTestJarIfNeed: Boolean = { + if (sys.env.contains("USE_MAVEN")) { + if (RemoteSparkSession.transferredClientTestJar.get()) { + true + } else { + val fileOption = IntegrationTestUtils.tryFindJar( + "connector/connect/client/jvm", + // SBT passes the client & test jars to the server process automatically. + // So we skip building or finding this jar for SBT. + "sbt-tests-do-not-need-this-jar", + "spark-connect-client-jvm", + test = true) + fileOption match { + case Some(f) if spark != null => + if (RemoteSparkSession.transferredClientTestJar.compareAndSet(false, true)) { + spark.addArtifact(f.getCanonicalPath) + } + true + case _ => false + } + } + } else { + true + } + } override def beforeAll(): Unit = { super.beforeAll() @@ -218,3 +259,8 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll { super.afterAll() } } + +private object RemoteSparkSession { + private val transferredClientJar = new AtomicBoolean(false) + private val transferredClientTestJar = new AtomicBoolean(false) +}