Skip to content

Commit

Permalink
refactor: try drop old udf
Browse files Browse the repository at this point in the history
In eariler version, udf is serialize as json. In current version we can not drop/list these udfs.

show user functions;
2001=>InvalidReply: source:(PbDecodeError: failed to decode Protobuf message: buffer underflow; when:(decode value of __fd_udfs/tn3ftqihs/plusp))
while list UDFs

drop function IF EXISTS plusp;
2001=>InvalidReply: source:(PbDecodeError: failed to decode Protobuf message: buffer underflow; when:(decode value of __fd_udfs/tn3ftqihs/plusp))

So if drop udf return err, we directly drop the kv.
  • Loading branch information
TCeason committed Jan 14, 2025
1 parent 00f4bd2 commit 13156bd
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 24 deletions.
21 changes: 16 additions & 5 deletions src/query/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;
use futures::TryStreamExt;

Expand Down Expand Up @@ -155,16 +156,26 @@ impl UdfMgr {
seq: MatchSeq,
) -> Result<Option<SeqV<UserDefinedFunction>>, MetaError> {
let key = UdfIdent::new(&self.tenant, udf_name);
let req = UpsertPB::delete(key).with(seq);
let res = self.kv_api.upsert_pb(&req).await?;

if res.is_changed() {
Ok(res.prev)
let req = UpsertPB::delete(key.clone()).with(seq);
if let Ok(res) = self.kv_api.upsert_pb(&req).await {
if res.is_changed() {
Ok(res.prev)
} else {
Ok(None)
}
} else {
self.try_drop_old_udf(&key).await?;
Ok(None)
}
}

#[async_backtrace::framed]
#[fastrace::trace]
pub async fn try_drop_old_udf(&self, key: &UdfIdent) -> Result<(), MetaError> {
let _res = self.kv_api.upsert_kv(UpsertKV::delete(key)).await?;
Ok(())
}

fn ensure_non_builtin(&self, name: &str) -> Result<(), UdfError> {
if is_builtin_function(name) {
return Err(UdfError::Exists {
Expand Down
29 changes: 29 additions & 0 deletions src/query/management/tests/it/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use databend_common_meta_embedded::MemMeta;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::Operation;
use databend_common_meta_types::UpsertKV;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_add_udf() -> Result<()> {
Expand Down Expand Up @@ -109,6 +111,33 @@ async fn test_add_udf() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_drop_old_udf() -> Result<()> {
let (kv_api, udf_api) = new_udf_api().await?;

// lambda udf
let udf = create_test_lambda_udf();
let udf_key = format!("__fd_udfs/admin/{}", udf.name);

let v = serde_json::to_vec("test")?;
let kv_api = kv_api.clone();
let _upsert_kv = kv_api
.upsert_kv(UpsertKV::new(
&udf_key,
MatchSeq::Exact(0),
Operation::Update(v),
None,
))
.await?;
let err = udf_api.list_udf().await.is_err();
assert!(err);

udf_api.drop_udf(&udf.name, MatchSeq::GE(1)).await?;
let udfs = udf_api.list_udf().await?;
assert_eq!(udfs, vec![]);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_already_exists_add_udf() -> Result<()> {
let (_, udf_api) = new_udf_api().await?;
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,9 @@ impl AccessChecker for PrivilegeAccess {
}
Plan::DropUDF(plan) => {
let udf_name = &plan.udf;
if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists {
/*if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists {
return Ok(());
}
}*/
if enable_experimental_rbac_check {
let udf = HashSet::from([udf_name]);
self.validate_udf_access(udf).await?;
Expand Down
33 changes: 16 additions & 17 deletions src/query/service/src/interpreters/interpreter_user_udf_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_management::RoleApi;
use databend_common_meta_app::principal::OwnershipObject;
// use databend_common_management::RoleApi;
// use databend_common_meta_app::principal::OwnershipObject;
use databend_common_sql::plans::DropUDFPlan;
use databend_common_users::RoleCacheManager;
// use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
use log::debug;

Expand Down Expand Up @@ -57,21 +57,20 @@ impl Interpreter for DropUserUDFScript {
let plan = self.plan.clone();
let tenant = self.ctx.get_tenant();

// we should do `drop ownership` after actually drop udf, and udf maybe not exists.
// // we should do `drop ownership` after actually drop udf, and udf maybe not exists.
// drop the ownership
if UserApiProvider::instance()
.exists_udf(&tenant, &self.plan.udf)
.await?
{
let role_api = UserApiProvider::instance().role_api(&tenant);
let owner_object = OwnershipObject::UDF {
name: self.plan.udf.clone(),
};

role_api.revoke_ownership(&owner_object).await?;
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// if UserApiProvider::instance()
// .exists_udf(&tenant, &self.plan.udf)
// .await?
// {
// let role_api = UserApiProvider::instance().role_api(&tenant);
// let owner_object = OwnershipObject::UDF {
// name: self.plan.udf.clone(),
// };
//
// role_api.revoke_ownership(&owner_object).await?;
// RoleCacheManager::instance().invalidate_cache(&tenant);
// }
// TODO: if it is appropriate to return an ErrorCode that contains either meta-service error and UdfNotFound error?

UserApiProvider::instance()
Expand Down

0 comments on commit 13156bd

Please sign in to comment.