Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ private static long optimalNumOfBits(long n, double p) {

static final double DEFAULT_FPP = 0.03;

/**
* Computes m (total bits of Bloom filter) which is expected to achieve.
* The smaller the expectedNumItems, the smaller the fpp.
*/
public static long optimalNumOfBits(long expectedNumItems, long maxNumItems, long maxNumOfBits) {
double fpp = Math.min(expectedNumItems / (maxNumItems / DEFAULT_FPP), DEFAULT_FPP);
return Math.min(optimalNumOfBits(expectedNumItems, fpp), maxNumOfBits);
}

/**
* Creates a {@link BloomFilter} with the expected number of insertions and a default expected
* false positive probability of 3%.
Expand Down
8 changes: 4 additions & 4 deletions core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
OpenJDK 64-Bit Server VM 11.0.16.1+1 on Linux 5.15.0-1022-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1269 1276 8 0.0 1268666001.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2672 2695 39 0.0 2671542753.0 0.5X
Num Maps: 50000 Fetch partitions:1500 4034 4069 50 0.0 4033696987.0 0.3X
Num Maps: 50000 Fetch partitions:500 1227 1262 47 0.0 1226744907.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2620 2637 15 0.0 2620288061.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3975 3990 17 0.0 3974979610.0 0.3X


10 changes: 5 additions & 5 deletions core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
OpenJDK 64-Bit Server VM 17.0.4.1+1 on Linux 5.15.0-1022-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1228 1238 17 0.0 1228191051.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2380 2393 16 0.0 2379601524.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3803 3857 55 0.0 3802550172.0 0.3X
Num Maps: 50000 Fetch partitions:500 1159 1184 38 0.0 1159155979.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2329 2387 57 0.0 2328833805.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3608 3712 92 0.0 3607631972.0 0.3X


10 changes: 5 additions & 5 deletions core/benchmarks/MapStatusesConvertBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1022-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1179 1187 13 0.0 1178581948.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2237 2257 21 0.0 2236562602.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3448 3680 367 0.0 3447793753.0 0.3X
Num Maps: 50000 Fetch partitions:500 1099 1127 27 0.0 1099192398.0 1.0X
Num Maps: 50000 Fetch partitions:1000 1981 1999 16 0.0 1981390271.0 0.6X
Num Maps: 50000 Fetch partitions:1500 2973 3011 39 0.0 2973029597.0 0.4X


20 changes: 20 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@
"Unable to convert column <name> of type <type> to JSON."
]
},
"CANNOT_DROP_ALL_FIELDS" : {
"message" : [
"Cannot drop all fields in struct."
]
},
"CAST_WITHOUT_SUGGESTION" : {
"message" : [
"cannot cast <srcType> to <targetType>."
Expand All @@ -155,6 +160,21 @@
"To convert values from <srcType> to <targetType>, you can use the functions <functionNames> instead."
]
},
"CREATE_MAP_KEY_DIFF_TYPES" : {
"message" : [
"The given keys of function <functionName> should all be the same type, but they are <dataType>."
]
},
"CREATE_MAP_VALUE_DIFF_TYPES" : {
"message" : [
"The given values of function <functionName> should all be the same type, but they are <dataType>."
]
},
"CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING" : {
"message" : [
"Only foldable `STRING` expressions are allowed to appear at odd position, but they are <inputExprs>."
]
},
"DATA_DIFF_TYPES" : {
"message" : [
"Input to <functionName> should all be the same type, but it's <dataType>."
Expand Down
4 changes: 2 additions & 2 deletions dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
HikariCP/2.5.1//HikariCP-2.5.1.jar
JLargeArrays/1.5//JLargeArrays-1.5.jar
JTransforms/3.1//JTransforms-3.1.jar
RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
RoaringBitmap/0.9.35//RoaringBitmap-0.9.35.jar
ST4/4.0.4//ST4-4.0.4.jar
activation/1.1.1//activation-1.1.1.jar
aircompressor/0.21//aircompressor-0.21.jar
Expand Down Expand Up @@ -247,7 +247,7 @@ scala-library/2.12.17//scala-library-2.12.17.jar
scala-parser-combinators_2.12/1.1.2//scala-parser-combinators_2.12-1.1.2.jar
scala-reflect/2.12.17//scala-reflect-2.12.17.jar
scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar
shims/0.9.32//shims-0.9.32.jar
shims/0.9.35//shims-0.9.35.jar
slf4j-api/2.0.3//slf4j-api-2.0.3.jar
snakeyaml/1.31//snakeyaml-1.31.jar
snappy-java/1.1.8.4//snappy-java-1.1.8.4.jar
Expand Down
4 changes: 2 additions & 2 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
HikariCP/2.5.1//HikariCP-2.5.1.jar
JLargeArrays/1.5//JLargeArrays-1.5.jar
JTransforms/3.1//JTransforms-3.1.jar
RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
RoaringBitmap/0.9.35//RoaringBitmap-0.9.35.jar
ST4/4.0.4//ST4-4.0.4.jar
activation/1.1.1//activation-1.1.1.jar
aircompressor/0.21//aircompressor-0.21.jar
Expand Down Expand Up @@ -234,7 +234,7 @@ scala-library/2.12.17//scala-library-2.12.17.jar
scala-parser-combinators_2.12/1.1.2//scala-parser-combinators_2.12-1.1.2.jar
scala-reflect/2.12.17//scala-reflect-2.12.17.jar
scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar
shims/0.9.32//shims-0.9.32.jar
shims/0.9.35//shims-0.9.35.jar
slf4j-api/2.0.3//slf4j-api-2.0.3.jar
snakeyaml/1.31//snakeyaml-1.31.jar
snappy-java/1.1.8.4//snappy-java-1.1.8.4.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.9.32</version>
<version>0.9.35</version>
</dependency>

<!-- Netty Begin -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType, toSQLValue}
import org.apache.spark.sql.catalyst.trees.TernaryLike
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{RUNTIME_BLOOM_FILTER_MAX_NUM_BITS, RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS}
import org.apache.spark.sql.types._
import org.apache.spark.util.sketch.BloomFilter

Expand Down Expand Up @@ -56,6 +57,13 @@ case class BloomFilterAggregate(
Multiply(estimatedNumItemsExpression, Literal(8L)))
}

def this(child: Expression, estimatedNumItems: Long) = {
this(child, Literal(estimatedNumItems),
Literal(BloomFilter.optimalNumOfBits(estimatedNumItems,
SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS),
SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS))))
}

def this(child: Expression) = {
this(child, Literal(SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_EXPECTED_NUM_ITEMS)),
Literal(SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_NUM_BITS)))
Expand Down Expand Up @@ -109,8 +117,8 @@ case class BloomFilterAggregate(
)
} else {
require(estimatedNumItems <=
SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS))
require(numBits <= SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS))
SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS))
require(numBits <= SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS))
TypeCheckSuccess
}
case _ =>
Expand All @@ -135,12 +143,12 @@ case class BloomFilterAggregate(
// Mark as lazy so that `estimatedNumItems` is not evaluated during tree transformation.
private lazy val estimatedNumItems: Long =
Math.min(estimatedNumItemsExpression.eval().asInstanceOf[Number].longValue,
SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS))
SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS))

// Mark as lazy so that `numBits` is not evaluated during tree transformation.
private lazy val numBits: Long =
Math.min(numBitsExpression.eval().asInstanceOf[Number].longValue,
SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS))
SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS))

override def first: Expression = child

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCheckResult, TypeCoercion, UnresolvedAttribute, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FUNC_ALIAS, FunctionBuilder}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.Cast._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
Expand Down Expand Up @@ -202,16 +204,30 @@ case class CreateMap(children: Seq[Expression], useStringTypeWhenEmpty: Boolean)

override def checkInputDataTypes(): TypeCheckResult = {
if (children.size % 2 != 0) {
TypeCheckResult.TypeCheckFailure(
s"$prettyName expects a positive even number of arguments.")
DataTypeMismatch(
errorSubClass = "WRONG_NUM_ARGS",
messageParameters = Map(
"functionName" -> toSQLId(prettyName),
"expectedNum" -> "2n (n > 0)",
"actualNum" -> children.length.toString
)
)
} else if (!TypeCoercion.haveSameType(keys.map(_.dataType))) {
TypeCheckResult.TypeCheckFailure(
"The given keys of function map should all be the same type, but they are " +
keys.map(_.dataType.catalogString).mkString("[", ", ", "]"))
DataTypeMismatch(
errorSubClass = "CREATE_MAP_KEY_DIFF_TYPES",
messageParameters = Map(
"functionName" -> toSQLId(prettyName),
"dataType" -> keys.map(key => toSQLType(key.dataType)).mkString("[", ", ", "]")
)
)
} else if (!TypeCoercion.haveSameType(values.map(_.dataType))) {
TypeCheckResult.TypeCheckFailure(
"The given values of function map should all be the same type, but they are " +
values.map(_.dataType.catalogString).mkString("[", ", ", "]"))
DataTypeMismatch(
errorSubClass = "CREATE_MAP_VALUE_DIFF_TYPES",
messageParameters = Map(
"functionName" -> toSQLId(prettyName),
"dataType" -> values.map(value => toSQLType(value.dataType)).mkString("[", ", ", "]")
)
)
} else {
TypeUtils.checkForMapKeyType(dataType.keyType)
}
Expand Down Expand Up @@ -444,17 +460,32 @@ case class CreateNamedStruct(children: Seq[Expression]) extends Expression with

override def checkInputDataTypes(): TypeCheckResult = {
if (children.size % 2 != 0) {
TypeCheckResult.TypeCheckFailure(s"$prettyName expects an even number of arguments.")
DataTypeMismatch(
errorSubClass = "WRONG_NUM_ARGS",
messageParameters = Map(
"functionName" -> toSQLId(prettyName),
"expectedNum" -> "2n (n > 0)",
"actualNum" -> children.length.toString
)
)
} else {
val invalidNames = nameExprs.filterNot(e => e.foldable && e.dataType == StringType)
if (invalidNames.nonEmpty) {
TypeCheckResult.TypeCheckFailure(
s"Only foldable ${StringType.catalogString} expressions are allowed to appear at odd" +
s" position, got: ${invalidNames.mkString(",")}")
DataTypeMismatch(
errorSubClass = "CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING",
messageParameters = Map(
"inputExprs" -> invalidNames.map(toSQLExpr(_)).mkString("[", ", ", "]")
)
)
} else if (!names.contains(null)) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure("Field name should not be null")
DataTypeMismatch(
errorSubClass = "UNEXPECTED_NULL",
messageParameters = Map(
"exprName" -> nameExprs.map(toSQLExpr).mkString("[", ", ", "]")
)
)
}
}
}
Expand Down Expand Up @@ -668,10 +699,19 @@ case class UpdateFields(structExpr: Expression, fieldOps: Seq[StructFieldsOperat
override def checkInputDataTypes(): TypeCheckResult = {
val dataType = structExpr.dataType
if (!dataType.isInstanceOf[StructType]) {
TypeCheckResult.TypeCheckFailure("struct argument should be struct type, got: " +
dataType.catalogString)
DataTypeMismatch(
errorSubClass = "UNEXPECTED_INPUT_TYPE",
messageParameters = Map(
"paramIndex" -> "1",
"requiredType" -> toSQLType(StructType),
"inputSql" -> toSQLExpr(structExpr),
"inputType" -> toSQLType(structExpr.dataType))
)
} else if (newExprs.isEmpty) {
TypeCheckResult.TypeCheckFailure("cannot drop all fields in struct")
DataTypeMismatch(
errorSubClass = "CANNOT_DROP_ALL_FIELDS",
messageParameters = Map.empty
)
} else {
TypeCheckResult.TypeCheckSuccess
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
val rowCount = filterCreationSidePlan.stats.rowCount
val bloomFilterAgg =
if (rowCount.isDefined && rowCount.get.longValue > 0L) {
new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),
Literal(rowCount.get.longValue))
new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)), rowCount.get.longValue)
} else {
new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2007,6 +2007,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STREAMING_METADATA_CACHE_ENABLED =
buildConf("spark.sql.streaming.metadataCache.enabled")
.internal()
.doc("Whether the streaming HDFSMetadataLog caches the metadata of the latest two batches.")
.booleanConf
.createWithDefault(true)


val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
Expand Down
Loading