From c81340333d573bdd1833f250b09582bcb0286912 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Wed, 8 May 2024 20:48:21 -0700 Subject: [PATCH 1/5] DQL --- .../spark/operation/ExecuteStatement.scala | 1 + .../spark/sql/kyuubi/SparkDatasetHelper.scala | 37 +++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index a52d32be9cb..bda8bad4aab 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -174,6 +174,7 @@ class ExecuteStatement( val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark) val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs( + statement, resultMaxRows, resultSaveThreshold, result)) { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 2dbfe7348a3..fab68872f00 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -24,9 +24,11 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.util.{ByteUnit, JavaUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{StatementContext, StatementDefaultContext} import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SparkSqlParser, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -246,8 +248,12 @@ object SparkDatasetHelper extends Logging { case _ => None } - def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = { - if (isCommandExec(result.queryExecution.executedPlan.nodeName)) { + def shouldSaveResultToFs( + statement: String, + resultMaxRows: Int, + minSize: Long, + result: DataFrame): Boolean = { + if (isCommandExec(result.queryExecution.executedPlan.nodeName) || !isDQL(statement)) { return false } val finalLimit = optimizedPlanLimit(result.queryExecution) match { @@ -274,4 +280,29 @@ object SparkDatasetHelper extends Logging { nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" || nodeName == "org.apache.spark.sql.execution.CommandResultExec" } + + class DQLParser extends SparkSqlParser { + override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { + super.parse(command)(toResult) + } + + def isStatementQuery(statement: String): Boolean = { + parse(statement)(_.statement()).isInstanceOf[StatementDefaultContext] + } + } + + private lazy val parser = new DQLParser() + + /** + * Whether is DQL(data query language), including withCte, select, union + */ + def isDQL(statement: String): Boolean = { + try { + parser.isStatementQuery(statement) + } catch { + case e: Throwable => + logDebug(s"error checking whether query $statement is DQL: ${e.getMessage}") + false + } + } } From f558dcca53e013e1975275e8495205ebc54108b0 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Wed, 8 May 2024 20:49:04 -0700 Subject: [PATCH 2/5] ut --- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 2 +- .../sql/kyuubi/SparkDatasetHelperSuite.scala | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index fab68872f00..f501061af7c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -25,7 +25,7 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.parser.SqlBaseParser -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{StatementContext, StatementDefaultContext} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{StatementDefaultContext} import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SparkSqlParser, SQLExecution} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala index 8ac00e60262..34c858d999f 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala @@ -42,4 +42,22 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine { spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold)) } } + + test("is data query language") { + var query = "select * from table" + assert(SparkDatasetHelper.isDQL(query)) + query = "(select * from table)" + assert(SparkDatasetHelper.isDQL(query)) + query = "(WITH TEMP_WITH_VIEW AS (SELECT * from tbl_d) SELECT * FROM TEMP_WITH_VIEW)" + assert(SparkDatasetHelper.isDQL(query)) + query = "(WITH TEMP_WITH_VIEW AS (SELECT * from tbl_d)" + + " INSERT INTO tbl SELECT * FROM TEMP_WITH_VIEW)" + assert(!SparkDatasetHelper.isDQL(query)) + query = "cache table tbl" + assert(!SparkDatasetHelper.isDQL(query)) + query = "insert into tbl select * from ta" + assert(!SparkDatasetHelper.isDQL(query)) + query = "set" + assert(!SparkDatasetHelper.isDQL(query)) + } } From 8f20ed84bbf3caf3be034a670123a9f43399b52f Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Wed, 8 May 2024 21:29:11 -0700 Subject: [PATCH 3/5] refine the check --- .../spark/operation/ExecuteStatement.scala | 8 +++-- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 29 ++++++++++++------- .../org/apache/kyuubi/config/KyuubiConf.scala | 8 +++++ 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index bda8bad4aab..76191da417b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} +import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager} import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState} @@ -172,11 +172,13 @@ class ExecuteStatement( }) } else { val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark) - val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) + val resultSaveSizeThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) + val resultSaveRowsThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, spark) if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs( statement, resultMaxRows, - resultSaveThreshold, + resultSaveSizeThreshold, + resultSaveRowsThreshold, result)) { saveFileName = Some( diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index f501061af7c..7aef8f33911 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -25,7 +25,7 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.parser.SqlBaseParser -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{StatementDefaultContext} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.StatementDefaultContext import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SparkSqlParser, SQLExecution} @@ -252,22 +252,29 @@ object SparkDatasetHelper extends Logging { statement: String, resultMaxRows: Int, minSize: Long, + minRows: Long, result: DataFrame): Boolean = { - if (isCommandExec(result.queryExecution.executedPlan.nodeName) || !isDQL(statement)) { + if (isCommandExec(result.queryExecution.executedPlan.nodeName) || + !isDQL(statement) || + (resultMaxRows > 0 && resultMaxRows < minRows)) { return false } val finalLimit = optimizedPlanLimit(result.queryExecution) match { - case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows) - case Some(limit) => limit - case None => resultMaxRows + case Some(limit) if resultMaxRows > 0 => Some(math.min(limit, resultMaxRows)) + case Some(limit) => Some(limit) + case None if resultMaxRows > 0 => Some(resultMaxRows) + case _ => None } - lazy val stats = if (finalLimit > 0) { - finalLimit * EstimationUtils.getSizePerRow( - result.queryExecution.executedPlan.output) - } else { - result.queryExecution.optimizedPlan.stats.sizeInBytes + if (finalLimit.exists(_ < minRows)) { + return false } - lazy val colSize = + val sizeInBytes = result.queryExecution.optimizedPlan.stats.sizeInBytes + val stats = finalLimit.map { limit => + val estimateSize = + limit * EstimationUtils.getSizePerRow(result.queryExecution.executedPlan.output) + estimateSize min sizeInBytes + }.getOrElse(sizeInBytes) + val colSize = if (result == null || result.schema.isEmpty) { 0 } else { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 22d31f7df4e..3665f6694a2 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2066,6 +2066,14 @@ object KyuubiConf { .checkValue(_ > 0, "must be positive value") .createWithDefault(200 * 1024 * 1024) + val OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS: ConfigEntry[Long] = + buildConf("kyuubi.operation.result.saveToFile.minRows") + .doc("The minRows of Spark result save to file, default value is 10000.") + .version("1.9.0") + .longConf + .checkValue(_ > 0, "must be positive value") + .createWithDefault(10000) + val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] = buildConf("kyuubi.operation.incremental.collect") .internal From 04e20db5fa987b9d21f343c3bf588b0869283505 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Wed, 8 May 2024 21:36:32 -0700 Subject: [PATCH 4/5] conf --- docs/configuration/settings.md | 1 + .../scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala | 2 +- .../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index e922e3dcacb..c3231cbb0b8 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -420,6 +420,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 | | kyuubi.operation.result.saveToFile.dir | /tmp/kyuubi/tmp_kyuubi_result | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query result will delete when query finished. | string | 1.9.0 | | kyuubi.operation.result.saveToFile.enabled | false | The switch for Spark query result save to file. | boolean | 1.9.0 | +| kyuubi.operation.result.saveToFile.minRows | 10000 | The minRows of Spark result save to file, default value is 10000. | long | 1.9.1 | | kyuubi.operation.result.saveToFile.minSize | 209715200 | The minSize of Spark result save to file, default value is 200 MB.we use spark's `EstimationUtils#getSizePerRowestimate` to estimate the output size of the execution plan. | long | 1.9.0 | | kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 | | kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 | diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 7aef8f33911..5298954b49c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -259,7 +259,7 @@ object SparkDatasetHelper extends Logging { (resultMaxRows > 0 && resultMaxRows < minRows)) { return false } - val finalLimit = optimizedPlanLimit(result.queryExecution) match { + val finalLimit: Option[Long] = optimizedPlanLimit(result.queryExecution) match { case Some(limit) if resultMaxRows > 0 => Some(math.min(limit, resultMaxRows)) case Some(limit) => Some(limit) case None if resultMaxRows > 0 => Some(resultMaxRows) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 3665f6694a2..d8fec583391 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2069,7 +2069,7 @@ object KyuubiConf { val OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS: ConfigEntry[Long] = buildConf("kyuubi.operation.result.saveToFile.minRows") .doc("The minRows of Spark result save to file, default value is 10000.") - .version("1.9.0") + .version("1.9.1") .longConf .checkValue(_ > 0, "must be positive value") .createWithDefault(10000) From da9c2a9219228f3bb47e6091fac61cfa7e6f42aa Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 9 May 2024 00:15:52 -0700 Subject: [PATCH 5/5] ut --- .../spark/operation/ExecuteStatement.scala | 1 - .../spark/sql/kyuubi/SparkDatasetHelper.scala | 40 ++++--------------- .../sql/kyuubi/SparkDatasetHelperSuite.scala | 25 +++++------- 3 files changed, 16 insertions(+), 50 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 76191da417b..06193d83c4d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -175,7 +175,6 @@ class ExecuteStatement( val resultSaveSizeThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) val resultSaveRowsThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, spark) if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs( - statement, resultMaxRows, resultSaveSizeThreshold, resultSaveRowsThreshold, diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 5298954b49c..b78c8b7a33e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -24,11 +24,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.util.{ByteUnit, JavaUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.catalyst.parser.SqlBaseParser -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.StatementDefaultContext import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SparkSqlParser, SQLExecution} +import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -249,14 +247,14 @@ object SparkDatasetHelper extends Logging { } def shouldSaveResultToFs( - statement: String, resultMaxRows: Int, minSize: Long, minRows: Long, result: DataFrame): Boolean = { - if (isCommandExec(result.queryExecution.executedPlan.nodeName) || - !isDQL(statement) || - (resultMaxRows > 0 && resultMaxRows < minRows)) { + if (isCommandExec(result) || + (resultMaxRows > 0 && resultMaxRows < minRows) || + result.queryExecution.optimizedPlan.stats.rowCount.getOrElse( + BigInt(Long.MaxValue)) < minRows) { return false } val finalLimit: Option[Long] = optimizedPlanLimit(result.queryExecution) match { @@ -283,33 +281,9 @@ object SparkDatasetHelper extends Logging { minSize > 0 && colSize > 0 && stats >= minSize } - private def isCommandExec(nodeName: String): Boolean = { + def isCommandExec(result: DataFrame): Boolean = { + val nodeName = result.queryExecution.executedPlan.getClass.getName nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" || nodeName == "org.apache.spark.sql.execution.CommandResultExec" } - - class DQLParser extends SparkSqlParser { - override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { - super.parse(command)(toResult) - } - - def isStatementQuery(statement: String): Boolean = { - parse(statement)(_.statement()).isInstanceOf[StatementDefaultContext] - } - } - - private lazy val parser = new DQLParser() - - /** - * Whether is DQL(data query language), including withCte, select, union - */ - def isDQL(statement: String): Boolean = { - try { - parser.isStatementQuery(statement) - } catch { - case e: Throwable => - logDebug(s"error checking whether query $statement is DQL: ${e.getMessage}") - false - } - } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala index 34c858d999f..791cb12b9c5 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala @@ -43,21 +43,14 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine { } } - test("is data query language") { - var query = "select * from table" - assert(SparkDatasetHelper.isDQL(query)) - query = "(select * from table)" - assert(SparkDatasetHelper.isDQL(query)) - query = "(WITH TEMP_WITH_VIEW AS (SELECT * from tbl_d) SELECT * FROM TEMP_WITH_VIEW)" - assert(SparkDatasetHelper.isDQL(query)) - query = "(WITH TEMP_WITH_VIEW AS (SELECT * from tbl_d)" + - " INSERT INTO tbl SELECT * FROM TEMP_WITH_VIEW)" - assert(!SparkDatasetHelper.isDQL(query)) - query = "cache table tbl" - assert(!SparkDatasetHelper.isDQL(query)) - query = "insert into tbl select * from ta" - assert(!SparkDatasetHelper.isDQL(query)) - query = "set" - assert(!SparkDatasetHelper.isDQL(query)) + test("isCommandExec") { + var query = "set" + assert(SparkDatasetHelper.isCommandExec(spark.sql(query))) + query = "explain set" + assert(SparkDatasetHelper.isCommandExec(spark.sql(query))) + query = "show tables" + assert(SparkDatasetHelper.isCommandExec(spark.sql(query))) + query = "select * from VALUES(1),(2),(3),(4) AS t(id)" + assert(!SparkDatasetHelper.isCommandExec(spark.sql(query))) } }