Skip to content

Commit d53fe9f

Browse files
ekrivokonmaprd-popkovakhalymon-cv
authored
MapR [SPARK-979] Backport all needed 3.1.2 EEP commits tp 3.2 branch (apache#913)
* MapR [SPARK-953] Investigate and add all needed changes for Spark services (apache#905) * [EZSPA-347] Find a way to pass sensitive configs in secure manner (apache#907) * MapR [SPARK-961] Spark job can't be properly killed using yarn API or CLI (apache#908) * MapR [SPARK-962] MSSQL can not handle SQL syntax which is used in Spark (apache#909) * MapR [SPARK-963] select from hbase table which was created via hive fails (apache#910) Co-authored-by: Dmitry Popkov <[email protected]> Co-authored-by: Andrew Khalymon <[email protected]>
1 parent 7b94736 commit d53fe9f

File tree

9 files changed

+76
-14
lines changed

9 files changed

+76
-14
lines changed

conf/spark-env.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,6 @@ export SPARK_WORKER_DIR=$SPARK_HOME/tmp
170170

171171
#UI
172172
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djava.library.path=$SPARK_MAPR_HOME/lib"
173+
export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Djava.library.path=$SPARK_MAPR_HOME/lib"
174+
export SPARK_MASTER_HOST=$(hostname --fqdn)
175+
export SPARK_MASTER_IP=$(hostname --fqdn)

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ private[spark] class CoarseGrainedExecutorBackend(
291291
} else {
292292
logInfo("Skip exiting executor since it's been already asked to exit before.")
293293
}
294+
self.send(Shutdown)
294295
}
295296

296297
private def decommissionSelf(): Unit = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,12 @@ private[spark] object Config extends Logging {
548548
.stringConf
549549
.createOptional
550550

551+
val MAPR_SPARK_EXTRACONF_SECRET_NAME =
552+
ConfigBuilder("spark.mapr.extraconf.secret")
553+
.doc("Name of the secret with Spark extra configurations that will be added to sparkConf")
554+
.stringConf
555+
.createOptional
556+
551557
val MAPR_CLUSTER_CONFIGMAP =
552558
ConfigBuilder("spark.mapr.cluster.configMap")
553559
.doc("Name of the mapr cluster config map")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ private[spark] object Constants {
123123
val MAPR_USER_TICKET_SUBPATH = "CONTAINER_TICKET"
124124
val MAPR_USER_SECRET_MOUNT_PATH = "/tmp/usersecret"
125125
val MAPR_USER_TICKET_MOUNT_PATH = s"$MAPR_USER_SECRET_MOUNT_PATH/$MAPR_USER_TICKET_SUBPATH"
126+
val MAPR_SPARK_EXTRA_CONFIG_MOUNT_PATH = "/opt/mapr/kubernetes/spark_secrets"
126127

127128
val ENV_MAPR_METRICSFILE_LOCATION = "MAPR_METRICSFILE_LOCATION"
128129
val MAPR_METRICS_TICKET_SUBPATH = "maprmetricsticket"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MaprConfigFeatureStep.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ private[spark] class MaprConfigFeatureStep(conf: KubernetesConf)
4242
applySSSDSecret(podBuilder, containerBuilder)
4343
applySSHSecret(podBuilder, containerBuilder)
4444
applyClientSecret(podBuilder, containerBuilder)
45+
applySparkExtraConfigs(podBuilder, containerBuilder)
4546

4647
SparkPod(podBuilder.build(), containerBuilder.build())
4748
}
@@ -130,6 +131,32 @@ private[spark] class MaprConfigFeatureStep(conf: KubernetesConf)
130131
.endVolumeMount()
131132
}
132133

134+
private def applySparkExtraConfigs(podBuilder: PodBuilder, containerBuilder: ContainerBuilder): Unit = {
135+
val confSecretName = sparkConf.get(MAPR_SPARK_EXTRACONF_SECRET_NAME).get
136+
137+
if (confSecretName.isEmpty) {
138+
return
139+
}
140+
141+
val confSecretVolumeName = "spark-extraconf-secret"
142+
143+
podBuilder.editOrNewSpec()
144+
.addNewVolume()
145+
.withName(confSecretVolumeName)
146+
.withNewSecret()
147+
.withSecretName(confSecretName)
148+
.endSecret()
149+
.endVolume()
150+
.endSpec()
151+
152+
containerBuilder
153+
.addNewVolumeMount()
154+
.withName(confSecretVolumeName)
155+
.withMountPath(MAPR_SPARK_EXTRA_CONFIG_MOUNT_PATH)
156+
.endVolumeMount()
157+
}
158+
159+
133160
private def applyUserSecret(podBuilder: PodBuilder, containerBuilder: ContainerBuilder): Unit = {
134161
val userSecretNameConfig = sparkConf.get(MAPR_USER_SECRET)
135162

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,12 @@ class JDBCOptions(
8989
if (subquery.isEmpty) {
9090
throw QueryExecutionErrors.emptyOptionError(JDBC_QUERY_STRING)
9191
} else {
92-
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
92+
val runQueryAsIs = parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean
93+
if (runQueryAsIs) {
94+
s"${subquery}"
95+
} else {
96+
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
97+
}
9398
}
9499
}
95100

@@ -246,6 +251,7 @@ object JDBCOptions {
246251
val JDBC_URL = newOption("url")
247252
val JDBC_TABLE_NAME = newOption("dbtable")
248253
val JDBC_QUERY_STRING = newOption("query")
254+
val JDBC_USE_RAW_QUERY = newOption("useRawQuery")
249255
val JDBC_DRIVER_CLASS = newOption("driver")
250256
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
251257
val JDBC_LOWER_BOUND = newOption("lowerBound")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskCon
2525
import org.apache.spark.internal.Logging
2626
import org.apache.spark.rdd.RDD
2727
import org.apache.spark.sql.catalyst.InternalRow
28+
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.JDBC_USE_RAW_QUERY
2829
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Count, CountStar, Max, Min, Sum}
2930
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
3031
import org.apache.spark.sql.sources._
@@ -349,8 +350,13 @@ private[jdbc] class JDBCRDD(
349350

350351
val myWhereClause = getWhereClause(part)
351352

352-
val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" +
353-
s" $getGroupByClause"
353+
val runQueryAsIs = options.parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean
354+
val sqlText = if (runQueryAsIs) {
355+
s"${options.tableOrQuery}"
356+
} else {
357+
s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" +
358+
s" $getGroupByClause"
359+
}
354360
stmt = conn.prepareStatement(sqlText,
355361
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
356362
stmt.setFetchSize(options.fetchSize)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ import scala.math.BigDecimal.RoundingMode
2323
import org.apache.spark.Partition
2424
import org.apache.spark.internal.Logging
2525
import org.apache.spark.rdd.RDD
26-
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
26+
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext, SaveMode, SparkSession}
2727
import org.apache.spark.sql.catalyst.analysis._
28+
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2829
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
2930
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
3031
import org.apache.spark.sql.errors.QueryCompilationErrors
32+
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.{JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES, JDBC_USE_RAW_QUERY}
3133
import org.apache.spark.sql.internal.SQLConf
3234
import org.apache.spark.sql.jdbc.JdbcDialects
3335
import org.apache.spark.sql.sources._
@@ -236,11 +238,23 @@ private[sql] object JDBCRelation extends Logging {
236238
* @return resolved Catalyst schema of a JDBC table
237239
*/
238240
def getSchema(resolver: Resolver, jdbcOptions: JDBCOptions): StructType = {
239-
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
240-
jdbcOptions.customSchema match {
241-
case Some(customSchema) => JdbcUtils.getCustomSchema(
242-
tableSchema, customSchema, resolver)
243-
case None => tableSchema
241+
val runQueryAsIs = jdbcOptions.parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean
242+
if (runQueryAsIs) {
243+
val customSchema = jdbcOptions.parameters.get(JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES)
244+
val newSchema = jdbcOptions.customSchema match {
245+
case Some(customSchema) => CatalystSqlParser.parseTableSchema(customSchema)
246+
case None => throw new IllegalArgumentException(
247+
s"Field $JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES is mandatory when using $JDBC_USE_RAW_QUERY")
248+
}
249+
logInfo(s"Option $JDBC_USE_RAW_QUERY is enabled, parsed $newSchema from the filed $JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES with value $customSchema")
250+
newSchema
251+
} else {
252+
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
253+
jdbcOptions.customSchema match {
254+
case Some(customSchema) => JdbcUtils.getCustomSchema(
255+
tableSchema, customSchema, resolver)
256+
case None => tableSchema
257+
}
244258
}
245259
}
246260

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.{Path, PathFilter}
26-
import org.apache.hadoop.hive.maprdb.json.input.HiveMapRDBJsonInputFormat
2726
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
2827
import org.apache.hadoop.hive.ql.exec.Utilities
2928
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
@@ -312,11 +311,10 @@ class HadoopTableReader(
312311
*/
313312
private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = {
314313
val inputFormatClazz = localTableDesc.getInputFileFormatClass
315-
if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)
316-
&& !inputFormatClazz.isAssignableFrom(classOf[HiveMapRDBJsonInputFormat])) {
317-
createNewHadoopRDD(localTableDesc, inputPathStr)
318-
} else {
314+
if (classOf[oldInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
319315
createOldHadoopRDD(localTableDesc, inputPathStr)
316+
} else {
317+
createNewHadoopRDD(localTableDesc, inputPathStr)
320318
}
321319
}
322320

0 commit comments

Comments
 (0)