diff --git a/.github/labeler.yml b/.github/labeler.yml index 2a7d1f1c31f8f..0d04244f8822b 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -151,4 +151,7 @@ WEB UI: - "**/*UI.scala" DEPLOY: - "sbin/**/*" - +CONNECT: + - "connect/**/*" + - "**/sql/sparkconnect/**/*" + - "python/pyspark/sql/**/connect/**/*" diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 22b47c979c65a..e16c8c762dc30 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -555,6 +555,11 @@ "AES- with the padding by the function." ] }, + "CATALOG_OPERATION" : { + "message" : [ + "Catalog does not support ." + ] + }, "DESC_TABLE_COLUMN_PARTITION" : { "message" : [ "DESC TABLE COLUMN for a specific partition." diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 10ebbe76d6c74..4b0edf6f643f1 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -300,10 +300,15 @@ abstract class SparkFunSuite parameters: Map[String, String] = Map.empty, matchPVals: Boolean = false, queryContext: Array[QueryContext] = Array.empty): Unit = { - assert(exception.getErrorClass === errorClass) + val mainErrorClass :: tail = errorClass.split("\\.").toList + assert(tail.isEmpty || tail.length == 1) + // TODO: remove the `errorSubClass` parameter. + assert(tail.isEmpty || errorSubClass.isEmpty) + assert(exception.getErrorClass === mainErrorClass) if (exception.getErrorSubClass != null) { - assert(errorSubClass.isDefined) - assert(exception.getErrorSubClass === errorSubClass.get) + val subClass = errorSubClass.orElse(tail.headOption) + assert(subClass.isDefined) + assert(exception.getErrorSubClass === subClass.get) } sqlState.foreach(state => assert(exception.getSqlState === state)) val expectedParameters = exception.getMessageParameters.asScala diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala index 1fa2d0ab882c9..ec910e9bf3436 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala @@ -20,13 +20,14 @@ package org.apache.spark.deploy.history import org.openqa.selenium.WebDriver import org.openqa.selenium.chrome.{ChromeDriver, ChromeOptions} -import org.apache.spark.tags.ChromeUITest +import org.apache.spark.internal.config.History.HybridStoreDiskBackend +import org.apache.spark.tags.{ChromeUITest, ExtendedLevelDBTest} + /** * Tests for HistoryServer with Chrome. */ -@ChromeUITest -class ChromeUIHistoryServerSuite +abstract class ChromeUIHistoryServerSuite extends RealBrowserUIHistoryServerSuite("webdriver.chrome.driver") { override var webDriver: WebDriver = _ @@ -48,3 +49,14 @@ class ChromeUIHistoryServerSuite } } } + +@ChromeUITest +@ExtendedLevelDBTest +class LevelDBBackendChromeUIHistoryServerSuite extends ChromeUIHistoryServerSuite { + override protected def diskBackend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.LEVELDB +} + +@ChromeUITest +class RocksDBBackendChromeUIHistoryServerSuite extends ChromeUIHistoryServerSuite { + override protected def diskBackend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala index 7d2d31b4adab4..ea3a5ef5ba10c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/RealBrowserUIHistoryServerSuite.scala @@ -28,7 +28,7 @@ import org.scalatestplus.selenium.WebBrowser import org.apache.spark._ import org.apache.spark.internal.config.{EVENT_LOG_STAGE_EXECUTOR_METRICS, EXECUTOR_PROCESS_TREE_METRICS_ENABLED} -import org.apache.spark.internal.config.History.{HISTORY_LOG_DIR, LOCAL_STORE_DIR, UPDATE_INTERVAL_S} +import org.apache.spark.internal.config.History.{HISTORY_LOG_DIR, HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend, LOCAL_STORE_DIR, UPDATE_INTERVAL_S} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -48,6 +48,8 @@ abstract class RealBrowserUIHistoryServerSuite(val driverProp: String) private var server: HistoryServer = null private var port: Int = -1 + protected def diskBackend: HybridStoreDiskBackend.Value + override def beforeAll(): Unit = { super.beforeAll() assume( @@ -79,6 +81,7 @@ abstract class RealBrowserUIHistoryServerSuite(val driverProp: String) .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true) .set(EXECUTOR_PROCESS_TREE_METRICS_ENABLED, true) + .set(HYBRID_STORE_DISK_BACKEND, diskBackend.toString) conf.setAll(extraConf) provider = new FsHistoryProvider(conf) provider.checkForLogs() diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index a5eb21d0a06ce..8d26463465936 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -187,10 +187,10 @@ lapack/3.0.2//lapack-3.0.2.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar -log4j-1.2-api/2.18.0//log4j-1.2-api-2.18.0.jar -log4j-api/2.18.0//log4j-api-2.18.0.jar -log4j-core/2.18.0//log4j-core-2.18.0.jar -log4j-slf4j-impl/2.18.0//log4j-slf4j-impl-2.18.0.jar +log4j-1.2-api/2.19.0//log4j-1.2-api-2.19.0.jar +log4j-api/2.19.0//log4j-api-2.19.0.jar +log4j-core/2.19.0//log4j-core-2.19.0.jar +log4j-slf4j-impl/2.19.0//log4j-slf4j-impl-2.19.0.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar mesos/1.4.3/shaded-protobuf/mesos-1.4.3-shaded-protobuf.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 2b92962cdd526..25d2a2169e933 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -171,10 +171,10 @@ lapack/3.0.2//lapack-3.0.2.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar -log4j-1.2-api/2.18.0//log4j-1.2-api-2.18.0.jar -log4j-api/2.18.0//log4j-api-2.18.0.jar -log4j-core/2.18.0//log4j-core-2.18.0.jar -log4j-slf4j-impl/2.18.0//log4j-slf4j-impl-2.18.0.jar +log4j-1.2-api/2.19.0//log4j-1.2-api-2.19.0.jar +log4j-api/2.19.0//log4j-api-2.19.0.jar +log4j-core/2.19.0//log4j-core-2.19.0.jar +log4j-slf4j-impl/2.19.0//log4j-slf4j-impl-2.19.0.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar mesos/1.4.3/shaded-protobuf/mesos-1.4.3-shaded-protobuf.jar diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 1f173a7c64a56..03179da115b1c 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -852,9 +852,11 @@ The following extra configuration options are available when the shuffle service spark.shuffle.service.db.backend LEVELDB - To specify the kind of disk-base store used in shuffle service state store, supports `LEVELDB` and `ROCKSDB` now - and `LEVELDB` as default value. - The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. + When work-preserving restart is enabled in YARN, this is used to specify the disk-base store used + in shuffle service state store, supports `LEVELDB` and `ROCKSDB` with `LEVELDB` as default value. + The original data store in `LevelDB/RocksDB` will not be automatically converted to another kind + of storage now. The original data store will be retained and the new type data store will be + created when switching storage types. 3.4.0 diff --git a/pom.xml b/pom.xml index 69f94a6e0fe37..940f130097cac 100644 --- a/pom.xml +++ b/pom.xml @@ -113,7 +113,7 @@ 1.6.0 spark 1.7.36 - 2.18.0 + 2.19.0 3.3.4 2.5.0 diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index d7b26cacda39f..e2b70caf5d7e2 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -1448,7 +1448,7 @@ def corr(self, method: str = "pearson", min_periods: Optional[int] = None) -> "D 1. Pearson, Kendall and Spearman correlation are currently computed using pairwise complete observations. - 2. The complexity of Spearman correlation is O(#row * #row), if the dataset is too + 2. The complexity of Kendall correlation is O(#row * #row), if the dataset is too large, sampling ahead of correlation computation is recommended. Examples diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index dc4109a3ed8ad..7885229fc5c8a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -6290,15 +6290,26 @@ def map_from_arrays(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: col2 : :class:`~pyspark.sql.Column` or str name of column containing a set of values + Returns + ------- + :class:`~pyspark.sql.Column` + a column of map type. + Examples -------- >>> df = spark.createDataFrame([([2, 5], ['a', 'b'])], ['k', 'v']) - >>> df.select(map_from_arrays(df.k, df.v).alias("map")).show() + >>> df = df.select(map_from_arrays(df.k, df.v).alias("col")) + >>> df.show() +----------------+ - | map| + | col| +----------------+ |{2 -> a, 5 -> b}| +----------------+ + >>> df.printSchema() + root + |-- col: map (nullable = true) + | |-- key: long + | |-- value: string (valueContainsNull = true) """ return _invoke_function_over_columns("map_from_arrays", col1, col2) @@ -6326,12 +6337,21 @@ def array( column names or :class:`~pyspark.sql.Column`\\s that have the same data type. + Returns + ------- + :class:`~pyspark.sql.Column` + a column of array type. + Examples -------- >>> df.select(array('age', 'age').alias("arr")).collect() [Row(arr=[2, 2]), Row(arr=[5, 5])] >>> df.select(array([df.age, df.age]).alias("arr")).collect() [Row(arr=[2, 2]), Row(arr=[5, 5])] + >>> df.select(array('age', 'age').alias("col")).printSchema() + root + |-- col: array (nullable = false) + | |-- element: long (containsNull = true) """ if len(cols) == 1 and isinstance(cols[0], (list, set)): cols = cols[0] # type: ignore[assignment] @@ -6352,6 +6372,11 @@ def array_contains(col: "ColumnOrName", value: Any) -> Column: value : value or column to check for in array + Returns + ------- + :class:`~pyspark.sql.Column` + a column of Boolean type. + Examples -------- >>> df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data']) @@ -6372,6 +6397,11 @@ def arrays_overlap(a1: "ColumnOrName", a2: "ColumnOrName") -> Column: .. versionadded:: 2.4.0 + Returns + ------- + :class:`~pyspark.sql.Column` + a column of Boolean type. + Examples -------- >>> df = spark.createDataFrame([(["a", "b"], ["b", "c"]), (["a"], ["b", "c"])], ['x', 'y']) @@ -6385,7 +6415,7 @@ def slice( x: "ColumnOrName", start: Union["ColumnOrName", int], length: Union["ColumnOrName", int] ) -> Column: """ - Collection function: returns an array containing all the elements in `x` from index `start` + Collection function: returns an array containing all the elements in `x` from index `start` (array indices start at 1, or from the end if `start` is negative) with the specified `length`. .. versionadded:: 2.4.0 @@ -6399,6 +6429,11 @@ def slice( length : :class:`~pyspark.sql.Column` or str or int column name, column, or int containing the length of the slice + Returns + ------- + :class:`~pyspark.sql.Column` + a column of array type. Subset of array. + Examples -------- >>> df = spark.createDataFrame([([1, 2, 3],), ([4, 5],)], ['x']) @@ -6420,6 +6455,20 @@ def array_join( .. versionadded:: 2.4.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + delimiter : str + delimiter used to concatenate elements + null_replacement : str, optional + if set then null values will be replaced by this value + + Returns + ------- + :class:`~pyspark.sql.Column` + a column of string type. Concatenated values. + Examples -------- >>> df = spark.createDataFrame([(["a", "b", "c"],), (["a", None],)], ['data']) @@ -6439,19 +6488,39 @@ def array_join( def concat(*cols: "ColumnOrName") -> Column: """ Concatenates multiple input columns together into a single column. - The function works with strings, binary and compatible array columns. + The function works with strings, numeric, binary and compatible array columns. .. versionadded:: 1.5.0 + Parameters + ---------- + cols : :class:`~pyspark.sql.Column` or str + target column or columns to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + concatenated values. Type of the `Column` depends on input columns' type. + + See Also + -------- + :meth:`pyspark.sql.functions.array_join` : to concatenate string columns with delimiter + Examples -------- >>> df = spark.createDataFrame([('abcd','123')], ['s', 'd']) - >>> df.select(concat(df.s, df.d).alias('s')).collect() + >>> df = df.select(concat(df.s, df.d).alias('s')) + >>> df.collect() [Row(s='abcd123')] + >>> df + DataFrame[s: string] >>> df = spark.createDataFrame([([1, 2], [3, 4], [5]), ([1, 2], None, [3])], ['a', 'b', 'c']) - >>> df.select(concat(df.a, df.b, df.c).alias("arr")).collect() + >>> df = df.select(concat(df.a, df.b, df.c).alias("arr")) + >>> df.collect() [Row(arr=[1, 2, 3, 4, 5]), Row(arr=None)] + >>> df + DataFrame[arr: array] """ return _invoke_function_over_seq_of_columns("concat", cols) @@ -6468,6 +6537,18 @@ def array_position(col: "ColumnOrName", value: Any) -> Column: The position is not zero based, but 1 based index. Returns 0 if the given value could not be found in the array. + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + value : Any + value to look for. + + Returns + ------- + :class:`~pyspark.sql.Column` + position of the value in the given array if found and 0 otherwise. + Examples -------- >>> df = spark.createDataFrame([(["c", "b", "a"],), ([],)], ['data']) @@ -6479,8 +6560,10 @@ def array_position(col: "ColumnOrName", value: Any) -> Column: def element_at(col: "ColumnOrName", extraction: Any) -> Column: """ - Collection function: Returns element of array at given index in extraction if col is array. - Returns value for the given key in extraction if col is map. + Collection function: Returns element of array at given index in `extraction` if col is array. + Returns value for the given key in `extraction` if col is map. If position is negative + then location of the element will start from end, if number is outside the + array boundaries then None will be returned. .. versionadded:: 2.4.0 @@ -6491,6 +6574,11 @@ def element_at(col: "ColumnOrName", extraction: Any) -> Column: extraction : index to check for in array or key to check for in map + Returns + ------- + :class:`~pyspark.sql.Column` + value at given position. + Notes ----- The position is not zero based, but 1 based index. @@ -6504,6 +6592,13 @@ def element_at(col: "ColumnOrName", extraction: Any) -> Column: >>> df = spark.createDataFrame([(["a", "b", "c"],)], ['data']) >>> df.select(element_at(df.data, 1)).collect() [Row(element_at(data, 1)='a')] + >>> df.select(element_at(df.data, -1)).collect() + [Row(element_at(data, -1)='c')] + + Returns `None` if there is no value corresponding to the given `extraction`. + + >>> df.select(element_at(df.data, -4)).collect() + [Row(element_at(data, -4)=None)] >>> df = spark.createDataFrame([({"a": 1.0, "b": 2.0},)], ['data']) >>> df.select(element_at(df.data, lit("a"))).collect() @@ -6527,6 +6622,11 @@ def get(col: "ColumnOrName", index: Union["ColumnOrName", int]) -> Column: index : :class:`~pyspark.sql.Column` or str or int index to check for in array + Returns + ------- + :class:`~pyspark.sql.Column` + value at given position. + Notes ----- The position is not 1 based, but 0 based index. @@ -6591,6 +6691,11 @@ def array_remove(col: "ColumnOrName", element: Any) -> Column: element : element to be removed from the array + Returns + ------- + :class:`~pyspark.sql.Column` + an array excluding given value. + Examples -------- >>> df = spark.createDataFrame([([1, 2, 3, 1, 1],), ([],)], ['data']) @@ -6611,6 +6716,11 @@ def array_distinct(col: "ColumnOrName") -> Column: col : :class:`~pyspark.sql.Column` or str name of column or expression + Returns + ------- + :class:`~pyspark.sql.Column` + an array of unique values. + Examples -------- >>> df = spark.createDataFrame([([1, 2, 3, 2],), ([4, 5, 5, 4],)], ['data']) @@ -6634,6 +6744,11 @@ def array_intersect(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: col2 : :class:`~pyspark.sql.Column` or str name of column containing array + Returns + ------- + :class:`~pyspark.sql.Column` + an array of values in the intersection of two arrays. + Examples -------- >>> from pyspark.sql import Row @@ -6658,6 +6773,11 @@ def array_union(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: col2 : :class:`~pyspark.sql.Column` or str name of column containing array + Returns + ------- + :class:`~pyspark.sql.Column` + an array of values in union of two arrays. + Examples -------- >>> from pyspark.sql import Row @@ -6682,6 +6802,11 @@ def array_except(col1: "ColumnOrName", col2: "ColumnOrName") -> Column: col2 : :class:`~pyspark.sql.Column` or str name of column containing array + Returns + ------- + :class:`~pyspark.sql.Column` + an array of values from first array that are not in the second. + Examples -------- >>> from pyspark.sql import Row @@ -6700,6 +6825,22 @@ def explode(col: "ColumnOrName") -> Column: .. versionadded:: 1.4.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + one row per array item or map key value. + + See Also + -------- + :meth:`pyspark.functions.posexplode` + :meth:`pyspark.functions.explode_outer` + :meth:`pyspark.functions.posexplode_outer` + Examples -------- >>> from pyspark.sql import Row @@ -6725,6 +6866,16 @@ def posexplode(col: "ColumnOrName") -> Column: .. versionadded:: 2.1.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + one row per array item or map key value including positions as a separate column. + Examples -------- >>> from pyspark.sql import Row @@ -6751,6 +6902,16 @@ def explode_outer(col: "ColumnOrName") -> Column: .. versionadded:: 2.3.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + one row per array item or map key value. + Examples -------- >>> df = spark.createDataFrame( @@ -6788,6 +6949,16 @@ def posexplode_outer(col: "ColumnOrName") -> Column: .. versionadded:: 2.3.0 + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + one row per array item or map key value including positions as a separate column. + Examples -------- >>> df = spark.createDataFrame( @@ -6817,7 +6988,7 @@ def posexplode_outer(col: "ColumnOrName") -> Column: def get_json_object(col: "ColumnOrName", path: str) -> Column: """ - Extracts json object from a json string based on json path specified, and returns json string + Extracts json object from a json string based on json `path` specified, and returns json string of the extracted json object. It will return null if the input json string is invalid. .. versionadded:: 1.6.0 @@ -6829,6 +7000,11 @@ def get_json_object(col: "ColumnOrName", path: str) -> Column: path : str path to the json object to extract + Returns + ------- + :class:`~pyspark.sql.Column` + string representation of given JSON object value. + Examples -------- >>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')] @@ -6850,7 +7026,12 @@ def json_tuple(col: "ColumnOrName", *fields: str) -> Column: col : :class:`~pyspark.sql.Column` or str string column in json format fields : str - fields to extract + a field or fields to extract + + Returns + ------- + :class:`~pyspark.sql.Column` + a new row for each given field value from json object Examples -------- @@ -6890,6 +7071,11 @@ def from_json( .. # noqa + Returns + ------- + :class:`~pyspark.sql.Column` + a new column of complex type from given JSON object. + Examples -------- >>> from pyspark.sql.types import * @@ -6944,6 +7130,11 @@ def to_json(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Co .. # noqa + Returns + ------- + :class:`~pyspark.sql.Column` + JSON object as string column. + Examples -------- >>> from pyspark.sql import Row @@ -6993,6 +7184,11 @@ def schema_of_json(json: "ColumnOrName", options: Optional[Dict[str, str]] = Non .. versionchanged:: 3.0 It accepts `options` parameter to control schema inferring. + Returns + ------- + :class:`~pyspark.sql.Column` + a string representation of a :class:`StructType` parsed from given JSON. + Examples -------- >>> df = spark.range(1) @@ -7029,6 +7225,11 @@ def schema_of_csv(csv: "ColumnOrName", options: Optional[Dict[str, str]] = None) .. # noqa + Returns + ------- + :class:`~pyspark.sql.Column` + a string representation of a :class:`StructType` parsed from given CSV. + Examples -------- >>> df = spark.range(1) @@ -7065,6 +7266,11 @@ def to_csv(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Col .. # noqa + Returns + ------- + :class:`~pyspark.sql.Column` + a CSV string converted from given :class:`StructType`. + Examples -------- >>> from pyspark.sql import Row diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c00b469e7b39..70ff0039fa2b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1091,7 +1091,7 @@ class Analyzer(override val catalogManager: CatalogManager) }.getOrElse(u) case u @ UnresolvedView(identifier, cmd, allowTemp, relationTypeMismatchHint) => - lookupTableOrView(identifier).map { + lookupTableOrView(identifier, viewOnly = true).map { case _: ResolvedTempView if !allowTemp => throw QueryCompilationErrors.expectViewNotTempViewError(identifier, cmd, u) case t: ResolvedTable => @@ -1137,12 +1137,17 @@ class Analyzer(override val catalogManager: CatalogManager) * Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is * for resolving DDL and misc commands. */ - private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = { + private def lookupTableOrView( + identifier: Seq[String], + viewOnly: Boolean = false): Option[LogicalPlan] = { lookupTempView(identifier).map { tempView => ResolvedTempView(identifier.asIdentifier, tempView.tableMeta.schema) }.orElse { expandIdentifier(identifier) match { case CatalogAndIdentifier(catalog, ident) => + if (viewOnly && !CatalogV2Util.isSessionCatalog(catalog)) { + throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views") + } CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) && v1Table.v1Table.tableType == CatalogTableType.VIEW => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 68ed8991553e3..587ede9a81bf2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -131,12 +131,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case u: UnresolvedTable => u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}") - case u @ UnresolvedView(NonSessionCatalogAndIdentifier(catalog, ident), cmd, _, _) => - u.failAnalysis( - s"Cannot specify catalog `${catalog.name}` for view ${ident.quoted} " + - "because view support in v2 catalog has not been implemented yet. " + - s"$cmd expects a view.") - case u: UnresolvedView => u.failAnalysis(s"View not found: ${u.multipartIdentifier.quoted}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 805f3080c8472..5c2c6d918a238 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -54,6 +54,10 @@ case class NoSuchTableException( def this(tableIdent: Identifier) = { this(s"Table ${tableIdent.quoted} not found") } + + def this(nameParts: Seq[String]) = { + this(s"Table ${nameParts.quoted} not found") + } } case class NoSuchPartitionException( @@ -74,19 +78,17 @@ case class NoSuchPartitionException( case class NoSuchPermanentFunctionException(db: String, func: String) extends AnalysisException(s"Function '$func' not found in database '$db'") -case class NoSuchFunctionException( - override val message: String, - override val cause: Option[Throwable]) - extends AnalysisException(message, cause = cause) { +case class NoSuchFunctionException(override val message: String) + extends AnalysisException(message) { - def this(db: String, func: String, cause: Option[Throwable] = None) = { + def this(db: String, func: String) = { this(s"Undefined function: '$func'. " + - s"This function is neither a registered temporary function nor " + - s"a permanent function registered in the database '$db'.", cause = cause) + "This function is neither a registered temporary function nor " + + s"a permanent function registered in the database '$db'.") } def this(identifier: Identifier) = { - this(s"Undefined function: ${identifier.quoted}", cause = None) + this(s"Undefined function: ${identifier.quoted}") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 9893384b709f5..221f1a0f3135c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog} /** * Resolves the catalog of the name parts for table/view/function/namespace. @@ -28,8 +28,14 @@ class ResolveCatalogs(val catalogManager: CatalogManager) extends Rule[LogicalPlan] with LookupCatalog { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case UnresolvedIdentifier(CatalogAndIdentifier(catalog, identifier)) => - ResolvedIdentifier(catalog, identifier) + case UnresolvedIdentifier(nameParts, allowTemp) => + if (allowTemp && catalogManager.v1SessionCatalog.isTempView(nameParts)) { + val ident = Identifier.of(nameParts.dropRight(1).toArray, nameParts.last) + ResolvedIdentifier(FakeSystemCatalog, ident) + } else { + val CatalogAndIdentifier(catalog, identifier) = nameParts + ResolvedIdentifier(catalog, identifier) + } case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) => s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace)) case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala index a7370254826b4..7a2bd1ccc154f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable} +import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, LogicalPlan, NoopCommand, UncacheTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND @@ -29,10 +29,6 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND object ResolveCommandsWithIfExists extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(COMMAND)) { - case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists => - NoopCommand("DROP TABLE", u.multipartIdentifier) - case DropView(u: UnresolvedView, ifExists) if ifExists => - NoopCommand("DROP VIEW", u.multipartIdentifier) case UncacheTable(u: UnresolvedRelation, ifExists, _) if ifExists => NoopCommand("UNCACHE TABLE", u.multipartIdentifier) case DropFunction(u: UnresolvedFunc, ifExists) if ifExists => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index c3fc5533a8e02..321eecf42b09a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to @@ -135,7 +136,8 @@ case class UnresolvedFunc( * Holds the name of a table/view/function identifier that we need to determine the catalog. It will * be resolved to [[ResolvedIdentifier]] during analysis. */ -case class UnresolvedIdentifier(nameParts: Seq[String]) extends LeafNode { +case class UnresolvedIdentifier(nameParts: Seq[String], allowTemp: Boolean = false) + extends LeafNode { override lazy val resolved: Boolean = false override def output: Seq[Attribute] = Nil } @@ -244,3 +246,9 @@ case class ResolvedIdentifier( identifier: Identifier) extends LeafNodeWithoutStats { override def output: Seq[Attribute] = Nil } + +// A fake v2 catalog to hold temp views. +object FakeSystemCatalog extends CatalogPlugin { + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {} + override def name(): String = "SYSTEM" +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 5d5d8b202c533..1ada2ffa4fc15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder @@ -1588,10 +1587,9 @@ class SessionCatalog( TableFunctionRegistry.builtin.functionExists(name) } - protected[sql] def failFunctionLookup( - name: FunctionIdentifier, cause: Option[Throwable] = None): Nothing = { + protected[sql] def failFunctionLookup(name: FunctionIdentifier): Nothing = { throw new NoSuchFunctionException( - db = name.database.getOrElse(getCurrentDatabase), func = name.funcName, cause) + db = name.database.getOrElse(getCurrentDatabase), func = name.funcName) } /** @@ -1732,11 +1730,7 @@ class SessionCatalog( // The function has not been loaded to the function registry, which means // that the function is a persistent function (if it actually has been registered // in the metastore). We need to first put the function in the function registry. - val catalogFunction = try { - externalCatalog.getFunction(db, funcName) - } catch { - case _: AnalysisException => failFunctionLookup(qualifiedIdent) - } + val catalogFunction = externalCatalog.getFunction(db, funcName) loadFunctionResources(catalogFunction.resources) // Please note that qualifiedName is provided by the user. However, // catalogFunction.identifier.unquotedString is returned by the underlying diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 76de49d86dc42..d2ea8df415108 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3673,7 +3673,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { // DROP TABLE works with either a table or a temporary view. DropTable( - createUnresolvedTableOrView(ctx.multipartIdentifier(), "DROP TABLE"), + UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier()), allowTemp = true), ctx.EXISTS != null, ctx.PURGE != null) } @@ -3683,11 +3683,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit */ override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) { DropView( - createUnresolvedView( - ctx.multipartIdentifier(), - commandName = "DROP VIEW", - allowTemp = true, - relationTypeMismatchHint = Some("Please use DROP TABLE instead.")), + UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier()), allowTemp = true), ctx.EXISTS != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index c1d8f0a4a8a51..ef1d0dd94fc91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -604,6 +604,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { "operation" -> operation)) } + def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String): Throwable = { + new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE", + errorSubClass = "CATALOG_OPERATION", + messageParameters = Map( + "catalogName" -> toSQLId(Seq(catalog.name())), + "operation" -> operation)) + } + def alterColumnWithV1TableCannotSpecifyNotNullError(): Throwable = { new AnalysisException("ALTER COLUMN with v1 tables cannot specify NOT NULL.") } @@ -958,6 +967,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { new NoSuchTableException(ident) } + def noSuchTableError(nameParts: Seq[String]): Throwable = { + new NoSuchTableException(nameParts) + } + def noSuchNamespaceError(namespace: Array[String]): Throwable = { new NoSuchNamespaceException(namespace) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala index e50a58f8ce5fe..785d5ae05cfff 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala @@ -34,14 +34,12 @@ class AnalysisExceptionPositionSuite extends AnalysisTest { } test("SPARK-33918: UnresolvedView should retain sql text position") { - verifyViewPosition("DROP VIEW unknown", "unknown") verifyViewPosition("ALTER VIEW unknown SET TBLPROPERTIES ('k'='v')", "unknown") verifyViewPosition("ALTER VIEW unknown UNSET TBLPROPERTIES ('k')", "unknown") verifyViewPosition("ALTER VIEW unknown AS SELECT 1", "unknown") } test("SPARK-34057: UnresolvedTableOrView should retain sql text position") { - verifyTableOrViewPosition("DROP TABLE unknown", "unknown") verifyTableOrViewPosition("DESCRIBE TABLE unknown", "unknown") verifyTableOrPermanentViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS", "unknown") verifyTableOrViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS FOR COLUMNS col", "unknown") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 62491e04831cc..6ed14bd641653 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -1728,20 +1728,6 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { } } - test("SPARK-24544: test print actual failure cause when look up function failed") { - withBasicCatalog { catalog => - val cause = intercept[NoSuchFunctionException] { - catalog.failFunctionLookup(FunctionIdentifier("failureFunc"), - Some(new Exception("Actual error"))) - } - - // fullStackTrace will be printed, but `cause.getMessage` has been - // override in `AnalysisException`,so here we get the root cause - // exception message for check. - assert(cause.cause.get.getMessage.contains("Actual error")) - } - } - test("expire table relation cache if TTL is configured") { case class TestCommand() extends LeafCommand diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 25bacc3631efb..ba2cc4ff15e57 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -685,15 +685,15 @@ class DDLParserSuite extends AnalysisTest { val cmd = "DROP VIEW" val hint = Some("Please use DROP TABLE instead.") parseCompare(s"DROP VIEW testcat.db.view", - DropView(UnresolvedView(Seq("testcat", "db", "view"), cmd, true, hint), ifExists = false)) + DropView(UnresolvedIdentifier(Seq("testcat", "db", "view"), true), ifExists = false)) parseCompare(s"DROP VIEW db.view", - DropView(UnresolvedView(Seq("db", "view"), cmd, true, hint), ifExists = false)) + DropView(UnresolvedIdentifier(Seq("db", "view"), true), ifExists = false)) parseCompare(s"DROP VIEW IF EXISTS db.view", - DropView(UnresolvedView(Seq("db", "view"), cmd, true, hint), ifExists = true)) + DropView(UnresolvedIdentifier(Seq("db", "view"), true), ifExists = true)) parseCompare(s"DROP VIEW view", - DropView(UnresolvedView(Seq("view"), cmd, true, hint), ifExists = false)) + DropView(UnresolvedIdentifier(Seq("view"), true), ifExists = false)) parseCompare(s"DROP VIEW IF EXISTS view", - DropView(UnresolvedView(Seq("view"), cmd, true, hint), ifExists = true)) + DropView(UnresolvedIdentifier(Seq("view"), true), ifExists = true)) } private def testCreateOrReplaceDdl( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index cca0a56174da1..56236f0d2ad03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -216,19 +216,23 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } - case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) => + case DropTable(ResolvedV1Identifier(ident), ifExists, purge) => DropTableCommand(ident, ifExists, isView = false, purge = purge) - case DropTable(_: ResolvedPersistentView, ifExists, purge) => - throw QueryCompilationErrors.cannotDropViewWithDropTableError - // v1 DROP TABLE supports temp view. - case DropTable(ResolvedTempView(ident, _), ifExists, purge) => - DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge) + case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) => + DropTempViewCommand(ident) - case DropView(ResolvedViewIdentifier(ident), ifExists) => + case DropView(ResolvedV1Identifier(ident), ifExists) => DropTableCommand(ident, ifExists, isView = true, purge = false) + case DropView(r @ ResolvedIdentifier(catalog, ident), _) => + if (catalog == FakeSystemCatalog) { + DropTempViewCommand(ident) + } else { + throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views") + } + case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command => val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) val location = c.properties.get(SupportsNamespaces.PROP_LOCATION) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 527f78ef10e34..e9bbbc717d1e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint -import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint} +import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View} import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation @@ -159,11 +159,51 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { plan: LogicalPlan, cascade: Boolean, blocking: Boolean = false): Unit = { + uncacheQuery(spark, _.sameResult(plan), cascade, blocking) + } + + def uncacheTableOrView(spark: SparkSession, name: Seq[String], cascade: Boolean): Unit = { + uncacheQuery( + spark, + isMatchedTableOrView(_, name, spark.sessionState.conf), + cascade, + blocking = false) + } + + private def isMatchedTableOrView(plan: LogicalPlan, name: Seq[String], conf: SQLConf): Boolean = { + def isSameName(nameInCache: Seq[String]): Boolean = { + nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled) + } + + plan match { + case SubqueryAlias(ident, LogicalRelation(_, _, Some(catalogTable), _)) => + val v1Ident = catalogTable.identifier + isSameName(ident.qualifier :+ ident.name) && + isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) + + case SubqueryAlias(ident, DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _)) => + isSameName(ident.qualifier :+ ident.name) && + isSameName(catalog.name() +: v2Ident.namespace() :+ v2Ident.name()) + + case SubqueryAlias(ident, View(catalogTable, _, _)) => + val v1Ident = catalogTable.identifier + isSameName(ident.qualifier :+ ident.name) && + isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) + + case _ => false + } + } + + def uncacheQuery( + spark: SparkSession, + isMatchedPlan: LogicalPlan => Boolean, + cascade: Boolean, + blocking: Boolean): Unit = { val shouldRemove: LogicalPlan => Boolean = if (cascade) { - _.exists(_.sameResult(plan)) + _.exists(isMatchedPlan) } else { - _.sameResult(plan) + isMatchedPlan } val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) this.synchronized { @@ -187,7 +227,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // will keep it as it is. It means the physical plan has been re-compiled already in the // other thread. val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded - cd.plan.exists(_.sameResult(plan)) && !cacheAlreadyLoaded + cd.plan.exists(isMatchedPlan) && !cacheAlreadyLoaded }) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 078358b6c7dbd..1f71a104707a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -203,7 +203,8 @@ case class DescribeDatabaseCommand( } /** - * Drops a table/view from the metastore and removes it if it is cached. + * Drops a table/view from the metastore and removes it if it is cached. This command does not drop + * temp views, which should be handled by [[DropTempViewCommand]]. * * The syntax of this command is: * {{{ @@ -219,9 +220,8 @@ case class DropTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val isTempView = catalog.isTempView(tableName) - if (!isTempView && catalog.tableExists(tableName)) { + if (catalog.tableExists(tableName)) { // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view // issue an exception. catalog.getTableMetadata(tableName).tableType match { @@ -231,14 +231,10 @@ case class DropTableCommand( throw QueryCompilationErrors.cannotDropViewWithDropTableError() case _ => } - } - if (isTempView || catalog.tableExists(tableName)) { try { - val hasViewText = isTempView && - catalog.getTempViewOrPermanentTableMetadata(tableName).viewText.isDefined sparkSession.sharedState.cacheManager.uncacheQuery( - sparkSession.table(tableName), cascade = !isTempView || hasViewText) + sparkSession.table(tableName), cascade = true) } catch { case NonFatal(e) => log.warn(e.toString, e) } @@ -247,7 +243,28 @@ case class DropTableCommand( } else if (ifExists) { // no-op } else { - throw QueryCompilationErrors.tableOrViewNotFoundError(tableName.identifier) + throw QueryCompilationErrors.noSuchTableError( + tableName.catalog.toSeq ++ tableName.database :+ tableName.table) + } + Seq.empty[Row] + } +} + +case class DropTempViewCommand(ident: Identifier) extends LeafRunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(ident.namespace().isEmpty || ident.namespace().length == 1) + val nameParts = ident.namespace() :+ ident.name() + val catalog = sparkSession.sessionState.catalog + catalog.getRawLocalOrGlobalTempView(nameParts).foreach { view => + val hasViewText = view.tableMeta.viewText.isDefined + sparkSession.sharedState.cacheManager.uncacheTableOrView( + sparkSession, nameParts, cascade = hasViewText) + view.refresh() + if (ident.namespace().isEmpty) { + catalog.dropTempView(ident.name()) + } else { + catalog.dropGlobalTempView(ident.name()) + } } Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala index 09c8756ca0189..67e77a97865df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -111,12 +111,14 @@ private class PartitionIterator[T]( reader: PartitionReader[T], customMetrics: Map[String, SQLMetric]) extends Iterator[T] { private[this] var valuePrepared = false + private[this] var hasMoreInput = true private var numRow = 0L override def hasNext: Boolean = { - if (!valuePrepared) { - valuePrepared = reader.next() + if (!valuePrepared && hasMoreInput) { + hasMoreInput = reader.next() + valuePrepared = hasMoreInput } valuePrepared } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 35a5f41fb1768..39ad51ffbe73c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -326,8 +326,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat "DESC TABLE COLUMN", toPrettySQL(nested)) } - case DropTable(r: ResolvedTable, ifExists, purge) => - DropTableExec(r.catalog, r.identifier, ifExists, purge, invalidateTableCache(r)) :: Nil + case DropTable(r: ResolvedIdentifier, ifExists, purge) => + val invalidateFunc = () => session.sharedState.cacheManager.uncacheTableOrView( + session, r.catalog.name() +: r.identifier.namespace() :+ r.identifier.name(), + cascade = true) + DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil case _: NoopCommand => LocalTableScanExec(Nil, Nil) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index 1e0627fb6dfdd..2125b58813f85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -37,7 +37,8 @@ case class DropTableExec( invalidateCache() if (purge) catalog.purgeTable(ident) else catalog.dropTable(ident) } else if (!ifExists) { - throw QueryCompilationErrors.noSuchTableError(ident) + throw QueryCompilationErrors.noSuchTableError( + catalog.name() +: ident.namespace() :+ ident.name()) } Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 06ba1ceb35741..12427ece23670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming import scala.collection.mutable.{Map => MutableMap} +import scala.collection.mutable import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.encoders.RowEncoder @@ -553,7 +554,7 @@ class MicroBatchExecution( logDebug(s"Running batch $currentBatchId") // Request unprocessed data from all sources. - newData = reportTimeTaken("getBatch") { + val mutableNewData = mutable.Map.empty ++ reportTimeTaken("getBatch") { availableOffsets.flatMap { case (source: Source, available: Offset) if committedOffsets.get(source).map(_ != available).getOrElse(true) => @@ -590,7 +591,7 @@ class MicroBatchExecution( val newBatchesPlan = logicalPlan transform { // For v1 sources. case StreamingExecutionRelation(source, output, catalogTable) => - newData.get(source).map { dataPlan => + mutableNewData.get(source).map { dataPlan => val hasFileMetadata = output.exists { case FileSourceMetadataAttribute(_) => true case _ => false @@ -608,6 +609,11 @@ class MicroBatchExecution( } newRelation } + // SPARK-40460: overwrite the entry with the new logicalPlan + // because it might contain the _metadata column. It is a necessary change, + // in the ProgressReporter, we use the following mapping to get correct streaming metrics: + // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan + mutableNewData.put(source, finalDataPlan) val maxFields = SQLConf.get.maxToStringFields assert(output.size == finalDataPlan.output.size, s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " + @@ -623,14 +629,14 @@ class MicroBatchExecution( // For v2 sources. case r: StreamingDataSourceV2Relation => - newData.get(r.stream).map { + mutableNewData.get(r.stream).map { case OffsetHolder(start, end) => r.copy(startOffset = Some(start), endOffset = Some(end)) }.getOrElse { LocalRelation(r.output, isStreaming = true) } } - + newData = mutableNewData.toMap // Rewire the plan to use the new attributes that were returned by the source. val newAttributePlan = newBatchesPlan.transformAllExpressionsWithPruning( _.containsPattern(CURRENT_LIKE)) { diff --git a/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-analytics.sql b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-analytics.sql new file mode 100644 index 0000000000000..0249d98b6be8a --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-analytics.sql @@ -0,0 +1,70 @@ +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2) +AS testData(a, b); + +-- CUBE on overlapping columns +SELECT a + b, b, udaf(a - b) FROM testData GROUP BY a + b, b WITH CUBE; + +SELECT a, b, udaf(b) FROM testData GROUP BY a, b WITH CUBE; + +-- ROLLUP on overlapping columns +SELECT a + b, b, udaf(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP; + +SELECT a, b, udaf(b) FROM testData GROUP BY a, b WITH ROLLUP; + +CREATE OR REPLACE TEMPORARY VIEW courseSales AS SELECT * FROM VALUES +("dotNET", 2012, 10000), ("Java", 2012, 20000), ("dotNET", 2012, 5000), ("dotNET", 2013, 48000), ("Java", 2013, 30000) +AS courseSales(course, year, earnings); + +-- ROLLUP +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY course, year; +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY ROLLUP(course, year, (course, year)) ORDER BY course, year; +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY ROLLUP(course, year, (course, year), ()) ORDER BY course, year; + +-- CUBE +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, year; +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year, (course, year)) ORDER BY course, year; +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year, (course, year), ()) ORDER BY course, year; + +-- GROUPING SETS +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year); +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year, ()); +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course); +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year); + +-- Partial ROLLUP/CUBE/GROUPING SETS +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, CUBE(course, year) ORDER BY course, year; +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year), ROLLUP(course, year) ORDER BY course, year; +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year), ROLLUP(course, year), GROUPING SETS(course, year) ORDER BY course, year; + +-- GROUPING SETS with aggregate functions containing groupBy columns +SELECT course, udaf(earnings) AS sum FROM courseSales +GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum; +SELECT course, udaf(earnings) AS sum, GROUPING_ID(course, earnings) FROM courseSales +GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum; + +-- Aliases in SELECT could be used in ROLLUP/CUBE/GROUPING SETS +SELECT a + b AS k1, b AS k2, udaf(a - b) FROM testData GROUP BY CUBE(k1, k2); +SELECT a + b AS k, b, udaf(a - b) FROM testData GROUP BY ROLLUP(k, b); +SELECT a + b, b AS k, udaf(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k); + +-- GROUP BY use mixed Separate columns and CUBE/ROLLUP/Gr +SELECT a, b, udaf(1) FROM testData GROUP BY a, b, CUBE(a, b); +SELECT a, b, udaf(1) FROM testData GROUP BY a, b, ROLLUP(a, b); +SELECT a, b, udaf(1) FROM testData GROUP BY CUBE(a, b), ROLLUP(a, b); +SELECT a, b, udaf(1) FROM testData GROUP BY a, CUBE(a, b), ROLLUP(b); +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS((a, b), (a), ()); +SELECT a, b, udaf(1) FROM testData GROUP BY a, CUBE(a, b), GROUPING SETS((a, b), (a), ()); +SELECT a, b, udaf(1) FROM testData GROUP BY a, CUBE(a, b), ROLLUP(a, b), GROUPING SETS((a, b), (a), ()); + +-- Support nested CUBE/ROLLUP/GROUPING SETS in GROUPING SETS +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(ROLLUP(a, b)); +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(GROUPING SETS((a, b), (a), ())); + +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS((a, b), GROUPING SETS(ROLLUP(a, b))); +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS((a, b, a, b), (a, b, a), (a, b)); +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(GROUPING SETS((a, b, a, b), (a, b, a), (a, b))); + +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(ROLLUP(a, b), CUBE(a, b)); +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(GROUPING SETS((a, b), (a), ()), GROUPING SETS((a, b), (a), (b), ())); +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS((a, b), (a), (), (a, b), (a), (b), ()); diff --git a/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-by-ordinal.sql b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-by-ordinal.sql new file mode 100644 index 0000000000000..ded3e74d20a7e --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-by-ordinal.sql @@ -0,0 +1,88 @@ +-- group by ordinal positions + +create temporary view data as select * from values + (1, 1), + (1, 2), + (2, 1), + (2, 2), + (3, 1), + (3, 2) + as data(a, b); + +-- basic case +select a, udaf(b) from data group by 1; + +-- constant case +select 1, 2, udaf(b) from data group by 1, 2; + +-- duplicate group by column +select a, 1, udaf(b) from data group by a, 1; +select a, 1, udaf(b) from data group by 1, 2; + +-- group by a non-aggregate expression's ordinal +select a, b + 2, udaf(2) from data group by a, 2; + +-- with alias +select a as aa, b + 2 as bb, udaf(2) from data group by 1, 2; + +-- foldable non-literal: this should be the same as no grouping. +select udaf(b) from data group by 1 + 0; + +-- negative case: position is an aggregate expression +select a, b, udaf(b) from data group by 3; +select a, b, udaf(b) + 2 from data group by 3; + +-- negative case: nondeterministic expression +select a, rand(0), udaf(b) +from +(select /*+ REPARTITION(1) */ a, b from data) group by a, 2; + +-- group by ordinal followed by order by +select a, udaf(a) from (select 1 as a) tmp group by 1 order by 1; + +-- group by ordinal followed by having +select udaf(a), a from (select 1 as a) tmp group by 2 having a > 0; + +-- mixed cases: group-by ordinals and aliases +select a, a AS k, udaf(b) from data group by k, 1; + +-- can use ordinal in CUBE +select a, b, udaf(1) from data group by cube(1, 2); + +-- mixed cases: can use ordinal in CUBE +select a, b, udaf(1) from data group by cube(1, b); + +-- can use ordinal with cube +select a, b, udaf(1) from data group by 1, 2 with cube; + +-- can use ordinal in ROLLUP +select a, b, udaf(1) from data group by rollup(1, 2); + +-- mixed cases: can use ordinal in ROLLUP +select a, b, udaf(1) from data group by rollup(1, b); + +-- can use ordinal with rollup +select a, b, udaf(1) from data group by 1, 2 with rollup; + +-- can use ordinal in GROUPING SETS +select a, b, udaf(1) from data group by grouping sets((1), (2), (1, 2)); + +-- mixed cases: can use ordinal in GROUPING SETS +select a, b, udaf(1) from data group by grouping sets((1), (b), (a, 2)); + +select a, b, udaf(1) from data group by a, 2 grouping sets((1), (b), (a, 2)); + +-- range error +select a, b, udaf(1) from data group by a, -1; + +select a, b, udaf(1) from data group by a, 3; + +select a, b, udaf(1) from data group by cube(-1, 2); + +select a, b, udaf(1) from data group by cube(1, 3); + +-- turn off group by ordinal +set spark.sql.groupByOrdinal=false; + +-- can now group by negative literal +select udaf(b) from data group by -1; diff --git a/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-by.sql new file mode 100644 index 0000000000000..eaac13bcf6a93 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-group-by.sql @@ -0,0 +1,110 @@ +-- Test aggregate operator with codegen on and off. +--CONFIG_DIM1 spark.sql.codegen.wholeStage=true +--CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY +--CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN + +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2), (null, 1), (3, null), (null, null) +AS testData(a, b); + +-- Aggregate with empty GroupBy expressions. +SELECT a, udaf(b) FROM testData; +SELECT udaf(a), udaf(b) FROM testData; + +-- Aggregate with non-empty GroupBy expressions. +SELECT a, udaf(b) FROM testData GROUP BY a; +SELECT a, udaf(b) FROM testData GROUP BY b; +SELECT udaf(a), udaf(b) FROM testData GROUP BY a; + +-- Aggregate grouped by literals. +SELECT 'foo', udaf(a) FROM testData GROUP BY 1; + +-- Aggregate grouped by literals (hash aggregate). +SELECT 'foo', udaf(a) FROM testData WHERE a = 0 GROUP BY 1; + +-- Aggregate grouped by literals (sort aggregate). +SELECT 'foo', udaf(STRUCT(a)) FROM testData WHERE a = 0 GROUP BY 1; + +-- Aggregate with complex GroupBy expressions. +SELECT a + b, udaf(b) FROM testData GROUP BY a + b; +SELECT a + 2, udaf(b) FROM testData GROUP BY a + 1; +SELECT a + 1 + 1, udaf(b) FROM testData GROUP BY a + 1; + +-- Aggregate with nulls. +SELECT SKEWNESS(a), KURTOSIS(a), udaf(a), udaf(a), AVG(a), VARIANCE(a), STDDEV(a), SUM(a), udaf(a) +FROM testData; + +-- Aggregate with foldable input and multiple distinct groups. +SELECT udaf(DISTINCT b), udaf(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a; + +-- Aliases in SELECT could be used in GROUP BY +SELECT a AS k, udaf(b) FROM testData GROUP BY k; +SELECT a AS k, udaf(b) FROM testData GROUP BY k HAVING k > 1; + +-- GROUP BY alias with invalid col in SELECT list +SELECT a AS k, udaf(non_existing) FROM testData GROUP BY k; + +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW testDataHasSameNameWithAlias AS SELECT * FROM VALUES +(1, 1, 3), (1, 2, 1) AS testDataHasSameNameWithAlias(k, a, v); +SELECT k AS a, udaf(v) FROM testDataHasSameNameWithAlias GROUP BY a; + +-- turn off group by aliases +set spark.sql.groupByAliases=false; + +-- Check analysis exceptions +SELECT a AS k, udaf(b) FROM testData GROUP BY k; + +-- Aggregate with empty input and non-empty GroupBy expressions. +SELECT a, udaf(1) FROM testData WHERE false GROUP BY a; + +-- Aggregate with empty input and empty GroupBy expressions. +SELECT udaf(1) FROM testData WHERE false; +SELECT 1 FROM (SELECT udaf(1) FROM testData WHERE false) t; + +-- Aggregate with empty GroupBy expressions and filter on top +SELECT 1 from ( + SELECT 1 AS z, + udaf(a.x) + FROM (select 1 as x) a + WHERE false +) b +where b.z != b.z; + +-- SPARK-25708 HAVING without GROUP BY means global aggregate +SELECT 1 FROM range(10) HAVING udaf(id) > 0; + +-- Test data +CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES + (1, true), (1, false), + (2, true), + (3, false), (3, null), + (4, null), (4, null), + (5, null), (5, true), (5, false) AS test_agg(k, v); + +-- empty table +SELECT udaf(v), some(v), any(v), bool_and(v), bool_or(v) FROM test_agg WHERE 1 = 0; + +-- all null values +SELECT udaf(v), some(v), any(v), bool_and(v), bool_or(v) FROM test_agg WHERE k = 4; + +-- aggregates are null Filtering +SELECT udaf(v), some(v), any(v), bool_and(v), bool_or(v) FROM test_agg WHERE k = 5; + +-- group by +SELECT k, udaf(v), some(v), any(v), bool_and(v), bool_or(v) FROM test_agg GROUP BY k; + +-- having +SELECT k, udaf(v) FROM test_agg GROUP BY k HAVING udaf(v) = false; +SELECT k, udaf(v) FROM test_agg GROUP BY k HAVING udaf(v) IS NULL; + +-- basic subquery path to make sure rewrite happens in both parent and child plans. +SELECT k, + udaf(v) AS count +FROM test_agg +WHERE k = 2 + AND v IN (SELECT Any(v) + FROM test_agg + WHERE k = 1) +GROUP BY k; diff --git a/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-grouping-set.sql b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-grouping-set.sql new file mode 100644 index 0000000000000..1217b9e5b09db --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf-grouping-set.sql @@ -0,0 +1,47 @@ +CREATE TEMPORARY VIEW grouping AS SELECT * FROM VALUES + ("1", "2", "3", 1), + ("4", "5", "6", 1), + ("7", "8", "9", 1) + as grouping(a, b, c, d); + +-- SPARK-17849: grouping set throws NPE #1 +SELECT a, b, c, udaf(d) FROM grouping GROUP BY a, b, c GROUPING SETS (()); + +-- SPARK-17849: grouping set throws NPE #2 +SELECT a, b, c, udaf(d) FROM grouping GROUP BY a, b, c GROUPING SETS ((a)); + +-- SPARK-17849: grouping set throws NPE #3 +SELECT a, b, c, udaf(d) FROM grouping GROUP BY a, b, c GROUPING SETS ((c)); + +-- Group sets without explicit group by +SELECT c1, udaf(c2) FROM (VALUES ('x', 10, 0), ('y', 20, 0)) AS t (c1, c2, c3) GROUP BY GROUPING SETS (c1); + +-- Group sets without group by and with grouping +SELECT c1, udaf(c2), grouping(c1) FROM (VALUES ('x', 10, 0), ('y', 20, 0)) AS t (c1, c2, c3) GROUP BY GROUPING SETS (c1); + +-- Mutiple grouping within a grouping set +SELECT c1, c2, udaf(c3), grouping__id +FROM (VALUES ('x', 'a', 10), ('y', 'b', 20) ) AS t (c1, c2, c3) +GROUP BY GROUPING SETS ( ( c1 ), ( c2 ) ) +HAVING GROUPING__ID > 1; + +-- complex expression in grouping sets +SELECT a + b, b, udaf(c) FROM (VALUES (1,1,1),(2,2,2)) AS t(a,b,c) GROUP BY GROUPING SETS ( (a + b), (b)); + +-- complex expression in grouping sets +SELECT a + b, b, udaf(c) FROM (VALUES (1,1,1),(2,2,2)) AS t(a,b,c) GROUP BY GROUPING SETS ( (a + b), (b + a), (b)); + +-- negative tests - must have at least one grouping expression +SELECT a, b, c, udaf(d) FROM grouping GROUP BY WITH ROLLUP; + +SELECT a, b, c, udaf(d) FROM grouping GROUP BY WITH CUBE; + +-- duplicate entries in grouping sets +SELECT k1, k2, udaf(v) FROM (VALUES (1,1,1),(2,2,2)) AS t(k1,k2,v) GROUP BY GROUPING SETS ((k1),(k1,k2),(k2,k1)); + +SELECT grouping__id, k1, k2, udaf(v) FROM (VALUES (1,1,1),(2,2,2)) AS t(k1,k2,v) GROUP BY GROUPING SETS ((k1),(k1,k2),(k2,k1)); + +SELECT grouping(k1), k1, k2, udaf(v) FROM (VALUES (1,1,1),(2,2,2)) AS t(k1,k2,v) GROUP BY GROUPING SETS ((k1),(k1,k2),(k2,k1)); + +-- grouping_id function +SELECT grouping_id(k1, k2), udaf(v) from (VALUES (1,1,1),(2,2,2)) AS t(k1,k2,v) GROUP BY k1, k2 GROUPING SETS ((k2, k1), k1); diff --git a/sql/core/src/test/resources/sql-tests/inputs/udaf.sql b/sql/core/src/test/resources/sql-tests/inputs/udaf/udaf.sql similarity index 100% rename from sql/core/src/test/resources/sql-tests/inputs/udaf.sql rename to sql/core/src/test/resources/sql-tests/inputs/udaf/udaf.sql diff --git a/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-analytics.sql.out new file mode 100644 index 0000000000000..4b16407e1a0c0 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-analytics.sql.out @@ -0,0 +1,1150 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2) +AS testData(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT a + b, b, udaf(a - b) FROM testData GROUP BY a + b, b WITH CUBE +-- !query schema +struct<(a + b):int,b:int,udaf((a - b)):int> +-- !query output +2 1 1 +2 NULL 0 +3 1 1 +3 2 1 +3 NULL 0 +4 1 1 +4 2 1 +4 NULL 0 +5 2 1 +5 NULL 0 +NULL 1 3 +NULL 2 3 +NULL NULL 0 + + +-- !query +SELECT a, b, udaf(b) FROM testData GROUP BY a, b WITH CUBE +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 0 +2 1 1 +2 2 1 +2 NULL 0 +3 1 1 +3 2 1 +3 NULL 0 +NULL 1 3 +NULL 2 3 +NULL NULL 0 + + +-- !query +SELECT a + b, b, udaf(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP +-- !query schema +struct<(a + b):int,b:int,udaf((a - b)):int> +-- !query output +2 1 1 +2 NULL 0 +3 1 1 +3 2 1 +3 NULL 0 +4 1 1 +4 2 1 +4 NULL 0 +5 2 1 +5 NULL 0 +NULL NULL 0 + + +-- !query +SELECT a, b, udaf(b) FROM testData GROUP BY a, b WITH ROLLUP +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 0 +2 1 1 +2 2 1 +2 NULL 0 +3 1 1 +3 2 1 +3 NULL 0 +NULL NULL 0 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW courseSales AS SELECT * FROM VALUES +("dotNET", 2012, 10000), ("Java", 2012, 20000), ("dotNET", 2012, 5000), ("dotNET", 2013, 48000), ("Java", 2013, 30000) +AS courseSales(course, year, earnings) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY course, year +-- !query schema +struct +-- !query output +NULL NULL 5 +Java NULL 2 +Java 2012 1 +Java 2013 1 +dotNET NULL 3 +dotNET 2012 2 +dotNET 2013 1 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY ROLLUP(course, year, (course, year)) ORDER BY course, year +-- !query schema +struct +-- !query output +NULL NULL 5 +Java NULL 2 +Java 2012 1 +Java 2012 1 +Java 2013 1 +Java 2013 1 +dotNET NULL 3 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2013 1 +dotNET 2013 1 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY ROLLUP(course, year, (course, year), ()) ORDER BY course, year +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +Empty set in ROLLUP grouping sets is not supported.(line 1, pos 62) + +== SQL == +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY ROLLUP(course, year, (course, year), ()) ORDER BY course, year +--------------------------------------------------------------^^^ + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, year +-- !query schema +struct +-- !query output +NULL NULL 5 +NULL 2012 3 +NULL 2013 2 +Java NULL 2 +Java 2012 1 +Java 2013 1 +dotNET NULL 3 +dotNET 2012 2 +dotNET 2013 1 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year, (course, year)) ORDER BY course, year +-- !query schema +struct +-- !query output +NULL NULL 5 +NULL 2012 3 +NULL 2013 2 +Java NULL 2 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +dotNET NULL 3 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year, (course, year), ()) ORDER BY course, year +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +Empty set in CUBE grouping sets is not supported.(line 1, pos 62) + +== SQL == +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year, (course, year), ()) ORDER BY course, year +--------------------------------------------------------------^^^ + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year) +-- !query schema +struct +-- !query output +Java NULL 2 +NULL 2012 3 +NULL 2013 2 +dotNET NULL 3 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year, ()) +-- !query schema +struct +-- !query output +Java NULL 2 +NULL 2012 3 +NULL 2013 2 +NULL NULL 5 +dotNET NULL 3 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course) +-- !query schema +struct +-- !query output +Java NULL 2 +dotNET NULL 3 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year) +-- !query schema +struct +-- !query output +NULL 2012 3 +NULL 2013 2 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY course, CUBE(course, year) ORDER BY course, year +-- !query schema +struct +-- !query output +Java NULL 2 +Java NULL 2 +Java 2012 1 +Java 2012 1 +Java 2013 1 +Java 2013 1 +dotNET NULL 3 +dotNET NULL 3 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2013 1 +dotNET 2013 1 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year), ROLLUP(course, year) ORDER BY course, year +-- !query schema +struct +-- !query output +NULL NULL 5 +NULL 2012 3 +NULL 2013 2 +Java NULL 2 +Java NULL 2 +Java NULL 2 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +dotNET NULL 3 +dotNET NULL 3 +dotNET NULL 3 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 + + +-- !query +SELECT course, year, udaf(earnings) FROM courseSales GROUP BY CUBE(course, year), ROLLUP(course, year), GROUPING SETS(course, year) ORDER BY course, year +-- !query schema +struct +-- !query output +NULL 2012 3 +NULL 2012 3 +NULL 2013 2 +NULL 2013 2 +Java NULL 2 +Java NULL 2 +Java NULL 2 +Java NULL 2 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2012 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +Java 2013 1 +dotNET NULL 3 +dotNET NULL 3 +dotNET NULL 3 +dotNET NULL 3 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2012 2 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 +dotNET 2013 1 + + +-- !query +SELECT course, udaf(earnings) AS sum FROM courseSales +GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum +-- !query schema +struct +-- !query output +NULL 0 +Java 0 +Java 1 +Java 1 +dotNET 0 +dotNET 1 +dotNET 1 +dotNET 1 + + +-- !query +SELECT course, udaf(earnings) AS sum, GROUPING_ID(course, earnings) FROM courseSales +GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum +-- !query schema +struct +-- !query output +NULL 0 3 +Java 0 1 +Java 1 0 +Java 1 0 +dotNET 0 1 +dotNET 1 0 +dotNET 1 0 +dotNET 1 0 + + +-- !query +SELECT a + b AS k1, b AS k2, udaf(a - b) FROM testData GROUP BY CUBE(k1, k2) +-- !query schema +struct +-- !query output +2 1 1 +2 NULL 1 +3 1 1 +3 2 1 +3 NULL 2 +4 1 1 +4 2 1 +4 NULL 2 +5 2 1 +5 NULL 1 +NULL 1 3 +NULL 2 3 +NULL NULL 6 + + +-- !query +SELECT a + b AS k, b, udaf(a - b) FROM testData GROUP BY ROLLUP(k, b) +-- !query schema +struct +-- !query output +2 1 1 +2 NULL 0 +3 1 1 +3 2 1 +3 NULL 0 +4 1 1 +4 2 1 +4 NULL 0 +5 2 1 +5 NULL 0 +NULL NULL 0 + + +-- !query +SELECT a + b, b AS k, udaf(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k) +-- !query schema +struct<(a + b):int,k:int,udaf((a - b)):int> +-- !query output +NULL 1 3 +NULL 2 3 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, b, CUBE(a, b) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, b, ROLLUP(a, b) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY CUBE(a, b), ROLLUP(a, b) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 NULL 2 +1 NULL 2 +1 NULL 2 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 NULL 2 +2 NULL 2 +2 NULL 2 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 NULL 2 +3 NULL 2 +3 NULL 2 +NULL 1 3 +NULL 2 3 +NULL NULL 6 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, CUBE(a, b), ROLLUP(b) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 NULL 2 +1 NULL 2 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 NULL 2 +2 NULL 2 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS((a, b), (a), ()) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, CUBE(a, b), GROUPING SETS((a, b), (a), ()) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, CUBE(a, b), ROLLUP(a, b), GROUPING SETS((a, b), (a), ()) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 2 1 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 2 1 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 2 1 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(ROLLUP(a, b)) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(GROUPING SETS((a, b), (a), ())) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS((a, b), GROUPING SETS(ROLLUP(a, b))) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 NULL 2 +1 NULL 2 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 NULL 2 +2 NULL 2 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS((a, b, a, b), (a, b, a), (a, b)) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(GROUPING SETS((a, b, a, b), (a, b, a), (a, b))) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(ROLLUP(a, b), CUBE(a, b)) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS(GROUPING SETS((a, b), (a), ()), GROUPING SETS((a, b), (a), (b), ())) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 + + +-- !query +SELECT a, b, udaf(1) FROM testData GROUP BY a, GROUPING SETS((a, b), (a), (), (a, b), (a), (b), ()) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 1 1 +1 2 1 +1 2 1 +1 2 1 +1 NULL 2 +1 NULL 2 +1 NULL 2 +1 NULL 2 +2 1 1 +2 1 1 +2 1 1 +2 2 1 +2 2 1 +2 2 1 +2 NULL 2 +2 NULL 2 +2 NULL 2 +2 NULL 2 +3 1 1 +3 1 1 +3 1 1 +3 2 1 +3 2 1 +3 2 1 +3 NULL 2 +3 NULL 2 +3 NULL 2 +3 NULL 2 diff --git a/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-by-ordinal.sql.out new file mode 100644 index 0000000000000..d6e3f111001af --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-by-ordinal.sql.out @@ -0,0 +1,405 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +create temporary view data as select * from values + (1, 1), + (1, 2), + (2, 1), + (2, 2), + (3, 1), + (3, 2) + as data(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +select a, udaf(b) from data group by 1 +-- !query schema +struct +-- !query output +1 2 +2 2 +3 2 + + +-- !query +select 1, 2, udaf(b) from data group by 1, 2 +-- !query schema +struct<1:int,2:int,udaf(b):int> +-- !query output +1 2 6 + + +-- !query +select a, 1, udaf(b) from data group by a, 1 +-- !query schema +struct +-- !query output +1 1 2 +2 1 2 +3 1 2 + + +-- !query +select a, 1, udaf(b) from data group by 1, 2 +-- !query schema +struct +-- !query output +1 1 2 +2 1 2 +3 1 2 + + +-- !query +select a, b + 2, udaf(2) from data group by a, 2 +-- !query schema +struct +-- !query output +1 3 1 +1 4 1 +2 3 1 +2 4 1 +3 3 1 +3 4 1 + + +-- !query +select a as aa, b + 2 as bb, udaf(2) from data group by 1, 2 +-- !query schema +struct +-- !query output +1 3 1 +1 4 1 +2 3 1 +2 4 1 +3 3 1 +3 4 1 + + +-- !query +select udaf(b) from data group by 1 + 0 +-- !query schema +struct +-- !query output +6 + + +-- !query +select a, b, udaf(b) from data group by 3 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "COLUMN_NOT_IN_GROUP_BY_CLAUSE", + "sqlState" : "42000", + "messageParameters" : { + "expression" : "\"a\"" + } +} + + +-- !query +select a, b, udaf(b) + 2 from data group by 3 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "COLUMN_NOT_IN_GROUP_BY_CLAUSE", + "sqlState" : "42000", + "messageParameters" : { + "expression" : "\"a\"" + } +} + + +-- !query +select a, rand(0), udaf(b) +from +(select /*+ REPARTITION(1) */ a, b from data) group by a, 2 +-- !query schema +struct +-- !query output +1 0.5234194256885571 1 +1 0.7604953758285915 1 +2 0.0953472826424725 1 +2 0.3163249920547614 1 +3 0.2710259815484829 1 +3 0.7141011170991605 1 + + +-- !query +select a, udaf(a) from (select 1 as a) tmp group by 1 order by 1 +-- !query schema +struct +-- !query output +1 1 + + +-- !query +select udaf(a), a from (select 1 as a) tmp group by 2 having a > 0 +-- !query schema +struct +-- !query output +1 1 + + +-- !query +select a, a AS k, udaf(b) from data group by k, 1 +-- !query schema +struct +-- !query output +1 1 2 +2 2 2 +3 3 2 + + +-- !query +select a, b, udaf(1) from data group by cube(1, 2) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL 1 3 +NULL 2 3 +NULL NULL 6 + + +-- !query +select a, b, udaf(1) from data group by cube(1, b) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL 1 3 +NULL 2 3 +NULL NULL 6 + + +-- !query +select a, b, udaf(1) from data group by 1, 2 with cube +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL 1 3 +NULL 2 3 +NULL NULL 6 + + +-- !query +select a, b, udaf(1) from data group by rollup(1, 2) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL NULL 6 + + +-- !query +select a, b, udaf(1) from data group by rollup(1, b) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL NULL 6 + + +-- !query +select a, b, udaf(1) from data group by 1, 2 with rollup +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL NULL 6 + + +-- !query +select a, b, udaf(1) from data group by grouping sets((1), (2), (1, 2)) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL 1 3 +NULL 2 3 + + +-- !query +select a, b, udaf(1) from data group by grouping sets((1), (b), (a, 2)) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL 1 3 +NULL 2 3 + + +-- !query +select a, b, udaf(1) from data group by a, 2 grouping sets((1), (b), (a, 2)) +-- !query schema +struct +-- !query output +1 1 1 +1 2 1 +1 NULL 2 +2 1 1 +2 2 1 +2 NULL 2 +3 1 1 +3 2 1 +3 NULL 2 +NULL 1 3 +NULL 2 3 + + +-- !query +select a, b, udaf(1) from data group by a, -1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "-1", + "size" : "3" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 44, + "stopIndex" : 45, + "fragment" : "-1" + } ] +} + + +-- !query +select a, b, udaf(1) from data group by a, 3 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "COLUMN_NOT_IN_GROUP_BY_CLAUSE", + "sqlState" : "42000", + "messageParameters" : { + "expression" : "\"b\"" + } +} + + +-- !query +select a, b, udaf(1) from data group by cube(-1, 2) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "-1", + "size" : "3" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 46, + "stopIndex" : 47, + "fragment" : "-1" + } ] +} + + +-- !query +select a, b, udaf(1) from data group by cube(1, 3) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +grouping expressions sequence is empty, and 'data.a' is not an aggregate function. Wrap '()' in windowing function(s) or wrap 'data.a' in first() (or first_value) if you don't care which value you get. + + +-- !query +set spark.sql.groupByOrdinal=false +-- !query schema +struct +-- !query output +spark.sql.groupByOrdinal false + + +-- !query +select udaf(b) from data group by -1 +-- !query schema +struct +-- !query output +6 diff --git a/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-by.sql.out new file mode 100644 index 0000000000000..bb98d244faa12 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udaf/udaf-group-by.sql.out @@ -0,0 +1,411 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2), (null, 1), (3, null), (null, null) +AS testData(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT a, udaf(b) FROM testData +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +grouping expressions sequence is empty, and 'testdata.a' is not an aggregate function. Wrap '()' in windowing function(s) or wrap 'testdata.a' in first() (or first_value) if you don't care which value you get. + + +-- !query +SELECT udaf(a), udaf(b) FROM testData +-- !query schema +struct +-- !query output +7 7 + + +-- !query +SELECT a, udaf(b) FROM testData GROUP BY a +-- !query schema +struct +-- !query output +1 2 +2 2 +3 2 +NULL 1 + + +-- !query +SELECT a, udaf(b) FROM testData GROUP BY b +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "COLUMN_NOT_IN_GROUP_BY_CLAUSE", + "sqlState" : "42000", + "messageParameters" : { + "expression" : "\"a\"" + } +} + + +-- !query +SELECT udaf(a), udaf(b) FROM testData GROUP BY a +-- !query schema +struct +-- !query output +0 1 +2 2 +2 2 +3 2 + + +-- !query +SELECT 'foo', udaf(a) FROM testData GROUP BY 1 +-- !query schema +struct +-- !query output +foo 7 + + +-- !query +SELECT 'foo', udaf(a) FROM testData WHERE a = 0 GROUP BY 1 +-- !query schema +struct +-- !query output + + + +-- !query +SELECT 'foo', udaf(STRUCT(a)) FROM testData WHERE a = 0 GROUP BY 1 +-- !query schema +struct +-- !query output + + + +-- !query +SELECT a + b, udaf(b) FROM testData GROUP BY a + b +-- !query schema +struct<(a + b):int,udaf(b):int> +-- !query output +2 1 +3 2 +4 2 +5 1 +NULL 1 + + +-- !query +SELECT a + 2, udaf(b) FROM testData GROUP BY a + 1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "COLUMN_NOT_IN_GROUP_BY_CLAUSE", + "sqlState" : "42000", + "messageParameters" : { + "expression" : "\"a\"" + } +} + + +-- !query +SELECT a + 1 + 1, udaf(b) FROM testData GROUP BY a + 1 +-- !query schema +struct<((a + 1) + 1):int,udaf(b):int> +-- !query output +3 2 +4 2 +5 2 +NULL 1 + + +-- !query +SELECT SKEWNESS(a), KURTOSIS(a), udaf(a), udaf(a), AVG(a), VARIANCE(a), STDDEV(a), SUM(a), udaf(a) +FROM testData +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_PANDAS_UDF_PLACEMENT", + "messageParameters" : { + "functionList" : "`udaf`" + } +} + + +-- !query +SELECT udaf(DISTINCT b), udaf(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Function pythonudf does not support DISTINCT; line 1 pos 7 + + +-- !query +SELECT a AS k, udaf(b) FROM testData GROUP BY k +-- !query schema +struct +-- !query output +1 2 +2 2 +3 2 +NULL 1 + + +-- !query +SELECT a AS k, udaf(b) FROM testData GROUP BY k HAVING k > 1 +-- !query schema +struct +-- !query output +2 2 +3 2 + + +-- !query +SELECT a AS k, udaf(non_existing) FROM testData GROUP BY k +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN", + "errorSubClass" : "WITH_SUGGESTION", + "sqlState" : "42000", + "messageParameters" : { + "objectName" : "`non_existing`", + "proposal" : "`testdata`.`a`, `testdata`.`b`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 21, + "stopIndex" : 32, + "fragment" : "non_existing" + } ] +} + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW testDataHasSameNameWithAlias AS SELECT * FROM VALUES +(1, 1, 3), (1, 2, 1) AS testDataHasSameNameWithAlias(k, a, v) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT k AS a, udaf(v) FROM testDataHasSameNameWithAlias GROUP BY a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "COLUMN_NOT_IN_GROUP_BY_CLAUSE", + "sqlState" : "42000", + "messageParameters" : { + "expression" : "\"k\"" + } +} + + +-- !query +set spark.sql.groupByAliases=false +-- !query schema +struct +-- !query output +spark.sql.groupByAliases false + + +-- !query +SELECT a AS k, udaf(b) FROM testData GROUP BY k +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN", + "errorSubClass" : "WITH_SUGGESTION", + "sqlState" : "42000", + "messageParameters" : { + "objectName" : "`k`", + "proposal" : "`testdata`.`a`, `testdata`.`b`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 47, + "stopIndex" : 47, + "fragment" : "k" + } ] +} + + +-- !query +SELECT a, udaf(1) FROM testData WHERE false GROUP BY a +-- !query schema +struct +-- !query output + + + +-- !query +SELECT udaf(1) FROM testData WHERE false +-- !query schema +struct +-- !query output + + + +-- !query +SELECT 1 FROM (SELECT udaf(1) FROM testData WHERE false) t +-- !query schema +struct<1:int> +-- !query output +1 + + +-- !query +SELECT 1 from ( + SELECT 1 AS z, + udaf(a.x) + FROM (select 1 as x) a + WHERE false +) b +where b.z != b.z +-- !query schema +struct<1:int> +-- !query output + + + +-- !query +SELECT 1 FROM range(10) HAVING udaf(id) > 0 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN", + "errorSubClass" : "WITH_SUGGESTION", + "sqlState" : "42000", + "messageParameters" : { + "objectName" : "`id`", + "proposal" : "`1`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 37, + "stopIndex" : 38, + "fragment" : "id" + } ] +} + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES + (1, true), (1, false), + (2, true), + (3, false), (3, null), + (4, null), (4, null), + (5, null), (5, true), (5, false) AS test_agg(k, v) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT udaf(v), some(v), any(v), bool_and(v), bool_or(v) FROM test_agg WHERE 1 = 0 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_PANDAS_UDF_PLACEMENT", + "messageParameters" : { + "functionList" : "`udaf`" + } +} + + +-- !query +SELECT udaf(v), some(v), any(v), bool_and(v), bool_or(v) FROM test_agg WHERE k = 4 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_PANDAS_UDF_PLACEMENT", + "messageParameters" : { + "functionList" : "`udaf`" + } +} + + +-- !query +SELECT udaf(v), some(v), any(v), bool_and(v), bool_or(v) FROM test_agg WHERE k = 5 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_PANDAS_UDF_PLACEMENT", + "messageParameters" : { + "functionList" : "`udaf`" + } +} + + +-- !query +SELECT k, udaf(v), some(v), any(v), bool_and(v), bool_or(v) FROM test_agg GROUP BY k +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_PANDAS_UDF_PLACEMENT", + "messageParameters" : { + "functionList" : "`udaf`" + } +} + + +-- !query +SELECT k, udaf(v) FROM test_agg GROUP BY k HAVING udaf(v) = false +-- !query schema +struct +-- !query output +4 0 + + +-- !query +SELECT k, udaf(v) FROM test_agg GROUP BY k HAVING udaf(v) IS NULL +-- !query schema +struct +-- !query output + + + +-- !query +SELECT k, + udaf(v) AS count +FROM test_agg +WHERE k = 2 + AND v IN (SELECT Any(v) + FROM test_agg + WHERE k = 1) +GROUP BY k +-- !query schema +struct +-- !query output +2 1 diff --git a/sql/core/src/test/resources/sql-tests/results/udaf/udaf-grouping-set.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf/udaf-grouping-set.sql.out new file mode 100644 index 0000000000000..c9b27dca0c03c --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udaf/udaf-grouping-set.sql.out @@ -0,0 +1,175 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMPORARY VIEW grouping AS SELECT * FROM VALUES + ("1", "2", "3", 1), + ("4", "5", "6", 1), + ("7", "8", "9", 1) + as grouping(a, b, c, d) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT a, b, c, udaf(d) FROM grouping GROUP BY a, b, c GROUPING SETS (()) +-- !query schema +struct +-- !query output +NULL NULL NULL 3 + + +-- !query +SELECT a, b, c, udaf(d) FROM grouping GROUP BY a, b, c GROUPING SETS ((a)) +-- !query schema +struct +-- !query output +1 NULL NULL 1 +4 NULL NULL 1 +7 NULL NULL 1 + + +-- !query +SELECT a, b, c, udaf(d) FROM grouping GROUP BY a, b, c GROUPING SETS ((c)) +-- !query schema +struct +-- !query output +NULL NULL 3 1 +NULL NULL 6 1 +NULL NULL 9 1 + + +-- !query +SELECT c1, udaf(c2) FROM (VALUES ('x', 10, 0), ('y', 20, 0)) AS t (c1, c2, c3) GROUP BY GROUPING SETS (c1) +-- !query schema +struct +-- !query output +x 1 +y 1 + + +-- !query +SELECT c1, udaf(c2), grouping(c1) FROM (VALUES ('x', 10, 0), ('y', 20, 0)) AS t (c1, c2, c3) GROUP BY GROUPING SETS (c1) +-- !query schema +struct +-- !query output +x 1 0 +y 1 0 + + +-- !query +SELECT c1, c2, udaf(c3), grouping__id +FROM (VALUES ('x', 'a', 10), ('y', 'b', 20) ) AS t (c1, c2, c3) +GROUP BY GROUPING SETS ( ( c1 ), ( c2 ) ) +HAVING GROUPING__ID > 1 +-- !query schema +struct +-- !query output +NULL a 1 2 +NULL b 1 2 + + +-- !query +SELECT a + b, b, udaf(c) FROM (VALUES (1,1,1),(2,2,2)) AS t(a,b,c) GROUP BY GROUPING SETS ( (a + b), (b)) +-- !query schema +struct<(a + b):int,b:int,udaf(c):int> +-- !query output +2 NULL 1 +4 NULL 1 +NULL 1 1 +NULL 2 1 + + +-- !query +SELECT a + b, b, udaf(c) FROM (VALUES (1,1,1),(2,2,2)) AS t(a,b,c) GROUP BY GROUPING SETS ( (a + b), (b + a), (b)) +-- !query schema +struct<(a + b):int,b:int,udaf(c):int> +-- !query output +2 NULL 1 +2 NULL 1 +4 NULL 1 +4 NULL 1 +NULL 1 1 +NULL 2 1 + + +-- !query +SELECT a, b, c, udaf(d) FROM grouping GROUP BY WITH ROLLUP +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'ROLLUP'", + "hint" : ": extra input 'ROLLUP'" + } +} + + +-- !query +SELECT a, b, c, udaf(d) FROM grouping GROUP BY WITH CUBE +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'CUBE'", + "hint" : ": extra input 'CUBE'" + } +} + + +-- !query +SELECT k1, k2, udaf(v) FROM (VALUES (1,1,1),(2,2,2)) AS t(k1,k2,v) GROUP BY GROUPING SETS ((k1),(k1,k2),(k2,k1)) +-- !query schema +struct +-- !query output +1 1 1 +1 1 1 +1 NULL 1 +2 2 1 +2 2 1 +2 NULL 1 + + +-- !query +SELECT grouping__id, k1, k2, udaf(v) FROM (VALUES (1,1,1),(2,2,2)) AS t(k1,k2,v) GROUP BY GROUPING SETS ((k1),(k1,k2),(k2,k1)) +-- !query schema +struct +-- !query output +0 1 1 1 +0 1 1 1 +0 2 2 1 +0 2 2 1 +1 1 NULL 1 +1 2 NULL 1 + + +-- !query +SELECT grouping(k1), k1, k2, udaf(v) FROM (VALUES (1,1,1),(2,2,2)) AS t(k1,k2,v) GROUP BY GROUPING SETS ((k1),(k1,k2),(k2,k1)) +-- !query schema +struct +-- !query output +0 1 1 1 +0 1 1 1 +0 1 NULL 1 +0 2 2 1 +0 2 2 1 +0 2 NULL 1 + + +-- !query +SELECT grouping_id(k1, k2), udaf(v) from (VALUES (1,1,1),(2,2,2)) AS t(k1,k2,v) GROUP BY k1, k2 GROUPING SETS ((k2, k1), k1) +-- !query schema +struct +-- !query output +0 1 +0 1 +1 1 +1 1 diff --git a/sql/core/src/test/resources/sql-tests/results/udaf/udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf/udaf.sql.out new file mode 100644 index 0000000000000..0605af1c808db --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udaf/udaf.sql.out @@ -0,0 +1,67 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +(1), (2), (3), (4) +as t1(int_col1) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg' +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT default.myDoubleAvg(int_col1) as my_avg from t1 +-- !query schema +struct +-- !query output +102.5 + + +-- !query +SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Invalid number of arguments for function spark_catalog.default.mydoubleavg. Expected: 1; Found: 2; line 1 pos 7 + + +-- !query +CREATE FUNCTION udaf1 AS 'test.non.existent.udaf' +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT default.udaf1(int_col1) as udaf1 from t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Can not load class 'test.non.existent.udaf' when registering the function 'spark_catalog.default.udaf1', please make sure it is on the classpath; line 1 pos 7 + + +-- !query +DROP FUNCTION myDoubleAvg +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP FUNCTION udaf1 +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index cca9bb6741f68..5af7fa3898d90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -216,6 +216,13 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper resultFile: String, udf: TestUDF) extends TestCase with UDFTest + /** A UDAF test case. */ + protected case class UDAFTestCase( + name: String, + inputFile: String, + resultFile: String, + udf: TestUDF) extends TestCase with UDFTest + /** A UDF PostgreSQL test case. */ protected case class UDFPgSQLTestCase( name: String, @@ -436,6 +443,11 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper if udfTestCase.udf.isInstanceOf[TestScalarPandasUDF] && shouldTestScalarPandasUDFs => s"${testCase.name}${System.lineSeparator()}" + s"Python: $pythonVer Pandas: $pandasVer PyArrow: $pyarrowVer${System.lineSeparator()}" + case udfTestCase: UDFTest + if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] && + shouldTestGroupedAggPandasUDFs => + s"${testCase.name}${System.lineSeparator()}" + + s"Python: $pythonVer Pandas: $pandasVer PyArrow: $pyarrowVer${System.lineSeparator()}" case _ => s"${testCase.name}${System.lineSeparator()}" } @@ -495,6 +507,11 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper UDFTestCase( s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf) } + } else if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udaf")) { + Seq(TestGroupedAggPandasUDF("udaf")).map { udf => + UDAFTestCase( + s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf) + } } else if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}postgreSQL")) { PgSQLTestCase(testCaseName, absPath, resultFile) :: Nil } else if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}ansi")) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 11f4fe0649be4..7a97efe088c63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2089,33 +2089,18 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT } test("View commands are not supported in v2 catalogs") { - def validateViewCommand( - sql: String, - catalogName: String, - viewName: String, - cmdName: String): Unit = { - assertAnalysisError( - sql, - s"Cannot specify catalog `$catalogName` for view $viewName because view support " + - s"in v2 catalog has not been implemented yet. $cmdName expects a view.") - } - - validateViewCommand("DROP VIEW testcat.v", "testcat", "v", "DROP VIEW") - validateViewCommand( - "ALTER VIEW testcat.v SET TBLPROPERTIES ('key' = 'val')", - "testcat", - "v", - "ALTER VIEW ... SET TBLPROPERTIES") - validateViewCommand( - "ALTER VIEW testcat.v UNSET TBLPROPERTIES ('key')", - "testcat", - "v", - "ALTER VIEW ... UNSET TBLPROPERTIES") - validateViewCommand( - "ALTER VIEW testcat.v AS SELECT 1", - "testcat", - "v", - "ALTER VIEW ... AS") + def validateViewCommand(sqlStatement: String): Unit = { + val e = intercept[AnalysisException](sql(sqlStatement)) + checkError( + e, + errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION", + parameters = Map("catalogName" -> "`testcat`", "operation" -> "views")) + } + + validateViewCommand("DROP VIEW testcat.v") + validateViewCommand("ALTER VIEW testcat.v SET TBLPROPERTIES ('key' = 'val')") + validateViewCommand("ALTER VIEW testcat.v UNSET TBLPROPERTIES ('key')") + validateViewCommand("ALTER VIEW testcat.v AS SELECT 1") } test("SPARK-33924: INSERT INTO .. PARTITION preserves the partition location") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index c7fa365abbdeb..14af2b8241125 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -962,7 +962,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { sql("DROP VIEW dbx.tab1") } assert(e.getMessage.contains( - "dbx.tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead.")) + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")) } protected def testSetProperties(isDatasourceTable: Boolean): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala index 60c7cd8dd6f8b..7e81ad66436af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedIdentifier} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan} import org.apache.spark.sql.test.SharedSparkSession @@ -29,31 +29,26 @@ class DropTableParserSuite extends AnalysisTest with SharedSparkSession { test("drop table") { parseCompare("DROP TABLE testcat.ns1.ns2.tbl", - DropTable( - UnresolvedTableOrView(Seq("testcat", "ns1", "ns2", "tbl"), "DROP TABLE", true), + DropTable(UnresolvedIdentifier(Seq("testcat", "ns1", "ns2", "tbl"), true), ifExists = false, purge = false)) parseCompare(s"DROP TABLE db.tab", DropTable( - UnresolvedTableOrView(Seq("db", "tab"), "DROP TABLE", true), + UnresolvedIdentifier(Seq("db", "tab"), true), ifExists = false, purge = false)) parseCompare(s"DROP TABLE IF EXISTS db.tab", DropTable( - UnresolvedTableOrView(Seq("db", "tab"), "DROP TABLE", true), + UnresolvedIdentifier(Seq("db", "tab"), true), ifExists = true, purge = false)) parseCompare(s"DROP TABLE tab", - DropTable( - UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = false, purge = false)) + DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = false, purge = false)) parseCompare(s"DROP TABLE IF EXISTS tab", - DropTable( - UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = true, purge = false)) + DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = true, purge = false)) parseCompare(s"DROP TABLE tab PURGE", - DropTable( - UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = false, purge = true)) + DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = false, purge = true)) parseCompare(s"DROP TABLE IF EXISTS tab PURGE", - DropTable( - UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = true, purge = true)) + DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = true, purge = true)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala index 3c9b39af8ef22..c26022addf0c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala @@ -57,7 +57,7 @@ trait DropTableSuiteBase extends QueryTest with DDLCommandTestUtils { val errMsg = intercept[AnalysisException] { sql(s"DROP TABLE $catalog.ns.tbl") }.getMessage - assert(errMsg.contains("Table or view not found")) + assert(errMsg.contains(s"Table $catalog.ns.tbl not found")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 7e8816553499d..e678b866bfe4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -713,13 +713,13 @@ class PlanResolutionSuite extends AnalysisTest { val tableIdent2 = Identifier.of(Array.empty, "tab") parseResolveCompare(s"DROP TABLE $tableName1", - DropTable(ResolvedTable.create(testCat, tableIdent1, table), ifExists = false, purge = false)) + DropTable(ResolvedIdentifier(testCat, tableIdent1), ifExists = false, purge = false)) parseResolveCompare(s"DROP TABLE IF EXISTS $tableName1", - DropTable(ResolvedTable.create(testCat, tableIdent1, table), ifExists = true, purge = false)) + DropTable(ResolvedIdentifier(testCat, tableIdent1), ifExists = true, purge = false)) parseResolveCompare(s"DROP TABLE $tableName2", - DropTable(ResolvedTable.create(testCat, tableIdent2, table), ifExists = false, purge = false)) + DropTable(ResolvedIdentifier(testCat, tableIdent2), ifExists = false, purge = false)) parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2", - DropTable(ResolvedTable.create(testCat, tableIdent2, table), ifExists = true, purge = false)) + DropTable(ResolvedIdentifier(testCat, tableIdent2), ifExists = true, purge = false)) } test("drop view") { @@ -728,7 +728,7 @@ class PlanResolutionSuite extends AnalysisTest { val viewName2 = "view" val viewIdent2 = TableIdentifier("view", Option("default"), Some(SESSION_CATALOG_NAME)) val tempViewName = "v" - val tempViewIdent = TableIdentifier("v") + val tempViewIdent = Identifier.of(Array.empty, "v") parseResolveCompare(s"DROP VIEW $viewName1", DropTableCommand(viewIdent1, ifExists = false, isView = true, purge = false)) @@ -739,16 +739,19 @@ class PlanResolutionSuite extends AnalysisTest { parseResolveCompare(s"DROP VIEW IF EXISTS $viewName2", DropTableCommand(viewIdent2, ifExists = true, isView = true, purge = false)) parseResolveCompare(s"DROP VIEW $tempViewName", - DropTableCommand(tempViewIdent, ifExists = false, isView = true, purge = false)) + DropTempViewCommand(tempViewIdent)) parseResolveCompare(s"DROP VIEW IF EXISTS $tempViewName", - DropTableCommand(tempViewIdent, ifExists = true, isView = true, purge = false)) + DropTempViewCommand(tempViewIdent)) } test("drop view in v2 catalog") { - intercept[AnalysisException] { + val e = intercept[AnalysisException] { parseAndResolve("DROP VIEW testcat.db.view", checkAnalysis = true) - }.getMessage.toLowerCase(Locale.ROOT).contains( - "view support in catalog has not been implemented") + } + checkError( + e, + errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION", + parameters = Map("catalogName" -> "`testcat`", "operation" -> "views")) } // ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 2c56adbab94d5..8909fe49aac1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Ro import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} @@ -518,16 +519,19 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { withTempDir { dir => df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data") - val stream = spark.readStream.format("json") + val streamDf = spark.readStream.format("json") .schema(schema) .load(dir.getCanonicalPath + "/source/new-streaming-data") .select("*", "_metadata") + + val streamQuery0 = streamDf .writeStream.format("json") .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint") + .trigger(Trigger.AvailableNow()) .start(dir.getCanonicalPath + "/target/new-streaming-data") - stream.processAllAvailable() - stream.stop() + streamQuery0.awaitTermination() + assert(streamQuery0.lastProgress.numInputRows == 2L) val newDF = spark.read.format("json") .load(dir.getCanonicalPath + "/target/new-streaming-data") @@ -565,6 +569,34 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)) ) ) + + // Verify self-union + val streamQuery1 = streamDf.union(streamDf) + .writeStream.format("json") + .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint_union") + .trigger(Trigger.AvailableNow()) + .start(dir.getCanonicalPath + "/target/new-streaming-data-union") + streamQuery1.awaitTermination() + val df1 = spark.read.format("json") + .load(dir.getCanonicalPath + "/target/new-streaming-data-union") + // Verify self-union results + assert(streamQuery1.lastProgress.numInputRows == 4L) + assert(df1.count() == 4L) + assert(df1.select("*").columns.toSet == Set("name", "age", "info", "_metadata")) + + // Verify self-join + val streamQuery2 = streamDf.join(streamDf, Seq("name", "age", "info", "_metadata")) + .writeStream.format("json") + .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint_join") + .trigger(Trigger.AvailableNow()) + .start(dir.getCanonicalPath + "/target/new-streaming-data-join") + streamQuery2.awaitTermination() + val df2 = spark.read.format("json") + .load(dir.getCanonicalPath + "/target/new-streaming-data-join") + // Verify self-join results + assert(streamQuery2.lastProgress.numInputRows == 4L) + assert(df2.count() == 2L) + assert(df2.select("*").columns.toSet == Set("name", "age", "info", "_metadata")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 7aa8adc07edd3..7371b6cf0bc5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -82,9 +82,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people", false))) Seq( "h2.test.not_existing_table" -> - "Table or view not found: h2.test.not_existing_table", + "Table h2.test.not_existing_table not found", "h2.bad_test.not_existing_table" -> - "Table or view not found: h2.bad_test.not_existing_table" + "Table h2.bad_test.not_existing_table not found" ).foreach { case (table, expectedMsg) => val msg = intercept[AnalysisException] { sql(s"DROP TABLE $table") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 69e01cef5ab1e..b850ffccd4ee3 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -65,6 +65,7 @@ import org.apache.spark.sql.types._ * 1. Support UDF testing. * 2. Support DESC command. * 3. Support SHOW command. + * 4. Support UDAF testing. */ // scalastyle:on line.size.limit class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer { @@ -247,6 +248,8 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServ if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udf")) { Seq.empty + } else if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udaf")) { + Seq.empty } else if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}postgreSQL")) { PgSQLTestCase(testCaseName, absPath, resultFile) :: Nil } else if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}ansi")) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f1bb8d30eed99..ffb6993ccf1d1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1047,7 +1047,7 @@ class HiveDDLSuite sql("CREATE TABLE tab1(c1 int)") assertAnalysisError( "DROP VIEW tab1", - "tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead.") + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala index 0ca6184c9469e..8c6d718f18abb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala @@ -26,7 +26,7 @@ class DropTableSuite extends v1.DropTableSuiteBase with CommandSuiteBase { test("hive client calls") { withNamespaceAndTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id int) $defaultUsing") - checkHiveClientCalls(expected = 15) { + checkHiveClientCalls(expected = 11) { sql(s"DROP TABLE $t") } }