diff --git a/docs/sql-performance-tuning.md b/docs/sql-performance-tuning.md
index d736ff8f83f3..6ac39d90527f 100644
--- a/docs/sql-performance-tuning.md
+++ b/docs/sql-performance-tuning.md
@@ -40,7 +40,7 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp
spark.sql.inMemoryColumnarStorage.compressed |
true |
- When set to true Spark SQL will automatically select a compression codec for each column based
+ When set to true, Spark SQL will automatically select a compression codec for each column based
on statistics of the data.
|
1.0.1 |
@@ -77,8 +77,8 @@ that these options will be deprecated in future release as more optimizations ar
spark.sql.files.openCostInBytes |
4194304 (4 MB) |
- The estimated cost to open a file, measured by the number of bytes could be scanned in the same
- time. This is used when putting multiple files into a partition. It is better to over-estimated,
+ The estimated cost to open a file, measured by the number of bytes that could be scanned in the same
+ time. This is used when putting multiple files into a partition. It is better to over-estimate,
then the partitions with small files will be faster than partitions with bigger files (which is
scheduled first). This configuration is effective only when using file-based sources such as Parquet,
JSON and ORC.
@@ -110,7 +110,7 @@ that these options will be deprecated in future release as more optimizations ar
| 10485760 (10 MB) |
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
- performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently
+ performing a join. By setting this value to -1, broadcasting can be disabled. Note that currently
statistics are only supported for Hive Metastore tables where the command
ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run.
|
@@ -140,8 +140,7 @@ that these options will be deprecated in future release as more optimizations ar
10000 |
Configures the maximum listing parallelism for job input paths. In case the number of input
- paths is larger than this value, it will be throttled down to use this value. Same as above,
- this configuration is only effective when using file-based data sources such as Parquet, ORC
+ paths is larger than this value, it will be throttled down to use this value. This configuration is only effective when using file-based data sources such as Parquet, ORC
and JSON.
|
2.1.1 |
@@ -215,8 +214,8 @@ For more details please refer to the documentation of [Join Hints](sql-ref-synta
## Coalesce Hints for SQL Queries
-Coalesce hints allows the Spark SQL users to control the number of output files just like the
-`coalesce`, `repartition` and `repartitionByRange` in Dataset API, they can be used for performance
+Coalesce hints allow Spark SQL users to control the number of output files just like
+`coalesce`, `repartition` and `repartitionByRange` in the Dataset API, they can be used for performance
tuning and reducing the number of output files. The "COALESCE" hint only has a partition number as a
parameter. The "REPARTITION" hint has a partition number, columns, or both/neither of them as parameters.
The "REPARTITION_BY_RANGE" hint must have column names and a partition number is optional. The "REBALANCE"
@@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when the runtime statistics
spark.sql.adaptive.autoBroadcastJoinThreshold |
(none) |
- Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework.
+ Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1, broadcasting can be disabled. The default value is the same as spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework.
|
3.2.0 |
@@ -309,7 +308,7 @@ AQE converts sort-merge join to shuffled hash join when all post shuffle partiti
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0 |
- Configures the maximum size in bytes per partition that can be allowed to build local hash map. If this value is not smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes and all the partition size are not larger than this config, join selection prefer to use shuffled hash join instead of sort merge join regardless of the value of spark.sql.join.preferSortMergeJoin.
+ Configures the maximum size in bytes per partition that can be allowed to build local hash map. If this value is not smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes and all the partition sizes are not larger than this config, join selection prefers to use shuffled hash join instead of sort merge join regardless of the value of spark.sql.join.preferSortMergeJoin.
|
3.2.0 |
@@ -339,7 +338,7 @@ Data skew can severely downgrade the performance of join queries. This feature d
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB |
- A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than spark.sql.adaptive.skewJoin.skewedPartitionFactor multiplying the median partition size. Ideally this config should be set larger than spark.sql.adaptive.advisoryPartitionSizeInBytes.
+ A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than spark.sql.adaptive.skewJoin.skewedPartitionFactor multiplying the median partition size. Ideally, this config should be set larger than spark.sql.adaptive.advisoryPartitionSizeInBytes.
|
3.0.0 |
diff --git a/docs/tuning.md b/docs/tuning.md
index 18d4a6205f4f..550ffb0f357b 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -217,7 +217,7 @@ The goal of GC tuning in Spark is to ensure that only long-lived RDDs are stored
the Young generation is sufficiently sized to store short-lived objects. This will help avoid full GCs to collect
temporary objects created during task execution. Some steps which may be useful are:
-* Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for
+* Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times
before a task completes, it means that there isn't enough memory available for executing tasks.
* If there are too many minor collections but not many major GCs, allocating more memory for Eden would help. You
@@ -235,12 +235,12 @@ temporary objects created during task execution. Some steps which may be useful
* Try the G1GC garbage collector with `-XX:+UseG1GC`. It can improve performance in some situations where
garbage collection is a bottleneck. Note that with large executor heap sizes, it may be important to
increase the [G1 region size](http://www.oracle.com/technetwork/articles/java/g1gc-1984535.html)
- with `-XX:G1HeapRegionSize`
+ with `-XX:G1HeapRegionSize`.
* As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using
the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the
size of the block. So if we wish to have 3 or 4 tasks' worth of working space, and the HDFS block size is 128 MiB,
- we can estimate size of Eden to be `4*3*128MiB`.
+ we can estimate the size of Eden to be `4*3*128MiB`.
* Monitor how the frequency and time taken by garbage collection changes with the new settings.
@@ -293,14 +293,14 @@ available in `SparkContext` can greatly reduce the size of each serialized task,
of launching a job over a cluster. If your tasks use any large object from the driver program
inside of them (e.g. a static lookup table), consider turning it into a broadcast variable.
Spark prints the serialized size of each task on the master, so you can look at that to
-decide whether your tasks are too large; in general tasks larger than about 20 KiB are probably
+decide whether your tasks are too large; in general, tasks larger than about 20 KiB are probably
worth optimizing.
## Data Locality
Data locality can have a major impact on the performance of Spark jobs. If data and the code that
-operates on it are together then computation tends to be fast. But if code and data are separated,
-one must move to the other. Typically it is faster to ship serialized code from place to place than
+operates on it are together, then computation tends to be fast. But if code and data are separated,
+one must move to the other. Typically, it is faster to ship serialized code from place to place than
a chunk of data because code size is much smaller than data. Spark builds its scheduling around
this general principle of data locality.
@@ -308,14 +308,14 @@ Data locality is how close data is to the code processing it. There are several
locality based on the data's current location. In order from closest to farthest:
- `PROCESS_LOCAL` data is in the same JVM as the running code. This is the best locality
- possible
+ possible.
- `NODE_LOCAL` data is on the same node. Examples might be in HDFS on the same node, or in
another executor on the same node. This is a little slower than `PROCESS_LOCAL` because the data
- has to travel between processes
-- `NO_PREF` data is accessed equally quickly from anywhere and has no locality preference
+ has to travel between processes.
+- `NO_PREF` data is accessed equally quickly from anywhere and has no locality preference.
- `RACK_LOCAL` data is on the same rack of servers. Data is on a different server on the same rack
- so needs to be sent over the network, typically through a single switch
-- `ANY` data is elsewhere on the network and not in the same rack
+ so needs to be sent over the network, typically through a single switch.
+- `ANY` data is elsewhere on the network and not in the same rack.
Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In
situations where there is no unprocessed data on any idle executor, Spark switches to lower locality
diff --git a/pom.xml b/pom.xml
index c973c802d05b..6da1f8f0a116 100644
--- a/pom.xml
+++ b/pom.xml
@@ -169,7 +169,7 @@
errors building different Hadoop versions.
See: SPARK-36547, SPARK-38394.
-->
- 4.7.1
+ 4.7.2
true
true
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fa14468c5220..a6b3bf0725aa 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -110,7 +110,14 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.canEqual"),
ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.toString"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productElementName"),
- ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productElementNames")
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productElementNames"),
+
+ // [SPARK-40950][CORE] Fix isRemoteAddressMaxedOut performance overhead on scala 2.13
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.blocks"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.copy"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.copy$default$2"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.this"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.apply")
)
// Defulat exclude rules
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index d846162b7842..f86d12474d68 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -1578,13 +1578,29 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
val arguments = Seq(Literal(1), Literal(2), Literal(3))
assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
- intercept[NoSuchFunctionException] {
- catalog.lookupFunction(FunctionIdentifier("func1"), arguments)
- }
- intercept[NoSuchTempFunctionException] {
- catalog.dropTempFunction("func1", ignoreIfNotExists = false)
- }
+ checkError(
+ exception = intercept[NoSuchFunctionException] {
+ catalog.lookupFunction(FunctionIdentifier("func1"), arguments)
+ },
+ errorClass = "ROUTINE_NOT_FOUND",
+ parameters = Map("routineName" -> "`default`.`func1`")
+ )
+ checkError(
+ exception = intercept[NoSuchTempFunctionException] {
+ catalog.dropTempFunction("func1", ignoreIfNotExists = false)
+ },
+ errorClass = "ROUTINE_NOT_FOUND",
+ parameters = Map("routineName" -> "`func1`")
+ )
catalog.dropTempFunction("func1", ignoreIfNotExists = true)
+
+ checkError(
+ exception = intercept[NoSuchTempFunctionException] {
+ catalog.dropTempFunction("func2", ignoreIfNotExists = false)
+ },
+ errorClass = "ROUTINE_NOT_FOUND",
+ parameters = Map("routineName" -> "`func2`")
+ )
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
index 81bf15342423..d3154d0125af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
@@ -22,10 +22,10 @@ import java.net.URL
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.test.SharedSparkSession
-
/**
* Tests for [[org.apache.spark.sql.internal.SharedState]].
*/
@@ -52,4 +52,20 @@ class SharedStateSuite extends SharedSparkSession {
assert(conf.isInstanceOf[Configuration])
assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///")
}
+
+ test("Default database does not exist") {
+ SQLConf.get.setConfString("spark.sql.catalog.spark_catalog.defaultDatabase",
+ "default_database_not_exists")
+
+ checkError(
+ exception = intercept[SparkException] {
+ spark.sharedState.externalCatalog
+ },
+ errorClass = "DEFAULT_DATABASE_NOT_EXISTS",
+ parameters = Map("defaultDatabase" -> "default_database_not_exists")
+ )
+
+ SQLConf.get.setConfString("spark.sql.catalog.spark_catalog.defaultDatabase",
+ SessionCatalog.DEFAULT_DATABASE)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index f47efae88c86..46281ee6644c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, ExplainSuiteHelper, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Offset, Sort}
import org.apache.spark.sql.connector.{IntegralAverage, StrLen}
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
@@ -2628,6 +2628,15 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
assert(indexes1.isEmpty)
sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
+ checkError(
+ exception = intercept[IndexAlreadyExistsException] {
+ sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
+ },
+ errorClass = "INDEX_ALREADY_EXISTS",
+ parameters = Map(
+ "message" -> "Failed to create index people_index in test.people"
+ )
+ )
assert(jdbcTable.indexExists("people_index"))
val indexes2 = jdbcTable.listIndexes()
assert(!indexes2.isEmpty)
@@ -2636,6 +2645,15 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
assert(tableIndex.indexName() == "people_index")
sql(s"DROP INDEX people_index ON TABLE h2.test.people")
+ checkError(
+ exception = intercept[NoSuchIndexException] {
+ sql(s"DROP INDEX people_index ON TABLE h2.test.people")
+ },
+ errorClass = "INDEX_NOT_FOUND",
+ parameters = Map(
+ "message" -> "Failed to drop index people_index in test.people"
+ )
+ )
assert(jdbcTable.indexExists("people_index") == false)
val indexes3 = jdbcTable.listIndexes()
assert(indexes3.isEmpty)