diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 33a209de1ee..31b66644f7a 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -124,7 +124,11 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { /** Get all batch conf as map */ def getBatchConf(batchType: String): Map[String, String] = { - getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.${batchType.toLowerCase(Locale.ROOT)}", "") + batchType.toLowerCase(Locale.ROOT) match { + case "spark" | "pyspark" => getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.spark", "") + case _ => + getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.${batchType.toLowerCase(Locale.ROOT)}", "") + } } /** diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala index 18618e37e34..4db2a0a4a86 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/create/CreateBatchCommand.scala @@ -55,6 +55,7 @@ class CreateBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig) request.get("batchType").asInstanceOf[String], request.get("resource").asInstanceOf[String], request.get("className").asInstanceOf[String], + request.get("pythonFiles").asInstanceOf[String], request.get("name").asInstanceOf[String], config, args) diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java index f10a8fdb5f2..d9f574c677f 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/BatchRequest.java @@ -28,6 +28,7 @@ public class BatchRequest { private String batchType; private String resource; private String className; + private String pythonFiles; private String name; private Map conf; private List args; @@ -38,12 +39,14 @@ public BatchRequest( String batchType, String resource, String className, + String pythonFiles, String name, Map conf, List args) { this.batchType = batchType; this.resource = resource; this.className = className; + this.pythonFiles = pythonFiles; this.name = name; this.conf = conf; this.args = args; @@ -53,6 +56,7 @@ public BatchRequest(String batchType, String resource, String className, String this.batchType = batchType; this.resource = resource; this.className = className; + this.pythonFiles = null; this.name = name; this.conf = Collections.emptyMap(); this.args = Collections.emptyList(); @@ -82,6 +86,14 @@ public void setClassName(String className) { this.className = className; } + public String getPythonFiles() { + return pythonFiles; + } + + public void setPythonFiles(String pythonFiles) { + this.pythonFiles = pythonFiles; + } + public String getName() { return name; } @@ -120,6 +132,7 @@ public boolean equals(Object o) { return Objects.equals(getBatchType(), that.getBatchType()) && Objects.equals(getResource(), that.getResource()) && Objects.equals(getClassName(), that.getClassName()) + && Objects.equals(getPythonFiles(), that.getPythonFiles()) && Objects.equals(getName(), that.getName()) && Objects.equals(getConf(), that.getConf()) && Objects.equals(getArgs(), that.getArgs()); @@ -128,7 +141,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( - getBatchType(), getResource(), getClassName(), getName(), getConf(), getArgs()); + getBatchType(), getResource(), getClassName(), getPythonFiles(), getName(), getConf(), getArgs()); } @Override diff --git a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java index bc93b55dff5..b3a57811bb9 100644 --- a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java +++ b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtils.java @@ -69,6 +69,7 @@ public static BatchRequest generateTestBatchRequest() { "spark", "/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar", "org.apache.kyuubi.engine.spark.SparkSQLEngine", + null, "test_batch", Collections.singletonMap("spark.driver.memory", "16m"), Collections.emptyList()); diff --git a/kyuubi-server/src/main/resources/sql/derby/metadata-store-schema-derby.sql b/kyuubi-server/src/main/resources/sql/derby/metadata-store-schema-derby.sql index ecf41d6ab44..faf48244f3e 100644 --- a/kyuubi-server/src/main/resources/sql/derby/metadata-store-schema-derby.sql +++ b/kyuubi-server/src/main/resources/sql/derby/metadata-store-schema-derby.sql @@ -11,6 +11,7 @@ CREATE TABLE metadata( state varchar(128) NOT NULL, -- the session state resource varchar(1024), -- the main resource class_name varchar(1024), -- the main class name + python_files varchar(1024), -- the python files request_name varchar(1024), -- the request name request_conf clob, -- the request config map request_args clob, -- the request arguments diff --git a/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-mysql.sql b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-mysql.sql index 4efddaa92b9..96ea188e5d2 100644 --- a/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-mysql.sql +++ b/kyuubi-server/src/main/resources/sql/mysql/metadata-store-schema-mysql.sql @@ -11,6 +11,7 @@ CREATE TABLE metadata( state varchar(128) NOT NULL COMMENT 'the session state', resource varchar(1024) COMMENT 'the main resource', class_name varchar(1024) COMMENT 'the main class name', + python_files varchar(1024) COMMENT 'the python files', request_name varchar(1024) COMMENT 'the request name', request_conf mediumtext COMMENT 'the request config map', request_args mediumtext COMMENT 'the request arguments', diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index b772afc0479..ce6af1ac1e7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -178,7 +178,8 @@ object KyuubiApplicationManager { appConf: Map[String, String], kyuubiConf: KyuubiConf): Unit = { applicationType.toUpperCase(Locale.ROOT) match { - case appType if appType.startsWith("SPARK") => checkSparkAccessPaths(appConf, kyuubiConf) + case appType if appType.equals("PYSPARK") || appType.startsWith("SPARK") => + checkSparkAccessPaths(appConf, kyuubiConf) case appType if appType.startsWith("FLINK") => // TODO: check flink app access local paths case _ => } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index e7de6baa449..1b284c71073 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -29,21 +29,29 @@ class SparkBatchProcessBuilder( batchId: String, batchName: String, override val mainResource: Option[String], - override val mainClass: String, + className: Option[String], + pythonFiles: Option[String], batchConf: Map[String, String], batchArgs: Seq[String], override val extraEngineLog: Option[OperationLog]) extends SparkProcessBuilder(proxyUser, conf, batchId, extraEngineLog) { import SparkProcessBuilder._ + override def mainClass: String = className.orNull + override protected val commands: Array[String] = { val buffer = new ArrayBuffer[String]() buffer += executable - Option(mainClass).foreach { cla => + className.foreach { cla => buffer += CLASS buffer += cla } + pythonFiles.foreach { files => + buffer += PYTHON_FILES + buffer +=files + } + val batchKyuubiConf = new KyuubiConf(false) batchConf.foreach(entry => { batchKyuubiConf.set(entry._1, entry._2) }) // tag batch application diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index cd12dbd36c4..2cc7c578e4c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -216,6 +216,7 @@ object SparkProcessBuilder { final private[spark] val CLASS = "--class" final private[spark] val PROXY_USER = "--proxy-user" final private[spark] val SPARK_FILES = "spark.files" + final private[spark] val PYTHON_FILES = "--py-files" final private[spark] val PRINCIPAL = "spark.kerberos.principal" final private[spark] val KEYTAB = "spark.kerberos.keytab" // Get the appropriate spark-submit file diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 1dfef22b0f2..be38abd6a66 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -54,7 +54,8 @@ class BatchJobSubmission( val batchType: String, val batchName: String, resource: String, - className: String, + className: Option[String], + pythonFiles: Option[String], batchConf: Map[String, String], batchArgs: Seq[String], recoveryMetadata: Option[Metadata]) @@ -77,7 +78,7 @@ class BatchJobSubmission( @VisibleForTesting private[kyuubi] val builder: ProcBuilder = { Option(batchType).map(_.toUpperCase(Locale.ROOT)) match { - case Some("SPARK") => + case Some("SPARK") | Some("PYSPARK") => new SparkBatchProcessBuilder( session.user, session.sessionConf, @@ -85,6 +86,7 @@ class BatchJobSubmission( batchName, Option(resource), className, + pythonFiles, batchConf, batchArgs, getOperationLog) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala index c63ba9a6f78..ae152ea50cc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala @@ -69,7 +69,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam batchType: String, batchName: String, resource: String, - className: String, + className: Option[String], + pythonFiles: Option[String], batchConf: Map[String, String], batchArgs: Seq[String], recoveryMetadata: Option[Metadata]): BatchJobSubmission = { @@ -79,6 +80,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam batchName, resource, className, + pythonFiles, batchConf, batchArgs, recoveryMetadata) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 6900c1a7ffe..a6a45982d09 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -162,9 +162,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { require( supportedBatchType(request.getBatchType), s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}") - require(request.getResource != null, "resource is a required parameter") - require(request.getClassName != null, "classname is a required parameter") request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT)) + require(request.getResource != null, "resource is a required parameter") + if (request.getBatchType.equals("SPARK")) { + require(request.getClassName != null, "classname is a required parameter for SPARK") + } + if (request.getBatchType.equals("PYSPARK")) { + require(request.getPythonFiles != null, "pythonFiles is a required parameter for PYSPARK") + } val userName = fe.getSessionUser(request.getConf.asScala.toMap) val ipAddress = fe.getIpAddress @@ -365,7 +370,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { } object BatchesResource { - val SUPPORTED_BATCH_TYPES = Seq("SPARK") + val SUPPORTED_BATCH_TYPES = Seq("SPARK", "PYSPARK") val VALID_BATCH_STATES = Seq( OperationState.PENDING, OperationState.RUNNING, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala index d800104d0cb..90e8590a65f 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala @@ -59,6 +59,7 @@ case class Metadata( state: String = null, resource: String = null, className: String = null, + pythonFiles: String = null, requestName: String = null, requestConf: Map[String, String] = Map.empty, requestArgs: Seq[String] = Seq.empty, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index 0cffaeca3f3..b3770388b7c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -120,6 +120,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { |state, |resource, |class_name, + |python_files, |request_name, |request_conf, |request_args, @@ -143,6 +144,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { metadata.state, metadata.resource, metadata.className, + metadata.pythonFiles, metadata.requestName, valueAsString(metadata.requestConf), valueAsString(metadata.requestArgs), @@ -320,12 +322,14 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { var resource: String = null var className: String = null + var pythonFiles: String = null var requestConf: Map[String, String] = Map.empty var requestArgs: Seq[String] = Seq.empty if (!stateOnly) { resource = resultSet.getString("resource") className = resultSet.getString("class_name") + pythonFiles = resultSet.getString("python_files") requestConf = string2Map(resultSet.getString("request_conf")) requestArgs = string2Seq(resultSet.getString("request_args")) } @@ -499,6 +503,7 @@ object JDBCMetadataStore { METADATA_STATE_ONLY_COLUMNS, "resource", "class_name", + "python_files", "request_conf", "request_args").mkString(",") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala index 24dcc0477c3..b2f134951ea 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala @@ -70,7 +70,8 @@ class KyuubiBatchSessionImpl( batchRequest.getBatchType, batchRequest.getName, batchRequest.getResource, - batchRequest.getClassName, + Option(batchRequest.getClassName), + Option(batchRequest.getPythonFiles), normalizedConf, batchRequest.getArgs.asScala, recoveryMetadata) @@ -121,6 +122,7 @@ class KyuubiBatchSessionImpl( state = OperationState.PENDING.toString, resource = batchRequest.getResource, className = batchRequest.getClassName, + pythonFiles = batchRequest.getPythonFiles, requestName = batchRequest.getName, requestConf = normalizedConf, requestArgs = batchRequest.getArgs.asScala, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index ff051019951..40b653bd64d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -237,6 +237,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { metadata.engineType, metadata.resource, metadata.className, + metadata.pythonFiles, metadata.requestName, metadata.requestConf.asJava, metadata.requestArgs.asJava) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala index 27298dbf16d..4697697669d 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala @@ -47,6 +47,7 @@ trait BatchTestHelper { batchType, resource, className, + null, name, conf.asJava, args.asJava) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 1f59a7dc6ff..be7b477444b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -409,7 +409,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi batchId2, "RUNNING_RECOVERY", sparkBatchTestResource, - sparkBatchTestMainClass, + Some(sparkBatchTestMainClass), + None, batchMetadata2.requestConf, batchMetadata2.requestArgs, None)