Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: try drop old udf - DON'T MERGE #17264

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/query/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;
use futures::TryStreamExt;

Expand Down Expand Up @@ -155,16 +156,26 @@ impl UdfMgr {
seq: MatchSeq,
) -> Result<Option<SeqV<UserDefinedFunction>>, MetaError> {
let key = UdfIdent::new(&self.tenant, udf_name);
let req = UpsertPB::delete(key).with(seq);
let res = self.kv_api.upsert_pb(&req).await?;

if res.is_changed() {
Ok(res.prev)
let req = UpsertPB::delete(key.clone()).with(seq);
if let Ok(res) = self.kv_api.upsert_pb(&req).await {
if res.is_changed() {
Ok(res.prev)
} else {
Ok(None)
}
} else {
self.try_drop_old_udf(&key).await?;
Ok(None)
}
}

#[async_backtrace::framed]
#[fastrace::trace]
pub async fn try_drop_old_udf(&self, key: &UdfIdent) -> Result<(), MetaError> {
let _res = self.kv_api.upsert_kv(UpsertKV::delete(key)).await?;
Ok(())
}

fn ensure_non_builtin(&self, name: &str) -> Result<(), UdfError> {
if is_builtin_function(name) {
return Err(UdfError::Exists {
Expand Down
29 changes: 29 additions & 0 deletions src/query/management/tests/it/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use databend_common_meta_embedded::MemMeta;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::Operation;
use databend_common_meta_types::UpsertKV;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_add_udf() -> Result<()> {
Expand Down Expand Up @@ -109,6 +111,33 @@ async fn test_add_udf() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_drop_old_udf() -> Result<()> {
let (kv_api, udf_api) = new_udf_api().await?;

// lambda udf
let udf = create_test_lambda_udf();
let udf_key = format!("__fd_udfs/admin/{}", udf.name);

let v = serde_json::to_vec("test")?;
let kv_api = kv_api.clone();
let _upsert_kv = kv_api
.upsert_kv(UpsertKV::new(
&udf_key,
MatchSeq::Exact(0),
Operation::Update(v),
None,
))
.await?;
let err = udf_api.list_udf().await.is_err();
assert!(err);

udf_api.drop_udf(&udf.name, MatchSeq::GE(1)).await?;
let udfs = udf_api.list_udf().await?;
assert_eq!(udfs, vec![]);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_already_exists_add_udf() -> Result<()> {
let (_, udf_api) = new_udf_api().await?;
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,9 @@ impl AccessChecker for PrivilegeAccess {
}
Plan::DropUDF(plan) => {
let udf_name = &plan.udf;
if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists {
/*if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists {
return Ok(());
}
}*/
if enable_experimental_rbac_check {
let udf = HashSet::from([udf_name]);
self.validate_udf_access(udf).await?;
Expand Down
33 changes: 16 additions & 17 deletions src/query/service/src/interpreters/interpreter_user_udf_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_management::RoleApi;
use databend_common_meta_app::principal::OwnershipObject;
// use databend_common_management::RoleApi;
// use databend_common_meta_app::principal::OwnershipObject;
use databend_common_sql::plans::DropUDFPlan;
use databend_common_users::RoleCacheManager;
// use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
use log::debug;

Expand Down Expand Up @@ -57,21 +57,20 @@ impl Interpreter for DropUserUDFScript {
let plan = self.plan.clone();
let tenant = self.ctx.get_tenant();

// we should do `drop ownership` after actually drop udf, and udf maybe not exists.
// // we should do `drop ownership` after actually drop udf, and udf maybe not exists.
// drop the ownership
if UserApiProvider::instance()
.exists_udf(&tenant, &self.plan.udf)
.await?
{
let role_api = UserApiProvider::instance().role_api(&tenant);
let owner_object = OwnershipObject::UDF {
name: self.plan.udf.clone(),
};

role_api.revoke_ownership(&owner_object).await?;
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// if UserApiProvider::instance()
// .exists_udf(&tenant, &self.plan.udf)
// .await?
// {
// let role_api = UserApiProvider::instance().role_api(&tenant);
// let owner_object = OwnershipObject::UDF {
// name: self.plan.udf.clone(),
// };
//
// role_api.revoke_ownership(&owner_object).await?;
// RoleCacheManager::instance().invalidate_cache(&tenant);
// }
// TODO: if it is appropriate to return an ErrorCode that contains either meta-service error and UdfNotFound error?

UserApiProvider::instance()
Expand Down
Loading