diff --git a/src/query/management/src/udf/udf_mgr.rs b/src/query/management/src/udf/udf_mgr.rs index afd2eedf2aa46..1319f7e9a9147 100644 --- a/src/query/management/src/udf/udf_mgr.rs +++ b/src/query/management/src/udf/udf_mgr.rs @@ -27,6 +27,7 @@ use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaError; +use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use futures::TryStreamExt; @@ -155,16 +156,26 @@ impl UdfMgr { seq: MatchSeq, ) -> Result>, MetaError> { let key = UdfIdent::new(&self.tenant, udf_name); - let req = UpsertPB::delete(key).with(seq); - let res = self.kv_api.upsert_pb(&req).await?; - - if res.is_changed() { - Ok(res.prev) + let req = UpsertPB::delete(key.clone()).with(seq); + if let Ok(res) = self.kv_api.upsert_pb(&req).await { + if res.is_changed() { + Ok(res.prev) + } else { + Ok(None) + } } else { + self.try_drop_old_udf(&key).await?; Ok(None) } } + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn try_drop_old_udf(&self, key: &UdfIdent) -> Result<(), MetaError> { + let _res = self.kv_api.upsert_kv(UpsertKV::delete(key)).await?; + Ok(()) + } + fn ensure_non_builtin(&self, name: &str) -> Result<(), UdfError> { if is_builtin_function(name) { return Err(UdfError::Exists { diff --git a/src/query/management/tests/it/udf.rs b/src/query/management/tests/it/udf.rs index adb09f25f2298..0031c5c0b325e 100644 --- a/src/query/management/tests/it/udf.rs +++ b/src/query/management/tests/it/udf.rs @@ -28,6 +28,8 @@ use databend_common_meta_embedded::MemMeta; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::Operation; +use databend_common_meta_types::UpsertKV; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_add_udf() -> Result<()> { @@ -109,6 +111,33 @@ async fn test_add_udf() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_drop_old_udf() -> Result<()> { + let (kv_api, udf_api) = new_udf_api().await?; + + // lambda udf + let udf = create_test_lambda_udf(); + let udf_key = format!("__fd_udfs/admin/{}", udf.name); + + let v = serde_json::to_vec("test")?; + let kv_api = kv_api.clone(); + let _upsert_kv = kv_api + .upsert_kv(UpsertKV::new( + &udf_key, + MatchSeq::Exact(0), + Operation::Update(v), + None, + )) + .await?; + let err = udf_api.list_udf().await.is_err(); + assert!(err); + + udf_api.drop_udf(&udf.name, MatchSeq::GE(1)).await?; + let udfs = udf_api.list_udf().await?; + assert_eq!(udfs, vec![]); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_already_exists_add_udf() -> Result<()> { let (_, udf_api) = new_udf_api().await?; diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 4c5df38fb8068..9a087ed961685 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -832,9 +832,9 @@ impl AccessChecker for PrivilegeAccess { } Plan::DropUDF(plan) => { let udf_name = &plan.udf; - if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists { + /*if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists { return Ok(()); - } + }*/ if enable_experimental_rbac_check { let udf = HashSet::from([udf_name]); self.validate_udf_access(udf).await?; diff --git a/src/query/service/src/interpreters/interpreter_user_udf_drop.rs b/src/query/service/src/interpreters/interpreter_user_udf_drop.rs index a897385b71201..461ca2f015ea5 100644 --- a/src/query/service/src/interpreters/interpreter_user_udf_drop.rs +++ b/src/query/service/src/interpreters/interpreter_user_udf_drop.rs @@ -15,10 +15,10 @@ use std::sync::Arc; use databend_common_exception::Result; -use databend_common_management::RoleApi; -use databend_common_meta_app::principal::OwnershipObject; +// use databend_common_management::RoleApi; +// use databend_common_meta_app::principal::OwnershipObject; use databend_common_sql::plans::DropUDFPlan; -use databend_common_users::RoleCacheManager; +// use databend_common_users::RoleCacheManager; use databend_common_users::UserApiProvider; use log::debug; @@ -57,21 +57,20 @@ impl Interpreter for DropUserUDFScript { let plan = self.plan.clone(); let tenant = self.ctx.get_tenant(); - // we should do `drop ownership` after actually drop udf, and udf maybe not exists. + // // we should do `drop ownership` after actually drop udf, and udf maybe not exists. // drop the ownership - if UserApiProvider::instance() - .exists_udf(&tenant, &self.plan.udf) - .await? - { - let role_api = UserApiProvider::instance().role_api(&tenant); - let owner_object = OwnershipObject::UDF { - name: self.plan.udf.clone(), - }; - - role_api.revoke_ownership(&owner_object).await?; - RoleCacheManager::instance().invalidate_cache(&tenant); - } - + // if UserApiProvider::instance() + // .exists_udf(&tenant, &self.plan.udf) + // .await? + // { + // let role_api = UserApiProvider::instance().role_api(&tenant); + // let owner_object = OwnershipObject::UDF { + // name: self.plan.udf.clone(), + // }; + // + // role_api.revoke_ownership(&owner_object).await?; + // RoleCacheManager::instance().invalidate_cache(&tenant); + // } // TODO: if it is appropriate to return an ErrorCode that contains either meta-service error and UdfNotFound error? UserApiProvider::instance()