Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUti

override protected def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024)
spark.conf.set(SQLConf.FILES_MAX_PARTITION_BYTES.key, 1024)
}

def checkReloadMatchesSaved(originalFile: String, newFile: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTe

test("failOnDataLoss=false should not return duplicated records: microbatch v1") {
withSQLConf(
"spark.sql.streaming.disabledV2MicroBatchReaders" ->
SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key ->
classOf[KafkaSourceProvider].getCanonicalName) {
verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
val query = df.writeStream.format("memory").queryName(table).start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(
"spark.sql.streaming.disabledV2MicroBatchReaders",
SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key,
classOf[KafkaSourceProvider].getCanonicalName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED =
buildConf("spark.sql.inMemoryTableScanStatistics.enable")
.internal()
.doc("When true, enable in-memory table scan accumulators.")
.booleanConf
.createWithDefault(false)

val CACHE_VECTORIZED_READER_ENABLED =
buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
.doc("Enables vectorized reader for columnar caching.")
Expand Down Expand Up @@ -1024,6 +1031,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ENABLE_VECTORIZED_HASH_MAP =
buildConf("spark.sql.codegen.aggregate.map.vectorized.enable")
.internal()
.doc("Enable vectorized aggregate hash map. This is for testing/benchmarking only.")
.booleanConf
.createWithDefault(false)

val MAX_NESTED_VIEW_DEPTH =
buildConf("spark.sql.view.maxNestedViewDepth")
.internal()
Expand Down Expand Up @@ -2109,6 +2123,8 @@ class SQLConf extends Serializable with Logging {

def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

def inMemoryTableScanStatisticsEnabled: Boolean = getConf(IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED)

def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)

def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
Expand Down Expand Up @@ -2148,6 +2164,8 @@ class SQLConf extends Serializable with Logging {

def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)

def enableVectorizedHashMap: Boolean = getConf(ENABLE_VECTORIZED_HASH_MAP)

def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG)

def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,14 +1126,14 @@ class TypeCoercionSuite extends AnalysisTest {
Concat(Seq(Cast(Literal(new java.sql.Date(0)), StringType),
Cast(Literal(new Timestamp(0)), StringType))))

withSQLConf("spark.sql.function.concatBinaryAsString" -> "true") {
withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "true") {
ruleTest(rule,
Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))),
Concat(Seq(Cast(Literal("123".getBytes), StringType),
Cast(Literal("456".getBytes), StringType))))
}

withSQLConf("spark.sql.function.concatBinaryAsString" -> "false") {
withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "false") {
ruleTest(rule,
Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))),
Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))))
Expand Down Expand Up @@ -1180,14 +1180,14 @@ class TypeCoercionSuite extends AnalysisTest {
Elt(Seq(Literal(2), Cast(Literal(new java.sql.Date(0)), StringType),
Cast(Literal(new Timestamp(0)), StringType))))

withSQLConf("spark.sql.function.eltOutputAsString" -> "true") {
withSQLConf(SQLConf.ELT_OUTPUT_AS_STRING.key -> "true") {
ruleTest(rule,
Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))),
Elt(Seq(Literal(1), Cast(Literal("123".getBytes), StringType),
Cast(Literal("456".getBytes), StringType))))
}

withSQLConf("spark.sql.function.eltOutputAsString" -> "false") {
withSQLConf(SQLConf.ELT_OUTPUT_AS_STRING.key -> "false") {
ruleTest(rule,
Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))),
Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))))
Expand Down Expand Up @@ -1498,7 +1498,7 @@ class TypeCoercionSuite extends AnalysisTest {
DoubleType)))
Seq(true, false).foreach { convertToTS =>
withSQLConf(
"spark.sql.legacy.compareDateTimestampInTimestamp" -> convertToTS.toString) {
SQLConf.COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP.key -> convertToTS.toString) {
val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01 00:00:00"))
val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01 00:00:01"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
import org.apache.spark.unsafe.KVIterator
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -559,29 +560,24 @@ case class HashAggregateExec(
private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = {
if (!checkIfFastHashMapSupported(ctx)) {
if (modes.forall(mode => mode == Partial || mode == PartialMerge) && !Utils.isTesting) {
logInfo("spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but"
logInfo(s"${SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key} is set to true, but"
+ " current version of codegened fast hashmap does not support this aggregate.")
}
} else {
isFastHashMapEnabled = true

// This is for testing/benchmarking only.
// We enforce to first level to be a vectorized hashmap, instead of the default row-based one.
isVectorizedHashMapEnabled = sqlContext.getConf(
"spark.sql.codegen.aggregate.map.vectorized.enable", "false") == "true"
isVectorizedHashMapEnabled = sqlContext.conf.enableVectorizedHashMap
}
}

private def doProduceWithKeys(ctx: CodegenContext): String = {
val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg")
if (sqlContext.conf.enableTwoLevelAggMap) {
enableTwoLevelHashMap(ctx)
} else {
sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match {
case "true" =>
logWarning("Two level hashmap is disabled but vectorized hashmap is enabled.")
case _ =>
}
} else if (sqlContext.conf.enableVectorizedHashMap) {
logWarning("Two level hashmap is disabled but vectorized hashmap is enabled.")
}
val bitMaxCapacity = sqlContext.conf.fastHashAggregateRowMaxCapacityBit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ case class InMemoryTableScanExec(
}
}

lazy val enableAccumulatorsForTest: Boolean =
sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean
lazy val enableAccumulatorsForTest: Boolean = sqlContext.conf.inMemoryTableScanStatisticsEnabled

// Accumulators used for testing purposes
lazy val readPartitions = sparkContext.longAccumulator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,34 @@ package org.apache.spark.sql
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf

class SingleLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter {
override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.codegen.fallback", "false")
.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
.set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "false")

// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "false",
assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "false",
"configuration parameter changed in test body")
}
}

class TwoLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter {
override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.codegen.fallback", "false")
.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
.set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "true")

// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "true",
assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "true",
"configuration parameter changed in test body")
}
}
Expand All @@ -56,18 +57,18 @@ class TwoLevelAggregateHashMapWithVectorizedMapSuite
with BeforeAndAfter {

override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.codegen.fallback", "false")
.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
.set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "true")
.set(SQLConf.ENABLE_VECTORIZED_HASH_MAP.key, "true")

// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "true",
assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "true",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.vectorized.enable") == "true",
assert(sparkConf.get(SQLConf.ENABLE_VECTORIZED_HASH_MAP.key) == "true",
"configuration parameter changed in test body")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1672,7 +1672,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("reuse exchange") {
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2") {
val df = spark.range(100).toDF()
val join = df.join(df, "id")
val plan = join.queryExecution.executedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo

test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes effect") {
Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "250") {
withSQLConf(SQLConf.FILE_COMRESSION_FACTOR.key -> compressionFactor.toString,
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "250") {
withTempPath { workDir =>
// the file size is 486 bytes
val workDirPath = workDir.getAbsolutePath
Expand Down
8 changes: 4 additions & 4 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
test("join operator selection") {
spark.sharedState.cacheManager.clearCache()

withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0",
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
Expand Down Expand Up @@ -651,7 +651,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {

test("test SortMergeJoin (without spill)") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
"spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> Int.MaxValue.toString) {
SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> Int.MaxValue.toString) {

assertNotSpilled(sparkContext, "inner join") {
checkAnswer(
Expand Down Expand Up @@ -708,8 +708,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {

test("test SortMergeJoin (with spill)") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
"spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0",
"spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") {
SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0",
SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") {

assertSpilled(sparkContext, "inner join") {
checkAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config
import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD

class RuntimeConfigSuite extends SparkFunSuite {

Expand Down Expand Up @@ -60,8 +62,8 @@ class RuntimeConfigSuite extends SparkFunSuite {
val conf = newConf()

// SQL configs
assert(!conf.isModifiable("spark.sql.sources.schemaStringLengthThreshold"))
assert(conf.isModifiable("spark.sql.streaming.checkpointLocation"))
assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key))
assert(conf.isModifiable(CHECKPOINT_LOCATION.key))
// Core configs
assert(!conf.isModifiable(config.CPUS_PER_TASK.key))
assert(!conf.isModifiable("spark.executor.cores"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("Star Expansion - group by") {
withSQLConf("spark.sql.retainGroupColumns" -> "false") {
withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") {
checkAnswer(
testData2.groupBy($"a", $"b").agg($"*"),
sql("SELECT * FROM testData2 group by a, b"))
Expand Down Expand Up @@ -1936,7 +1936,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("Common subexpression elimination") {
// TODO: support subexpression elimination in whole stage codegen
withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
// select from a table to prevent constant folding.
val df = sql("SELECT a, b from testData2 limit 1")
checkAnswer(df, Row(1, 1))
Expand Down Expand Up @@ -1985,9 +1985,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1)

// Try disabling it via configuration.
spark.conf.set("spark.sql.subexpressionElimination.enabled", "false")
spark.conf.set(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false")
verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2)
spark.conf.set("spark.sql.subexpressionElimination.enabled", "true")
spark.conf.set(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "true")
verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, Metadata, StructType}
import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch, ColumnarMap, ColumnVector}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -152,7 +153,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
test("use custom class for extensions") {
val session = SparkSession.builder()
.master("local[1]")
.config("spark.sql.extensions", classOf[MyExtensions].getCanonicalName)
.config(SPARK_SESSION_EXTENSIONS.key, classOf[MyExtensions].getCanonicalName)
.getOrCreate()
try {
assert(session.sessionState.planner.strategies.contains(MySparkStrategy(session)))
Expand All @@ -173,7 +174,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
test("use multiple custom class for extensions in the specified order") {
val session = SparkSession.builder()
.master("local[1]")
.config("spark.sql.extensions", Seq(
.config(SPARK_SESSION_EXTENSIONS.key, Seq(
classOf[MyExtensions2].getCanonicalName,
classOf[MyExtensions].getCanonicalName).mkString(","))
.getOrCreate()
Expand Down Expand Up @@ -201,7 +202,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
test("allow an extension to be duplicated") {
val session = SparkSession.builder()
.master("local[1]")
.config("spark.sql.extensions", Seq(
.config(SPARK_SESSION_EXTENSIONS.key, Seq(
classOf[MyExtensions].getCanonicalName,
classOf[MyExtensions].getCanonicalName).mkString(","))
.getOrCreate()
Expand All @@ -228,7 +229,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
test("use the last registered function name when there are duplicates") {
val session = SparkSession.builder()
.master("local[1]")
.config("spark.sql.extensions", Seq(
.config(SPARK_SESSION_EXTENSIONS.key, Seq(
classOf[MyExtensions2].getCanonicalName,
classOf[MyExtensions2Duplicate].getCanonicalName).mkString(","))
.getOrCreate()
Expand Down
Loading