From c326c61a482d37d15f66478b5fa3fb07bcd4f6a0 Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Wed, 7 Sep 2022 15:57:59 +0800 Subject: [PATCH 1/5] set CommandExecutionMode SKIP when get physical and execute --- .../engine/spark/operation/PlanOnlyStatement.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index b7e5451ece2..b8bdb1e4e04 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -112,10 +113,15 @@ class PlanOnlyStatement( SQLConf.get.maxToStringFields, printOperatorId = false)))) case PhysicalMode => - val physical = spark.sql(statement).queryExecution.sparkPlan + val analyzed = spark.sessionState.analyzer.execute(plan) + spark.sessionState.analyzer.checkAnalysis(analyzed) + val physical = spark.sessionState.executePlan(analyzed, CommandExecutionMode.SKIP).sparkPlan iter = new IterableFetchIterator(Seq(Row(physical.toString()))) case ExecutionMode => - val executed = spark.sql(statement).queryExecution.executedPlan + val analyzed = spark.sessionState.analyzer.execute(plan) + spark.sessionState.analyzer.checkAnalysis(analyzed) + val executed = + spark.sessionState.executePlan(analyzed, CommandExecutionMode.SKIP).executedPlan iter = new IterableFetchIterator(Seq(Row(executed.toString()))) case UnknownMode => throw unknownModeError(mode) case _ => throw notSupportedModeError(mode, "Spark SQL") From a4469e3d5bd1315122b85dfef7e570a99cc2b6ad Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Tue, 13 Sep 2022 10:44:36 +0800 Subject: [PATCH 2/5] use explain command get execution --- .../spark/operation/PlanOnlyStatement.scala | 22 +++++++----- .../operation/PlanOnlyOperationSuite.scala | 35 ++++++++++++++++++- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index b8bdb1e4e04..17b1b02db2f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -18,8 +18,9 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} +import org.apache.spark.sql.execution.SimpleMode +import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -115,14 +116,13 @@ class PlanOnlyStatement( case PhysicalMode => val analyzed = spark.sessionState.analyzer.execute(plan) spark.sessionState.analyzer.checkAnalysis(analyzed) - val physical = spark.sessionState.executePlan(analyzed, CommandExecutionMode.SKIP).sparkPlan + val optimized = spark.sessionState.optimizer.execute(analyzed) + val physical = spark.sessionState.planner.plan(ReturnAnswer(optimized)).next() iter = new IterableFetchIterator(Seq(Row(physical.toString()))) case ExecutionMode => - val analyzed = spark.sessionState.analyzer.execute(plan) - spark.sessionState.analyzer.checkAnalysis(analyzed) - val executed = - spark.sessionState.executePlan(analyzed, CommandExecutionMode.SKIP).executedPlan - iter = new IterableFetchIterator(Seq(Row(executed.toString()))) + val executed = ExplainCommand(plan, SimpleMode).run(spark).map(x => + Row(x.getString(0).replaceFirst("== Physical Plan ==\n", ""))) + iter = new IterableFetchIterator(executed) case UnknownMode => throw unknownModeError(mode) case _ => throw notSupportedModeError(mode, "Spark SQL") } @@ -142,9 +142,13 @@ class PlanOnlyStatement( val optimized = spark.sessionState.optimizer.execute(analyzed) iter = new IterableFetchIterator(Seq(Row(optimized.toJSON))) case PhysicalMode => - val physical = spark.sql(statement).queryExecution.sparkPlan + val analyzed = spark.sessionState.analyzer.execute(plan) + spark.sessionState.analyzer.checkAnalysis(analyzed) + val optimized = spark.sessionState.optimizer.execute(analyzed) + val physical = spark.sessionState.planner.plan(ReturnAnswer(optimized)).next() iter = new IterableFetchIterator(Seq(Row(physical.toJSON))) case ExecutionMode => + warn("Plan like Command will real execute") val executed = spark.sql(statement).queryExecution.executedPlan iter = new IterableFetchIterator(Seq(Row(executed.toJSON))) case UnknownMode => throw unknownModeError(mode) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala index 6a37e823db5..0e5ab7c73bb 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala @@ -188,7 +188,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper { } test("kyuubi #3214: Plan only mode with an incorrect value") { - withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> "parse"))(Map.empty) { + withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> ParseMode.name))(Map.empty) { withJdbcStatement() { statement => statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=parser") val e = intercept[KyuubiSQLException](statement.executeQuery("select 1")) @@ -201,6 +201,39 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper { } } + test("kyuubi #3435: Command should not execute when plan only mode is set to PHYSICAL") { + withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))(Map.empty) { + withJdbcStatement("tmp_test") { statement => + statement.execute("create database if not exists db1") + statement.execute("use db1") + statement.executeQuery("create table tmp_test(test_col string) using parquet") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=physical") + statement.executeQuery("drop table tmp_test") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=none") + val result = statement.executeQuery("desc table tmp_test") + assert(result.next()) + assert(result.getString(1).contains("test_col")) + } + } + } + + test("kyuubi #3435: Command should not execute when plan only mode is set to EXECUTION") { + withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))(Map.empty) { + withJdbcStatement("tmp_test") { statement => + statement.execute("create database if not exists db1") + statement.execute("use db1") + statement.executeQuery("create table tmp_test(test_col string) using parquet") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=execution") + statement.executeQuery("drop table tmp_test") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=none") + val result = statement.executeQuery("desc table tmp_test") + assert(result.next()) + assert(result.getString(1).contains("test_col")) + } + } + + } + private def getOperationPlanWithStatement(statement: Statement): String = { val resultSet = statement.executeQuery("select 1 where true") assert(resultSet.next()) From bf60f028dc9d33220d95cb396a8cedfaf9b191b8 Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Thu, 15 Sep 2022 19:36:51 +0800 Subject: [PATCH 3/5] remove execution mode when explain with json style --- .../kyuubi/engine/spark/operation/PlanOnlyStatement.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index 17b1b02db2f..42ddc014e68 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -148,9 +148,8 @@ class PlanOnlyStatement( val physical = spark.sessionState.planner.plan(ReturnAnswer(optimized)).next() iter = new IterableFetchIterator(Seq(Row(physical.toJSON))) case ExecutionMode => - warn("Plan like Command will real execute") - val executed = spark.sql(statement).queryExecution.executedPlan - iter = new IterableFetchIterator(Seq(Row(executed.toJSON))) + throw KyuubiSQLException(s"The operation mode $mode" + + " with json style doesn't support in Spark SQL engine.") case UnknownMode => throw unknownModeError(mode) case _ => throw KyuubiSQLException(s"The operation mode $mode" + From 9aabe046221a0ce7d6b5a221f539c8f838cc5ce5 Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Wed, 7 Dec 2022 17:50:07 +0800 Subject: [PATCH 4/5] use explain formatted wrap sql when plan only mode is execution --- .../engine/spark/operation/PlanOnlyStatement.scala | 4 +--- .../kyuubi/operation/PlanOnlyOperationSuite.scala | 14 ++++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index 42ddc014e68..e321adeeed1 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -19,8 +19,6 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} -import org.apache.spark.sql.execution.SimpleMode -import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -120,7 +118,7 @@ class PlanOnlyStatement( val physical = spark.sessionState.planner.plan(ReturnAnswer(optimized)).next() iter = new IterableFetchIterator(Seq(Row(physical.toString()))) case ExecutionMode => - val executed = ExplainCommand(plan, SimpleMode).run(spark).map(x => + val executed = spark.sql(s"explain formatted $statement").collect.map(x => Row(x.getString(0).replaceFirst("== Physical Plan ==\n", ""))) iter = new IterableFetchIterator(executed) case UnknownMode => throw unknownModeError(mode) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala index 0e5ab7c73bb..03c2a331357 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala @@ -75,8 +75,8 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper { Map.empty) { withJdbcStatement() { statement => val operationPlan = getOperationPlanWithStatement(statement) - assert(operationPlan.startsWith("*(1) Project") && - operationPlan.contains("*(1) Scan OneRowRelation")) + assert(operationPlan.startsWith("* Project (2)") && + operationPlan.contains("* Scan OneRowRelation (1)")) } } } @@ -207,9 +207,10 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper { statement.execute("create database if not exists db1") statement.execute("use db1") statement.executeQuery("create table tmp_test(test_col string) using parquet") - statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=physical") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}" + + s"=${PhysicalMode.name}") statement.executeQuery("drop table tmp_test") - statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=none") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=${NoneMode.name}") val result = statement.executeQuery("desc table tmp_test") assert(result.next()) assert(result.getString(1).contains("test_col")) @@ -223,9 +224,10 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper { statement.execute("create database if not exists db1") statement.execute("use db1") statement.executeQuery("create table tmp_test(test_col string) using parquet") - statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=execution") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}" + + s"=${ExecutionMode.name}") statement.executeQuery("drop table tmp_test") - statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=none") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=${NoneMode.name}") val result = statement.executeQuery("desc table tmp_test") assert(result.next()) assert(result.getString(1).contains("test_col")) From 3447cbf5560bfd2c833b2f3eae45a58f7c9d55b8 Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Wed, 14 Dec 2022 10:18:17 +0800 Subject: [PATCH 5/5] remove empty line --- .../org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala index 03c2a331357..662375f3962 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala @@ -233,7 +233,6 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper { assert(result.getString(1).contains("test_col")) } } - } private def getOperationPlanWithStatement(statement: Statement): String = {