Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 |
| kyuubi.operation.result.saveToFile.dir | /tmp/kyuubi/tmp_kyuubi_result | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query result will delete when query finished. | string | 1.9.0 |
| kyuubi.operation.result.saveToFile.enabled | false | The switch for Spark query result save to file. | boolean | 1.9.0 |
| kyuubi.operation.result.saveToFile.minRows | 10000 | The minRows of Spark result save to file, default value is 10000. | long | 1.9.1 |
| kyuubi.operation.result.saveToFile.minSize | 209715200 | The minSize of Spark result save to file, default value is 200 MB.we use spark's `EstimationUtils#getSizePerRowestimate` to estimate the output size of the execution plan. | long | 1.9.0 |
| kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
Expand Down Expand Up @@ -172,10 +172,12 @@ class ExecuteStatement(
})
} else {
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
val resultSaveSizeThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
val resultSaveRowsThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, spark)
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
resultMaxRows,
resultSaveThreshold,
resultSaveSizeThreshold,
resultSaveRowsThreshold,
result)) {
saveFileName =
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,22 +246,33 @@ object SparkDatasetHelper extends Logging {
case _ => None
}

def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
def shouldSaveResultToFs(
resultMaxRows: Int,
minSize: Long,
minRows: Long,
result: DataFrame): Boolean = {
if (isCommandExec(result) ||
(resultMaxRows > 0 && resultMaxRows < minRows) ||
result.queryExecution.optimizedPlan.stats.rowCount.getOrElse(
BigInt(Long.MaxValue)) < minRows) {
return false
}
val finalLimit = optimizedPlanLimit(result.queryExecution) match {
case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
case Some(limit) => limit
case None => resultMaxRows
val finalLimit: Option[Long] = optimizedPlanLimit(result.queryExecution) match {
case Some(limit) if resultMaxRows > 0 => Some(math.min(limit, resultMaxRows))
case Some(limit) => Some(limit)
case None if resultMaxRows > 0 => Some(resultMaxRows)
case _ => None
}
lazy val stats = if (finalLimit > 0) {
finalLimit * EstimationUtils.getSizePerRow(
result.queryExecution.executedPlan.output)
} else {
result.queryExecution.optimizedPlan.stats.sizeInBytes
if (finalLimit.exists(_ < minRows)) {
return false
}
lazy val colSize =
val sizeInBytes = result.queryExecution.optimizedPlan.stats.sizeInBytes
val stats = finalLimit.map { limit =>
val estimateSize =
limit * EstimationUtils.getSizePerRow(result.queryExecution.executedPlan.output)
estimateSize min sizeInBytes
}.getOrElse(sizeInBytes)
val colSize =
if (result == null || result.schema.isEmpty) {
0
} else {
Expand All @@ -270,7 +281,8 @@ object SparkDatasetHelper extends Logging {
minSize > 0 && colSize > 0 && stats >= minSize
}

private def isCommandExec(nodeName: String): Boolean = {
def isCommandExec(result: DataFrame): Boolean = {
val nodeName = result.queryExecution.executedPlan.getClass.getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result.queryExecution.executedPlan.nodeName?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark 3.1

scala> spark.sql("set").queryExecution.executedPlan.nodeName
res22: String = Execute SetCommand

scala> spark.sql("show tables").queryExecution.executedPlan.nodeName
res23: String = Execute ShowTablesCommand

scala>

spark 3.5

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a bug and not covered by UT before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have the method isCommandResultExec before, until we drop spark 3.1 support

private def isCommandResultExec(sparkPlan: SparkPlan): Boolean = {
// scalastyle:off line.size.limit
// the CommandResultExec was introduced in SPARK-35378 (Spark 3.2), after SPARK-35378 the
// physical plan of runnable command is CommandResultExec.
// for instance:
// ```
// scala> spark.sql("show tables").queryExecution.executedPlan
// res0: org.apache.spark.sql.execution.SparkPlan =
// CommandResult <empty>, [namespace#0, tableName#1, isTemporary#2]
// +- ShowTables [namespace#0, tableName#1, isTemporary#2], V2SessionCatalog(spark_catalog), [default]
//
// scala > spark.sql("show tables").queryExecution.executedPlan.getClass
// res1: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.CommandResultExec
// ```
// scalastyle:on line.size.limit
sparkPlan.getClass.getName == "org.apache.spark.sql.execution.CommandResultExec"
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have dropped support for Spark 3.1 in #6273, do we still need to adapt to it?

Copy link
Member Author

@turboFei turboFei May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it did not work for spark 3.5 as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it did not work for spark 3.5 as well

I mean we can simplify the logical to

result.queryExecution.executedPlan match {
    case commandResult: CommandResultExec => true
    case _ => false
}

nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
nodeName == "org.apache.spark.sql.execution.CommandResultExec"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,15 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine {
spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold))
}
}

test("isCommandExec") {
var query = "set"
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
query = "explain set"
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
query = "show tables"
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
query = "select * from VALUES(1),(2),(3),(4) AS t(id)"
assert(!SparkDatasetHelper.isCommandExec(spark.sql(query)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,14 @@ object KyuubiConf {
.checkValue(_ > 0, "must be positive value")
.createWithDefault(200 * 1024 * 1024)

val OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS: ConfigEntry[Long] =
buildConf("kyuubi.operation.result.saveToFile.minRows")
.doc("The minRows of Spark result save to file, default value is 10000.")
.version("1.9.1")
.longConf
.checkValue(_ > 0, "must be positive value")
.createWithDefault(10000)

val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.incremental.collect")
.internal
Expand Down