Skip to content

Commit f85fc46

Browse files
yifhuaturboFei
authored andcommitted
[HADP-52545][HADP-40838][HADP-41621][HADP-42682] Support built-in ETL Sql Driver, execute select statement,specify init files for etl sql initialization (apache#207)
Co-authored-by: fwang12 <[email protected]>
1 parent b9a51a1 commit f85fc46

File tree

23 files changed

+917
-117
lines changed

23 files changed

+917
-117
lines changed

assembly/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,16 @@
224224
</dependency>
225225
</dependencies>
226226
</profile>
227+
<profile>
228+
<id>ebay-etl-sql</id>
229+
<dependencies>
230+
<dependency>
231+
<groupId>org.apache.spark</groupId>
232+
<artifactId>spark-ebay-etl-sql_${scala.binary.version}</artifactId>
233+
<version>${project.version}</version>
234+
</dependency>
235+
</dependencies>
236+
</profile>
227237
<profile>
228238
<id>spark-ganglia-lgpl</id>
229239
<dependencies>

bin/spark-etl-sql

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#!/usr/bin/env bash
2+
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one or more
5+
# contributor license agreements. See the NOTICE file distributed with
6+
# this work for additional information regarding copyright ownership.
7+
# The ASF licenses this file to You under the Apache License, Version 2.0
8+
# (the "License"); you may not use this file except in compliance with
9+
# the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
if [ -z "${SPARK_HOME}" ]; then
21+
source "$(dirname "$0")"/find-spark-home
22+
fi
23+
24+
export _SPARK_CMD_USAGE="Usage: ./bin/spark-etl-sql [options] [ETL SQL Driver options]"
25+
exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.ebay.ETLSqlDriver "$@"

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2375,29 +2375,29 @@ class SparkContext(config: SparkConf) extends Logging {
23752375
postApplicationEnd()
23762376
}
23772377
}
2378-
Utils.tryLogNonFatalError {
2378+
logStop("driver logger") {
23792379
_driverLogger.foreach(_.stop())
23802380
}
2381-
Utils.tryLogNonFatalError {
2381+
logStop("ui") {
23822382
_scratchPath.foreach(SparkHadoopUtil.deletePath(_, hadoopConfiguration))
23832383
}
23842384
Utils.tryLogNonFatalError {
23852385
_ui.foreach(_.stop())
23862386
}
2387-
Utils.tryLogNonFatalError {
2387+
logStop("context cleaner") {
23882388
_cleaner.foreach(_.stop())
23892389
}
2390-
Utils.tryLogNonFatalError {
2390+
logStop("executor allocation manager") {
23912391
_executorAllocationManager.foreach(_.stop())
23922392
}
23932393
if (_dagScheduler != null) {
2394-
Utils.tryLogNonFatalError {
2394+
logStop("dag schedule") {
23952395
_dagScheduler.stop(exitCode)
23962396
}
23972397
_dagScheduler = null
23982398
}
23992399
if (_listenerBusStarted) {
2400-
Utils.tryLogNonFatalError {
2400+
logStop("listener bus") {
24012401
listenerBus.stop()
24022402
_listenerBusStarted = false
24032403
}
@@ -2413,7 +2413,7 @@ class SparkContext(config: SparkConf) extends Logging {
24132413
Utils.tryLogNonFatalError {
24142414
FallbackStorage.cleanUp(_conf, _hadoopConfiguration)
24152415
}
2416-
Utils.tryLogNonFatalError {
2416+
logStop("event logger") {
24172417
_eventLogger.foreach(_.stop())
24182418
}
24192419
if (_shuffleDriverComponents != null) {
@@ -2422,29 +2422,31 @@ class SparkContext(config: SparkConf) extends Logging {
24222422
}
24232423
}
24242424
if (_heartbeater != null) {
2425-
Utils.tryLogNonFatalError {
2425+
logStop("heart beater") {
24262426
_heartbeater.stop()
24272427
}
24282428
_heartbeater = null
24292429
}
24302430
if (env != null && _heartbeatReceiver != null) {
2431-
Utils.tryLogNonFatalError {
2431+
logStop("heart beat receiver") {
24322432
env.rpcEnv.stop(_heartbeatReceiver)
24332433
}
24342434
}
2435-
Utils.tryLogNonFatalError {
2435+
logStop("progress bar") {
24362436
_progressBar.foreach(_.stop())
24372437
}
24382438
_taskScheduler = null
24392439
// TODO: Cache.stop()?
24402440
if (_env != null) {
2441-
Utils.tryLogNonFatalError {
2441+
logStop("spark env") {
24422442
_env.stop()
24432443
}
24442444
SparkEnv.set(null)
24452445
}
24462446
if (_statusStore != null) {
2447-
_statusStore.close()
2447+
logStop("status store") {
2448+
_statusStore.close()
2449+
}
24482450
}
24492451
if (_kafkaStore.nonEmpty) {
24502452
try {
@@ -2463,6 +2465,12 @@ class SparkContext(config: SparkConf) extends Logging {
24632465
logInfo("Successfully stopped SparkContext")
24642466
}
24652467

2468+
def logStop(name: String)(stop: => Unit) {
2469+
logInfo(s"Stopping $name")
2470+
Utils.tryLogNonFatalError {
2471+
stop
2472+
}
2473+
}
24662474

24672475
/**
24682476
* Get Spark's home location from either a value set through the constructor,

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import javax.ws.rs.core.UriBuilder
2929

3030
import scala.annotation.tailrec
3131
import scala.collection.JavaConverters._
32-
import scala.collection.mutable.ArrayBuffer
32+
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
3333
import scala.util.{Properties, Try}
3434

3535
import org.apache.commons.lang3.StringUtils
@@ -242,6 +242,11 @@ private[spark] class SparkSubmit extends Logging {
242242
if (sparkConf.contains("spark.local.connect")) sparkConf.remove("spark.remote")
243243
var childMainClass = ""
244244

245+
// Fail fast if there is no sql files specified in ETL mode
246+
if (isEtlSqlDriver(args.mainClass) && args.etlSqlFiles == null) {
247+
printErrorAndExit("For ETL Sql mode, please specify at least one sql file to execute.")
248+
}
249+
245250
// Set the cluster manager
246251
val clusterManager: Int = args.maybeMaster match {
247252
case Some(v) =>
@@ -618,12 +623,37 @@ private[spark] class SparkSubmit extends Logging {
618623
args.files = mergeFileLists(args.files, args.primaryResource)
619624
}
620625

626+
if (isYarnCluster && isEtlSqlDriver(args.mainClass)) {
627+
// In yarn-cluster mode for ETL app, add sql files to files
628+
// that can be distributed with the job
629+
args.files = mergeFileLists(args.files, args.etlInitFiles)
630+
args.files = mergeFileLists(args.files, args.etlSqlFiles)
631+
args.etlInitFileNames = useETLSqlFileNames(args.etlInitFiles)
632+
args.etlSqlFiles = useETLSqlFileNames(args.etlSqlFiles)
633+
}
634+
635+
// for k8s cluster mode, there are two times spark-submit
636+
// for the first time spark-submit, it is used to launch a spark driver pod
637+
// and merge the etl sql files into spark.files.
638+
if (isKubernetesCluster && isEtlSqlDriver(args.mainClass)) {
639+
args.files = mergeFileLists(args.files, args.etlInitFiles)
640+
args.files = mergeFileLists(args.files, args.etlSqlFiles)
641+
}
642+
643+
// for the second time spark-submit, it is used to launch spark driver.
644+
// the spark.files are placed into working directory and here using
645+
// the file names to access the etl sql files
646+
if (isKubernetesClusterModeDriver && isEtlSqlDriver(args.mainClass)) {
647+
args.etlInitFileNames = useETLSqlFileNames(args.etlInitFiles)
648+
args.etlSqlFiles = useETLSqlFileNames(args.etlSqlFiles)
649+
}
650+
621651
// Special flag to avoid deprecation warnings at the client
622652
sys.props("SPARK_SUBMIT") = "true"
623653

624654
// A list of rules to map each argument to system properties or command-line options in
625655
// each deploy mode; we iterate through these below
626-
val options = List[OptionAssigner](
656+
val options = ListBuffer[OptionAssigner](
627657

628658
// All cluster managers
629659
OptionAssigner(
@@ -706,6 +736,25 @@ private[spark] class SparkSubmit extends Logging {
706736
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
707737
)
708738

739+
if (isEtlSqlDriver(args.mainClass)) {
740+
options ++= ListBuffer(
741+
// Only for ETL SQL mode
742+
OptionAssigner(args.etlInitFiles, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
743+
confKey = "spark.etl.sql.init.files.absPaths"),
744+
if (args.etlInitFileNames != null) {
745+
OptionAssigner(args.etlInitFileNames, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
746+
confKey = "spark.etl.sql.init.files")
747+
} else {
748+
OptionAssigner(args.etlInitFiles, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
749+
confKey = "spark.etl.sql.init.files")
750+
},
751+
OptionAssigner(args.etlSqlFiles, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
752+
confKey = "spark.etl.sql.files"),
753+
OptionAssigner(args.etlSqlVars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
754+
confKey = "spark.etl.sql.vars")
755+
)
756+
}
757+
709758
// In client mode, launch the application main class directly
710759
// In addition, add the main application jar and any added jars (if any) to the classpath
711760
if (deployMode == CLIENT) {
@@ -788,6 +837,9 @@ private[spark] class SparkSubmit extends Logging {
788837
if (args.isPython) {
789838
sparkConf.set("spark.yarn.isPython", "true")
790839
}
840+
if (isEtlSqlDriver(args.mainClass)) {
841+
sparkConf.set("spark.yarn.isEtlSql", "true")
842+
}
791843
}
792844

793845
if ((clusterManager == MESOS || clusterManager == KUBERNETES)
@@ -1166,6 +1218,13 @@ object SparkSubmit extends CommandLineUtils with Logging {
11661218
mainClass == "org.apache.spark.sql.connect.service.SparkConnectServer"
11671219
}
11681220

1221+
/**
1222+
* Return whether the given main class represents a etl sql driver.
1223+
*/
1224+
private[deploy] def isEtlSqlDriver(mainClass: String): Boolean = {
1225+
mainClass == "org.apache.spark.sql.ebay.ETLSqlDriver"
1226+
}
1227+
11691228
/**
11701229
* Return whether the given primary resource requires running python.
11711230
*/
@@ -1184,6 +1243,14 @@ object SparkSubmit extends CommandLineUtils with Logging {
11841243
res == SparkLauncher.NO_RESOURCE
11851244
}
11861245

1246+
/**
1247+
* For yarn-cluster mode, all sql files would be distributed to working dir, use file names
1248+
* to access them.
1249+
*/
1250+
private[deploy] def useETLSqlFileNames(etlSqlFiles: String): String = {
1251+
etlSqlFiles.split(",").map(sf => new Path(sf).getName).mkString(",")
1252+
Option(etlSqlFiles).map(_.split(",").map(sf => new File(sf).getName).mkString(",")).orNull
1253+
}
11871254
}
11881255

11891256
/** Provides utility functions to be used inside SparkSubmit. */

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
8686
var submissionToRequestStatusFor: String = null
8787
var useRest: Boolean = false // used internally
8888

89+
// ETL mode only
90+
var etlInitFiles: String = null
91+
var etlInitFileNames: String = null
92+
var etlSqlFiles: String = null
93+
var etlSqlVars: String = null
94+
8995
/** Default properties present in the currently defined defaults file. */
9096
lazy val defaultSparkProperties: HashMap[String, String] = {
9197
val defaultProperties = new HashMap[String, String]()
@@ -229,6 +235,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
229235

230236
// Action should be SUBMIT unless otherwise specified
231237
action = Option(action).getOrElse(SUBMIT)
238+
239+
// load etl configuration
240+
etlInitFiles = Option(etlInitFiles).orElse(sparkProperties.get("spark.etl.sql.init.files"))
241+
.orNull
242+
etlSqlFiles = Option(etlSqlFiles).orElse(sparkProperties.get("spark.etl.sql.files")).orNull
243+
etlSqlVars = Option(etlSqlVars).orElse(sparkProperties.get("spark.etl.sql.vars")).orNull
232244
}
233245

234246
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
@@ -444,6 +456,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
444456
case KEYTAB =>
445457
keytab = value
446458

459+
case ETL_INIT_FILES =>
460+
etlInitFiles = value
461+
462+
case ETL_SQL_FILES =>
463+
etlSqlFiles = value
464+
465+
case ETL_SQL_VARS =>
466+
etlSqlVars = value
467+
447468
case HELP =>
448469
printUsageAndExit(0)
449470

@@ -593,6 +614,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
593614
logInfo(getSqlShellOptions())
594615
}
595616

617+
if (SparkSubmit.isEtlSqlDriver(mainClass)) {
618+
logInfo(getETLSqlDriverOptions())
619+
}
620+
596621
throw new SparkUserAppException(exitCode)
597622
}
598623

@@ -647,6 +672,21 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
647672
}
648673
}
649674

675+
/**
676+
* Run the Spark ETL SQL Driver main class with the "--help" option and catch its output.
677+
*/
678+
private def getETLSqlDriverOptions(): String = {
679+
// scalastyle:off
680+
s"""
681+
|ETL SQL Driver options:
682+
| --etl-init-files A comma-separated list of sql files to be executed for initialization.
683+
| --etl-sql-files A comma-separated list of sql files to be executed. Must be specified.
684+
| --etl-sql-vars A comma-separated list of sql variables, formatted like variable1=replaced1,
685+
| variable2=replaced2, used for sql queries replacement.
686+
""".stripMargin
687+
// scalastyle:on
688+
}
689+
650690
private def error(msg: String): Unit = throw new SparkException(msg)
651691

652692
private[deploy] def toSparkConf(sparkConf: Option[SparkConf] = None): SparkConf = {

0 commit comments

Comments
 (0)