diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala index b9fb9325999..2e33d8ce6db 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.util.kvstore.KVIndex import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.ConfigEntry import org.apache.kyuubi.util.SemanticVersion object KyuubiSparkUtil extends Logging { @@ -98,4 +99,18 @@ object KyuubiSparkUtil extends Logging { // Given that we are on the Spark SQL engine side, the [[org.apache.spark.SPARK_VERSION]] can be // represented as the runtime version of the Spark SQL engine. lazy val SPARK_ENGINE_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION) + + /** + * Get session level config value + * @param configEntry configEntry + * @param spark sparkSession + * @tparam T any type + * @return session level config value, if spark not set this config, + * default return kyuubi's config + */ + def getSessionConf[T](configEntry: ConfigEntry[T], spark: SparkSession): T = { + spark.conf.getOption(configEntry.key).map(configEntry.valueConverter).getOrElse { + SparkSQLEngine.kyuubiConf.get(configEntry) + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index badd835301a..b3643a7ae43 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -295,10 +295,8 @@ object ExecutePython extends Logging { } def getSparkPythonExecFromArchive(spark: SparkSession, session: Session): Option[String] = { - val pythonEnvArchive = spark.conf.getOption(ENGINE_SPARK_PYTHON_ENV_ARCHIVE.key) - .orElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_ENV_ARCHIVE)) - val pythonEnvExecPath = spark.conf.getOption(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH.key) - .getOrElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH)) + val pythonEnvArchive = getSessionConf(ENGINE_SPARK_PYTHON_ENV_ARCHIVE, spark) + val pythonEnvExecPath = getSessionConf(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, spark) pythonEnvArchive.map { archive => var uri = new URI(archive) @@ -311,8 +309,7 @@ object ExecutePython extends Logging { } def getSparkPythonHomeFromArchive(spark: SparkSession, session: Session): Option[String] = { - val pythonHomeArchive = spark.conf.getOption(ENGINE_SPARK_PYTHON_HOME_ARCHIVE.key) - .orElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_HOME_ARCHIVE)) + val pythonHomeArchive = getSessionConf(ENGINE_SPARK_PYTHON_HOME_ARCHIVE, spark) pythonHomeArchive.map { archive => var uri = new URI(archive) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 17d8a741269..acb49d65e5c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -148,8 +148,7 @@ class ExecuteStatement( s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString") private def collectAsIterator(resultDF: DataFrame): FetchIterator[_] = { - val resultMaxRows = spark.conf.getOption(OPERATION_RESULT_MAX_ROWS.key).map(_.toInt) - .getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_MAX_ROWS)) + val resultMaxRows: Int = getSessionConf(OPERATION_RESULT_MAX_ROWS, spark) if (incrementalCollect) { if (resultMaxRows > 0) { warn(s"Ignore ${OPERATION_RESULT_MAX_ROWS.key} on incremental collect mode.") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala index 980e4fdb173..75ce9492176 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.types.StructType import org.apache.kyuubi.config.KyuubiConf.OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils import org.apache.kyuubi.operation.IterableFetchIterator import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ @@ -34,10 +35,7 @@ class GetTables( extends SparkOperation(session) { protected val ignoreTableProperties = - spark.conf.getOption(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES.key) match { - case Some(s) => s.toBoolean - case _ => session.sessionManager.getConf.get(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES) - } + getSessionConf(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES, spark) override def statement: String = { super.statement + diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index 4f88083130a..f2a67047196 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.StructType import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE} +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle} import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError} import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError} @@ -49,9 +50,7 @@ class PlanOnlyStatement( .getOrElse(session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_EXCLUDES)) } - private val style = PlanOnlyStyle.fromString(spark.conf.get( - OPERATION_PLAN_ONLY_OUT_STYLE.key, - session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_OUT_STYLE))) + private val style = PlanOnlyStyle.fromString(getSessionConf(OPERATION_PLAN_ONLY_OUT_STYLE, spark)) spark.conf.set(OPERATION_PLAN_ONLY_OUT_STYLE.key, style.name) override def getOperationLog: Option[OperationLog] = Option(operationLog) @@ -74,7 +73,6 @@ class PlanOnlyStatement( withLocalProperties { SQLConf.withExistingConf(spark.sessionState.conf) { val parsed = spark.sessionState.sqlParser.parsePlan(statement) - parsed match { case cmd if planExcludes.contains(cmd.getClass.getSimpleName) => result = spark.sql(statement) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 1de360f0715..f40f1d490ab 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -31,7 +31,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY} -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SCHEDULER_POOL_KEY} import org.apache.kyuubi.engine.spark.events.SparkOperationEvent import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper} @@ -63,11 +63,8 @@ abstract class SparkOperation(session: Session) override def redactedStatement: String = redact(spark.sessionState.conf.stringRedactionPattern, statement) - protected val operationSparkListenerEnabled = - spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match { - case Some(s) => s.toBoolean - case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED) - } + protected val operationSparkListenerEnabled: Boolean = + getSessionConf(OPERATION_SPARK_LISTENER_ENABLED, spark) protected val operationListener: Option[SQLOperationListener] = if (operationSparkListenerEnabled) { @@ -80,10 +77,7 @@ abstract class SparkOperation(session: Session) operationListener.foreach(spark.sparkContext.addSparkListener(_)) } - private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match { - case Some(s) => s.toBoolean - case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE) - } + private val progressEnable: Boolean = getSessionConf(SESSION_PROGRESS_ENABLE, spark) protected def supportProgress: Boolean = false @@ -113,9 +107,7 @@ abstract class SparkOperation(session: Session) protected val forceCancel = session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL) - protected val schedulerPool = - spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse( - session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL)) + protected val schedulerPool = getSessionConf(KyuubiConf.OPERATION_SCHEDULER_POOL, spark) protected val isSessionUserSignEnabled: Boolean = spark.sparkContext.getConf.getBoolean( s"spark.${SESSION_USER_SIGN_ENABLED.key}", diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 686cb1f359b..a7d409c7ca5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -27,10 +27,9 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd import org.apache.kyuubi.Logging -import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL} import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY} import org.apache.kyuubi.engine.spark.operation.ExecuteStatement import org.apache.kyuubi.operation.Operation import org.apache.kyuubi.operation.log.OperationLog @@ -50,15 +49,14 @@ class SQLOperationListener( private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]() private var executionId: Option[Long] = None - private val conf: KyuubiConf = operation.getSession.sessionManager.getConf private lazy val consoleProgressBar = - if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) { + if (getSessionConf(ENGINE_SPARK_SHOW_PROGRESS, spark)) { Some(new SparkConsoleProgressBar( operation, activeJobs, activeStages, - conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL), - conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT))) + getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL, spark), + getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, spark))) } else { None } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index f732f7c3846..c5d24399d74 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -29,9 +29,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { - override def withKyuubiConf: Map[String, String] = Map( - KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true", - KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200") + override def withKyuubiConf: Map[String, String] = Map.empty override protected def jdbcUrl: String = getJdbcUrl @@ -58,19 +56,23 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp } test("operation listener with progress job info") { - val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" - withSessionHandle { (client, handle) => - val req = new TExecuteStatementReq() - req.setSessionHandle(handle) - req.setStatement(sql) - val tExecuteStatementResp = client.ExecuteStatement(req) - val opHandle = tExecuteStatementResp.getOperationHandle - val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000) - fetchResultsReq.setFetchType(1.toShort) - eventually(timeout(90.seconds), interval(500.milliseconds)) { - val resultsResp = client.FetchResults(fetchResultsReq) - val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) + withSessionConf(Map( + KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true", + KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200"))()() { + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" + withSessionHandle { (client, handle) => + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val tExecuteStatementResp = client.ExecuteStatement(req) + val opHandle = tExecuteStatementResp.getOperationHandle + val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000) + fetchResultsReq.setFetchType(1.toShort) + eventually(timeout(90.seconds), interval(500.milliseconds)) { + val resultsResp = client.FetchResults(fetchResultsReq) + val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala + assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) + } } } }