-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31999][SQL] Add REFRESH FUNCTION command #28840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
69a47a1
a95dcb6
3fc807e
b282348
f677a4a
a6c5d8b
de54470
9e09875
63695c0
9e9d5ce
c434821
35fd44b
f83fd8b
e444943
afd510b
1241bde
93f5d71
dc684b5
0ea7dd6
643969c
6cb2edd
cffc207
4b6408d
5d5fe71
4ba345b
6765395
dc86b82
a38d656
cdea55b
5e227d7
703ad47
a79f72b
a4d144a
3bd8d23
60ac2a0
b36b760
c5937a2
56ec5ea
c129a54
a956144
711656d
5d4c152
94fa132
fc4789f
e83194f
b18437c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| --- | ||
| layout: global | ||
| title: REFRESH FUNCTION | ||
| displayTitle: REFRESH FUNCTION | ||
| license: | | ||
| Licensed to the Apache Software Foundation (ASF) under one or more | ||
| contributor license agreements. See the NOTICE file distributed with | ||
| this work for additional information regarding copyright ownership. | ||
| The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| (the "License"); you may not use this file except in compliance with | ||
| the License. You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| --- | ||
|
|
||
| ### Description | ||
|
|
||
| `REFRESH FUNCTION` statement invalidates the cached function entry, which include class name | ||
| and resource location of the given function. The invalidated cache is populated right away. | ||
| Note that, refresh function only works for permanent function. Refresh native function or temporary function will cause exception. | ||
|
||
|
|
||
| ### Syntax | ||
|
|
||
| ```sql | ||
| REFRESH FUNCTION function_identifier | ||
| ``` | ||
|
|
||
| ### Parameters | ||
|
|
||
| * **function_identifier** | ||
|
|
||
| Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, use the current database. | ||
|
||
|
|
||
| **Syntax:** `[ database_name. ] function_name` | ||
|
|
||
| ### Examples | ||
|
|
||
| ```sql | ||
| -- The cached entries of the function will be refreshed | ||
|
||
| -- The function is resolved from the current database as the function name is unqualified. | ||
| REFRESH FUNCTION func1; | ||
|
|
||
| -- The cached entries of the function will be refreshed | ||
|
||
| -- The function is resolved from tempDB database as the function name is qualified. | ||
| REFRESH FUNCTION tempDB.func1; | ||
| ``` | ||
|
|
||
| ### Related Statements | ||
|
|
||
| * [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html) | ||
| * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) | ||
| * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to mention the above three data-related statement? The following
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just feel they are part of the |
||
| * [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) | ||
| * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -217,6 +217,7 @@ class Analyzer( | |
| ResolveInsertInto :: | ||
| ResolveRelations :: | ||
| ResolveTables :: | ||
| ResolveFunc(catalogManager) :: | ||
| ResolveReferences :: | ||
| ResolveCreateNamedStruct :: | ||
| ResolveDeserializer :: | ||
|
|
@@ -834,6 +835,14 @@ class Analyzer( | |
| } | ||
| } | ||
|
|
||
| case class ResolveFunc(catalogManager: CatalogManager) | ||
|
||
| extends Rule[LogicalPlan] with LookupCatalog { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| case UnresolvedFunc(CatalogAndFunctionIdentifier(catalog, identifier)) => | ||
| ResolvedFunc(catalog, identifier) | ||
| } | ||
| } | ||
|
|
||
| private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import org.apache.spark.sql.catalyst.FunctionIdentifier | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} | ||
| import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsNamespaces, Table, TableCatalog} | ||
|
|
@@ -50,6 +51,11 @@ case class UnresolvedTableOrView(multipartIdentifier: Seq[String]) extends LeafN | |
| override def output: Seq[Attribute] = Nil | ||
| } | ||
|
|
||
| case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode { | ||
| override lazy val resolved: Boolean = false | ||
| override def output: Seq[Attribute] = Nil | ||
| } | ||
|
|
||
| /** | ||
| * A plan containing resolved namespace. | ||
| */ | ||
|
|
@@ -74,3 +80,8 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T | |
| case class ResolvedView(identifier: Identifier) extends LeafNode { | ||
| override def output: Seq[Attribute] = Nil | ||
| } | ||
|
|
||
| case class ResolvedFunc(catalog: CatalogPlugin, functionIdentifier: FunctionIdentifier) | ||
|
||
| extends LeafNode { | ||
| override def output: Seq[Attribute] = Nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1341,6 +1341,16 @@ class SessionCatalog( | |
| functionRegistry.registerFunction(func, info, builder) | ||
| } | ||
|
|
||
| /** | ||
| * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] | ||
| */ | ||
| def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { | ||
|
||
| if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { | ||
| throw new NoSuchFunctionException( | ||
|
||
| formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Drop a temporary function. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.catalog | |
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||
| import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} | ||
|
|
||
| /** | ||
|
|
@@ -155,4 +155,37 @@ private[sql] trait LookupCatalog extends Logging { | |
| None | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Extract catalog and function identifier from a multi-part name with the current catalog if | ||
| * needed. | ||
| * | ||
| * Note that: now function is only supported in v1 catalog. | ||
|
||
| */ | ||
| object CatalogAndFunctionIdentifier { | ||
|
||
| def unapply(nameParts: Seq[String]): Some[(CatalogPlugin, FunctionIdentifier)] = { | ||
|
||
|
|
||
| if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { | ||
| return Some(currentCatalog, FunctionIdentifier(nameParts.head)) | ||
| } | ||
|
|
||
| nameParts match { | ||
| case SessionCatalogAndIdentifier(catalog, ident) => | ||
| if (nameParts.length == 1) { | ||
| // If there is only one name part, it means the current catalog is the session catalog. | ||
| // Here we don't fill the default database, to keep the error message unchanged for | ||
| // v1 commands. | ||
| Some(catalog, FunctionIdentifier(nameParts.head, None)) | ||
| } else { | ||
| ident.namespace match { | ||
| case Array(db) => Some(catalog, FunctionIdentifier(ident.name, Some(db))) | ||
| case _ => | ||
| throw new AnalysisException(s"Unsupported function name '$ident'") | ||
| } | ||
| } | ||
|
|
||
| case _ => throw new AnalysisException(s"Function command is only supported in v1 catalog") | ||
|
||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -566,24 +566,19 @@ class ResolveSessionCatalog( | |
| case ShowTableProperties(r: ResolvedView, propertyKey) => | ||
| ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) | ||
|
|
||
| case DescribeFunctionStatement(nameParts, extended) => | ||
| val functionIdent = | ||
| parseSessionCatalogFunctionIdentifier(nameParts, "DESCRIBE FUNCTION") | ||
| case DescribeFunctionStatement(CatalogAndFunctionIdentifier(_, functionIdent), extended) => | ||
| DescribeFunctionCommand(functionIdent, extended) | ||
|
|
||
| case ShowFunctionsStatement(userScope, systemScope, pattern, fun) => | ||
| val (database, function) = fun match { | ||
| case Some(nameParts) => | ||
| val FunctionIdentifier(fn, db) = | ||
| parseSessionCatalogFunctionIdentifier(nameParts, "SHOW FUNCTIONS") | ||
| case Some(CatalogAndFunctionIdentifier(_, FunctionIdentifier(fn, db))) => | ||
| (db, Some(fn)) | ||
| case None => (None, pattern) | ||
| } | ||
| ShowFunctionsCommand(database, function, userScope, systemScope) | ||
|
|
||
| case DropFunctionStatement(nameParts, ifExists, isTemp) => | ||
| val FunctionIdentifier(function, database) = | ||
| parseSessionCatalogFunctionIdentifier(nameParts, "DROP FUNCTION") | ||
| case DropFunctionStatement( | ||
| CatalogAndFunctionIdentifier(_, FunctionIdentifier(function, database)), ifExists, isTemp) => | ||
| DropFunctionCommand(database, function, ifExists, isTemp) | ||
|
|
||
| case CreateFunctionStatement(nameParts, | ||
|
|
@@ -606,38 +601,16 @@ class ResolveSessionCatalog( | |
| ignoreIfExists, | ||
| replace) | ||
| } else { | ||
| val FunctionIdentifier(function, database) = | ||
| parseSessionCatalogFunctionIdentifier(nameParts, "CREATE FUNCTION") | ||
| CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, | ||
| replace) | ||
| } | ||
| } | ||
|
|
||
| // TODO: move function related v2 statements to the new framework. | ||
| private def parseSessionCatalogFunctionIdentifier( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this method to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR needs the change? |
||
| nameParts: Seq[String], | ||
| sql: String): FunctionIdentifier = { | ||
| if (nameParts.length == 1 && isTempFunction(nameParts.head)) { | ||
| return FunctionIdentifier(nameParts.head) | ||
| } | ||
|
|
||
| nameParts match { | ||
| case SessionCatalogAndIdentifier(_, ident) => | ||
| if (nameParts.length == 1) { | ||
| // If there is only one name part, it means the current catalog is the session catalog. | ||
| // Here we don't fill the default database, to keep the error message unchanged for | ||
| // v1 commands. | ||
| FunctionIdentifier(nameParts.head, None) | ||
| } else { | ||
| ident.namespace match { | ||
| case Array(db) => FunctionIdentifier(ident.name, Some(db)) | ||
| case _ => | ||
| throw new AnalysisException(s"Unsupported function name '$ident'") | ||
| } | ||
| nameParts match { | ||
| case CatalogAndFunctionIdentifier(_, FunctionIdentifier(function, database)) => | ||
| CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, | ||
| replace) | ||
| } | ||
| } | ||
|
|
||
| case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") | ||
| } | ||
| case RefreshFunction(ResolvedFunc(_, func)) => | ||
| // Fallback to v1 command | ||
| RefreshFunctionCommand(func.database, func.funcName) | ||
| } | ||
|
|
||
| private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @huaxingao