Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {

/** Get all batch conf as map */
def getBatchConf(batchType: String): Map[String, String] = {
getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.${batchType.toLowerCase(Locale.ROOT)}", "")
batchType.toLowerCase(Locale.ROOT) match {
case "spark" | "pyspark" => getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.spark", "")
case _ =>
getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.${batchType.toLowerCase(Locale.ROOT)}", "")
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class CreateBatchCommand(cliConfig: CliConfig) extends Command[Batch](cliConfig)
request.get("batchType").asInstanceOf[String],
request.get("resource").asInstanceOf[String],
request.get("className").asInstanceOf[String],
request.get("pythonFiles").asInstanceOf[String],
request.get("name").asInstanceOf[String],
config,
args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class BatchRequest {
private String batchType;
private String resource;
private String className;
private String pythonFiles;
private String name;
private Map<String, String> conf;
private List<String> args;
Expand All @@ -38,12 +39,14 @@ public BatchRequest(
String batchType,
String resource,
String className,
String pythonFiles,
String name,
Map<String, String> conf,
List<String> args) {
this.batchType = batchType;
this.resource = resource;
this.className = className;
this.pythonFiles = pythonFiles;
this.name = name;
this.conf = conf;
this.args = args;
Expand All @@ -53,6 +56,7 @@ public BatchRequest(String batchType, String resource, String className, String
this.batchType = batchType;
this.resource = resource;
this.className = className;
this.pythonFiles = null;
this.name = name;
this.conf = Collections.emptyMap();
this.args = Collections.emptyList();
Expand Down Expand Up @@ -82,6 +86,14 @@ public void setClassName(String className) {
this.className = className;
}

public String getPythonFiles() {
return pythonFiles;
}

public void setPythonFiles(String pythonFiles) {
this.pythonFiles = pythonFiles;
}

public String getName() {
return name;
}
Expand Down Expand Up @@ -120,6 +132,7 @@ public boolean equals(Object o) {
return Objects.equals(getBatchType(), that.getBatchType())
&& Objects.equals(getResource(), that.getResource())
&& Objects.equals(getClassName(), that.getClassName())
&& Objects.equals(getPythonFiles(), that.getPythonFiles())
&& Objects.equals(getName(), that.getName())
&& Objects.equals(getConf(), that.getConf())
&& Objects.equals(getArgs(), that.getArgs());
Expand All @@ -128,7 +141,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
getBatchType(), getResource(), getClassName(), getName(), getConf(), getArgs());
getBatchType(), getResource(), getClassName(), getPythonFiles(), getName(), getConf(), getArgs());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public static BatchRequest generateTestBatchRequest() {
"spark",
"/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar",
"org.apache.kyuubi.engine.spark.SparkSQLEngine",
null,
"test_batch",
Collections.singletonMap("spark.driver.memory", "16m"),
Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE metadata(
state varchar(128) NOT NULL, -- the session state
resource varchar(1024), -- the main resource
class_name varchar(1024), -- the main class name
python_files varchar(1024), -- the python files
request_name varchar(1024), -- the request name
request_conf clob, -- the request config map
request_args clob, -- the request arguments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE metadata(
state varchar(128) NOT NULL COMMENT 'the session state',
resource varchar(1024) COMMENT 'the main resource',
class_name varchar(1024) COMMENT 'the main class name',
python_files varchar(1024) COMMENT 'the python files',
request_name varchar(1024) COMMENT 'the request name',
request_conf mediumtext COMMENT 'the request config map',
request_args mediumtext COMMENT 'the request arguments',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ object KyuubiApplicationManager {
appConf: Map[String, String],
kyuubiConf: KyuubiConf): Unit = {
applicationType.toUpperCase(Locale.ROOT) match {
case appType if appType.startsWith("SPARK") => checkSparkAccessPaths(appConf, kyuubiConf)
case appType if appType.equals("PYSPARK") || appType.startsWith("SPARK") =>
checkSparkAccessPaths(appConf, kyuubiConf)
case appType if appType.startsWith("FLINK") => // TODO: check flink app access local paths
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,29 @@ class SparkBatchProcessBuilder(
batchId: String,
batchName: String,
override val mainResource: Option[String],
override val mainClass: String,
className: Option[String],
pythonFiles: Option[String],
batchConf: Map[String, String],
batchArgs: Seq[String],
override val extraEngineLog: Option[OperationLog])
extends SparkProcessBuilder(proxyUser, conf, batchId, extraEngineLog) {
import SparkProcessBuilder._

override def mainClass: String = className.orNull

override protected val commands: Array[String] = {
val buffer = new ArrayBuffer[String]()
buffer += executable
Option(mainClass).foreach { cla =>
className.foreach { cla =>
buffer += CLASS
buffer += cla
}

pythonFiles.foreach { files =>
buffer += PYTHON_FILES
buffer +=files
}

val batchKyuubiConf = new KyuubiConf(false)
batchConf.foreach(entry => { batchKyuubiConf.set(entry._1, entry._2) })
// tag batch application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ object SparkProcessBuilder {
final private[spark] val CLASS = "--class"
final private[spark] val PROXY_USER = "--proxy-user"
final private[spark] val SPARK_FILES = "spark.files"
final private[spark] val PYTHON_FILES = "--py-files"
final private[spark] val PRINCIPAL = "spark.kerberos.principal"
final private[spark] val KEYTAB = "spark.kerberos.keytab"
// Get the appropriate spark-submit file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class BatchJobSubmission(
val batchType: String,
val batchName: String,
resource: String,
className: String,
className: Option[String],
pythonFiles: Option[String],
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata])
Expand All @@ -77,14 +78,15 @@ class BatchJobSubmission(
@VisibleForTesting
private[kyuubi] val builder: ProcBuilder = {
Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
case Some("SPARK") =>
case Some("SPARK") | Some("PYSPARK") =>
new SparkBatchProcessBuilder(
session.user,
session.sessionConf,
batchId,
batchName,
Option(resource),
className,
pythonFiles,
batchConf,
batchArgs,
getOperationLog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
batchType: String,
batchName: String,
resource: String,
className: String,
className: Option[String],
pythonFiles: Option[String],
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata]): BatchJobSubmission = {
Expand All @@ -79,6 +80,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
batchName,
resource,
className,
pythonFiles,
batchConf,
batchArgs,
recoveryMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
require(
supportedBatchType(request.getBatchType),
s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}")
require(request.getResource != null, "resource is a required parameter")
require(request.getClassName != null, "classname is a required parameter")
request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))
require(request.getResource != null, "resource is a required parameter")
if (request.getBatchType.equals("SPARK")) {
require(request.getClassName != null, "classname is a required parameter for SPARK")
}
if (request.getBatchType.equals("PYSPARK")) {
require(request.getPythonFiles != null, "pythonFiles is a required parameter for PYSPARK")
}

val userName = fe.getSessionUser(request.getConf.asScala.toMap)
val ipAddress = fe.getIpAddress
Expand Down Expand Up @@ -365,7 +370,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
}

object BatchesResource {
val SUPPORTED_BATCH_TYPES = Seq("SPARK")
val SUPPORTED_BATCH_TYPES = Seq("SPARK", "PYSPARK")
val VALID_BATCH_STATES = Seq(
OperationState.PENDING,
OperationState.RUNNING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ case class Metadata(
state: String = null,
resource: String = null,
className: String = null,
pythonFiles: String = null,
requestName: String = null,
requestConf: Map[String, String] = Map.empty,
requestArgs: Seq[String] = Seq.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
|state,
|resource,
|class_name,
|python_files,
|request_name,
|request_conf,
|request_args,
Expand All @@ -143,6 +144,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
metadata.state,
metadata.resource,
metadata.className,
metadata.pythonFiles,
metadata.requestName,
valueAsString(metadata.requestConf),
valueAsString(metadata.requestArgs),
Expand Down Expand Up @@ -320,12 +322,14 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {

var resource: String = null
var className: String = null
var pythonFiles: String = null
var requestConf: Map[String, String] = Map.empty
var requestArgs: Seq[String] = Seq.empty

if (!stateOnly) {
resource = resultSet.getString("resource")
className = resultSet.getString("class_name")
pythonFiles = resultSet.getString("python_files")
requestConf = string2Map(resultSet.getString("request_conf"))
requestArgs = string2Seq(resultSet.getString("request_args"))
}
Expand Down Expand Up @@ -499,6 +503,7 @@ object JDBCMetadataStore {
METADATA_STATE_ONLY_COLUMNS,
"resource",
"class_name",
"python_files",
"request_conf",
"request_args").mkString(",")
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class KyuubiBatchSessionImpl(
batchRequest.getBatchType,
batchRequest.getName,
batchRequest.getResource,
batchRequest.getClassName,
Option(batchRequest.getClassName),
Option(batchRequest.getPythonFiles),
normalizedConf,
batchRequest.getArgs.asScala,
recoveryMetadata)
Expand Down Expand Up @@ -121,6 +122,7 @@ class KyuubiBatchSessionImpl(
state = OperationState.PENDING.toString,
resource = batchRequest.getResource,
className = batchRequest.getClassName,
pythonFiles = batchRequest.getPythonFiles,
requestName = batchRequest.getName,
requestConf = normalizedConf,
requestArgs = batchRequest.getArgs.asScala,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
metadata.engineType,
metadata.resource,
metadata.className,
metadata.pythonFiles,
metadata.requestName,
metadata.requestConf.asJava,
metadata.requestArgs.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ trait BatchTestHelper {
batchType,
resource,
className,
null,
name,
conf.asJava,
args.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
batchId2,
"RUNNING_RECOVERY",
sparkBatchTestResource,
sparkBatchTestMainClass,
Some(sparkBatchTestMainClass),
None,
batchMetadata2.requestConf,
batchMetadata2.requestArgs,
None)
Expand Down