Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions docs/sql-performance-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp
<td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td>
<td>true</td>
<td>
When set to true Spark SQL will automatically select a compression codec for each column based
When set to true, Spark SQL will automatically select a compression codec for each column based
on statistics of the data.
</td>
<td>1.0.1</td>
Expand Down Expand Up @@ -77,8 +77,8 @@ that these options will be deprecated in future release as more optimizations ar
<td><code>spark.sql.files.openCostInBytes</code></td>
<td>4194304 (4 MB)</td>
<td>
The estimated cost to open a file, measured by the number of bytes could be scanned in the same
time. This is used when putting multiple files into a partition. It is better to over-estimated,
The estimated cost to open a file, measured by the number of bytes that could be scanned in the same
time. This is used when putting multiple files into a partition. It is better to over-estimate,
then the partitions with small files will be faster than partitions with bigger files (which is
scheduled first). This configuration is effective only when using file-based sources such as Parquet,
JSON and ORC.
Expand Down Expand Up @@ -110,7 +110,7 @@ that these options will be deprecated in future release as more optimizations ar
<td>10485760 (10 MB)</td>
<td>
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently
performing a join. By setting this value to -1, broadcasting can be disabled. Note that currently
statistics are only supported for Hive Metastore tables where the command
<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.
</td>
Expand Down Expand Up @@ -140,8 +140,7 @@ that these options will be deprecated in future release as more optimizations ar
<td>10000</td>
<td>
Configures the maximum listing parallelism for job input paths. In case the number of input
paths is larger than this value, it will be throttled down to use this value. Same as above,
this configuration is only effective when using file-based data sources such as Parquet, ORC
paths is larger than this value, it will be throttled down to use this value. This configuration is only effective when using file-based data sources such as Parquet, ORC
and JSON.
</td>
<td>2.1.1</td>
Expand Down Expand Up @@ -215,8 +214,8 @@ For more details please refer to the documentation of [Join Hints](sql-ref-synta

## Coalesce Hints for SQL Queries

Coalesce hints allows the Spark SQL users to control the number of output files just like the
`coalesce`, `repartition` and `repartitionByRange` in Dataset API, they can be used for performance
Coalesce hints allow Spark SQL users to control the number of output files just like
`coalesce`, `repartition` and `repartitionByRange` in the Dataset API, they can be used for performance
tuning and reducing the number of output files. The "COALESCE" hint only has a partition number as a
parameter. The "REPARTITION" hint has a partition number, columns, or both/neither of them as parameters.
The "REPARTITION_BY_RANGE" hint must have column names and a partition number is optional. The "REBALANCE"
Expand Down Expand Up @@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when the runtime statistics
<td><code>spark.sql.adaptive.autoBroadcastJoinThreshold</code></td>
<td>(none)</td>
<td>
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with <code>spark.sql.autoBroadcastJoinThreshold</code>. Note that, this config is used only in adaptive framework.
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1, broadcasting can be disabled. The default value is the same as <code>spark.sql.autoBroadcastJoinThreshold</code>. Note that, this config is used only in adaptive framework.
</td>
<td>3.2.0</td>
</tr>
Expand All @@ -309,7 +308,7 @@ AQE converts sort-merge join to shuffled hash join when all post shuffle partiti
<td><code>spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold</code></td>
<td>0</td>
<td>
Configures the maximum size in bytes per partition that can be allowed to build local hash map. If this value is not smaller than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code> and all the partition size are not larger than this config, join selection prefer to use shuffled hash join instead of sort merge join regardless of the value of <code>spark.sql.join.preferSortMergeJoin</code>.
Configures the maximum size in bytes per partition that can be allowed to build local hash map. If this value is not smaller than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code> and all the partition sizes are not larger than this config, join selection prefers to use shuffled hash join instead of sort merge join regardless of the value of <code>spark.sql.join.preferSortMergeJoin</code>.
</td>
<td>3.2.0</td>
</tr>
Expand Down Expand Up @@ -339,7 +338,7 @@ Data skew can severely downgrade the performance of join queries. This feature d
<td><code>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes</code></td>
<td>256MB</td>
<td>
A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than <code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code> multiplying the median partition size. Ideally this config should be set larger than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than <code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code> multiplying the median partition size. Ideally, this config should be set larger than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
</td>
<td>3.0.0</td>
</tr>
Expand Down
22 changes: 11 additions & 11 deletions docs/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ The goal of GC tuning in Spark is to ensure that only long-lived RDDs are stored
the Young generation is sufficiently sized to store short-lived objects. This will help avoid full GCs to collect
temporary objects created during task execution. Some steps which may be useful are:

* Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for
* Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times
before a task completes, it means that there isn't enough memory available for executing tasks.

* If there are too many minor collections but not many major GCs, allocating more memory for Eden would help. You
Expand All @@ -235,12 +235,12 @@ temporary objects created during task execution. Some steps which may be useful
* Try the G1GC garbage collector with `-XX:+UseG1GC`. It can improve performance in some situations where
garbage collection is a bottleneck. Note that with large executor heap sizes, it may be important to
increase the [G1 region size](http://www.oracle.com/technetwork/articles/java/g1gc-1984535.html)
with `-XX:G1HeapRegionSize`
with `-XX:G1HeapRegionSize`.

* As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using
the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the
size of the block. So if we wish to have 3 or 4 tasks' worth of working space, and the HDFS block size is 128 MiB,
we can estimate size of Eden to be `4*3*128MiB`.
we can estimate the size of Eden to be `4*3*128MiB`.

* Monitor how the frequency and time taken by garbage collection changes with the new settings.

Expand Down Expand Up @@ -293,29 +293,29 @@ available in `SparkContext` can greatly reduce the size of each serialized task,
of launching a job over a cluster. If your tasks use any large object from the driver program
inside of them (e.g. a static lookup table), consider turning it into a broadcast variable.
Spark prints the serialized size of each task on the master, so you can look at that to
decide whether your tasks are too large; in general tasks larger than about 20 KiB are probably
decide whether your tasks are too large; in general, tasks larger than about 20 KiB are probably
worth optimizing.

## Data Locality

Data locality can have a major impact on the performance of Spark jobs. If data and the code that
operates on it are together then computation tends to be fast. But if code and data are separated,
one must move to the other. Typically it is faster to ship serialized code from place to place than
operates on it are together, then computation tends to be fast. But if code and data are separated,
one must move to the other. Typically, it is faster to ship serialized code from place to place than
a chunk of data because code size is much smaller than data. Spark builds its scheduling around
this general principle of data locality.

Data locality is how close data is to the code processing it. There are several levels of
locality based on the data's current location. In order from closest to farthest:

- `PROCESS_LOCAL` data is in the same JVM as the running code. This is the best locality
possible
possible.
- `NODE_LOCAL` data is on the same node. Examples might be in HDFS on the same node, or in
another executor on the same node. This is a little slower than `PROCESS_LOCAL` because the data
has to travel between processes
- `NO_PREF` data is accessed equally quickly from anywhere and has no locality preference
has to travel between processes.
- `NO_PREF` data is accessed equally quickly from anywhere and has no locality preference.
- `RACK_LOCAL` data is on the same rack of servers. Data is on a different server on the same rack
so needs to be sent over the network, typically through a single switch
- `ANY` data is elsewhere on the network and not in the same rack
so needs to be sent over the network, typically through a single switch.
- `ANY` data is elsewhere on the network and not in the same rack.

Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In
situations where there is no unprocessed data on any idle executor, Spark switches to lower locality
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
errors building different Hadoop versions.
See: SPARK-36547, SPARK-38394.
-->
<scala-maven-plugin.version>4.7.1</scala-maven-plugin.version>
<scala-maven-plugin.version>4.7.2</scala-maven-plugin.version>
<!-- for now, not running scalafmt as part of default verify pipeline -->
<scalafmt.skip>true</scalafmt.skip>
<scalafmt.validateOnly>true</scalafmt.validateOnly>
Expand Down
9 changes: 8 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,14 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.canEqual"),
ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.toString"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productElementName"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productElementNames")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productElementNames"),

// [SPARK-40950][CORE] Fix isRemoteAddressMaxedOut performance overhead on scala 2.13
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.blocks"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.copy"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.copy$default$2"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.ShuffleBlockFetcherIterator#FetchRequest.apply")
)

// Defulat exclude rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1578,13 +1578,29 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
val arguments = Seq(Literal(1), Literal(2), Literal(3))
assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
intercept[NoSuchFunctionException] {
catalog.lookupFunction(FunctionIdentifier("func1"), arguments)
}
intercept[NoSuchTempFunctionException] {
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
}
checkError(
exception = intercept[NoSuchFunctionException] {
catalog.lookupFunction(FunctionIdentifier("func1"), arguments)
},
errorClass = "ROUTINE_NOT_FOUND",
parameters = Map("routineName" -> "`default`.`func1`")
)
checkError(
exception = intercept[NoSuchTempFunctionException] {
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
},
errorClass = "ROUTINE_NOT_FOUND",
parameters = Map("routineName" -> "`func1`")
)
catalog.dropTempFunction("func1", ignoreIfNotExists = true)

checkError(
exception = intercept[NoSuchTempFunctionException] {
catalog.dropTempFunction("func2", ignoreIfNotExists = false)
},
errorClass = "ROUTINE_NOT_FOUND",
parameters = Map("routineName" -> "`func2`")
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import java.net.URL
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.test.SharedSparkSession


/**
* Tests for [[org.apache.spark.sql.internal.SharedState]].
*/
Expand All @@ -52,4 +52,20 @@ class SharedStateSuite extends SharedSparkSession {
assert(conf.isInstanceOf[Configuration])
assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///")
}

test("Default database does not exist") {
SQLConf.get.setConfString("spark.sql.catalog.spark_catalog.defaultDatabase",
"default_database_not_exists")

checkError(
exception = intercept[SparkException] {
spark.sharedState.externalCatalog
},
errorClass = "DEFAULT_DATABASE_NOT_EXISTS",
parameters = Map("defaultDatabase" -> "default_database_not_exists")
)

SQLConf.get.setConfString("spark.sql.catalog.spark_catalog.defaultDatabase",
SessionCatalog.DEFAULT_DATABASE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.control.NonFatal
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, ExplainSuiteHelper, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Offset, Sort}
import org.apache.spark.sql.connector.{IntegralAverage, StrLen}
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
Expand Down Expand Up @@ -2628,6 +2628,15 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
assert(indexes1.isEmpty)

sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
checkError(
exception = intercept[IndexAlreadyExistsException] {
sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map(
"message" -> "Failed to create index people_index in test.people"
)
)
assert(jdbcTable.indexExists("people_index"))
val indexes2 = jdbcTable.listIndexes()
assert(!indexes2.isEmpty)
Expand All @@ -2636,6 +2645,15 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
assert(tableIndex.indexName() == "people_index")

sql(s"DROP INDEX people_index ON TABLE h2.test.people")
checkError(
exception = intercept[NoSuchIndexException] {
sql(s"DROP INDEX people_index ON TABLE h2.test.people")
},
errorClass = "INDEX_NOT_FOUND",
parameters = Map(
"message" -> "Failed to drop index people_index in test.people"
)
)
assert(jdbcTable.indexExists("people_index") == false)
val indexes3 = jdbcTable.listIndexes()
assert(indexes3.isEmpty)
Expand Down