diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index e922e3dcacb..c3231cbb0b8 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -420,6 +420,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 | | kyuubi.operation.result.saveToFile.dir | /tmp/kyuubi/tmp_kyuubi_result | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query result will delete when query finished. | string | 1.9.0 | | kyuubi.operation.result.saveToFile.enabled | false | The switch for Spark query result save to file. | boolean | 1.9.0 | +| kyuubi.operation.result.saveToFile.minRows | 10000 | The minRows of Spark result save to file, default value is 10000. | long | 1.9.1 | | kyuubi.operation.result.saveToFile.minSize | 209715200 | The minSize of Spark result save to file, default value is 200 MB.we use spark's `EstimationUtils#getSizePerRowestimate` to estimate the output size of the execution plan. | long | 1.9.0 | | kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 | | kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 | diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index a52d32be9cb..06193d83c4d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} +import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager} import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState} @@ -172,10 +172,12 @@ class ExecuteStatement( }) } else { val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark) - val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) + val resultSaveSizeThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) + val resultSaveRowsThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, spark) if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs( resultMaxRows, - resultSaveThreshold, + resultSaveSizeThreshold, + resultSaveRowsThreshold, result)) { saveFileName = Some( diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 2dbfe7348a3..b78c8b7a33e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -246,22 +246,33 @@ object SparkDatasetHelper extends Logging { case _ => None } - def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = { - if (isCommandExec(result.queryExecution.executedPlan.nodeName)) { + def shouldSaveResultToFs( + resultMaxRows: Int, + minSize: Long, + minRows: Long, + result: DataFrame): Boolean = { + if (isCommandExec(result) || + (resultMaxRows > 0 && resultMaxRows < minRows) || + result.queryExecution.optimizedPlan.stats.rowCount.getOrElse( + BigInt(Long.MaxValue)) < minRows) { return false } - val finalLimit = optimizedPlanLimit(result.queryExecution) match { - case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows) - case Some(limit) => limit - case None => resultMaxRows + val finalLimit: Option[Long] = optimizedPlanLimit(result.queryExecution) match { + case Some(limit) if resultMaxRows > 0 => Some(math.min(limit, resultMaxRows)) + case Some(limit) => Some(limit) + case None if resultMaxRows > 0 => Some(resultMaxRows) + case _ => None } - lazy val stats = if (finalLimit > 0) { - finalLimit * EstimationUtils.getSizePerRow( - result.queryExecution.executedPlan.output) - } else { - result.queryExecution.optimizedPlan.stats.sizeInBytes + if (finalLimit.exists(_ < minRows)) { + return false } - lazy val colSize = + val sizeInBytes = result.queryExecution.optimizedPlan.stats.sizeInBytes + val stats = finalLimit.map { limit => + val estimateSize = + limit * EstimationUtils.getSizePerRow(result.queryExecution.executedPlan.output) + estimateSize min sizeInBytes + }.getOrElse(sizeInBytes) + val colSize = if (result == null || result.schema.isEmpty) { 0 } else { @@ -270,7 +281,8 @@ object SparkDatasetHelper extends Logging { minSize > 0 && colSize > 0 && stats >= minSize } - private def isCommandExec(nodeName: String): Boolean = { + def isCommandExec(result: DataFrame): Boolean = { + val nodeName = result.queryExecution.executedPlan.getClass.getName nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" || nodeName == "org.apache.spark.sql.execution.CommandResultExec" } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala index 8ac00e60262..791cb12b9c5 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala @@ -42,4 +42,15 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine { spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold)) } } + + test("isCommandExec") { + var query = "set" + assert(SparkDatasetHelper.isCommandExec(spark.sql(query))) + query = "explain set" + assert(SparkDatasetHelper.isCommandExec(spark.sql(query))) + query = "show tables" + assert(SparkDatasetHelper.isCommandExec(spark.sql(query))) + query = "select * from VALUES(1),(2),(3),(4) AS t(id)" + assert(!SparkDatasetHelper.isCommandExec(spark.sql(query))) + } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 22d31f7df4e..d8fec583391 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2066,6 +2066,14 @@ object KyuubiConf { .checkValue(_ > 0, "must be positive value") .createWithDefault(200 * 1024 * 1024) + val OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS: ConfigEntry[Long] = + buildConf("kyuubi.operation.result.saveToFile.minRows") + .doc("The minRows of Spark result save to file, default value is 10000.") + .version("1.9.1") + .longConf + .checkValue(_ > 0, "must be positive value") + .createWithDefault(10000) + val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] = buildConf("kyuubi.operation.incremental.collect") .internal