Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util
import scala.collection.JavaConverters._

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Unstable
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.sql.connect.service.SparkConnectService

Expand All @@ -33,7 +32,6 @@ import org.apache.spark.sql.connect.service.SparkConnectService
* implement it as a Driver Plugin. To enable Spark Connect, simply make sure that the appropriate
* JAR is available in the CLASSPATH and the driver plugin is configured to load this class.
*/
@Unstable
class SparkConnectPlugin extends SparkPlugin {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._

import com.google.common.collect.{Lists, Maps}

import org.apache.spark.annotation.{Since, Unstable}
import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.WriteOperation
Expand All @@ -35,8 +34,6 @@ final case class InvalidCommandInput(
private val cause: Throwable = null)
extends Exception(message, cause)

@Unstable
@Since("3.4.0")
class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command) {

lazy val pythonExec =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import org.apache.spark.sql.connect.planner.DataTypeProtoConverter

/**
* A collection of implicit conversions that create a DSL for constructing connect protos.
*
* All classes in connect/dsl are considered an internal API to Spark Connect and are subject to
* change between minor releases.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still keep this to match Catalyst's Dsl.

*/

package object dsl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.connect.planner

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Since, Unstable}
import org.apache.spark.connect.proto
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
Expand All @@ -38,8 +37,6 @@ final case class InvalidPlanInput(
private val cause: Throwable = None.orNull)
extends Exception(message, cause)

@Unstable
@Since("3.4.0")
class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {

def transform(): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.stub.StreamObserver

import org.apache.spark.SparkEnv
import org.apache.spark.annotation.{Since, Unstable}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AnalyzeResponse, Request, Response, SparkConnectServiceGrpc}
import org.apache.spark.internal.Logging
Expand All @@ -44,8 +43,6 @@ import org.apache.spark.sql.execution.ExtendedMode
* @param debug
* delegates debug behavior to the handlers.
*/
@Unstable
@Since("3.4.0")
class SparkConnectService(debug: Boolean)
extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
with Logging {
Expand Down Expand Up @@ -127,18 +124,14 @@ class SparkConnectService(debug: Boolean)
* @param userId
* @param session
*/
@Unstable
@Since("3.4.0")
private[connect] case class SessionHolder(userId: String, session: SparkSession)
case class SessionHolder(userId: String, session: SparkSession)

/**
* Static instance of the SparkConnectService.
*
* Used to start the overall SparkConnect service and provides global state to manage the
* different SparkSession from different users connecting to the cluster.
*/
@Unstable
@Since("3.4.0")
object SparkConnectService {

private val CACHE_SIZE = 100
Expand Down Expand Up @@ -169,7 +162,7 @@ object SparkConnectService {
/**
* Based on the `key` find or create a new SparkSession.
*/
private[connect] def getOrCreateIsolatedSession(key: SessionCacheKey): SessionHolder = {
def getOrCreateIsolatedSession(key: SessionCacheKey): SessionHolder = {
userSessionMapping.get(
key,
() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver

import org.apache.spark.SparkException
import org.apache.spark.annotation.{Since, Unstable}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{Request, Response}
import org.apache.spark.internal.Logging
Expand All @@ -34,8 +33,6 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
import org.apache.spark.sql.internal.SQLConf

@Unstable
@Since("3.4.0")
class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) extends Logging {

// The maximum batch size in bytes for a single batch of data to be returned via proto.
Expand All @@ -60,7 +57,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
processRows(request.getClientId, rows)
}

private[connect] def processRows(clientId: String, rows: DataFrame): Unit = {
def processRows(clientId: String, rows: DataFrame): Unit = {
val timeZoneId = SQLConf.get.sessionLocalTimeZone

// Only process up to 10MB of data.
Expand Down
1 change: 1 addition & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ object Unidoc {
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/collection")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/kvstore")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/connect")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive")))
Expand Down