From baa1e734d8728f783507887d1a8896d2f94c9621 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Tue, 25 Oct 2022 14:37:29 -0700 Subject: [PATCH 1/5] [SPARK-40914][CONNECT] Mark private API to be private[connect]. --- .../scala/org/apache/spark/sql/connect/dsl/package.scala | 9 ++++++--- .../sql/connect/planner/DataTypeProtoConverter.scala | 2 +- .../service/SparkConnectInterceptorRegistry.scala | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 6ae6dfa1577c6..5e210655c9227 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -26,11 +26,14 @@ import org.apache.spark.sql.connect.planner.DataTypeProtoConverter /** * A collection of implicit conversions that create a DSL for constructing connect protos. + * + * All classes in connect/dsl are considered an internal API to Spark Connect and are subject to + * change between minor releases. */ package object dsl { - object expressions { // scalastyle:ignore + private[connect] object expressions { // scalastyle:ignore implicit class DslString(val s: String) { def protoAttr: proto.Expression = proto.Expression @@ -133,7 +136,7 @@ package object dsl { .build() } - object commands { // scalastyle:ignore + private[connect] object commands { // scalastyle:ignore implicit class DslCommands(val logicalPlan: proto.Relation) { def write( format: Option[String] = None, @@ -172,7 +175,7 @@ package object dsl { } } - object plans { // scalastyle:ignore + private[connect] object plans { // scalastyle:ignore implicit class DslLogicalPlan(val logicalPlan: proto.Relation) { def select(exprs: proto.Expression*): proto.Relation = { proto.Relation diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala index 0ee90b5e8fbbb..519c944b89678 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StringType, /** * This object offers methods to convert to/from connect proto to catalyst types. */ -object DataTypeProtoConverter { +private[connect] object DataTypeProtoConverter { def toCatalystType(t: proto.DataType): DataType = { t.getKindCase match { case proto.DataType.KindCase.I32 => IntegerType diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala index cddd4b976638d..b0ce9fdd445f7 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils * added to the GRPC server in order of their position in the list. Once the statically compiled * interceptors are added, dynamically configured interceptors are added. */ -object SparkConnectInterceptorRegistry { +private[connect] object SparkConnectInterceptorRegistry { // Contains the list of configured interceptors. private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq( From a196b719ec9c1fe1e537132466809c0a7b9a96b6 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Tue, 25 Oct 2022 14:46:01 -0700 Subject: [PATCH 2/5] update --- .../org/apache/spark/sql/connect/SparkConnectPlugin.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala index 4ecbfd123f0d2..8ebadb5220416 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala @@ -18,11 +18,10 @@ package org.apache.spark.sql.connect import java.util - import scala.collection.JavaConverters._ import org.apache.spark.SparkContext -import org.apache.spark.annotation.Unstable +import org.apache.spark.annotation.{Since, Unstable} import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.sql.connect.service.SparkConnectService @@ -34,6 +33,7 @@ import org.apache.spark.sql.connect.service.SparkConnectService * JAR is available in the CLASSPATH and the driver plugin is configured to load this class. */ @Unstable +@Since("3.4.0") class SparkConnectPlugin extends SparkPlugin { /** From dae258c06ddb051bec1af235533f10720be7f26d Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Tue, 25 Oct 2022 14:49:45 -0700 Subject: [PATCH 3/5] update --- .../scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala index 8ebadb5220416..766a692527e3f 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connect import java.util + import scala.collection.JavaConverters._ import org.apache.spark.SparkContext From 21d21262a20b959f67203e146bee420a8ecbe0e7 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Tue, 25 Oct 2022 16:50:53 -0700 Subject: [PATCH 4/5] update --- .../scala/org/apache/spark/sql/connect/dsl/package.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 5e210655c9227..a9a97e740d84d 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connect.planner.DataTypeProtoConverter package object dsl { - private[connect] object expressions { // scalastyle:ignore + object expressions { // scalastyle:ignore implicit class DslString(val s: String) { def protoAttr: proto.Expression = proto.Expression @@ -136,7 +136,7 @@ package object dsl { .build() } - private[connect] object commands { // scalastyle:ignore + object commands { // scalastyle:ignore implicit class DslCommands(val logicalPlan: proto.Relation) { def write( format: Option[String] = None, @@ -175,7 +175,7 @@ package object dsl { } } - private[connect] object plans { // scalastyle:ignore + object plans { // scalastyle:ignore implicit class DslLogicalPlan(val logicalPlan: proto.Relation) { def select(exprs: proto.Expression*): proto.Relation = { proto.Relation From 25665d64902d8f0e1e14cc26d8cc5a5633d00675 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Wed, 26 Oct 2022 13:11:38 -0700 Subject: [PATCH 5/5] update --- .../apache/spark/sql/connect/SparkConnectPlugin.scala | 3 --- .../connect/command/SparkConnectCommandPlanner.scala | 3 --- .../sql/connect/planner/DataTypeProtoConverter.scala | 2 +- .../sql/connect/planner/SparkConnectPlanner.scala | 3 --- .../service/SparkConnectInterceptorRegistry.scala | 2 +- .../sql/connect/service/SparkConnectService.scala | 11 ++--------- .../connect/service/SparkConnectStreamHandler.scala | 5 +---- project/SparkBuild.scala | 1 + 8 files changed, 6 insertions(+), 24 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala index 766a692527e3f..bb694a7679890 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala @@ -22,7 +22,6 @@ import java.util import scala.collection.JavaConverters._ import org.apache.spark.SparkContext -import org.apache.spark.annotation.{Since, Unstable} import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.sql.connect.service.SparkConnectService @@ -33,8 +32,6 @@ import org.apache.spark.sql.connect.service.SparkConnectService * implement it as a Driver Plugin. To enable Spark Connect, simply make sure that the appropriate * JAR is available in the CLASSPATH and the driver plugin is configured to load this class. */ -@Unstable -@Since("3.4.0") class SparkConnectPlugin extends SparkPlugin { /** diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala index 47d421a0359bf..80c36a4773e6e 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._ import com.google.common.collect.{Lists, Maps} -import org.apache.spark.annotation.{Since, Unstable} import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.WriteOperation @@ -35,8 +34,6 @@ final case class InvalidCommandInput( private val cause: Throwable = null) extends Exception(message, cause) -@Unstable -@Since("3.4.0") class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command) { lazy val pythonExec = diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala index 519c944b89678..0ee90b5e8fbbb 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StringType, /** * This object offers methods to convert to/from connect proto to catalyst types. */ -private[connect] object DataTypeProtoConverter { +object DataTypeProtoConverter { def toCatalystType(t: proto.DataType): DataType = { t.getKindCase match { case proto.DataType.KindCase.I32 => IntegerType diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 9e3899f4a1a03..53abf2e770909 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.connect.planner import scala.collection.JavaConverters._ -import org.apache.spark.annotation.{Since, Unstable} import org.apache.spark.connect.proto import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} @@ -38,8 +37,6 @@ final case class InvalidPlanInput( private val cause: Throwable = None.orNull) extends Exception(message, cause) -@Unstable -@Since("3.4.0") class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { def transform(): LogicalPlan = { diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala index b0ce9fdd445f7..cddd4b976638d 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils * added to the GRPC server in order of their position in the list. Once the statically compiled * interceptors are added, dynamically configured interceptors are added. */ -private[connect] object SparkConnectInterceptorRegistry { +object SparkConnectInterceptorRegistry { // Contains the list of configured interceptors. private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq( diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 5841017e5bb71..a1e70975da55e 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -27,7 +27,6 @@ import io.grpc.protobuf.services.ProtoReflectionService import io.grpc.stub.StreamObserver import org.apache.spark.SparkEnv -import org.apache.spark.annotation.{Since, Unstable} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AnalyzeResponse, Request, Response, SparkConnectServiceGrpc} import org.apache.spark.internal.Logging @@ -44,8 +43,6 @@ import org.apache.spark.sql.execution.ExtendedMode * @param debug * delegates debug behavior to the handlers. */ -@Unstable -@Since("3.4.0") class SparkConnectService(debug: Boolean) extends SparkConnectServiceGrpc.SparkConnectServiceImplBase with Logging { @@ -127,9 +124,7 @@ class SparkConnectService(debug: Boolean) * @param userId * @param session */ -@Unstable -@Since("3.4.0") -private[connect] case class SessionHolder(userId: String, session: SparkSession) +case class SessionHolder(userId: String, session: SparkSession) /** * Static instance of the SparkConnectService. @@ -137,8 +132,6 @@ private[connect] case class SessionHolder(userId: String, session: SparkSession) * Used to start the overall SparkConnect service and provides global state to manage the * different SparkSession from different users connecting to the cluster. */ -@Unstable -@Since("3.4.0") object SparkConnectService { private val CACHE_SIZE = 100 @@ -169,7 +162,7 @@ object SparkConnectService { /** * Based on the `key` find or create a new SparkSession. */ - private[connect] def getOrCreateIsolatedSession(key: SessionCacheKey): SessionHolder = { + def getOrCreateIsolatedSession(key: SessionCacheKey): SessionHolder = { userSessionMapping.get( key, () => { diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index 997b0f6b6d826..a429823c02f8e 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -23,7 +23,6 @@ import com.google.protobuf.ByteString import io.grpc.stub.StreamObserver import org.apache.spark.SparkException -import org.apache.spark.annotation.{Since, Unstable} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{Request, Response} import org.apache.spark.internal.Logging @@ -34,8 +33,6 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} import org.apache.spark.sql.internal.SQLConf -@Unstable -@Since("3.4.0") class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) extends Logging { // The maximum batch size in bytes for a single batch of data to be returned via proto. @@ -60,7 +57,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte processRows(request.getClientId, rows) } - private[connect] def processRows(clientId: String, rows: DataFrame): Unit = { + def processRows(clientId: String, rows: DataFrame): Unit = { val timeZoneId = SQLConf.get.sessionLocalTimeZone // Only process up to 10MB of data. diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 33883a2efaa51..20c537e0e672f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1177,6 +1177,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/collection"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/kvstore"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/connect"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive")))