Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: try drop old udf - DON'T MERGE #17264

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
refactor: try drop old udf
In eariler version, udf is serialize as json. In current version we can not drop/list these udfs.

show user functions;
2001=>InvalidReply: source:(PbDecodeError: failed to decode Protobuf message: buffer underflow; when:(decode value of __fd_udfs/tn3ftqihs/plusp))
while list UDFs

drop function IF EXISTS plusp;
2001=>InvalidReply: source:(PbDecodeError: failed to decode Protobuf message: buffer underflow; when:(decode value of __fd_udfs/tn3ftqihs/plusp))

So if drop udf return err, we directly drop the kv.
TCeason committed Jan 14, 2025
commit 13156bd08257e1105a53e1eeb3b0265d69d234dd
21 changes: 16 additions & 5 deletions src/query/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;
use futures::TryStreamExt;

@@ -155,16 +156,26 @@ impl UdfMgr {
seq: MatchSeq,
) -> Result<Option<SeqV<UserDefinedFunction>>, MetaError> {
let key = UdfIdent::new(&self.tenant, udf_name);
let req = UpsertPB::delete(key).with(seq);
let res = self.kv_api.upsert_pb(&req).await?;

if res.is_changed() {
Ok(res.prev)
let req = UpsertPB::delete(key.clone()).with(seq);
if let Ok(res) = self.kv_api.upsert_pb(&req).await {
if res.is_changed() {
Ok(res.prev)
} else {
Ok(None)
}
} else {
self.try_drop_old_udf(&key).await?;
Ok(None)
}
}

#[async_backtrace::framed]
#[fastrace::trace]
pub async fn try_drop_old_udf(&self, key: &UdfIdent) -> Result<(), MetaError> {
let _res = self.kv_api.upsert_kv(UpsertKV::delete(key)).await?;
Ok(())
}

fn ensure_non_builtin(&self, name: &str) -> Result<(), UdfError> {
if is_builtin_function(name) {
return Err(UdfError::Exists {
29 changes: 29 additions & 0 deletions src/query/management/tests/it/udf.rs
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@ use databend_common_meta_embedded::MemMeta;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::Operation;
use databend_common_meta_types::UpsertKV;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_add_udf() -> Result<()> {
@@ -109,6 +111,33 @@ async fn test_add_udf() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_drop_old_udf() -> Result<()> {
let (kv_api, udf_api) = new_udf_api().await?;

// lambda udf
let udf = create_test_lambda_udf();
let udf_key = format!("__fd_udfs/admin/{}", udf.name);

let v = serde_json::to_vec("test")?;
let kv_api = kv_api.clone();
let _upsert_kv = kv_api
.upsert_kv(UpsertKV::new(
&udf_key,
MatchSeq::Exact(0),
Operation::Update(v),
None,
))
.await?;
let err = udf_api.list_udf().await.is_err();
assert!(err);

udf_api.drop_udf(&udf.name, MatchSeq::GE(1)).await?;
let udfs = udf_api.list_udf().await?;
assert_eq!(udfs, vec![]);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_already_exists_add_udf() -> Result<()> {
let (_, udf_api) = new_udf_api().await?;
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
@@ -832,9 +832,9 @@ impl AccessChecker for PrivilegeAccess {
}
Plan::DropUDF(plan) => {
let udf_name = &plan.udf;
if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists {
/*if !UserApiProvider::instance().exists_udf(&tenant, udf_name).await? && plan.if_exists {
return Ok(());
}
}*/
if enable_experimental_rbac_check {
let udf = HashSet::from([udf_name]);
self.validate_udf_access(udf).await?;
33 changes: 16 additions & 17 deletions src/query/service/src/interpreters/interpreter_user_udf_drop.rs
Original file line number Diff line number Diff line change
@@ -15,10 +15,10 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_management::RoleApi;
use databend_common_meta_app::principal::OwnershipObject;
// use databend_common_management::RoleApi;
// use databend_common_meta_app::principal::OwnershipObject;
use databend_common_sql::plans::DropUDFPlan;
use databend_common_users::RoleCacheManager;
// use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
use log::debug;

@@ -57,21 +57,20 @@ impl Interpreter for DropUserUDFScript {
let plan = self.plan.clone();
let tenant = self.ctx.get_tenant();

// we should do `drop ownership` after actually drop udf, and udf maybe not exists.
// // we should do `drop ownership` after actually drop udf, and udf maybe not exists.
// drop the ownership
if UserApiProvider::instance()
.exists_udf(&tenant, &self.plan.udf)
.await?
{
let role_api = UserApiProvider::instance().role_api(&tenant);
let owner_object = OwnershipObject::UDF {
name: self.plan.udf.clone(),
};

role_api.revoke_ownership(&owner_object).await?;
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// if UserApiProvider::instance()
// .exists_udf(&tenant, &self.plan.udf)
// .await?
// {
// let role_api = UserApiProvider::instance().role_api(&tenant);
// let owner_object = OwnershipObject::UDF {
// name: self.plan.udf.clone(),
// };
//
// role_api.revoke_ownership(&owner_object).await?;
// RoleCacheManager::instance().invalidate_cache(&tenant);
// }
// TODO: if it is appropriate to return an ErrorCode that contains either meta-service error and UdfNotFound error?

UserApiProvider::instance()