-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31999][SQL] Add REFRESH FUNCTION command #28840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
69a47a1
a95dcb6
3fc807e
b282348
f677a4a
a6c5d8b
de54470
9e09875
63695c0
9e9d5ce
c434821
35fd44b
f83fd8b
e444943
afd510b
1241bde
93f5d71
dc684b5
0ea7dd6
643969c
6cb2edd
cffc207
4b6408d
5d5fe71
4ba345b
6765395
dc86b82
a38d656
cdea55b
5e227d7
703ad47
a79f72b
a4d144a
3bd8d23
60ac2a0
b36b760
c5937a2
56ec5ea
c129a54
a956144
711656d
5d4c152
94fa132
fc4789f
e83194f
b18437c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| --- | ||
| layout: global | ||
| title: REFRESH FUNCTION | ||
| displayTitle: REFRESH FUNCTION | ||
| license: | | ||
| Licensed to the Apache Software Foundation (ASF) under one or more | ||
| contributor license agreements. See the NOTICE file distributed with | ||
| this work for additional information regarding copyright ownership. | ||
| The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| (the "License"); you may not use this file except in compliance with | ||
| the License. You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| --- | ||
|
|
||
| ### Description | ||
|
|
||
| `REFRESH FUNCTION` statement invalidates the cached function entry, which include class name | ||
| and resource location of the given function. The invalidated cache is populated right away. | ||
| Note that, refresh function only works for permanent function. Refresh native function or temporary function will cause exception. | ||
|
||
|
|
||
| ### Syntax | ||
|
|
||
| ```sql | ||
| REFRESH FUNCTION function_identifier | ||
| ``` | ||
|
|
||
| ### Parameters | ||
|
|
||
| * **function_identifier** | ||
|
|
||
| Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, use the current database. | ||
|
||
|
|
||
| **Syntax:** `[ database_name. ] function_name` | ||
|
|
||
| ### Examples | ||
|
|
||
| ```sql | ||
| -- The cached entries of the function will be refreshed | ||
|
||
| -- The function is resolved from the current database as the function name is unqualified. | ||
| REFRESH FUNCTION func1; | ||
|
|
||
| -- The cached entries of the function will be refreshed | ||
|
||
| -- The function is resolved from tempDB database as the function name is qualified. | ||
| REFRESH FUNCTION tempDB.func1; | ||
| ``` | ||
|
|
||
| ### Related Statements | ||
|
|
||
| * [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html) | ||
| * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) | ||
| * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to mention the above three data-related statement? The following
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just feel they are part of the |
||
| * [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) | ||
| * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1341,6 +1341,16 @@ class SessionCatalog( | |
| functionRegistry.registerFunction(func, info, builder) | ||
| } | ||
|
|
||
| /** | ||
| * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] | ||
| */ | ||
| def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { | ||
|
||
| if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { | ||
| throw new NoSuchFunctionException( | ||
|
||
| formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Drop a temporary function. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -516,3 +516,8 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Comma | |
| case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { | ||
| override def children: Seq[LogicalPlan] = child :: Nil | ||
| } | ||
|
|
||
| /** | ||
| * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. | ||
| */ | ||
| case class RefreshFunction(func: Seq[String]) extends Command | ||
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -236,6 +236,58 @@ case class ShowFunctionsCommand( | |
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * A command for users to refresh the persistent function. | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * REFRESH FUNCTION functionName | ||
| * }}} | ||
| */ | ||
| case class RefreshFunctionCommand( | ||
| databaseName: Option[String], | ||
| functionName: String) | ||
| extends RunnableCommand { | ||
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val catalog = sparkSession.sessionState.catalog | ||
| if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still can create persistent function with the same name as the built-in function. For example, CREATE FUNCTION rand AS 'org.apache.spark.sql.catalyst.expressions.Abs'
DESC function default.randI think we should still allow this case.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems no meaning to refresh a persistent function whose name is same as a built-in function. Yes, we can create a persistent function with the same name as the built-in function, but just create in metastore. The actual function we used is the built-in function. The reason is built-in functions are pre-cached in registry and we lookup cached function first. e.g., BTW, maybe it's the reason why we create function and load it lazy that just be a Hive client, otherwise we can't create such function like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about I think this is similar to table and temp views. Spark will try to look up temp view first, so if the name conflicts, temp view is preferred. But users can still use a qualified table name to read the table explicitly.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. Missed qualified name case, I will fix this in followup. |
||
| throw new AnalysisException(s"Cannot refresh native function $functionName") | ||
| } else if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { | ||
| throw new AnalysisException(s"Cannot refresh temp function $functionName") | ||
| } else { | ||
| // we only refresh the permanent function. | ||
| // there are 4 cases: | ||
| // 1. registry exists externalCatalog exists | ||
| // 2. registry exists externalCatalog not exists | ||
| // 3. registry not exists externalCatalog exists | ||
| // 4. registry not exists externalCatalog not exists | ||
| val identifier = FunctionIdentifier( | ||
| functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) | ||
| val isRegisteredFunction = catalog.isRegisteredFunction(identifier) | ||
| val isPersistentFunction = catalog.isPersistentFunction(identifier) | ||
| if (isRegisteredFunction && isPersistentFunction) { | ||
| // re-register function | ||
| catalog.unregisterFunction(identifier, true) | ||
| val func = catalog.getFunctionMetadata(identifier) | ||
| catalog.registerFunction(func, true) | ||
| } else if (isRegisteredFunction && !isPersistentFunction) { | ||
| // unregister function and throw NoSuchFunctionException | ||
| catalog.unregisterFunction(identifier, true) | ||
| throw new NoSuchFunctionException(identifier.database.get, functionName) | ||
| } else if (!isRegisteredFunction && isPersistentFunction) { | ||
| // register function | ||
| val func = catalog.getFunctionMetadata(identifier) | ||
| catalog.registerFunction(func, true) | ||
| } else { | ||
| throw new NoSuchFunctionException(identifier.database.get, functionName) | ||
| } | ||
| } | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
| } | ||
|
|
||
| object FunctionsCommand { | ||
| // operators that do not have corresponding functions. | ||
| // They should be handled `DescribeFunctionCommand`, `ShowFunctionsCommand` | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,8 +28,8 @@ import org.apache.spark.{SparkException, SparkFiles} | |
| import org.apache.spark.internal.config | ||
| import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD | ||
| import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} | ||
| import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} | ||
| import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
| import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER | ||
|
|
@@ -3030,6 +3030,49 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("REFRESH FUNCTION") { | ||
| val msg = intercept[AnalysisException] { | ||
| sql("REFRESH FUNCTION md5") | ||
| }.getMessage | ||
| assert(msg.contains("Cannot refresh native function")) | ||
|
|
||
| withUserDefinedFunction("func1" -> true) { | ||
| sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") | ||
| val msg = intercept[AnalysisException] { | ||
| sql("REFRESH FUNCTION func1") | ||
| }.getMessage | ||
| assert(msg.contains("Cannot refresh temp function")) | ||
| } | ||
|
|
||
| withUserDefinedFunction("func1" -> false) { | ||
| intercept[NoSuchFunctionException] { | ||
| sql("REFRESH FUNCTION func1") | ||
| } | ||
|
|
||
| val func = FunctionIdentifier("func1", Some("default")) | ||
| sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") | ||
| assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) | ||
|
||
| sql("REFRESH FUNCTION func1") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only positive test case. Could you think more and try to cover more cases? |
||
| assert(spark.sessionState.catalog.isRegisteredFunction(func) == true) | ||
|
|
||
| spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") | ||
| assert(spark.sessionState.catalog.isRegisteredFunction(func) == true) | ||
| intercept[NoSuchFunctionException] { | ||
| sql("REFRESH FUNCTION func1") | ||
| } | ||
| assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) | ||
|
|
||
| val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) | ||
| spark.sessionState.catalog.createFunction(function, false) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bit weird if we don't fail invalid functions when creating, but fail when refreshing it. How hard is it to make
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know why we not check function during create. It seems no use to create a not exists function but can produce some problem like typo. The same command, Hive failed directly
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can fix CREATE FUNCTION later and update this test.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or is it because you are calling the internal API not the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we make The different thing is we don't register/check function and the register/check action happened when user query with this function like I think it might be better to do the function check right now.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should make
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, I will try it after this pr finished. |
||
| assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) | ||
| val err = intercept[AnalysisException] { | ||
| sql("REFRESH FUNCTION func1") | ||
| }.getMessage | ||
| assert(err.contains("Can not load class")) | ||
| assert(spark.sessionState.catalog.isRegisteredFunction(func) == false) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| object FakeLocalFsFileSystem { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @huaxingao