Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
03e2887
add common method to get session level config
davidyuan1223 Oct 20, 2023
32028f9
add common method to get session level config
davidyuan1223 Oct 20, 2023
c1024bd
Merge branch 'apache:master' into 5438_add_common_method_to_support_s…
davidyuan1223 Oct 25, 2023
618c0f6
Merge branch 'apache:master' into 5438_add_common_method_to_support_s…
davidyuan1223 Oct 25, 2023
c8647ef
add common method to get session level config
davidyuan1223 Oct 20, 2023
d9cf248
add common method to get session level config
davidyuan1223 Oct 20, 2023
bb63ed8
add common method to get session level config
davidyuan1223 Oct 26, 2023
605ef16
Merge remote-tracking branch 'origin/5438_add_common_method_to_suppor…
davidyuan1223 Oct 26, 2023
8011959
add common method to get session level config
davidyuan1223 Oct 26, 2023
623200f
Merge remote-tracking branch 'origin/5438_add_common_method_to_suppor…
davidyuan1223 Oct 26, 2023
bb5d5ce
add common method to get session level config
davidyuan1223 Oct 20, 2023
3f42317
add common method to get session level config
davidyuan1223 Oct 20, 2023
7a67ace
add common method to get session level config
davidyuan1223 Oct 26, 2023
503c3f7
Merge remote-tracking branch 'origin/5438_add_common_method_to_suppor…
davidyuan1223 Oct 26, 2023
3af5ed1
[KYUUBI #5427] [AUTHZ] Shade spark authz plugin
yikf Oct 20, 2023
119c393
[KYUUBI #5447][AUTHZ] Support Hudi DeleteHoodieTableCommand/UpdateHoo…
AngersZhuuuu Oct 21, 2023
0c53d00
[KYUUBI #5447][FOLLOWUP] Remove unrelated debug prints in TableIdenti…
AngersZhuuuu Oct 22, 2023
e2754fe
[KYUUBI #5492][AUTHZ] saveAsTable create DataSource table miss db info
AngersZhuuuu Oct 23, 2023
1293cf2
[KYUUBI #5500] Add Kyuubi Code Program to Doc
yaooqinn Oct 23, 2023
0c8be79
[KYUUBI #5475][FOLLOWUP] Authz check permanent view's subquery should…
AngersZhuuuu Oct 23, 2023
c039e1b
[KYUUBI #5497] [AuthZ] Simplify debug message for missing field/metho…
yaooqinn Oct 23, 2023
a67b824
[KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect a…
zhuyaogai Oct 23, 2023
bc3fcbb
[KYUUBI #5472] Permanent View should pass column when child plan no o…
AngersZhuuuu Oct 24, 2023
1dc264a
[KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for sto…
AngersZhuuuu Oct 24, 2023
9c75d82
[KYUUBI #5435][INFRA][TEST] Improve Kyuubi On Kubernetes IT
zwangsheng Oct 24, 2023
0750437
[KYUUBI #5294] [DOC] Update supported dialects for JDBC engine
ymZhao1001 Oct 24, 2023
d123a5a
[KYUUBI #5282] Support configure Trino session conf in `kyuubi-defaul…
Oct 24, 2023
b7b3544
[KYUUBI #5212] Fix configuration errors causing by helm charts of pro…
FourSpaces Oct 24, 2023
dfdd7a3
[KYUUBI #5499][KYUUBI #2503] Catch any exception when closing idle se…
turboFei Oct 24, 2023
86f692d
[KYUUBI #5512] [AuthZ] Remove the non-existent query specs in Deletes…
yaooqinn Oct 24, 2023
6a5bb10
[KYUUBI #5380][UT] Create PySpark batch jobs tests for RESTful API
weixi62961 Oct 24, 2023
ee52b2a
[KYUUBI #5446][AUTHZ] Support Create/Drop/Show/Reresh index command f…
AngersZhuuuu Oct 25, 2023
c71528e
[KYUUBI #5484] Remove legacy Web UI
pan3793 Oct 25, 2023
682e5b5
[KYUUBI #5405] [FLINK] Support Flink 1.18
YesOrNo828 Oct 25, 2023
538a648
[KYUUBI #4186] Spark showProgress with JobInfo
davidyuan1223 Oct 25, 2023
88bb6b4
[KYUUBI #5486] Bump Kafka client version from 3.4.0 to 3.5.1
bowenliang123 Oct 25, 2023
b06e044
[KYUUBI #5517] [UI] Initial implement the SQL Lab page
labbomb Oct 25, 2023
4a1db42
[KYUUBI #5513][BATCH] Always redirect delete batch request to Kyuubi …
zwangsheng Oct 25, 2023
c7d15ae
[KYUUBI #5483] Release Spark TPC-H/DS Connectors with Scala 2.13
pan3793 Oct 25, 2023
d8b808d
[KYUUBI #5523] [DOC] Update the Kyuubi supported components version
pan3793 Oct 25, 2023
3ec73ad
[KYUUBI #5522] [BATCH] Ignore main class for PySpark batch job submis…
bowenliang123 Oct 26, 2023
2de96b5
add common method to get session level config
davidyuan1223 Oct 26, 2023
d838931
Merge branch '5438_add_common_method_to_support_session_config' of ht…
davidyuan1223 Oct 26, 2023
15641e8
add more optional session level to get conf
davidyuan1223 Oct 28, 2023
940f8f8
add more optional session level to get conf
davidyuan1223 Oct 28, 2023
96d7cde
Revert "add more optional session level to get conf"
davidyuan1223 Oct 31, 2023
4d70902
add more optional session level to get conf
davidyuan1223 Oct 31, 2023
84c4568
add more optional session level to get conf
davidyuan1223 Oct 31, 2023
e1ded36
add more optional session level to get conf
davidyuan1223 Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.util.kvstore.KVIndex

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.ConfigEntry
import org.apache.kyuubi.util.SemanticVersion

object KyuubiSparkUtil extends Logging {
Expand Down Expand Up @@ -98,4 +99,18 @@ object KyuubiSparkUtil extends Logging {
// Given that we are on the Spark SQL engine side, the [[org.apache.spark.SPARK_VERSION]] can be
// represented as the runtime version of the Spark SQL engine.
lazy val SPARK_ENGINE_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)

/**
* Get session level config value
* @param configEntry configEntry
* @param spark sparkSession
* @tparam T any type
* @return session level config value, if spark not set this config,
* default return kyuubi's config
*/
def getSessionConf[T](configEntry: ConfigEntry[T], spark: SparkSession): T = {
spark.conf.getOption(configEntry.key).map(configEntry.valueConverter).getOrElse {
SparkSQLEngine.kyuubiConf.get(configEntry)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,8 @@ object ExecutePython extends Logging {
}

def getSparkPythonExecFromArchive(spark: SparkSession, session: Session): Option[String] = {
val pythonEnvArchive = spark.conf.getOption(ENGINE_SPARK_PYTHON_ENV_ARCHIVE.key)
.orElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_ENV_ARCHIVE))
val pythonEnvExecPath = spark.conf.getOption(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH.key)
.getOrElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH))
val pythonEnvArchive = getSessionConf(ENGINE_SPARK_PYTHON_ENV_ARCHIVE, spark)
val pythonEnvExecPath = getSessionConf(ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, spark)
pythonEnvArchive.map {
archive =>
var uri = new URI(archive)
Expand All @@ -311,8 +309,7 @@ object ExecutePython extends Logging {
}

def getSparkPythonHomeFromArchive(spark: SparkSession, session: Session): Option[String] = {
val pythonHomeArchive = spark.conf.getOption(ENGINE_SPARK_PYTHON_HOME_ARCHIVE.key)
.orElse(session.sessionManager.getConf.get(ENGINE_SPARK_PYTHON_HOME_ARCHIVE))
val pythonHomeArchive = getSessionConf(ENGINE_SPARK_PYTHON_HOME_ARCHIVE, spark)
pythonHomeArchive.map {
archive =>
var uri = new URI(archive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ class ExecuteStatement(
s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString")

private def collectAsIterator(resultDF: DataFrame): FetchIterator[_] = {
val resultMaxRows = spark.conf.getOption(OPERATION_RESULT_MAX_ROWS.key).map(_.toInt)
.getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_MAX_ROWS))
val resultMaxRows: Int = getSessionConf(OPERATION_RESULT_MAX_ROWS, spark)
if (incrementalCollect) {
if (resultMaxRows > 0) {
warn(s"Ignore ${OPERATION_RESULT_MAX_ROWS.key} on incremental collect mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.config.KyuubiConf.OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
Expand All @@ -34,10 +35,7 @@ class GetTables(
extends SparkOperation(session) {

protected val ignoreTableProperties =
spark.conf.getOption(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES)
}
getSessionConf(OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES, spark)

override def statement: String = {
super.statement +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
Expand All @@ -49,9 +50,7 @@ class PlanOnlyStatement(
.getOrElse(session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_EXCLUDES))
}

private val style = PlanOnlyStyle.fromString(spark.conf.get(
OPERATION_PLAN_ONLY_OUT_STYLE.key,
session.sessionManager.getConf.get(OPERATION_PLAN_ONLY_OUT_STYLE)))
private val style = PlanOnlyStyle.fromString(getSessionConf(OPERATION_PLAN_ONLY_OUT_STYLE, spark))
spark.conf.set(OPERATION_PLAN_ONLY_OUT_STYLE.key, style.name)

override def getOperationLog: Option[OperationLog] = Option(operationLog)
Expand All @@ -74,7 +73,6 @@ class PlanOnlyStatement(
withLocalProperties {
SQLConf.withExistingConf(spark.sessionState.conf) {
val parsed = spark.sessionState.sqlParser.parsePlan(statement)

parsed match {
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
result = spark.sql(statement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SCHEDULER_POOL_KEY}
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper}
Expand Down Expand Up @@ -63,11 +63,8 @@ abstract class SparkOperation(session: Session)
override def redactedStatement: String =
redact(spark.sessionState.conf.stringRedactionPattern, statement)

protected val operationSparkListenerEnabled =
spark.conf.getOption(OPERATION_SPARK_LISTENER_ENABLED.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(OPERATION_SPARK_LISTENER_ENABLED)
}
protected val operationSparkListenerEnabled: Boolean =
getSessionConf(OPERATION_SPARK_LISTENER_ENABLED, spark)

protected val operationListener: Option[SQLOperationListener] =
if (operationSparkListenerEnabled) {
Expand All @@ -80,10 +77,7 @@ abstract class SparkOperation(session: Session)
operationListener.foreach(spark.sparkContext.addSparkListener(_))
}

private val progressEnable = spark.conf.getOption(SESSION_PROGRESS_ENABLE.key) match {
case Some(s) => s.toBoolean
case _ => session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE)
}
private val progressEnable: Boolean = getSessionConf(SESSION_PROGRESS_ENABLE, spark)

protected def supportProgress: Boolean = false

Expand Down Expand Up @@ -113,9 +107,7 @@ abstract class SparkOperation(session: Session)
protected val forceCancel =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)

protected val schedulerPool =
spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
protected val schedulerPool = getSessionConf(KyuubiConf.OPERATION_SCHEDULER_POOL, spark)

protected val isSessionUserSignEnabled: Boolean = spark.sparkContext.getConf.getBoolean(
s"spark.${SESSION_USER_SIGN_ENABLED.key}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY}
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.log.OperationLog
Expand All @@ -50,15 +49,14 @@ class SQLOperationListener(
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None

private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
private lazy val consoleProgressBar =
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
if (getSessionConf(ENGINE_SPARK_SHOW_PROGRESS, spark)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL, spark),
getSessionConf(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, spark)))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper

class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {

override def withKyuubiConf: Map[String, String] = Map(
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200")
override def withKyuubiConf: Map[String, String] = Map.empty

override protected def jdbcUrl: String = getJdbcUrl

Expand All @@ -58,19 +56,23 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp
}

test("operation listener with progress job info") {
val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);"
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
fetchResultsReq.setFetchType(1.toShort)
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val resultsResp = client.FetchResults(fetchResultsReq)
val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]")))
withSessionConf(Map(
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200"))()() {
val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);"
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
fetchResultsReq.setFetchType(1.toShort)
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val resultsResp = client.FetchResults(fetchResultsReq)
val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]")))
}
}
}
}
Expand Down