diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index 2a6e270a91267..5c01841e5015a 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -207,6 +207,15 @@ private static long optimalNumOfBits(long n, double p) { static final double DEFAULT_FPP = 0.03; + /** + * Computes m (total bits of Bloom filter) which is expected to achieve. + * The smaller the expectedNumItems, the smaller the fpp. + */ + public static long optimalNumOfBits(long expectedNumItems, long maxNumItems, long maxNumOfBits) { + double fpp = Math.min(expectedNumItems / (maxNumItems / DEFAULT_FPP), DEFAULT_FPP); + return Math.min(optimalNumOfBits(expectedNumItems, fpp), maxNumOfBits); + } + /** * Creates a {@link BloomFilter} with the expected number of insertions and a default expected * false positive probability of 3%. diff --git a/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt index adac80834e44e..06f7cc7c92ce0 100644 --- a/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt +++ b/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt @@ -2,12 +2,12 @@ MapStatuses Convert Benchmark ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure +OpenJDK 64-Bit Server VM 11.0.16.1+1 on Linux 5.15.0-1022-azure Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Num Maps: 50000 Fetch partitions:500 1269 1276 8 0.0 1268666001.0 1.0X -Num Maps: 50000 Fetch partitions:1000 2672 2695 39 0.0 2671542753.0 0.5X -Num Maps: 50000 Fetch partitions:1500 4034 4069 50 0.0 4033696987.0 0.3X +Num Maps: 50000 Fetch partitions:500 1227 1262 47 0.0 1226744907.0 1.0X +Num Maps: 50000 Fetch partitions:1000 2620 2637 15 0.0 2620288061.0 0.5X +Num Maps: 50000 Fetch partitions:1500 3975 3990 17 0.0 3974979610.0 0.3X diff --git a/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt b/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt index 9911ae3326f05..3b6f5c6695eac 100644 --- a/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt +++ b/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt @@ -2,12 +2,12 @@ MapStatuses Convert Benchmark ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure -Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz +OpenJDK 64-Bit Server VM 17.0.4.1+1 on Linux 5.15.0-1022-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Num Maps: 50000 Fetch partitions:500 1228 1238 17 0.0 1228191051.0 1.0X -Num Maps: 50000 Fetch partitions:1000 2380 2393 16 0.0 2379601524.0 0.5X -Num Maps: 50000 Fetch partitions:1500 3803 3857 55 0.0 3802550172.0 0.3X +Num Maps: 50000 Fetch partitions:500 1159 1184 38 0.0 1159155979.0 1.0X +Num Maps: 50000 Fetch partitions:1000 2329 2387 57 0.0 2328833805.0 0.5X +Num Maps: 50000 Fetch partitions:1500 3608 3712 92 0.0 3607631972.0 0.3X diff --git a/core/benchmarks/MapStatusesConvertBenchmark-results.txt b/core/benchmarks/MapStatusesConvertBenchmark-results.txt index e52d7ed2af843..bfbc69b6341fd 100644 --- a/core/benchmarks/MapStatusesConvertBenchmark-results.txt +++ b/core/benchmarks/MapStatusesConvertBenchmark-results.txt @@ -2,12 +2,12 @@ MapStatuses Convert Benchmark ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure -Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz +OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1022-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Num Maps: 50000 Fetch partitions:500 1179 1187 13 0.0 1178581948.0 1.0X -Num Maps: 50000 Fetch partitions:1000 2237 2257 21 0.0 2236562602.0 0.5X -Num Maps: 50000 Fetch partitions:1500 3448 3680 367 0.0 3447793753.0 0.3X +Num Maps: 50000 Fetch partitions:500 1099 1127 27 0.0 1099192398.0 1.0X +Num Maps: 50000 Fetch partitions:1000 1981 1999 16 0.0 1981390271.0 0.6X +Num Maps: 50000 Fetch partitions:1500 2973 3011 39 0.0 2973029597.0 0.4X diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index fe2cd3a44bb1a..7ec5e11a2060d 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -138,6 +138,11 @@ "Unable to convert column of type to JSON." ] }, + "CANNOT_DROP_ALL_FIELDS" : { + "message" : [ + "Cannot drop all fields in struct." + ] + }, "CAST_WITHOUT_SUGGESTION" : { "message" : [ "cannot cast to ." @@ -155,6 +160,21 @@ "To convert values from to , you can use the functions instead." ] }, + "CREATE_MAP_KEY_DIFF_TYPES" : { + "message" : [ + "The given keys of function should all be the same type, but they are ." + ] + }, + "CREATE_MAP_VALUE_DIFF_TYPES" : { + "message" : [ + "The given values of function should all be the same type, but they are ." + ] + }, + "CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING" : { + "message" : [ + "Only foldable `STRING` expressions are allowed to appear at odd position, but they are ." + ] + }, "DATA_DIFF_TYPES" : { "message" : [ "Input to should all be the same type, but it's ." diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index d4106c9045dcb..c5229ae68f83f 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -1,7 +1,7 @@ HikariCP/2.5.1//HikariCP-2.5.1.jar JLargeArrays/1.5//JLargeArrays-1.5.jar JTransforms/3.1//JTransforms-3.1.jar -RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar +RoaringBitmap/0.9.35//RoaringBitmap-0.9.35.jar ST4/4.0.4//ST4-4.0.4.jar activation/1.1.1//activation-1.1.1.jar aircompressor/0.21//aircompressor-0.21.jar @@ -247,7 +247,7 @@ scala-library/2.12.17//scala-library-2.12.17.jar scala-parser-combinators_2.12/1.1.2//scala-parser-combinators_2.12-1.1.2.jar scala-reflect/2.12.17//scala-reflect-2.12.17.jar scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar -shims/0.9.32//shims-0.9.32.jar +shims/0.9.35//shims-0.9.35.jar slf4j-api/2.0.3//slf4j-api-2.0.3.jar snakeyaml/1.31//snakeyaml-1.31.jar snappy-java/1.1.8.4//snappy-java-1.1.8.4.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 2d10ad96104e2..4c74838a494d0 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -1,7 +1,7 @@ HikariCP/2.5.1//HikariCP-2.5.1.jar JLargeArrays/1.5//JLargeArrays-1.5.jar JTransforms/3.1//JTransforms-3.1.jar -RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar +RoaringBitmap/0.9.35//RoaringBitmap-0.9.35.jar ST4/4.0.4//ST4-4.0.4.jar activation/1.1.1//activation-1.1.1.jar aircompressor/0.21//aircompressor-0.21.jar @@ -234,7 +234,7 @@ scala-library/2.12.17//scala-library-2.12.17.jar scala-parser-combinators_2.12/1.1.2//scala-parser-combinators_2.12-1.1.2.jar scala-reflect/2.12.17//scala-reflect-2.12.17.jar scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar -shims/0.9.32//shims-0.9.32.jar +shims/0.9.35//shims-0.9.35.jar slf4j-api/2.0.3//slf4j-api-2.0.3.jar snakeyaml/1.31//snakeyaml-1.31.jar snappy-java/1.1.8.4//snappy-java-1.1.8.4.jar diff --git a/pom.xml b/pom.xml index 41a197f2031b5..c973c802d05b2 100644 --- a/pom.xml +++ b/pom.xml @@ -830,7 +830,7 @@ org.roaringbitmap RoaringBitmap - 0.9.32 + 0.9.35 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala index 5b78c5b52286c..980785e764cdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType, toSQLValue} import org.apache.spark.sql.catalyst.trees.TernaryLike import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{RUNTIME_BLOOM_FILTER_MAX_NUM_BITS, RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS} import org.apache.spark.sql.types._ import org.apache.spark.util.sketch.BloomFilter @@ -56,6 +57,13 @@ case class BloomFilterAggregate( Multiply(estimatedNumItemsExpression, Literal(8L))) } + def this(child: Expression, estimatedNumItems: Long) = { + this(child, Literal(estimatedNumItems), + Literal(BloomFilter.optimalNumOfBits(estimatedNumItems, + SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS), + SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)))) + } + def this(child: Expression) = { this(child, Literal(SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_EXPECTED_NUM_ITEMS)), Literal(SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_NUM_BITS))) @@ -109,8 +117,8 @@ case class BloomFilterAggregate( ) } else { require(estimatedNumItems <= - SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS)) - require(numBits <= SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)) + SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS)) + require(numBits <= SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)) TypeCheckSuccess } case _ => @@ -135,12 +143,12 @@ case class BloomFilterAggregate( // Mark as lazy so that `estimatedNumItems` is not evaluated during tree transformation. private lazy val estimatedNumItems: Long = Math.min(estimatedNumItemsExpression.eval().asInstanceOf[Number].longValue, - SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS)) + SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS)) // Mark as lazy so that `numBits` is not evaluated during tree transformation. private lazy val numBits: Long = Math.min(numBitsExpression.eval().asInstanceOf[Number].longValue, - SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)) + SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)) override def first: Expression = child diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala index 27d4f506ac864..97c882fd176be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCheckResult, TypeCoercion, UnresolvedAttribute, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FUNC_ALIAS, FunctionBuilder} +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.expressions.Cast._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -202,16 +204,30 @@ case class CreateMap(children: Seq[Expression], useStringTypeWhenEmpty: Boolean) override def checkInputDataTypes(): TypeCheckResult = { if (children.size % 2 != 0) { - TypeCheckResult.TypeCheckFailure( - s"$prettyName expects a positive even number of arguments.") + DataTypeMismatch( + errorSubClass = "WRONG_NUM_ARGS", + messageParameters = Map( + "functionName" -> toSQLId(prettyName), + "expectedNum" -> "2n (n > 0)", + "actualNum" -> children.length.toString + ) + ) } else if (!TypeCoercion.haveSameType(keys.map(_.dataType))) { - TypeCheckResult.TypeCheckFailure( - "The given keys of function map should all be the same type, but they are " + - keys.map(_.dataType.catalogString).mkString("[", ", ", "]")) + DataTypeMismatch( + errorSubClass = "CREATE_MAP_KEY_DIFF_TYPES", + messageParameters = Map( + "functionName" -> toSQLId(prettyName), + "dataType" -> keys.map(key => toSQLType(key.dataType)).mkString("[", ", ", "]") + ) + ) } else if (!TypeCoercion.haveSameType(values.map(_.dataType))) { - TypeCheckResult.TypeCheckFailure( - "The given values of function map should all be the same type, but they are " + - values.map(_.dataType.catalogString).mkString("[", ", ", "]")) + DataTypeMismatch( + errorSubClass = "CREATE_MAP_VALUE_DIFF_TYPES", + messageParameters = Map( + "functionName" -> toSQLId(prettyName), + "dataType" -> values.map(value => toSQLType(value.dataType)).mkString("[", ", ", "]") + ) + ) } else { TypeUtils.checkForMapKeyType(dataType.keyType) } @@ -444,17 +460,32 @@ case class CreateNamedStruct(children: Seq[Expression]) extends Expression with override def checkInputDataTypes(): TypeCheckResult = { if (children.size % 2 != 0) { - TypeCheckResult.TypeCheckFailure(s"$prettyName expects an even number of arguments.") + DataTypeMismatch( + errorSubClass = "WRONG_NUM_ARGS", + messageParameters = Map( + "functionName" -> toSQLId(prettyName), + "expectedNum" -> "2n (n > 0)", + "actualNum" -> children.length.toString + ) + ) } else { val invalidNames = nameExprs.filterNot(e => e.foldable && e.dataType == StringType) if (invalidNames.nonEmpty) { - TypeCheckResult.TypeCheckFailure( - s"Only foldable ${StringType.catalogString} expressions are allowed to appear at odd" + - s" position, got: ${invalidNames.mkString(",")}") + DataTypeMismatch( + errorSubClass = "CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING", + messageParameters = Map( + "inputExprs" -> invalidNames.map(toSQLExpr(_)).mkString("[", ", ", "]") + ) + ) } else if (!names.contains(null)) { TypeCheckResult.TypeCheckSuccess } else { - TypeCheckResult.TypeCheckFailure("Field name should not be null") + DataTypeMismatch( + errorSubClass = "UNEXPECTED_NULL", + messageParameters = Map( + "exprName" -> nameExprs.map(toSQLExpr).mkString("[", ", ", "]") + ) + ) } } } @@ -668,10 +699,19 @@ case class UpdateFields(structExpr: Expression, fieldOps: Seq[StructFieldsOperat override def checkInputDataTypes(): TypeCheckResult = { val dataType = structExpr.dataType if (!dataType.isInstanceOf[StructType]) { - TypeCheckResult.TypeCheckFailure("struct argument should be struct type, got: " + - dataType.catalogString) + DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( + "paramIndex" -> "1", + "requiredType" -> toSQLType(StructType), + "inputSql" -> toSQLExpr(structExpr), + "inputType" -> toSQLType(structExpr.dataType)) + ) } else if (newExprs.isEmpty) { - TypeCheckResult.TypeCheckFailure("cannot drop all fields in struct") + DataTypeMismatch( + errorSubClass = "CANNOT_DROP_ALL_FIELDS", + messageParameters = Map.empty + ) } else { TypeCheckResult.TypeCheckSuccess } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 8c63012c6814d..62782f6051b8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -77,8 +77,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J val rowCount = filterCreationSidePlan.stats.rowCount val bloomFilterAgg = if (rowCount.isDefined && rowCount.get.longValue > 0L) { - new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)), - Literal(rowCount.get.longValue)) + new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)), rowCount.get.longValue) } else { new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp))) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index abe9df8dd87de..0f3dc3cf44c4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2007,6 +2007,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_METADATA_CACHE_ENABLED = + buildConf("spark.sql.streaming.metadataCache.enabled") + .internal() + .doc("Whether the streaming HDFSMetadataLog caches the metadata of the latest two batches.") + .booleanConf + .createWithDefault(true) + + val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala index 83139ab719fe0..eb2ebce3a5f1f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala @@ -40,6 +40,10 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer $"arrayField".array(StringType), Symbol("mapField").map(StringType, LongType)) + private def analysisException(expr: Expression): AnalysisException = { + intercept[AnalysisException](assertSuccess(expr)) + } + def assertError(expr: Expression, errorMessage: String): Unit = { val e = intercept[AnalysisException] { assertSuccess(expr) @@ -522,29 +526,68 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer } test("check types for CreateNamedStruct") { - assertError( - CreateNamedStruct(Seq("a", "b", 2.0)), "even number of arguments") - assertError( - CreateNamedStruct(Seq(1, "a", "b", 2.0)), - "Only foldable string expressions are allowed to appear at odd position") - assertError( - CreateNamedStruct(Seq($"a".string.at(0), "a", "b", 2.0)), - "Only foldable string expressions are allowed to appear at odd position") - assertError( - CreateNamedStruct(Seq(Literal.create(null, StringType), "a")), - "Field name should not be null") + checkError( + exception = analysisException(CreateNamedStruct(Seq("a", "b", 2.0))), + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", + parameters = Map( + "sqlExpr" -> "\"named_struct(a, b, 2.0)\"", + "functionName" -> "`named_struct`", + "expectedNum" -> "2n (n > 0)", + "actualNum" -> "3") + ) + checkError( + exception = analysisException(CreateNamedStruct(Seq(1, "a", "b", 2.0))), + errorClass = "DATATYPE_MISMATCH.CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING", + parameters = Map( + "sqlExpr" -> "\"named_struct(1, a, b, 2.0)\"", + "inputExprs" -> "[\"1\"]") + ) + checkError( + exception = analysisException(CreateNamedStruct(Seq($"a".string.at(0), "a", "b", 2.0))), + errorClass = "DATATYPE_MISMATCH.CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING", + parameters = Map( + "sqlExpr" -> "\"named_struct(boundreference(), a, b, 2.0)\"", + "inputExprs" -> "[\"boundreference()\"]") + ) + checkError( + exception = analysisException(CreateNamedStruct(Seq(Literal.create(null, StringType), "a"))), + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_NULL", + parameters = Map( + "sqlExpr" -> "\"named_struct(NULL, a)\"", + "exprName" -> "[\"NULL\"]") + ) } test("check types for CreateMap") { - assertError(CreateMap(Seq("a", "b", 2.0)), "even number of arguments") - assertError( - CreateMap(Seq($"intField", $"stringField", - $"booleanField", $"stringField")), - "keys of function map should all be the same type") - assertError( - CreateMap(Seq($"stringField", $"intField", - $"stringField", $"booleanField")), - "values of function map should all be the same type") + checkError( + exception = analysisException(CreateMap(Seq("a", "b", 2.0))), + errorClass = "DATATYPE_MISMATCH.WRONG_NUM_ARGS", + parameters = Map( + "sqlExpr" -> "\"map(a, b, 2.0)\"", + "functionName" -> "`map`", + "expectedNum" -> "2n (n > 0)", + "actualNum" -> "3") + ) + checkError( + exception = analysisException(CreateMap(Seq(Literal(1), + Literal("a"), Literal(true), Literal("b")))), + errorClass = "DATATYPE_MISMATCH.CREATE_MAP_KEY_DIFF_TYPES", + parameters = Map( + "sqlExpr" -> "\"map(1, a, true, b)\"", + "functionName" -> "`map`", + "dataType" -> "[\"INT\", \"BOOLEAN\"]" + ) + ) + checkError( + exception = analysisException(CreateMap(Seq(Literal("a"), + Literal(1), Literal("b"), Literal(true)))), + errorClass = "DATATYPE_MISMATCH.CREATE_MAP_VALUE_DIFF_TYPES", + parameters = Map( + "sqlExpr" -> "\"map(a, 1, b, true)\"", + "functionName" -> "`map`", + "dataType" -> "[\"INT\", \"BOOLEAN\"]" + ) + ) } test("check types for ROUND/BROUND") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala index fb6a23e3d776c..f1f781b7137b4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.util._ @@ -314,6 +315,40 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { assert(errorSubClass == "INVALID_MAP_KEY_TYPE") assert(messageParameters === Map("keyType" -> "\"MAP\"")) } + + // expects a positive even number of arguments + val map3 = CreateMap(Seq(Literal(1), Literal(2), Literal(3))) + assert(map3.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "WRONG_NUM_ARGS", + messageParameters = Map( + "functionName" -> "`map`", + "expectedNum" -> "2n (n > 0)", + "actualNum" -> "3") + ) + ) + + // The given keys of function map should all be the same type + val map4 = CreateMap(Seq(Literal(1), Literal(2), Literal('a'), Literal(3))) + assert(map4.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "CREATE_MAP_KEY_DIFF_TYPES", + messageParameters = Map( + "functionName" -> "`map`", + "dataType" -> "[\"INT\", \"STRING\"]") + ) + ) + + // The given values of function map should all be the same type + val map5 = CreateMap(Seq(Literal(1), Literal(2), Literal(3), Literal('a'))) + assert(map5.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "CREATE_MAP_VALUE_DIFF_TYPES", + messageParameters = Map( + "functionName" -> "`map`", + "dataType" -> "[\"INT\", \"STRING\"]") + ) + ) } test("MapFromArrays") { @@ -397,6 +432,18 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { create_row(UTF8String.fromString("x"), 2.0)) checkEvaluation(CreateNamedStruct(Seq("a", Literal.create(null, IntegerType))), create_row(null)) + + // expects a positive even number of arguments + val namedStruct1 = CreateNamedStruct(Seq(Literal(1), Literal(2), Literal(3))) + assert(namedStruct1.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "WRONG_NUM_ARGS", + messageParameters = Map( + "functionName" -> "`named_struct`", + "expectedNum" -> "2n (n > 0)", + "actualNum" -> "3") + ) + ) } test("test dsl for complex type") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 554f6a34b17e8..3c9f3e58cec63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -940,7 +940,7 @@ class Column(val expr: Expression) extends Logging { * * val df = sql("SELECT named_struct('a', 1, 'b', 2) struct_col") * df.select($"struct_col".dropFields("a", "b")) - * // result: org.apache.spark.sql.AnalysisException: cannot resolve 'update_fields(update_fields(`struct_col`))' due to data type mismatch: cannot drop all fields in struct + * // result: org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.CANNOT_DROP_ALL_FIELDS] Cannot resolve "update_fields(struct_col, dropfield(), dropfield())" due to data type mismatch: Cannot drop all fields in struct.; * * val df = sql("SELECT CAST(NULL AS struct) struct_col") * df.select($"struct_col".dropFields("b")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 8a037b55168b8..1d44465554852 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.streaming import java.io._ import java.nio.charset.StandardCharsets +import java.util.{Collections, LinkedHashMap => JLinkedHashMap} +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.commons.io.IOUtils @@ -30,6 +32,7 @@ import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf /** @@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: fileManager.mkdirs(metadataPath) } + protected val metadataCacheEnabled: Boolean + = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED) + + /** + * Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches + * when committing offsets, this cache will save some file system operations. + */ + protected[sql] val batchCache = Collections.synchronizedMap(new JLinkedHashMap[Long, T](2) { + override def removeEldestEntry(e: java.util.Map.Entry[Long, T]): Boolean = size > 2 + }) + /** * A `PathFilter` to filter only batch files */ @@ -113,10 +127,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: */ override def add(batchId: Long, metadata: T): Boolean = { require(metadata != null, "'null' metadata cannot written to a metadata log") - addNewBatchByStream(batchId) { output => serialize(metadata, output) } + val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) } + if (metadataCacheEnabled && res) batchCache.put(batchId, metadata) + res } override def get(batchId: Long): Option[T] = { + if (metadataCacheEnabled && batchCache.containsKey(batchId)) { + val metadata = batchCache.get(batchId) + assert(metadata != null) + return Some(metadata) + } + try { applyFnToBatchByStream(batchId) { input => Some(deserialize(input)) } } catch { @@ -135,9 +157,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * NOTE: This no longer fails early on corruption. The caller should handle the exception * properly and make sure the logic is not affected by failing in the middle. */ - def applyFnToBatchByStream[RET](batchId: Long)(fn: InputStream => RET): RET = { + def applyFnToBatchByStream[RET]( + batchId: Long, skipExistingCheck: Boolean = false)(fn: InputStream => RET): RET = { val batchMetadataFile = batchIdToPath(batchId) - if (fileManager.exists(batchMetadataFile)) { + if (skipExistingCheck || fileManager.exists(batchMetadataFile)) { val input = fileManager.open(batchMetadataFile) try { fn(input) @@ -168,7 +191,13 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * valid behavior, we still need to prevent it from destroying the files. */ def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { - get(batchId).map(_ => false).getOrElse { + + val batchMetadataFile = batchIdToPath(batchId) + + if ((metadataCacheEnabled && batchCache.containsKey(batchId)) + || fileManager.exists(batchMetadataFile)) { + false + } else { // Only write metadata when the batch has not yet been written val output = fileManager.createAtomic(batchIdToPath(batchId), overwriteIfPossible = false) try { @@ -188,42 +217,32 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } + private def getExistingBatch(batchId: Long): T = { + val metadata = batchCache.get(batchId) + if (metadata == null) { + applyFnToBatchByStream(batchId, skipExistingCheck = true) { input => deserialize(input) } + } else { + metadata + } + } + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = { assert(startId.isEmpty || endId.isEmpty || startId.get <= endId.get) - val files = fileManager.list(metadataPath, batchFilesFilter) - val batchIds = files - .map(f => pathToBatchId(f.getPath)) - .filter { batchId => - (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) + val batchIds = listBatches.filter { batchId => + (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) }.sorted HDFSMetadataLog.verifyBatchIds(batchIds, startId, endId) - - batchIds.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { - case (batchId, metadataOption) => - (batchId, metadataOption.get) - } + batchIds.map(batchId => (batchId, getExistingBatch(batchId))) } - /** - * Return the latest batch Id without reading the file. This method only checks for existence of - * file to avoid cost on reading and deserializing log file. - */ - def getLatestBatchId(): Option[Long] = { - fileManager.list(metadataPath, batchFilesFilter) - .map(f => pathToBatchId(f.getPath)) - .sorted(Ordering.Long.reverse) - .headOption - } + /** Return the latest batch id without reading the file. */ + def getLatestBatchId(): Option[Long] = listBatches.sorted.lastOption override def getLatest(): Option[(Long, T)] = { - getLatestBatchId().map { batchId => - val content = get(batchId).getOrElse { - // If we find the last batch file, we must read that file, other than failing back to - // old batches. - throw new IllegalStateException(s"failed to read log file for batch $batchId") - } - (batchId, content) + listBatches.sorted.lastOption.map { batchId => + logInfo(s"Getting latest batch $batchId") + (batchId, getExistingBatch(batchId)) } } @@ -250,16 +269,15 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: possibleTargetBatchIds.foreach { batchId => val path = batchIdToPath(batchId) fileManager.delete(path) + if (metadataCacheEnabled) batchCache.remove(batchId) logTrace(s"Removed metadata log file: $path") } } else { // using list to retrieve all elements - val batchIds = fileManager.list(metadataPath, batchFilesFilter) - .map(f => pathToBatchId(f.getPath)) - - for (batchId <- batchIds if batchId < thresholdBatchId) { + for (batchId <- listBatches if batchId < thresholdBatchId) { val path = batchIdToPath(batchId) fileManager.delete(path) + if (metadataCacheEnabled) batchCache.remove(batchId) logTrace(s"Removed metadata log file: $path") } } @@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: for (batchId <- batchIds if batchId > thresholdBatchId) { val path = batchIdToPath(batchId) fileManager.delete(path) + if (metadataCacheEnabled) batchCache.remove(batchId) logTrace(s"Removed metadata log file: $path") } } + + /** + * List the available batches on file system. As a workaround for S3 inconsistent list, it also + * tries to take `batchCache` into consideration to infer a better answer. + */ + protected def listBatches: Array[Long] = { + val batchIds = fileManager.list(metadataPath, batchFilesFilter) + .map(f => pathToBatchId(f.getPath)) ++ + // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to + // elimiate the race condition. + batchCache.synchronized { + batchCache.keySet.asScala.toArray + } + logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", ")) + + if (batchIds.isEmpty) { + Array.empty + } else { + // Assume batch ids are continuous + (batchIds.min to batchIds.max).toArray + } + } + private[sql] def validateVersion(text: String, maxSupportedVersion: Int): Int = MetadataVersionUtil.validateVersion(text, maxSupportedVersion) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index 7f00717ea4df6..5646f61440e77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.streaming -import java.{util => ju} import java.io.{InputStream, OutputStream} import java.nio.charset.StandardCharsets._ @@ -47,23 +46,6 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} class OffsetSeqLog(sparkSession: SparkSession, path: String) extends HDFSMetadataLog[OffsetSeq](sparkSession, path) { - private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]() - - override def add(batchId: Long, metadata: OffsetSeq): Boolean = { - val added = super.add(batchId, metadata) - if (added) { - // cache metadata as it will be read again - cachedMetadata.put(batchId, metadata) - // we don't access metadata for (batchId - 2) batches; evict them - cachedMetadata.headMap(batchId - 2, true).clear() - } - added - } - - override def get(batchId: Long): Option[OffsetSeq] = { - Option(cachedMetadata.get(batchId)).orElse(super.get(batchId)) - } - override protected def deserialize(in: InputStream): OffsetSeq = { // called inside a try-finally where the underlying stream is closed in the caller def parseOffset(value: String): OffsetV2 = value match { diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q10.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q10.sf100/explain.txt index efd0db46b9f5a..8e472ce04796c 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q10.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q10.sf100/explain.txt @@ -304,7 +304,7 @@ Input [2]: [ca_address_sk#19, ca_county#20] (53) ObjectHashAggregate Input [1]: [ca_address_sk#19] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#19, 42), 2555, 20440, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#19, 42), 2555, 57765, 0, 0)] Aggregate Attributes [1]: [buf#39] Results [1]: [buf#40] @@ -315,9 +315,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=7] (55) ObjectHashAggregate Input [1]: [buf#40] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#19, 42), 2555, 20440, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#19, 42), 2555, 20440, 0, 0)#41] -Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#19, 42), 2555, 20440, 0, 0)#41 AS bloomFilter#42] +Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#19, 42), 2555, 57765, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#19, 42), 2555, 57765, 0, 0)#41] +Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#19, 42), 2555, 57765, 0, 0)#41 AS bloomFilter#42] Subquery:2 Hosting operator id = 6 Hosting Expression = ws_sold_date_sk#7 IN dynamicpruning#8 BroadcastExchange (60) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q10.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q10.sf100/simplified.txt index 9adf7d0719da1..4ac6e8e08a0ec 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q10.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q10.sf100/simplified.txt @@ -25,7 +25,7 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purcha WholeStageCodegen (1) Filter [c_customer_sk,c_current_addr_sk,c_current_cdemo_sk] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 2555, 20440, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 2555, 57765, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [ca_address_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q59.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q59.sf100/explain.txt index c021a12eca064..5ce802dabc256 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q59.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q59.sf100/explain.txt @@ -348,7 +348,7 @@ Input [2]: [d_month_seq#40, d_week_seq#41] (59) ObjectHashAggregate Input [1]: [d_week_seq#41] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 2680, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 8990, 0, 0)] Aggregate Attributes [1]: [buf#85] Results [1]: [buf#86] @@ -359,9 +359,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=10] (61) ObjectHashAggregate Input [1]: [buf#86] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 2680, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 2680, 0, 0)#87] -Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 2680, 0, 0)#87 AS bloomFilter#88] +Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 8990, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 8990, 0, 0)#87] +Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 8990, 0, 0)#87 AS bloomFilter#88] Subquery:2 Hosting operator id = 31 Hosting Expression = Subquery scalar-subquery#52, [id=#53] ObjectHashAggregate (68) @@ -394,7 +394,7 @@ Input [2]: [d_month_seq#68, d_week_seq#69] (66) ObjectHashAggregate Input [1]: [d_week_seq#69] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#69, 42), 335, 2680, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#69, 42), 335, 8990, 0, 0)] Aggregate Attributes [1]: [buf#89] Results [1]: [buf#90] @@ -405,8 +405,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=11] (68) ObjectHashAggregate Input [1]: [buf#90] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#69, 42), 335, 2680, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#69, 42), 335, 2680, 0, 0)#91] -Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#69, 42), 335, 2680, 0, 0)#91 AS bloomFilter#92] +Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#69, 42), 335, 8990, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#69, 42), 335, 8990, 0, 0)#91] +Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#69, 42), 335, 8990, 0, 0)#91 AS bloomFilter#92] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q59.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q59.sf100/simplified.txt index 9825affd38929..534396577ab9d 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q59.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q59.sf100/simplified.txt @@ -22,7 +22,7 @@ TakeOrderedAndProject [s_store_name1,s_store_id1,d_week_seq1,(sun_sales1 / sun_s WholeStageCodegen (1) Filter [d_date_sk,d_week_seq] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 2680, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 8990, 0, 0),bloomFilter,buf] Exchange #3 ObjectHashAggregate [d_week_seq] [buf,buf] WholeStageCodegen (1) @@ -72,7 +72,7 @@ TakeOrderedAndProject [s_store_name1,s_store_id1,d_week_seq1,(sun_sales1 / sun_s WholeStageCodegen (5) Filter [d_date_sk,d_week_seq] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 2680, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 8990, 0, 0),bloomFilter,buf] Exchange #9 ObjectHashAggregate [d_week_seq] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/explain.txt index 7163f1611883e..d3434fd6be977 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/explain.txt @@ -320,7 +320,7 @@ Input [2]: [ca_address_sk#18, ca_county#19] (56) ObjectHashAggregate Input [1]: [ca_address_sk#18] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 20440, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 57765, 0, 0)] Aggregate Attributes [1]: [buf#38] Results [1]: [buf#39] @@ -331,9 +331,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9] (58) ObjectHashAggregate Input [1]: [buf#39] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 20440, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 20440, 0, 0)#40] -Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 20440, 0, 0)#40 AS bloomFilter#41] +Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 57765, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 57765, 0, 0)#40] +Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 57765, 0, 0)#40 AS bloomFilter#41] Subquery:2 Hosting operator id = 6 Hosting Expression = ss_sold_date_sk#9 IN dynamicpruning#10 BroadcastExchange (63) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/simplified.txt index 7930a3a45163b..9528756e264b5 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10.sf100/simplified.txt @@ -32,7 +32,7 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purcha WholeStageCodegen (1) Filter [c_current_addr_sk,c_current_cdemo_sk] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 2555, 20440, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 2555, 57765, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [ca_address_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt index 79c4fa5c0a4f7..aadf72d0af219 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/explain.txt @@ -291,7 +291,7 @@ Input [2]: [ca_address_sk#20, ca_state#21] (50) ObjectHashAggregate Input [1]: [ca_address_sk#20] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 143688, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 333176, 0, 0)] Aggregate Attributes [1]: [buf#35] Results [1]: [buf#36] @@ -302,9 +302,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=8] (52) ObjectHashAggregate Input [1]: [buf#36] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 143688, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 143688, 0, 0)#37] -Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 143688, 0, 0)#37 AS bloomFilter#38] +Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 333176, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 333176, 0, 0)#37] +Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 333176, 0, 0)#37 AS bloomFilter#38] Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#11, [id=#12] ObjectHashAggregate (59) @@ -337,7 +337,7 @@ Input [2]: [cc_call_center_sk#22, cc_county#23] (57) ObjectHashAggregate Input [1]: [cc_call_center_sk#22] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(cc_call_center_sk#22, 42), 4, 32, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(cc_call_center_sk#22, 42), 4, 144, 0, 0)] Aggregate Attributes [1]: [buf#39] Results [1]: [buf#40] @@ -348,9 +348,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9] (59) ObjectHashAggregate Input [1]: [buf#40] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(cc_call_center_sk#22, 42), 4, 32, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(cc_call_center_sk#22, 42), 4, 32, 0, 0)#41] -Results [1]: [bloom_filter_agg(xxhash64(cc_call_center_sk#22, 42), 4, 32, 0, 0)#41 AS bloomFilter#42] +Functions [1]: [bloom_filter_agg(xxhash64(cc_call_center_sk#22, 42), 4, 144, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(cc_call_center_sk#22, 42), 4, 144, 0, 0)#41] +Results [1]: [bloom_filter_agg(xxhash64(cc_call_center_sk#22, 42), 4, 144, 0, 0)#41 AS bloomFilter#42] Subquery:3 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#13, [id=#14] ObjectHashAggregate (66) @@ -383,7 +383,7 @@ Input [2]: [d_date_sk#24, d_date#25] (64) ObjectHashAggregate Input [1]: [d_date_sk#24] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 584392, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 1141755, 0, 0)] Aggregate Attributes [1]: [buf#43] Results [1]: [buf#44] @@ -394,8 +394,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=10] (66) ObjectHashAggregate Input [1]: [buf#44] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 584392, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 584392, 0, 0)#45] -Results [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 584392, 0, 0)#45 AS bloomFilter#46] +Functions [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 1141755, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 1141755, 0, 0)#45] +Results [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 1141755, 0, 0)#45 AS bloomFilter#46] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt index 4978dd337359c..def1677f94401 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q16.sf100/simplified.txt @@ -26,7 +26,7 @@ WholeStageCodegen (12) Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit] Filter [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 17961, 143688, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 17961, 333176, 0, 0),bloomFilter,buf] Exchange #3 ObjectHashAggregate [ca_address_sk] [buf,buf] WholeStageCodegen (1) @@ -36,7 +36,7 @@ WholeStageCodegen (12) InputAdapter Scan parquet spark_catalog.default.customer_address [ca_address_sk,ca_state] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(cc_call_center_sk, 42), 4, 32, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(cc_call_center_sk, 42), 4, 144, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [cc_call_center_sk] [buf,buf] WholeStageCodegen (1) @@ -46,7 +46,7 @@ WholeStageCodegen (12) InputAdapter Scan parquet spark_catalog.default.call_center [cc_call_center_sk,cc_county] Subquery #3 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_date_sk, 42), 73049, 584392, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_date_sk, 42), 73049, 1141755, 0, 0),bloomFilter,buf] Exchange #5 ObjectHashAggregate [d_date_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/explain.txt index da1ba5bd46d51..3b189de7d34e5 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/explain.txt @@ -323,7 +323,7 @@ Input [2]: [d_week_seq#42, d_year#43] (56) ObjectHashAggregate Input [1]: [d_week_seq#42] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#42, 42), 362, 2896, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#42, 42), 362, 9656, 0, 0)] Aggregate Attributes [1]: [buf#85] Results [1]: [buf#86] @@ -334,9 +334,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9] (58) ObjectHashAggregate Input [1]: [buf#86] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#42, 42), 362, 2896, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#42, 42), 362, 2896, 0, 0)#87] -Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#42, 42), 362, 2896, 0, 0)#87 AS bloomFilter#88] +Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#42, 42), 362, 9656, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#42, 42), 362, 9656, 0, 0)#87] +Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#42, 42), 362, 9656, 0, 0)#87 AS bloomFilter#88] Subquery:2 Hosting operator id = 33 Hosting Expression = Subquery scalar-subquery#52, [id=#53] ObjectHashAggregate (65) @@ -369,7 +369,7 @@ Input [2]: [d_week_seq#68, d_year#69] (63) ObjectHashAggregate Input [1]: [d_week_seq#68] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#68, 42), 362, 2896, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#68, 42), 362, 9656, 0, 0)] Aggregate Attributes [1]: [buf#89] Results [1]: [buf#90] @@ -380,8 +380,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=10] (65) ObjectHashAggregate Input [1]: [buf#90] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#68, 42), 362, 2896, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#68, 42), 362, 2896, 0, 0)#91] -Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#68, 42), 362, 2896, 0, 0)#91 AS bloomFilter#92] +Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#68, 42), 362, 9656, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#68, 42), 362, 9656, 0, 0)#91] +Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#68, 42), 362, 9656, 0, 0)#91 AS bloomFilter#92] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/simplified.txt index e322eae978899..4fb858b42521a 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q2.sf100/simplified.txt @@ -31,7 +31,7 @@ WholeStageCodegen (13) WholeStageCodegen (3) Filter [d_date_sk,d_week_seq] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 362, 2896, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 362, 9656, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [d_week_seq] [buf,buf] WholeStageCodegen (1) @@ -80,7 +80,7 @@ WholeStageCodegen (13) WholeStageCodegen (8) Filter [d_date_sk,d_week_seq] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 362, 2896, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 362, 9656, 0, 0),bloomFilter,buf] Exchange #9 ObjectHashAggregate [d_week_seq] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q32.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q32.sf100/explain.txt index e91bca4b7c27d..74b51485aeaa6 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q32.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q32.sf100/explain.txt @@ -205,7 +205,7 @@ Input [2]: [i_item_sk#1, i_manufact_id#2] (34) ObjectHashAggregate Input [1]: [i_item_sk#1] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 1592, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 5556, 0, 0)] Aggregate Attributes [1]: [buf#24] Results [1]: [buf#25] @@ -216,9 +216,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=5] (36) ObjectHashAggregate Input [1]: [buf#25] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 1592, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 1592, 0, 0)#26] -Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 1592, 0, 0)#26 AS bloomFilter#27] +Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 5556, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 5556, 0, 0)#26] +Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 5556, 0, 0)#26 AS bloomFilter#27] Subquery:2 Hosting operator id = 6 Hosting Expression = cs_sold_date_sk#5 IN dynamicpruning#6 BroadcastExchange (41) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q32.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q32.sf100/simplified.txt index 48c9060295493..084b50e2c0ead 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q32.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q32.sf100/simplified.txt @@ -31,7 +31,7 @@ WholeStageCodegen (7) BroadcastHashJoin [cs_sold_date_sk,d_date_sk] Filter [cs_item_sk] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 199, 1592, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 199, 5556, 0, 0),bloomFilter,buf] Exchange #6 ObjectHashAggregate [i_item_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q40.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q40.sf100/explain.txt index 9387f21d9c886..55ba768476e40 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q40.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q40.sf100/explain.txt @@ -221,7 +221,7 @@ Input [2]: [i_item_sk#13, i_current_price#15] (38) ObjectHashAggregate Input [1]: [i_item_sk#13] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#13, 42), 1019, 8152, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#13, 42), 1019, 24988, 0, 0)] Aggregate Attributes [1]: [buf#32] Results [1]: [buf#33] @@ -232,9 +232,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=6] (40) ObjectHashAggregate Input [1]: [buf#33] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#13, 42), 1019, 8152, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#13, 42), 1019, 8152, 0, 0)#34] -Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#13, 42), 1019, 8152, 0, 0)#34 AS bloomFilter#35] +Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#13, 42), 1019, 24988, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#13, 42), 1019, 24988, 0, 0)#34] +Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#13, 42), 1019, 24988, 0, 0)#34 AS bloomFilter#35] Subquery:2 Hosting operator id = 1 Hosting Expression = cs_sold_date_sk#5 IN dynamicpruning#6 BroadcastExchange (44) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q40.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q40.sf100/simplified.txt index e95fab855f35c..4368e7b605c54 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q40.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q40.sf100/simplified.txt @@ -21,7 +21,7 @@ TakeOrderedAndProject [w_state,i_item_id,sales_before,sales_after] WholeStageCodegen (1) Filter [cs_warehouse_sk,cs_item_sk] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 1019, 8152, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 1019, 24988, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [i_item_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q59.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q59.sf100/explain.txt index 05660cb90fae5..1aa4410e295cd 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q59.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q59.sf100/explain.txt @@ -348,7 +348,7 @@ Input [2]: [d_month_seq#40, d_week_seq#41] (59) ObjectHashAggregate Input [1]: [d_week_seq#41] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 2680, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 8990, 0, 0)] Aggregate Attributes [1]: [buf#88] Results [1]: [buf#89] @@ -359,9 +359,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=10] (61) ObjectHashAggregate Input [1]: [buf#89] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 2680, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 2680, 0, 0)#90] -Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 2680, 0, 0)#90 AS bloomFilter#91] +Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 8990, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 8990, 0, 0)#90] +Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#41, 42), 335, 8990, 0, 0)#90 AS bloomFilter#91] Subquery:2 Hosting operator id = 31 Hosting Expression = Subquery scalar-subquery#52, [id=#53] ObjectHashAggregate (68) @@ -394,7 +394,7 @@ Input [2]: [d_month_seq#70, d_week_seq#71] (66) ObjectHashAggregate Input [1]: [d_week_seq#71] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#71, 42), 335, 2680, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_week_seq#71, 42), 335, 8990, 0, 0)] Aggregate Attributes [1]: [buf#92] Results [1]: [buf#93] @@ -405,8 +405,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=11] (68) ObjectHashAggregate Input [1]: [buf#93] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#71, 42), 335, 2680, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#71, 42), 335, 2680, 0, 0)#94] -Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#71, 42), 335, 2680, 0, 0)#94 AS bloomFilter#95] +Functions [1]: [bloom_filter_agg(xxhash64(d_week_seq#71, 42), 335, 8990, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_week_seq#71, 42), 335, 8990, 0, 0)#94] +Results [1]: [bloom_filter_agg(xxhash64(d_week_seq#71, 42), 335, 8990, 0, 0)#94 AS bloomFilter#95] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q59.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q59.sf100/simplified.txt index f7d471cf48f00..62f4fab4891e0 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q59.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q59.sf100/simplified.txt @@ -22,7 +22,7 @@ TakeOrderedAndProject [s_store_name1,s_store_id1,d_week_seq1,(sun_sales1 / sun_s WholeStageCodegen (1) Filter [d_date_sk,d_week_seq] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 2680, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 8990, 0, 0),bloomFilter,buf] Exchange #3 ObjectHashAggregate [d_week_seq] [buf,buf] WholeStageCodegen (1) @@ -72,7 +72,7 @@ TakeOrderedAndProject [s_store_name1,s_store_id1,d_week_seq1,(sun_sales1 / sun_s WholeStageCodegen (5) Filter [d_date_sk,d_week_seq] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 2680, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_week_seq, 42), 335, 8990, 0, 0),bloomFilter,buf] Exchange #9 ObjectHashAggregate [d_week_seq] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q64.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q64.sf100/explain.txt index d102d1c36425d..8a57ad7ce8df0 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q64.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q64.sf100/explain.txt @@ -1176,7 +1176,7 @@ Input [3]: [i_item_sk#75, i_current_price#76, i_color#77] (214) ObjectHashAggregate Input [1]: [i_item_sk#75] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 10000, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 30121, 0, 0)] Aggregate Attributes [1]: [buf#176] Results [1]: [buf#177] @@ -1187,9 +1187,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=32] (216) ObjectHashAggregate Input [1]: [buf#177] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 10000, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 10000, 0, 0)#178] -Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 10000, 0, 0)#178 AS bloomFilter#179] +Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 30121, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 30121, 0, 0)#178] +Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 30121, 0, 0)#178 AS bloomFilter#179] Subquery:2 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#12 IN dynamicpruning#13 BroadcastExchange (220) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q64.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q64.sf100/simplified.txt index 87226794ac85a..ce628bd22350f 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q64.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q64.sf100/simplified.txt @@ -88,7 +88,7 @@ WholeStageCodegen (88) WholeStageCodegen (1) Filter [ss_item_sk,ss_ticket_number,ss_store_sk,ss_customer_sk,ss_cdemo_sk,ss_promo_sk,ss_hdemo_sk,ss_addr_sk] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 1250, 10000, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 1250, 30121, 0, 0),bloomFilter,buf] Exchange #11 ObjectHashAggregate [i_item_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt index 18229b02f23e8..96ca7b8cb0be3 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/explain.txt @@ -300,7 +300,7 @@ Input [2]: [ca_address_sk#16, ca_state#17] (52) ObjectHashAggregate Input [1]: [ca_address_sk#16] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#16, 42), 55556, 444448, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#16, 42), 55556, 899992, 0, 0)] Aggregate Attributes [1]: [buf#30] Results [1]: [buf#31] @@ -311,9 +311,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=8] (54) ObjectHashAggregate Input [1]: [buf#31] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#16, 42), 55556, 444448, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#16, 42), 55556, 444448, 0, 0)#32] -Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#16, 42), 55556, 444448, 0, 0)#32 AS bloomFilter#33] +Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#16, 42), 55556, 899992, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#16, 42), 55556, 899992, 0, 0)#32] +Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#16, 42), 55556, 899992, 0, 0)#32 AS bloomFilter#33] Subquery:2 Hosting operator id = 6 Hosting Expression = ss_sold_date_sk#7 IN dynamicpruning#8 BroadcastExchange (59) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt index 6483ac8dc74cb..7635aa1c6c3de 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q69.sf100/simplified.txt @@ -28,7 +28,7 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purcha WholeStageCodegen (1) Filter [c_current_addr_sk,c_current_cdemo_sk] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 55556, 444448, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 55556, 899992, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [ca_address_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/explain.txt index 90bb3af97ec8a..c930a8f522304 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/explain.txt @@ -639,7 +639,7 @@ Input [2]: [i_item_sk#18, i_current_price#19] (112) ObjectHashAggregate Input [1]: [i_item_sk#18] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 814584, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 1521109, 0, 0)] Aggregate Attributes [1]: [buf#134] Results [1]: [buf#135] @@ -650,9 +650,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=16] (114) ObjectHashAggregate Input [1]: [buf#135] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 814584, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 814584, 0, 0)#136] -Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 814584, 0, 0)#136 AS bloomFilter#137] +Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 1521109, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 1521109, 0, 0)#136] +Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 1521109, 0, 0)#136 AS bloomFilter#137] Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#11, [id=#12] ObjectHashAggregate (121) @@ -685,7 +685,7 @@ Input [2]: [p_promo_sk#20, p_channel_tv#21] (119) ObjectHashAggregate Input [1]: [p_promo_sk#20] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 7888, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 24246, 0, 0)] Aggregate Attributes [1]: [buf#138] Results [1]: [buf#139] @@ -696,9 +696,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=17] (121) ObjectHashAggregate Input [1]: [buf#139] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 7888, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 7888, 0, 0)#140] -Results [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 7888, 0, 0)#140 AS bloomFilter#141] +Functions [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 24246, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 24246, 0, 0)#140] +Results [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 24246, 0, 0)#140 AS bloomFilter#141] Subquery:3 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#7 IN dynamicpruning#8 BroadcastExchange (126) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/simplified.txt index 647cca694c72e..315c338617f5e 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/simplified.txt @@ -32,7 +32,7 @@ TakeOrderedAndProject [channel,id,sales,returns,profit] WholeStageCodegen (1) Filter [ss_store_sk,ss_item_sk,ss_promo_sk] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 101823, 814584, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 101823, 1521109, 0, 0),bloomFilter,buf] Exchange #5 ObjectHashAggregate [i_item_sk] [buf,buf] WholeStageCodegen (1) @@ -42,7 +42,7 @@ TakeOrderedAndProject [channel,id,sales,returns,profit] InputAdapter Scan parquet spark_catalog.default.item [i_item_sk,i_current_price] Subquery #3 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(p_promo_sk, 42), 986, 7888, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(p_promo_sk, 42), 986, 24246, 0, 0),bloomFilter,buf] Exchange #6 ObjectHashAggregate [p_promo_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt index 51b43a6477bac..410a6a1957505 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/explain.txt @@ -368,7 +368,7 @@ Input [3]: [cd_demo_sk#23, cd_marital_status#24, cd_education_status#25] (64) ObjectHashAggregate Input [1]: [cd_demo_sk#23] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(cd_demo_sk#23, 42), 159981, 1279848, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(cd_demo_sk#23, 42), 159981, 2239471, 0, 0)] Aggregate Attributes [1]: [buf#55] Results [1]: [buf#56] @@ -379,9 +379,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=11] (66) ObjectHashAggregate Input [1]: [buf#56] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(cd_demo_sk#23, 42), 159981, 1279848, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(cd_demo_sk#23, 42), 159981, 1279848, 0, 0)#57] -Results [1]: [bloom_filter_agg(xxhash64(cd_demo_sk#23, 42), 159981, 1279848, 0, 0)#57 AS bloomFilter#58] +Functions [1]: [bloom_filter_agg(xxhash64(cd_demo_sk#23, 42), 159981, 2239471, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(cd_demo_sk#23, 42), 159981, 2239471, 0, 0)#57] +Results [1]: [bloom_filter_agg(xxhash64(cd_demo_sk#23, 42), 159981, 2239471, 0, 0)#57 AS bloomFilter#58] Subquery:3 Hosting operator id = 14 Hosting Expression = Subquery scalar-subquery#21, [id=#22] ObjectHashAggregate (73) @@ -414,7 +414,7 @@ Input [3]: [ca_address_sk#29, ca_state#30, ca_country#31] (71) ObjectHashAggregate Input [1]: [ca_address_sk#29] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#29, 42), 152837, 1222696, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#29, 42), 152837, 2153999, 0, 0)] Aggregate Attributes [1]: [buf#59] Results [1]: [buf#60] @@ -425,8 +425,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=12] (73) ObjectHashAggregate Input [1]: [buf#60] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#29, 42), 152837, 1222696, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#29, 42), 152837, 1222696, 0, 0)#61] -Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#29, 42), 152837, 1222696, 0, 0)#61 AS bloomFilter#62] +Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#29, 42), 152837, 2153999, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#29, 42), 152837, 2153999, 0, 0)#61] +Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#29, 42), 152837, 2153999, 0, 0)#61 AS bloomFilter#62] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt index aa9e8c4c20d32..46c14e8bd6773 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q85.sf100/simplified.txt @@ -59,7 +59,7 @@ TakeOrderedAndProject [substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refu Project [wr_item_sk,wr_refunded_cdemo_sk,wr_refunded_addr_sk,wr_returning_cdemo_sk,wr_reason_sk,wr_order_number,wr_fee,wr_refunded_cash] Filter [wr_item_sk,wr_order_number,wr_refunded_cdemo_sk,wr_returning_cdemo_sk,wr_refunded_addr_sk,wr_reason_sk] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(cd_demo_sk, 42), 159981, 1279848, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(cd_demo_sk, 42), 159981, 2239471, 0, 0),bloomFilter,buf] Exchange #7 ObjectHashAggregate [cd_demo_sk] [buf,buf] WholeStageCodegen (1) @@ -69,7 +69,7 @@ TakeOrderedAndProject [substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refu InputAdapter Scan parquet spark_catalog.default.customer_demographics [cd_demo_sk,cd_marital_status,cd_education_status] Subquery #3 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 152837, 1222696, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 152837, 2153999, 0, 0),bloomFilter,buf] Exchange #8 ObjectHashAggregate [ca_address_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92.sf100/explain.txt index e74a02ff719d5..dce5f37bb95a4 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92.sf100/explain.txt @@ -205,7 +205,7 @@ Input [2]: [i_item_sk#1, i_manufact_id#2] (34) ObjectHashAggregate Input [1]: [i_item_sk#1] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 1592, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 5556, 0, 0)] Aggregate Attributes [1]: [buf#24] Results [1]: [buf#25] @@ -216,9 +216,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=5] (36) ObjectHashAggregate Input [1]: [buf#25] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 1592, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 1592, 0, 0)#26] -Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 1592, 0, 0)#26 AS bloomFilter#27] +Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 5556, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 5556, 0, 0)#26] +Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#1, 42), 199, 5556, 0, 0)#26 AS bloomFilter#27] Subquery:2 Hosting operator id = 6 Hosting Expression = ws_sold_date_sk#5 IN dynamicpruning#6 BroadcastExchange (41) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92.sf100/simplified.txt index 0402324ad1a16..d664a0c731724 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q92.sf100/simplified.txt @@ -31,7 +31,7 @@ WholeStageCodegen (7) BroadcastHashJoin [ws_sold_date_sk,d_date_sk] Filter [ws_item_sk] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 199, 1592, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 199, 5556, 0, 0),bloomFilter,buf] Exchange #6 ObjectHashAggregate [i_item_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt index 5c73695cd200c..ff096bf4509ae 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/explain.txt @@ -291,7 +291,7 @@ Input [2]: [ca_address_sk#20, ca_state#21] (50) ObjectHashAggregate Input [1]: [ca_address_sk#20] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 143688, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 333176, 0, 0)] Aggregate Attributes [1]: [buf#35] Results [1]: [buf#36] @@ -302,9 +302,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=8] (52) ObjectHashAggregate Input [1]: [buf#36] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 143688, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 143688, 0, 0)#37] -Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 143688, 0, 0)#37 AS bloomFilter#38] +Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 333176, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 333176, 0, 0)#37] +Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#20, 42), 17961, 333176, 0, 0)#37 AS bloomFilter#38] Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#11, [id=#12] ObjectHashAggregate (59) @@ -337,7 +337,7 @@ Input [2]: [web_site_sk#22, web_company_name#23] (57) ObjectHashAggregate Input [1]: [web_site_sk#22] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(web_site_sk#22, 42), 4, 32, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(web_site_sk#22, 42), 4, 144, 0, 0)] Aggregate Attributes [1]: [buf#39] Results [1]: [buf#40] @@ -348,9 +348,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9] (59) ObjectHashAggregate Input [1]: [buf#40] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(web_site_sk#22, 42), 4, 32, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(web_site_sk#22, 42), 4, 32, 0, 0)#41] -Results [1]: [bloom_filter_agg(xxhash64(web_site_sk#22, 42), 4, 32, 0, 0)#41 AS bloomFilter#42] +Functions [1]: [bloom_filter_agg(xxhash64(web_site_sk#22, 42), 4, 144, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(web_site_sk#22, 42), 4, 144, 0, 0)#41] +Results [1]: [bloom_filter_agg(xxhash64(web_site_sk#22, 42), 4, 144, 0, 0)#41 AS bloomFilter#42] Subquery:3 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#13, [id=#14] ObjectHashAggregate (66) @@ -383,7 +383,7 @@ Input [2]: [d_date_sk#24, d_date#25] (64) ObjectHashAggregate Input [1]: [d_date_sk#24] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 584392, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 1141755, 0, 0)] Aggregate Attributes [1]: [buf#43] Results [1]: [buf#44] @@ -394,8 +394,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=10] (66) ObjectHashAggregate Input [1]: [buf#44] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 584392, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 584392, 0, 0)#45] -Results [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 584392, 0, 0)#45 AS bloomFilter#46] +Functions [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 1141755, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 1141755, 0, 0)#45] +Results [1]: [bloom_filter_agg(xxhash64(d_date_sk#24, 42), 73049, 1141755, 0, 0)#45 AS bloomFilter#46] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt index fc764d31f52cb..230b08abe0a54 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q94.sf100/simplified.txt @@ -26,7 +26,7 @@ WholeStageCodegen (12) Project [ws_ship_date_sk,ws_ship_addr_sk,ws_web_site_sk,ws_warehouse_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] Filter [ws_ship_date_sk,ws_ship_addr_sk,ws_web_site_sk] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 17961, 143688, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 17961, 333176, 0, 0),bloomFilter,buf] Exchange #3 ObjectHashAggregate [ca_address_sk] [buf,buf] WholeStageCodegen (1) @@ -36,7 +36,7 @@ WholeStageCodegen (12) InputAdapter Scan parquet spark_catalog.default.customer_address [ca_address_sk,ca_state] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(web_site_sk, 42), 4, 32, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(web_site_sk, 42), 4, 144, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [web_site_sk] [buf,buf] WholeStageCodegen (1) @@ -46,7 +46,7 @@ WholeStageCodegen (12) InputAdapter Scan parquet spark_catalog.default.web_site [web_site_sk,web_company_name] Subquery #3 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_date_sk, 42), 73049, 584392, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_date_sk, 42), 73049, 1141755, 0, 0),bloomFilter,buf] Exchange #5 ObjectHashAggregate [d_date_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/explain.txt index aee8e7ded8d5d..d6cf257b8b528 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/explain.txt @@ -356,7 +356,7 @@ Input [2]: [ca_address_sk#21, ca_state#22] (62) ObjectHashAggregate Input [1]: [ca_address_sk#21] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#21, 42), 17961, 143688, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#21, 42), 17961, 333176, 0, 0)] Aggregate Attributes [1]: [buf#36] Results [1]: [buf#37] @@ -367,9 +367,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=8] (64) ObjectHashAggregate Input [1]: [buf#37] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#21, 42), 17961, 143688, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#21, 42), 17961, 143688, 0, 0)#38] -Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#21, 42), 17961, 143688, 0, 0)#38 AS bloomFilter#39] +Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#21, 42), 17961, 333176, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#21, 42), 17961, 333176, 0, 0)#38] +Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#21, 42), 17961, 333176, 0, 0)#38 AS bloomFilter#39] Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#10, [id=#11] ObjectHashAggregate (71) @@ -402,7 +402,7 @@ Input [2]: [web_site_sk#23, web_company_name#24] (69) ObjectHashAggregate Input [1]: [web_site_sk#23] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(web_site_sk#23, 42), 4, 32, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(web_site_sk#23, 42), 4, 144, 0, 0)] Aggregate Attributes [1]: [buf#40] Results [1]: [buf#41] @@ -413,9 +413,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9] (71) ObjectHashAggregate Input [1]: [buf#41] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(web_site_sk#23, 42), 4, 32, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(web_site_sk#23, 42), 4, 32, 0, 0)#42] -Results [1]: [bloom_filter_agg(xxhash64(web_site_sk#23, 42), 4, 32, 0, 0)#42 AS bloomFilter#43] +Functions [1]: [bloom_filter_agg(xxhash64(web_site_sk#23, 42), 4, 144, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(web_site_sk#23, 42), 4, 144, 0, 0)#42] +Results [1]: [bloom_filter_agg(xxhash64(web_site_sk#23, 42), 4, 144, 0, 0)#42 AS bloomFilter#43] Subquery:3 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#12, [id=#13] ObjectHashAggregate (78) @@ -448,7 +448,7 @@ Input [2]: [d_date_sk#25, d_date#26] (76) ObjectHashAggregate Input [1]: [d_date_sk#25] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(d_date_sk#25, 42), 73049, 584392, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(d_date_sk#25, 42), 73049, 1141755, 0, 0)] Aggregate Attributes [1]: [buf#44] Results [1]: [buf#45] @@ -459,8 +459,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=10] (78) ObjectHashAggregate Input [1]: [buf#45] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(d_date_sk#25, 42), 73049, 584392, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_date_sk#25, 42), 73049, 584392, 0, 0)#46] -Results [1]: [bloom_filter_agg(xxhash64(d_date_sk#25, 42), 73049, 584392, 0, 0)#46 AS bloomFilter#47] +Functions [1]: [bloom_filter_agg(xxhash64(d_date_sk#25, 42), 73049, 1141755, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(d_date_sk#25, 42), 73049, 1141755, 0, 0)#46] +Results [1]: [bloom_filter_agg(xxhash64(d_date_sk#25, 42), 73049, 1141755, 0, 0)#46 AS bloomFilter#47] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/simplified.txt index e521277505b16..8922d43c2aaa4 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q95.sf100/simplified.txt @@ -25,7 +25,7 @@ WholeStageCodegen (21) Project [ws_ship_date_sk,ws_ship_addr_sk,ws_web_site_sk,ws_order_number,ws_ext_ship_cost,ws_net_profit] Filter [ws_ship_date_sk,ws_ship_addr_sk,ws_web_site_sk] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 17961, 143688, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 17961, 333176, 0, 0),bloomFilter,buf] Exchange #3 ObjectHashAggregate [ca_address_sk] [buf,buf] WholeStageCodegen (1) @@ -35,7 +35,7 @@ WholeStageCodegen (21) InputAdapter Scan parquet spark_catalog.default.customer_address [ca_address_sk,ca_state] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(web_site_sk, 42), 4, 32, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(web_site_sk, 42), 4, 144, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [web_site_sk] [buf,buf] WholeStageCodegen (1) @@ -45,7 +45,7 @@ WholeStageCodegen (21) InputAdapter Scan parquet spark_catalog.default.web_site [web_site_sk,web_company_name] Subquery #3 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_date_sk, 42), 73049, 584392, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(d_date_sk, 42), 73049, 1141755, 0, 0),bloomFilter,buf] Exchange #5 ObjectHashAggregate [d_date_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/explain.txt index e6ba05cca537f..72298764a9e36 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/explain.txt @@ -286,7 +286,7 @@ Input [2]: [ca_address_sk#18, ca_county#19] (50) ObjectHashAggregate Input [1]: [ca_address_sk#18] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 20440, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 57765, 0, 0)] Aggregate Attributes [1]: [buf#38] Results [1]: [buf#39] @@ -297,9 +297,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=7] (52) ObjectHashAggregate Input [1]: [buf#39] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 20440, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 20440, 0, 0)#40] -Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 20440, 0, 0)#40 AS bloomFilter#41] +Functions [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 57765, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 57765, 0, 0)#40] +Results [1]: [bloom_filter_agg(xxhash64(ca_address_sk#18, 42), 2555, 57765, 0, 0)#40 AS bloomFilter#41] Subquery:2 Hosting operator id = 6 Hosting Expression = ss_sold_date_sk#7 IN dynamicpruning#8 BroadcastExchange (57) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/simplified.txt index 8102047b218c7..49da06e14bc21 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q10a.sf100/simplified.txt @@ -25,7 +25,7 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purcha WholeStageCodegen (1) Filter [c_current_addr_sk,c_current_cdemo_sk] Subquery #1 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 2555, 20440, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(ca_address_sk, 42), 2555, 57765, 0, 0),bloomFilter,buf] Exchange #4 ObjectHashAggregate [ca_address_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64.sf100/explain.txt index 56dadada88879..50beb9878641c 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64.sf100/explain.txt @@ -1176,7 +1176,7 @@ Input [3]: [i_item_sk#75, i_current_price#76, i_color#77] (214) ObjectHashAggregate Input [1]: [i_item_sk#75] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 10000, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 30121, 0, 0)] Aggregate Attributes [1]: [buf#176] Results [1]: [buf#177] @@ -1187,9 +1187,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=32] (216) ObjectHashAggregate Input [1]: [buf#177] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 10000, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 10000, 0, 0)#178] -Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 10000, 0, 0)#178 AS bloomFilter#179] +Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 30121, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 30121, 0, 0)#178] +Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#75, 42), 1250, 30121, 0, 0)#178 AS bloomFilter#179] Subquery:2 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#12 IN dynamicpruning#13 BroadcastExchange (220) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64.sf100/simplified.txt index cb088f6a6ad06..9793e7cf7ef1f 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64.sf100/simplified.txt @@ -88,7 +88,7 @@ WholeStageCodegen (88) WholeStageCodegen (1) Filter [ss_item_sk,ss_ticket_number,ss_store_sk,ss_customer_sk,ss_cdemo_sk,ss_promo_sk,ss_hdemo_sk,ss_addr_sk] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 1250, 10000, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 1250, 30121, 0, 0),bloomFilter,buf] Exchange #11 ObjectHashAggregate [i_item_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q80a.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q80a.sf100/explain.txt index a0246af44b7c0..2c3bbda04d1bd 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q80a.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q80a.sf100/explain.txt @@ -1186,7 +1186,7 @@ Input [2]: [i_item_sk#18, i_current_price#19] (207) ObjectHashAggregate Input [1]: [i_item_sk#18] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 814584, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 1521109, 0, 0)] Aggregate Attributes [1]: [buf#218] Results [1]: [buf#219] @@ -1197,9 +1197,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=27] (209) ObjectHashAggregate Input [1]: [buf#219] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 814584, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 814584, 0, 0)#220] -Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 814584, 0, 0)#220 AS bloomFilter#221] +Functions [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 1521109, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 1521109, 0, 0)#220] +Results [1]: [bloom_filter_agg(xxhash64(i_item_sk#18, 42), 101823, 1521109, 0, 0)#220 AS bloomFilter#221] Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#11, [id=#12] ObjectHashAggregate (216) @@ -1232,7 +1232,7 @@ Input [2]: [p_promo_sk#20, p_channel_tv#21] (214) ObjectHashAggregate Input [1]: [p_promo_sk#20] Keys: [] -Functions [1]: [partial_bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 7888, 0, 0)] +Functions [1]: [partial_bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 24246, 0, 0)] Aggregate Attributes [1]: [buf#222] Results [1]: [buf#223] @@ -1243,9 +1243,9 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=28] (216) ObjectHashAggregate Input [1]: [buf#223] Keys: [] -Functions [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 7888, 0, 0)] -Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 7888, 0, 0)#224] -Results [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 7888, 0, 0)#224 AS bloomFilter#225] +Functions [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 24246, 0, 0)] +Aggregate Attributes [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 24246, 0, 0)#224] +Results [1]: [bloom_filter_agg(xxhash64(p_promo_sk#20, 42), 986, 24246, 0, 0)#224 AS bloomFilter#225] Subquery:3 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#7 IN dynamicpruning#8 BroadcastExchange (221) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q80a.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q80a.sf100/simplified.txt index 542e92a27d720..7082f78d270a8 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q80a.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q80a.sf100/simplified.txt @@ -39,7 +39,7 @@ TakeOrderedAndProject [channel,id,sales,returns,profit] WholeStageCodegen (1) Filter [ss_store_sk,ss_item_sk,ss_promo_sk] Subquery #2 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 101823, 814584, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(i_item_sk, 42), 101823, 1521109, 0, 0),bloomFilter,buf] Exchange #6 ObjectHashAggregate [i_item_sk] [buf,buf] WholeStageCodegen (1) @@ -49,7 +49,7 @@ TakeOrderedAndProject [channel,id,sales,returns,profit] InputAdapter Scan parquet spark_catalog.default.item [i_item_sk,i_current_price] Subquery #3 - ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(p_promo_sk, 42), 986, 7888, 0, 0),bloomFilter,buf] + ObjectHashAggregate [buf] [bloom_filter_agg(xxhash64(p_promo_sk, 42), 986, 24246, 0, 0),bloomFilter,buf] Exchange #7 ObjectHashAggregate [p_promo_sk] [buf,buf] WholeStageCodegen (1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala index cf5d4c8c1e9a0..4edb51d271903 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala @@ -376,4 +376,21 @@ class BloomFilterAggregateQuerySuite extends QueryTest with SharedSparkSession { .queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].inputPlan .collect({case agg: BaseAggregateExec => agg}).size == 2) } + + test("Test numBitsExpression") { + def checkNumBits(estimatedNumItems: Long, numBits: Long): Unit = { + val agg = new BloomFilterAggregate(Literal(1L), estimatedNumItems) + assert(agg.numBitsExpression === Literal(numBits)) + } + + checkNumBits(conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS) * 100, + conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)) + checkNumBits(conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS) + 10, 29193836) + checkNumBits(conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS), 29193763) + checkNumBits(2000000, 17482271) + checkNumBits(1000000, 10183830) + checkNumBits(10000, 197688) + checkNumBits(100, 2935) + checkNumBits(1, 38) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index d7ebb9003884f..718405bd8acf5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -1027,9 +1027,18 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { nullable = false)))) test("withField should throw an exception if called on a non-StructType column") { - intercept[AnalysisException] { - testData.withColumn("key", $"key".withField("a", lit(2))) - }.getMessage should include("struct argument should be struct type, got: int") + checkError( + exception = intercept[AnalysisException] { + testData.withColumn("key", $"key".withField("a", lit(2))) + }, + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + parameters = Map( + "sqlExpr" -> "\"update_fields(key, WithField(2))\"", + "paramIndex" -> "1", + "inputSql" -> "\"key\"", + "inputType" -> "\"INT\"", + "requiredType" -> "\"STRUCT\"") + ) } test("withField should throw an exception if either fieldName or col argument are null") { @@ -1063,9 +1072,18 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { } test("withField should throw an exception if intermediate field is not a struct") { - intercept[AnalysisException] { - structLevel1.withColumn("a", $"a".withField("b.a", lit(2))) - }.getMessage should include("struct argument should be struct type, got: int") + checkError( + exception = intercept[AnalysisException] { + structLevel1.withColumn("a", $"a".withField("b.a", lit(2))) + }, + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + parameters = Map( + "sqlExpr" -> "\"update_fields(a.b, WithField(2))\"", + "paramIndex" -> "1", + "inputSql" -> "\"a.b\"", + "inputType" -> "\"INT\"", + "requiredType" -> "\"STRUCT\"") + ) } test("withField should throw an exception if intermediate field reference is ambiguous") { @@ -1792,9 +1810,18 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { } test("dropFields should throw an exception if called on a non-StructType column") { - intercept[AnalysisException] { - testData.withColumn("key", $"key".dropFields("a")) - }.getMessage should include("struct argument should be struct type, got: int") + checkError( + exception = intercept[AnalysisException] { + testData.withColumn("key", $"key".dropFields("a")) + }, + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + parameters = Map( + "sqlExpr" -> "\"update_fields(key, dropfield())\"", + "paramIndex" -> "1", + "inputSql" -> "\"key\"", + "inputType" -> "\"INT\"", + "requiredType" -> "\"STRUCT\"") + ) } test("dropFields should throw an exception if fieldName argument is null") { @@ -1820,9 +1847,18 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { } test("dropFields should throw an exception if intermediate field is not a struct") { - intercept[AnalysisException] { - structLevel1.withColumn("a", $"a".dropFields("b.a")) - }.getMessage should include("struct argument should be struct type, got: int") + checkError( + exception = intercept[AnalysisException] { + structLevel1.withColumn("a", $"a".dropFields("b.a")) + }, + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + parameters = Map( + "sqlExpr" -> "\"update_fields(a.b, dropfield())\"", + "paramIndex" -> "1", + "inputSql" -> "\"a.b\"", + "inputType" -> "\"INT\"", + "requiredType" -> "\"STRUCT\"") + ) } test("dropFields should throw an exception if intermediate field reference is ambiguous") { @@ -1877,9 +1913,13 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { } test("dropFields should throw an exception if no fields will be left in struct") { - intercept[AnalysisException] { - structLevel1.withColumn("a", $"a".dropFields("a", "b", "c")) - }.getMessage should include("cannot drop all fields in struct") + checkError( + exception = intercept[AnalysisException] { + structLevel1.withColumn("a", $"a".dropFields("a", "b", "c")) + }, + errorClass = "DATATYPE_MISMATCH.CANNOT_DROP_ALL_FIELDS", + parameters = Map("sqlExpr" -> "\"update_fields(a, dropfield(), dropfield(), dropfield())\"") + ) } test("dropFields should drop field with no name in struct") { @@ -2144,10 +2184,14 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { .select($"struct_col".dropFields("b", "c")), Row(Row(1))) - intercept[AnalysisException] { - sql("SELECT named_struct('a', 1, 'b', 2) struct_col") - .select($"struct_col".dropFields("a", "b")) - }.getMessage should include("cannot drop all fields in struct") + checkError( + exception = intercept[AnalysisException] { + sql("SELECT named_struct('a', 1, 'b', 2) struct_col") + .select($"struct_col".dropFields("a", "b")) + }, + errorClass = "DATATYPE_MISMATCH.CANNOT_DROP_ALL_FIELDS", + parameters = Map("sqlExpr" -> "\"update_fields(struct_col, dropfield(), dropfield())\"") + ) checkAnswer( sql("SELECT CAST(NULL AS struct) struct_col")