Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions connector/connect/bin/spark-connect-build
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,5 @@ SCALA_BINARY_VER=`grep "scala.binary.version" "${SPARK_HOME}/pom.xml" | head -n1
SCALA_VER=`grep "scala.version" "${SPARK_HOME}/pom.xml" | grep ${SCALA_BINARY_VER} | head -n1 | awk -F '[<>]' '{print $3}'`
SCALA_ARG="-Pscala-${SCALA_BINARY_VER}"

# Build the jars needed for spark submit and spark connect
build/sbt "${SCALA_ARG}" -Phive -Pconnect package || exit 1
# Build the jars needed for spark connect JVM client
build/sbt "${SCALA_ARG}" "sql/package;connect-client-jvm/assembly" || exit 1
# Build the jars needed for spark submit and spark connect JVM client
build/sbt "${SCALA_ARG}" -Phive -Pconnect package "connect-client-jvm/package" || exit 1
2 changes: 1 addition & 1 deletion connector/connect/bin/spark-connect-scala-client
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ SCALA_ARG="-Pscala-${SCALA_BINARY_VER}"
SCBUILD="${SCBUILD:-1}"
if [ "$SCBUILD" -eq "1" ]; then
# Build the jars needed for spark connect JVM client
build/sbt "${SCALA_ARG}" "sql/package;connect-client-jvm/assembly" || exit 1
build/sbt "${SCALA_ARG}" "connect-client-jvm/package" || exit 1
fi

if [ -z "$SCCLASSPATH" ]; then
Expand Down
3 changes: 1 addition & 2 deletions connector/connect/bin/spark-connect-scala-client-classpath
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ SCALA_VER=`grep "scala.version" "${SPARK_HOME}/pom.xml" | grep ${SCALA_BINARY_VE
SCALA_ARG="-Pscala-${SCALA_BINARY_VER}"

CONNECT_CLASSPATH="$(build/sbt "${SCALA_ARG}" -DcopyDependencies=false "export connect-client-jvm/fullClasspath" | grep jar | tail -n1)"
SQL_CLASSPATH="$(build/sbt "${SCALA_ARG}" -DcopyDependencies=false "export sql/fullClasspath" | grep jar | tail -n1)"

echo "$CONNECT_CLASSPATH:$CLASSPATH"
echo "$CONNECT_CLASSPATH"
5 changes: 0 additions & 5 deletions connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Use mima to perform the compatibility check -->
<dependency>
<groupId>com.typesafe</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect.client

/**
* Class used to test stubbing. This needs to be in the main source tree, because this is not
* synced with the connect server during tests.
*/
case class ToStub(value: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import static org.apache.spark.sql.Encoders.*;
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.RowFactory.create;
import org.apache.spark.sql.connect.client.SparkConnectClient;
import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.test.SparkConnectServerUtils;
import org.apache.spark.sql.types.StructType;

/**
Expand All @@ -40,14 +39,7 @@ public class JavaEncoderSuite implements Serializable {

@BeforeClass
public static void setup() {
SparkConnectServerUtils.start();
spark = SparkSession
.builder()
.client(SparkConnectClient
.builder()
.port(SparkConnectServerUtils.port())
.build())
.create();
spark = SparkConnectServerUtils.createSparkSession();
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.{File, FilenameFilter}
import org.apache.commons.io.FileUtils

import org.apache.spark.SparkException
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.test.{RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.storage.StorageLevel

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Random
import org.scalatest.matchers.must.Matchers._

import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.test.RemoteSparkSession

class ClientDataFrameStatSuite extends RemoteSparkSession {
private def toLetter(i: Int): String = (i + 97).toChar.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.scalatest.BeforeAndAfterEach

import org.apache.spark.connect.proto
import org.apache.spark.sql.connect.client.{DummySparkConnectService, SparkConnectClient}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.ConnectFunSuite

// Add sample tests.
// - sample fraction: simple.sample(0.1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils.port
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.test.{IntegrationTestUtils, RemoteSparkSession, SQLHelper}
import org.apache.spark.sql.test.SparkConnectServerUtils.port
import org.apache.spark.sql.types._

class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.ByteArrayOutputStream
import scala.collection.JavaConverters._

import org.apache.spark.sql.{functions => fn}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.sql.types._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.sql

import scala.collection.JavaConverters._

import org.apache.spark.sql.connect.client.util.QueryTest
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.test.{QueryTest, SQLHelper}
import org.apache.spark.sql.types.{StringType, StructType}

class DataFrameNaFunctionSuite extends QueryTest with SQLHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import java.util.Collections
import scala.collection.JavaConverters._

import org.apache.spark.sql.avro.{functions => avroFn}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.functions._
import org.apache.spark.sql.protobuf.{functions => pbFn}
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.sql.types.{DataType, StructType}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import java.sql.Timestamp
import java.util.Arrays

import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
import org.apache.spark.sql.connect.client.util.QueryTest
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.test.{QueryTest, SQLHelper}
import org.apache.spark.sql.types._

case class ClickEvent(id: String, timestamp: Timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ import org.apache.spark.sql.avro.{functions => avroFn}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.connect.client.SparkConnectClient
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.connect.client.util.IntegrationTestUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.protobuf.{functions => pbFn}
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.SparkFileUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.connect.client.SparkConnectClient
import org.apache.spark.sql.connect.client.arrow.{ArrowDeserializers, ArrowSerializer}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite

/**
* Test suite for SQL implicits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.{Failure, Success}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.SparkException
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.test.RemoteSparkSession
import org.apache.spark.util.SparkThreadUtils.awaitResult

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.control.NonFatal

import io.grpc.{CallOptions, Channel, ClientCall, ClientInterceptor, MethodDescriptor}

import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite

/**
* Tests for non-dataframe related SparkSession operations.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.sql.connect.client.ToStub
import org.apache.spark.sql.test.RemoteSparkSession

class StubbingTestSuite extends RemoteSparkSession {
private def eval[T](f: => T): T = f

test("capture of to-be stubbed class") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a working reproduction for issue. Now that SparkResult has moved to common, I guess we need to find a new problem...

Copy link
Contributor

@LuciferYang LuciferYang Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this test case can't reproduce the problem. The new test requires a class like SparkResult

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually revert the changes from SPARK-44806 based on this PR and verified that the test cases in SparkSessionE2ESuite can pass through Maven. However, it would be best if we could find a new target case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revamped the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm it is working with Maven, and that it triggers stubbing on the server side.

val session = spark
import session.implicits._
val result = spark
.range(0, 10, 1, 1)
.map(n => n + 1)
.as[ToStub]
.head()
eval {
assert(result.value == 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect.client
package org.apache.spark.sql

import java.io.File
import java.nio.file.{Files, Paths}

import scala.util.Properties

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.connect.common.ProtoDataTypes
import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
import org.apache.spark.sql.test.RemoteSparkSession

class UDFClassLoadingE2ESuite extends RemoteSparkSession {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.api.java.function._
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
import org.apache.spark.sql.connect.client.util.QueryTest
import org.apache.spark.sql.functions.{col, struct, udf}
import org.apache.spark.sql.test.QueryTest
import org.apache.spark.sql.types.IntegerType

/**
Expand Down Expand Up @@ -215,33 +215,31 @@ class UserDefinedFunctionE2ETestSuite extends QueryTest {
}

test("Dataset foreachPartition") {
val sum = new AtomicLong()
val func: Iterator[JLong] => Unit = f => {
val sum = new AtomicLong()
f.foreach(v => sum.addAndGet(v))
// The value should be 45
assert(sum.get() == -1)
throw new Exception("Success, processed records: " + sum.get())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fun one. This was throwing a scalatest specific exception. The problem with this is that this error is decoded by the ResultHandler in Spark Core, this does not have the thread so things fail with a ClassNotFoundException instead of the expected exception.

}
val exception = intercept[Exception] {
spark.range(10).repartition(1).foreachPartition(func)
}
assert(exception.getMessage.contains("45 did not equal -1"))
assert(exception.getMessage.contains("Success, processed records: 45"))
}

test("Dataset foreachPartition - java") {
val sum = new AtomicLong()
val exception = intercept[Exception] {
spark
.range(10)
.range(11)
.repartition(1)
.foreachPartition(new ForeachPartitionFunction[JLong] {
override def call(t: JIterator[JLong]): Unit = {
t.asScala.foreach(v => sum.addAndGet(v))
// The value should be 45
assert(sum.get() == -1)
throw new Exception("Success, processed records: " + sum.get())
}
})
}
assert(exception.getMessage.contains("45 did not equal -1"))
assert(exception.getMessage.contains("Success, processed records: 55"))
}

test("Dataset foreach: change not visible to client") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import scala.reflect.runtime.universe.typeTag

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.connect.common.UdfPacket
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.util.SparkSerDeUtils

class UserDefinedFunctionSuite extends ConnectFunSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.Properties
import org.apache.commons.io.output.ByteArrayOutputStream
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
import org.apache.spark.sql.test.{IntegrationTestUtils, RemoteSparkSession}

class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.BeforeAndAfterEach

import org.apache.spark.connect.proto.AddArtifactsRequest
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite

class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.regex.Pattern
import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.lib.MiMaLib

import org.apache.spark.sql.connect.client.util.IntegrationTestUtils._
import org.apache.spark.sql.test.IntegrationTestUtils._

/**
* A tool for checking the binary compatibility of the connect client API against the spark SQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.file.Paths

import org.apache.commons.io.FileUtils

import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.util.SparkFileUtils

class ClassFinderSuite extends ConnectFunSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.connect.client

import java.util.UUID

import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite

/**
* Test suite for [[SparkConnectClient.Builder]] parsing and configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.SparkException
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, ArtifactStatusesRequest, ArtifactStatusesResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.connect.common.config.ConnectCommon
import org.apache.spark.sql.test.ConnectFunSuite

class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._
import org.apache.spark.sql.catalyst.util.SparkIntervalUtils._
import org.apache.spark.sql.connect.client.CloseableIterator
import org.apache.spark.sql.connect.client.arrow.FooEnum.FooEnum
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.sql.types.{ArrayType, DataType, DayTimeIntervalType, Decimal, DecimalType, IntegerType, Metadata, SQLUserDefinedType, StructType, UserDefinedType, YearMonthIntervalType}

/**
Expand Down
Loading