Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ if [ ! -z "${SPARK_LOCAL_IP}" ]; then
echo "Using SPARK_LOCAL_IP=$SPARK_LOCAL_IP" 1>&2
fi

export USE_MAVEN=1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only Maven will trigger this issue, so add an environment variable needs to identify Maven test


# call the `mvn` command as usual
# SPARK-25854
"${MVN_BIN}" "$@"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
val session = spark
import session.implicits._
implicit val ec = ExecutionContext.global
assume(transferredClientTestJarIfNeed && transferredClientJarIfNeed)
Copy link
Contributor Author

@LuciferYang LuciferYang Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because ClientE2ETestSuite has imported classes such as SparkResult in client module, these two test cases need to also add spark-connect-client-jvm.jar as an artifact.

So I think it would be better to move them from ClientE2ETestSuite to a new test class like #41487

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can try using SparkResult.class as an Artifact, but this may make the testing code more and more complex

val q1 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
}
Expand Down Expand Up @@ -996,6 +997,8 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
import session.implicits._
implicit val ec = ExecutionContext.global

assume(transferredClientTestJarIfNeed && transferredClientJarIfNeed)

@volatile var finished = false
val interruptor = Future {
eventually(timeout(20.seconds), interval(1.seconds)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
import session.implicits._

test("mapGroups") {
assume(transferredClientTestJarIfNeed)
val session: SparkSession = spark
import session.implicits._
val values = spark
Expand All @@ -44,6 +45,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("flatGroupMap") {
assume(transferredClientTestJarIfNeed)
val values = spark
.range(10)
.groupByKey(v => v % 2)
Expand All @@ -53,6 +55,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("keys") {
assume(transferredClientTestJarIfNeed)
val values = spark
.range(10)
.groupByKey(v => v % 2)
Expand All @@ -62,6 +65,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("keyAs - keys") {
assume(transferredClientTestJarIfNeed)
// It is okay to cast from Long to Double, but not Long to Int.
val values = spark
.range(10)
Expand All @@ -73,6 +77,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("keyAs - flatGroupMap") {
assume(transferredClientTestJarIfNeed)
val values = spark
.range(10)
.groupByKey(v => v % 2)
Expand All @@ -83,6 +88,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("keyAs mapValues - cogroup") {
assume(transferredClientTestJarIfNeed)
val grouped = spark
.range(10)
.groupByKey(v => v % 2)
Expand Down Expand Up @@ -114,6 +120,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("mapValues - flatGroupMap") {
assume(transferredClientTestJarIfNeed)
val values = spark
.range(10)
.groupByKey(v => v % 2)
Expand All @@ -124,6 +131,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("mapValues - keys") {
assume(transferredClientTestJarIfNeed)
val values = spark
.range(10)
.groupByKey(v => v % 2)
Expand All @@ -134,6 +142,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("flatMapSortedGroups") {
assume(transferredClientTestJarIfNeed)
val grouped = spark
.range(10)
.groupByKey(v => v % 2)
Expand All @@ -157,6 +166,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("cogroup") {
assume(transferredClientTestJarIfNeed)
val grouped = spark
.range(10)
.groupByKey(v => v % 2)
Expand All @@ -173,6 +183,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("cogroupSorted") {
assume(transferredClientTestJarIfNeed)
val grouped = spark
.range(10)
.groupByKey(v => v % 2)
Expand Down Expand Up @@ -200,6 +211,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("agg, keyAs") {
assume(transferredClientTestJarIfNeed)
val ds = spark
.range(10)
.groupByKey(v => v % 2)
Expand All @@ -210,6 +222,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("typed aggregation: expr") {
assume(transferredClientTestJarIfNeed)
val session: SparkSession = spark
import session.implicits._
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
Expand All @@ -222,6 +235,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("typed aggregation: expr, expr") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()

checkDatasetUnorderly(
Expand All @@ -232,6 +246,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("typed aggregation: expr, expr, expr") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()

checkDatasetUnorderly(
Expand All @@ -242,6 +257,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("typed aggregation: expr, expr, expr, expr") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()

checkDatasetUnorderly(
Expand All @@ -257,6 +273,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("typed aggregation: expr, expr, expr, expr, expr") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()

checkDatasetUnorderly(
Expand All @@ -273,6 +290,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("typed aggregation: expr, expr, expr, expr, expr, expr") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()

checkDatasetUnorderly(
Expand All @@ -290,6 +308,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("typed aggregation: expr, expr, expr, expr, expr, expr, expr") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()

checkDatasetUnorderly(
Expand All @@ -308,6 +327,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("typed aggregation: expr, expr, expr, expr, expr, expr, expr, expr") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()

checkDatasetUnorderly(
Expand Down Expand Up @@ -377,6 +397,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("SPARK-24762: typed agg on Option[Product] type") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(Some((1, 2)), Some((2, 3)), Some((1, 3))).toDS()
assert(ds.groupByKey(_.get._1).count().collect() === Seq((1, 2), (2, 1)))

Expand All @@ -386,6 +407,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("SPARK-25942: typed aggregation on primitive type") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(1, 2, 3).toDS()

val agg = ds
Expand All @@ -395,6 +417,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("SPARK-25942: typed aggregation on product type") {
assume(transferredClientTestJarIfNeed)
val ds = Seq((1, 2), (2, 3), (3, 4)).toDS()
val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long])
checkDatasetUnorderly(agg, ((1, 2), 1L, 3L), ((2, 3), 2L, 4L), ((3, 4), 3L, 5L))
Expand All @@ -413,6 +436,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("reduceGroups") {
assume(transferredClientTestJarIfNeed)
val ds = Seq("abc", "xyz", "hello").toDS()
checkDatasetUnorderly(
ds.groupByKey(_.length).reduceGroups(_ + _),
Expand All @@ -421,6 +445,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
}

test("groupby") {
assume(transferredClientTestJarIfNeed)
val ds = Seq(("a", 1, 10), ("a", 2, 20), ("b", 2, 1), ("b", 1, 2), ("c", 1, 1))
.toDF("key", "seq", "value")
val grouped = ds.groupBy($"key").as[String, (String, Int, Int)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import org.apache.spark.sql.functions.{col, udf}
*/
class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
test("Dataset typed filter") {
assume(transferredClientTestJarIfNeed)
val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
}

test("Dataset typed filter - java") {
assume(transferredClientTestJarIfNeed)
val rows = spark
.range(10)
.filter(new FilterFunction[JLong] {
Expand All @@ -49,11 +51,13 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset typed map") {
assume(transferredClientTestJarIfNeed)
val rows = spark.range(10).map(n => n / 2)(PrimitiveLongEncoder).collectAsList()
assert(rows == Arrays.asList[Long](0, 0, 1, 1, 2, 2, 3, 3, 4, 4))
}

test("filter with condition") {
assume(transferredClientTestJarIfNeed)
// This should go via `def filter(condition: Column)` rather than
// `def filter(func: T => Boolean)`
def func(i: Long): Boolean = i < 5
Expand All @@ -63,6 +67,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("filter with col(*)") {
assume(transferredClientTestJarIfNeed)
// This should go via `def filter(condition: Column)` but it is executed as
// `def filter(func: T => Boolean)`. This is fine as the result is the same.
def func(i: Long): Boolean = i < 5
Expand All @@ -72,6 +77,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset typed map - java") {
assume(transferredClientTestJarIfNeed)
val rows = spark
.range(10)
.map(
Expand All @@ -84,6 +90,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset typed flat map") {
assume(transferredClientTestJarIfNeed)
val session: SparkSession = spark
import session.implicits._
val rows = spark
Expand All @@ -95,6 +102,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset typed flat map - java") {
assume(transferredClientTestJarIfNeed)
val rows = spark
.range(5)
.flatMap(
Expand All @@ -108,6 +116,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset typed map partition") {
assume(transferredClientTestJarIfNeed)
val session: SparkSession = spark
import session.implicits._
val df = spark.range(0, 100, 1, 50).repartition(4)
Expand All @@ -117,6 +126,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset typed map partition - java") {
assume(transferredClientTestJarIfNeed)
val df = spark.range(0, 100, 1, 50).repartition(4)
val result = df
.mapPartitions(
Expand All @@ -131,6 +141,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset foreach") {
assume(transferredClientTestJarIfNeed)
val func: JLong => Unit = _ => {
throw new RuntimeException("Hello foreach")
}
Expand All @@ -141,6 +152,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset foreach - java") {
assume(transferredClientTestJarIfNeed)
val exception = intercept[Exception] {
spark
.range(2)
Expand All @@ -154,6 +166,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset foreachPartition") {
assume(transferredClientTestJarIfNeed)
val sum = new AtomicLong()
val func: Iterator[JLong] => Unit = f => {
f.foreach(v => sum.addAndGet(v))
Expand All @@ -170,6 +183,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset foreachPartition - java") {
assume(transferredClientTestJarIfNeed)
val sum = new AtomicLong()
val exception = intercept[Exception] {
spark
Expand All @@ -190,6 +204,7 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset foreach: change not visible to client") {
assume(transferredClientTestJarIfNeed)
val sum = new AtomicLong()
val func: Iterator[JLong] => Unit = f => {
f.foreach(v => sum.addAndGet(v))
Expand All @@ -199,12 +214,14 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
}

test("Dataset reduce") {
assume(transferredClientTestJarIfNeed)
val session: SparkSession = spark
import session.implicits._
assert(spark.range(10).map(_ + 1).reduce(_ + _) == 55)
}

test("Dataset reduce - java") {
assume(transferredClientTestJarIfNeed)
val session: SparkSession = spark
import session.implicits._
assert(
Expand Down
Loading