From 180fe0a14409043e22d2eb51c17b60a27f678eb7 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 25 Dec 2019 17:09:09 +0800 Subject: [PATCH 1/5] [SPARK-30352][SQL] DataSourceV2: Add CURRENT_CATALOG function --- .../catalyst/analysis/FunctionRegistry.scala | 1 + .../spark/sql/catalyst/expressions/misc.scala | 18 ++++++++++++++++++ .../sql/catalyst/optimizer/Optimizer.scala | 2 ++ .../catalyst/optimizer/finishAnalysis.scala | 11 +++++++++++ .../sql-tests/inputs/misc-functions.sql | 3 +++ .../sql-tests/results/misc-functions.sql.out | 10 +++++++++- 6 files changed, 44 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 7a8b88e5264a3..5a68573b1ab49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -490,6 +490,7 @@ object FunctionRegistry { expression[InputFileBlockLength]("input_file_block_length"), expression[MonotonicallyIncreasingID]("monotonically_increasing_id"), expression[CurrentDatabase]("current_database"), + expression[CurrentCatalog]("current_catalog"), expression[CallMethodViaReflection]("reflect"), expression[CallMethodViaReflection]("java_method"), expression[SparkVersion]("version"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index f576873829f27..cb66c31f6f090 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -116,6 +116,24 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { override def prettyName: String = "current_database" } +/** + * Returns the current catalog. + */ +@ExpressionDescription( + usage = "_FUNC_() - Returns the current catalog.", + examples = """ + Examples: + > SELECT _FUNC_(); + spark_catalog + """, + since = "3.0.0") +case class CurrentCatalog() extends LeafExpression with Unevaluable { + override def dataType: DataType = StringType + override def foldable: Boolean = true + override def nullable: Boolean = false + override def prettyName: String = "current_catalog" +} + // scalastyle:off line.size.limit @ExpressionDescription( usage = """_FUNC_() - Returns an universally unique identifier (UUID) string. The value is returned as a canonical UUID 36-character string.""", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 810c28116de47..74aef3e0c10d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -130,6 +130,7 @@ abstract class Optimizer(catalogManager: CatalogManager) ReplaceExpressions, ComputeCurrentTime, GetCurrentDatabase(catalogManager), + GetCurrentCatalog(catalogManager), RewriteDistinctAggregates, ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// @@ -220,6 +221,7 @@ abstract class Optimizer(catalogManager: CatalogManager) ReplaceExpressions.ruleName :: ComputeCurrentTime.ruleName :: GetCurrentDatabase(catalogManager).ruleName :: + GetCurrentCatalog(catalogManager).ruleName :: RewriteDistinctAggregates.ruleName :: ReplaceDeduplicateWithAggregate.ruleName :: ReplaceIntersectWithSemiJoin.ruleName :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index f64b6e00373f6..989132fbcfc63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -88,3 +88,14 @@ case class GetCurrentDatabase(catalogManager: CatalogManager) extends Rule[Logic } } } + +/** Replaces the expression of [[CurrentCatalog]] with the current catalog name. */ +case class GetCurrentCatalog(catalogManager: CatalogManager) extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + val currentCatalog = catalogManager.currentCatalog.name() + + plan transformAllExpressions { + case CurrentCatalog() => Literal.create(currentCatalog, StringType) + } + } +} diff --git a/sql/core/src/test/resources/sql-tests/inputs/misc-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/misc-functions.sql index 95f71925e9294..1792506206494 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/misc-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/misc-functions.sql @@ -8,3 +8,6 @@ select typeof(cast(1.0 as float)), typeof(1.0D), typeof(1.2); select typeof(date '1986-05-23'), typeof(timestamp '1986-05-23'), typeof(interval '23 days'); select typeof(x'ABCD'), typeof('SPARK'); select typeof(array(1, 2)), typeof(map(1, 2)), typeof(named_struct('a', 1, 'b', 'spark')); + +-- get current_catalog +select current_catalog(); diff --git a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out index cd0818a5189b5..2800e430813f2 100644 --- a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 7 +-- Number of queries: 8 -- !query 0 @@ -56,3 +56,11 @@ select typeof(array(1, 2)), typeof(map(1, 2)), typeof(named_struct('a', 1, 'b', struct -- !query 6 output array map struct + + +-- !query 7 +select current_catalog() +-- !query 7 schema +struct +-- !query 7 output +spark_catalog From c7b82d5250950dc74299899fe1d8bea9ae189223 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 20 May 2020 17:48:58 +0800 Subject: [PATCH 2/5] address comments --- .../sql/catalyst/optimizer/Optimizer.scala | 6 ++--- .../catalyst/optimizer/finishAnalysis.scala | 23 ++++++++----------- .../sql-functions/sql-expression-schema.md | 3 ++- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 332fa4302aad3..6c5d69611f7c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -133,8 +133,7 @@ abstract class Optimizer(catalogManager: CatalogManager) ReplaceExpressions, RewriteNonCorrelatedExists, ComputeCurrentTime, - GetCurrentDatabase(catalogManager), - GetCurrentCatalog(catalogManager), + GetCurrentDatabaseOrCatalog(catalogManager), RewriteDistinctAggregates, ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// @@ -224,8 +223,7 @@ abstract class Optimizer(catalogManager: CatalogManager) EliminateView.ruleName :: ReplaceExpressions.ruleName :: ComputeCurrentTime.ruleName :: - GetCurrentDatabase(catalogManager).ruleName :: - GetCurrentCatalog(catalogManager).ruleName :: + GetCurrentDatabaseOrCatalog(catalogManager).ruleName :: RewriteDistinctAggregates.ruleName :: ReplaceDeduplicateWithAggregate.ruleName :: ReplaceIntersectWithSemiJoin.ruleName :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index c7731500ce628..40586e1d8665e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -91,26 +91,21 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { } -/** Replaces the expression of CurrentDatabase with the current database name. */ -case class GetCurrentDatabase(catalogManager: CatalogManager) extends Rule[LogicalPlan] { +/** + * Replaces the expression of CurrentDatabase with the current database name. + * Replaces the expression of CurrentCatalog with the current catalog name. + */ +case class GetCurrentDatabaseOrCatalog(catalogManager: CatalogManager) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val currentNamespace = catalogManager.currentNamespace.quoted plan transformAllExpressions { case CurrentDatabase() => + val currentNamespace = catalogManager.currentNamespace.quoted Literal.create(currentNamespace, StringType) - } - } -} - -/** Replaces the expression of [[CurrentCatalog]] with the current catalog name. */ -case class GetCurrentCatalog(catalogManager: CatalogManager) extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = { - val currentCatalog = catalogManager.currentCatalog.name() - - plan transformAllExpressions { - case CurrentCatalog() => Literal.create(currentCatalog, StringType) + case CurrentCatalog() => + val currentCatalog = catalogManager.currentCatalog.name() + Literal.create(currentCatalog, StringType) } } } diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 3570fb61e2880..870b096683d2c 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -1,6 +1,6 @@ ## Summary - - Number of queries: 333 + - Number of queries: 334 - Number of expressions that missing example: 34 - Expressions missing examples: and,string,tinyint,double,smallint,date,decimal,boolean,float,binary,bigint,int,timestamp,cume_dist,dense_rank,input_file_block_length,input_file_block_start,input_file_name,lag,lead,monotonically_increasing_id,ntile,struct,!,not,or,percent_rank,rank,row_number,spark_partition_id,version,window,positive,count_min_sketch ## Schema of Built-in Functions @@ -82,6 +82,7 @@ | org.apache.spark.sql.catalyst.expressions.CsvToStructs | from_csv | SELECT from_csv('1, 0.8', 'a INT, b DOUBLE') | struct> | | org.apache.spark.sql.catalyst.expressions.Cube | cube | SELECT name, age, count(*) FROM VALUES (2, 'Alice'), (5, 'Bob') people(age, name) GROUP BY cube(name, age) | struct | | org.apache.spark.sql.catalyst.expressions.CumeDist | cume_dist | N/A | N/A | +| org.apache.spark.sql.catalyst.expressions.CurrentCatalog | current_catalog | SELECT current_catalog() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_database | SELECT current_database() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentDate | current_date | SELECT current_date() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentTimestamp | current_timestamp | SELECT current_timestamp() | struct | From 2c32f98aa334ddcd1c2f44f2efaf4b58219b29db Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 22 May 2020 17:37:05 +0800 Subject: [PATCH 3/5] address comments --- .../org/apache/spark/sql/catalyst/expressions/misc.scala | 2 +- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- .../spark/sql/catalyst/optimizer/finishAnalysis.scala | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index d0f2412e4e4d0..617ddcb69eab0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -126,7 +126,7 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { > SELECT _FUNC_(); spark_catalog """, - since = "3.0.0") + since = "3.1.0") case class CurrentCatalog() extends LeafExpression with Unevaluable { override def dataType: DataType = StringType override def foldable: Boolean = true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6c5d69611f7c2..f1a307b1c2cc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -133,7 +133,7 @@ abstract class Optimizer(catalogManager: CatalogManager) ReplaceExpressions, RewriteNonCorrelatedExists, ComputeCurrentTime, - GetCurrentDatabaseOrCatalog(catalogManager), + GetCurrentDatabaseAndCatalog(catalogManager), RewriteDistinctAggregates, ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// @@ -223,7 +223,7 @@ abstract class Optimizer(catalogManager: CatalogManager) EliminateView.ruleName :: ReplaceExpressions.ruleName :: ComputeCurrentTime.ruleName :: - GetCurrentDatabaseOrCatalog(catalogManager).ruleName :: + GetCurrentDatabaseAndCatalog(catalogManager).ruleName :: RewriteDistinctAggregates.ruleName :: ReplaceDeduplicateWithAggregate.ruleName :: ReplaceIntersectWithSemiJoin.ruleName :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 40586e1d8665e..6c9bb6db06d86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -95,16 +95,16 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { * Replaces the expression of CurrentDatabase with the current database name. * Replaces the expression of CurrentCatalog with the current catalog name. */ -case class GetCurrentDatabaseOrCatalog(catalogManager: CatalogManager) extends Rule[LogicalPlan] { +case class GetCurrentDatabaseAndCatalog(catalogManager: CatalogManager) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val currentNamespace = catalogManager.currentNamespace.quoted + val currentCatalog = catalogManager.currentCatalog.name() plan transformAllExpressions { case CurrentDatabase() => - val currentNamespace = catalogManager.currentNamespace.quoted Literal.create(currentNamespace, StringType) case CurrentCatalog() => - val currentCatalog = catalogManager.currentCatalog.name() Literal.create(currentCatalog, StringType) } } From f4f8fbec028b7c44c4f3e93912fe526a9560b8fd Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 22 May 2020 19:18:26 +0800 Subject: [PATCH 4/5] mv tests --- .../sql-tests/inputs/current_database_catalog.sql | 2 ++ .../test/resources/sql-tests/inputs/misc-functions.sql | 3 --- .../resources/sql-tests/results/misc-functions.sql.out | 10 +--------- 3 files changed, 3 insertions(+), 12 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/current_database_catalog.sql diff --git a/sql/core/src/test/resources/sql-tests/inputs/current_database_catalog.sql b/sql/core/src/test/resources/sql-tests/inputs/current_database_catalog.sql new file mode 100644 index 0000000000000..4406f1bc2e6e3 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/current_database_catalog.sql @@ -0,0 +1,2 @@ +-- get current_datebase and current_catalog +select current_database(), current_catalog(); diff --git a/sql/core/src/test/resources/sql-tests/inputs/misc-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/misc-functions.sql index 1792506206494..95f71925e9294 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/misc-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/misc-functions.sql @@ -8,6 +8,3 @@ select typeof(cast(1.0 as float)), typeof(1.0D), typeof(1.2); select typeof(date '1986-05-23'), typeof(timestamp '1986-05-23'), typeof(interval '23 days'); select typeof(x'ABCD'), typeof('SPARK'); select typeof(array(1, 2)), typeof(map(1, 2)), typeof(named_struct('a', 1, 'b', 'spark')); - --- get current_catalog -select current_catalog(); diff --git a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out index c8f16e9b05d69..bd8ffb82ee129 100644 --- a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 8 +-- Number of queries: 7 -- !query @@ -56,11 +56,3 @@ select typeof(array(1, 2)), typeof(map(1, 2)), typeof(named_struct('a', 1, 'b', struct -- !query output array map struct - - --- !query 7 -select current_catalog() --- !query 7 schema -struct --- !query 7 output -spark_catalog From fa520d1797e91434e0236d5034d204e1878d8b0a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 22 May 2020 19:18:32 +0800 Subject: [PATCH 5/5] mv tests --- .../sql-tests/results/current_database_catalog.sql.out | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 sql/core/src/test/resources/sql-tests/results/current_database_catalog.sql.out diff --git a/sql/core/src/test/resources/sql-tests/results/current_database_catalog.sql.out b/sql/core/src/test/resources/sql-tests/results/current_database_catalog.sql.out new file mode 100644 index 0000000000000..b714463a0aa0c --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/current_database_catalog.sql.out @@ -0,0 +1,10 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 1 + + +-- !query +select current_database(), current_catalog() +-- !query schema +struct +-- !query output +default spark_catalog