Skip to content

Commit

Permalink
refactor: use NonEmptyString to access UDFMgr.
Browse files Browse the repository at this point in the history
No need to return TenantIsEmpty error anymore
  • Loading branch information
drmingdrmer committed Mar 5, 2024
1 parent dbe6830 commit bbeabf1
Show file tree
Hide file tree
Showing 50 changed files with 249 additions and 247 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/bendpy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ databend-common-expression = { path = "../query/expression" }
databend-common-license = { path = "../common/license" }
databend-common-meta-app = { path = "../meta/app" }
databend-common-meta-embedded = { path = "../meta/embedded" }
databend-common-meta-types = { path = "../meta/types" }
databend-common-users = { path = "../query/users" }
databend-query = { path = "../query/service", features = [
"simd",
Expand Down
5 changes: 4 additions & 1 deletion src/bendpy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::GrantObject;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_types::NonEmptyString;
use databend_common_users::UserApiProvider;
use databend_query::sessions::QueryContext;
use databend_query::sessions::Session;
Expand Down Expand Up @@ -55,12 +56,14 @@ impl PySessionContext {
uuid::Uuid::new_v4().to_string()
};

let tenant = NonEmptyString::new(tenant).unwrap();

let config = GlobalConfig::instance();
UserApiProvider::try_create_simple(config.meta.to_meta_grpc_client_conf(), &tenant)
.await
.unwrap();

session.set_current_tenant(tenant.to_owned());
session.set_current_tenant(tenant.to_string());

let mut user = UserInfo::new_no_auth("root", "%");
user.grants.grant_privileges(
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl GlobalServices {
UserApiProvider::init(
config.meta.to_meta_grpc_client_conf(),
config.query.idm.clone(),
config.query.tenant_id.as_str(),
&config.query.tenant_id,
config.query.tenant_quota.clone(),
)
.await?;
Expand Down
11 changes: 6 additions & 5 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_meta_app::principal::StageType;
use databend_common_meta_app::principal::UserGrantSet;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_types::NonEmptyString;
use databend_common_sql::optimizer::get_udf_names;
use databend_common_sql::plans::InsertInputSource;
use databend_common_sql::plans::PresignAction;
Expand Down Expand Up @@ -507,7 +508,7 @@ impl AccessChecker for PrivilegeAccess {
ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) }
ObjectId::Database(db_id) => { (db_id, None) }
};
let has_priv = has_priv(tenant.as_str(), database, None, db_id, table_id, grant_set).await?;
let has_priv = has_priv(&tenant, database, None, db_id, table_id, grant_set).await?;
return if has_priv {
Ok(())
} else {
Expand All @@ -526,7 +527,7 @@ impl AccessChecker for PrivilegeAccess {
ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) }
ObjectId::Database(db_id) => { (db_id, None) }
};
let has_priv = has_priv(tenant.as_str(), database, None, db_id, table_id, grant_set).await?;
let has_priv = has_priv(&tenant, database, None, db_id, table_id, grant_set).await?;
return if has_priv {
Ok(())
} else {
Expand All @@ -545,7 +546,7 @@ impl AccessChecker for PrivilegeAccess {
ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) }
ObjectId::Database(db_id) => { (db_id, None) }
};
let has_priv = has_priv(tenant.as_str(), database, Some(table), db_id, table_id, grant_set).await?;
let has_priv = has_priv(&tenant, database, Some(table), db_id, table_id, grant_set).await?;
return if has_priv {
Ok(())
} else {
Expand Down Expand Up @@ -628,7 +629,7 @@ impl AccessChecker for PrivilegeAccess {
ObjectId::Table(db_id, table_id) => { (db_id, Some(table_id)) }
ObjectId::Database(db_id) => { (db_id, None) }
};
let has_priv = has_priv(tenant.as_str(), &plan.database, None, db_id, None, grant_set).await?;
let has_priv = has_priv(&tenant, &plan.database, None, db_id, None, grant_set).await?;

return if has_priv {
Ok(())
Expand Down Expand Up @@ -1002,7 +1003,7 @@ impl AccessChecker for PrivilegeAccess {

// TODO(liyz): replace it with verify_access
async fn has_priv(
tenant: &str,
tenant: &NonEmptyString,
db_name: &str,
table_name: Option<&str>,
db_id: u64,
Expand Down
5 changes: 1 addition & 4 deletions src/query/service/src/interpreters/common/grant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ pub async fn validate_grant_object_exists(
}
}
GrantObject::UDF(udf) => {
if !UserApiProvider::instance()
.exists_udf(tenant.as_str(), udf)
.await?
{
if !UserApiProvider::instance().exists_udf(&tenant, udf).await? {
return Err(databend_common_exception::ErrorCode::UnknownStage(format!(
"udf {udf} not exists"
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Interpreter for CreateDatabaseInterpreter {
let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?;
let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data;
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let databases = catalog.list_databases(&tenant).await?;
let databases = catalog.list_databases(tenant.as_str()).await?;
if quota.max_databases != 0 && databases.len() >= quota.max_databases as usize {
return Err(ErrorCode::TenantQuotaExceeded(format!(
"Max databases quota exceeded {}",
Expand All @@ -121,7 +121,7 @@ impl Interpreter for CreateDatabaseInterpreter {
};
// if create from other tenant, check from share endpoint
if let Some(ref share_name) = self.plan.meta.from_share {
self.check_create_database_from_share(&tenant, share_name)
self.check_create_database_from_share(&tenant.to_string(), share_name)
.await?;
}

Expand All @@ -130,7 +130,7 @@ impl Interpreter for CreateDatabaseInterpreter {

// Grant ownership as the current role. The above create_db_req.meta.owner could be removed in
// the future.
let role_api = UserApiProvider::instance().get_role_api_client(&tenant)?;
let role_api = UserApiProvider::instance().role_api(&tenant);
if let Some(current_role) = self.ctx.get_current_role() {
role_api
.grant_ownership(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ impl Interpreter for DropDatabaseInterpreter {
.get_database(tenant.as_str(), &self.plan.database)
.await;
if let Ok(db) = db {
let role_api = UserApiProvider::instance().get_role_api_client(tenant.as_str())?;
let role_api = UserApiProvider::instance().role_api(&tenant);
let owner_object = OwnershipObject::Database {
catalog_name: self.plan.catalog.clone(),
db_id: db.get_db_info().ident.db_id,
};

role_api.revoke_ownership(&owner_object).await?;
RoleCacheManager::instance().invalidate_cache(tenant.as_str());
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// actual drop database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_meta_app::principal::OwnershipObject;
use databend_common_meta_app::principal::PrincipalIdentity;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_app::principal::UserPrivilegeType::Ownership;
use databend_common_meta_types::NonEmptyString;
use databend_common_sql::plans::GrantPrivilegePlan;
use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
Expand Down Expand Up @@ -111,7 +112,7 @@ impl GrantPrivilegeInterpreter {
async fn grant_ownership(
&self,
ctx: &Arc<QueryContext>,
tenant: &str,
tenant: &NonEmptyString,
owner_object: &OwnershipObject,
new_role: &str,
) -> Result<()> {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl Interpreter for GrantPrivilegeInterpreter {
.convert_to_ownerobject(tenant.as_str(), &plan.on, plan.on.catalog())
.await?;
if self.ctx.get_current_role().is_some() {
self.grant_ownership(&self.ctx, tenant.as_str(), &owner_object, &role)
self.grant_ownership(&self.ctx, &tenant, &owner_object, &role)
.await?;
} else {
return Err(databend_common_exception::ErrorCode::UnknownRole(
Expand All @@ -208,9 +209,9 @@ impl Interpreter for GrantPrivilegeInterpreter {
}
} else {
user_mgr
.grant_privileges_to_role(tenant.as_str(), &role, plan.on, plan.priv_types)
.grant_privileges_to_role(&tenant, &role, plan.on, plan.priv_types)
.await?;
RoleCacheManager::instance().invalidate_cache(tenant.as_str());
RoleCacheManager::instance().invalidate_cache(&tenant);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,7 @@ impl Interpreter for RevokePrivilegeInterpreter {
}
for object in plan.on {
user_mgr
.revoke_privileges_from_role(
tenant.as_str(),
&role,
object,
plan.priv_types,
)
.revoke_privileges_from_role(&tenant, &role, object, plan.priv_types)
.await?;
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/query/service/src/interpreters/interpreter_role_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,9 @@ impl Interpreter for CreateRoleInterpreter {
let tenant = self.ctx.get_tenant();
let user_mgr = UserApiProvider::instance();
user_mgr
.add_role(
tenant.as_str(),
RoleInfo::new(&role_name),
plan.if_not_exists,
)
.await?;
RoleCacheManager::instance()
.force_reload(tenant.as_str())
.add_role(&tenant, RoleInfo::new(&role_name), plan.if_not_exists)
.await?;
RoleCacheManager::instance().force_reload(&tenant).await?;
Ok(PipelineBuildResult::create())
}
}
6 changes: 2 additions & 4 deletions src/query/service/src/interpreters/interpreter_role_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl Interpreter for DropRoleInterpreter {
}
let tenant = self.ctx.get_tenant();
UserApiProvider::instance()
.drop_role(tenant.as_str(), plan.role_name, plan.if_exists)
.drop_role(&tenant, plan.role_name, plan.if_exists)
.await?;

let session = self.ctx.get_current_session();
Expand All @@ -77,9 +77,7 @@ impl Interpreter for DropRoleInterpreter {
}
}

RoleCacheManager::instance()
.force_reload(tenant.as_str())
.await?;
RoleCacheManager::instance().force_reload(&tenant).await?;
Ok(PipelineBuildResult::create())
}
}
10 changes: 3 additions & 7 deletions src/query/service/src/interpreters/interpreter_role_grant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ impl Interpreter for GrantRoleInterpreter {
// TODO: check privileges

// Check if the grant role exists.
user_mgr
.get_role(tenant.as_str(), plan.role.clone())
.await?;
user_mgr.get_role(&tenant, plan.role.clone()).await?;
match plan.principal {
PrincipalIdentity::User(user) => {
user_mgr
Expand All @@ -71,14 +69,12 @@ impl Interpreter for GrantRoleInterpreter {
}
PrincipalIdentity::Role(role) => {
user_mgr
.grant_role_to_role(tenant.as_str(), &role, plan.role)
.grant_role_to_role(&tenant, &role, plan.role)
.await?;
}
}

RoleCacheManager::instance()
.force_reload(tenant.as_str())
.await?;
RoleCacheManager::instance().force_reload(&tenant).await?;
Ok(PipelineBuildResult::create())
}
}
6 changes: 2 additions & 4 deletions src/query/service/src/interpreters/interpreter_role_revoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,12 @@ impl Interpreter for RevokeRoleInterpreter {
}
PrincipalIdentity::Role(role) => {
UserApiProvider::instance()
.revoke_role_from_role(tenant.as_str(), &role, &plan.role)
.revoke_role_from_role(&tenant, &role, &plan.role)
.await?;
}
}

RoleCacheManager::instance()
.force_reload(tenant.as_str())
.await?;
RoleCacheManager::instance().force_reload(&tenant).await?;
Ok(PipelineBuildResult::create())
}
}
5 changes: 4 additions & 1 deletion src/query/service/src/interpreters/interpreter_setting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use chrono_tz::Tz;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_types::NonEmptyString;
use databend_common_sql::plans::SettingPlan;
use databend_common_sql::plans::VarValue;
use databend_common_users::UserApiProvider;
Expand Down Expand Up @@ -91,7 +92,9 @@ impl Interpreter for SettingInterpreter {
if config.query.internal_enable_sandbox_tenant && !tenant.is_empty() {
UserApiProvider::try_create_simple(
config.meta.to_meta_grpc_client_conf(),
&tenant,
&NonEmptyString::new(tenant).map_err(|_e| {
ErrorCode::TenantIsEmpty("when SettingInterpreter")
})?,
)
.await?;
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/interpreter_show_grants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ impl Interpreter for ShowGrantsInterpreter {
}
PrincipalIdentity::Role(role) => {
let role = UserApiProvider::instance()
.get_role(tenant.as_str(), role.clone())
.get_role(&tenant, role.clone())
.await?;
(format!("ROLE `{}`", role.identity()), role.grants)
}
},
};
// TODO: display roles list instead of the inherited roles
let grant_entries = RoleCacheManager::instance()
.find_related_roles(tenant.as_str(), &grant_set.roles())
.find_related_roles(&tenant, &grant_set.roles())
.await?
.into_iter()
.map(|role| role.grants)
Expand Down
12 changes: 8 additions & 4 deletions src/query/service/src/interpreters/interpreter_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_app::schema::TableStatistics;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::NonEmptyString;
use databend_common_sql::field_default_value;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_sql::BloomIndexColumns;
Expand Down Expand Up @@ -100,6 +101,9 @@ impl Interpreter for CreateTableInterpreter {
#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
let tenant = self.plan.tenant.clone();
let tenant = NonEmptyString::new(tenant).map_err(|_e| {
ErrorCode::TenantIsEmpty("tenant is empty when CreateTableInterpreter")
})?;
let has_computed_column = self
.plan
.schema
Expand Down Expand Up @@ -186,7 +190,7 @@ impl CreateTableInterpreter {
.await?;
let db_id = db.get_db_info().ident.db_id;

let role_api = UserApiProvider::instance().get_role_api_client(tenant.as_str())?;
let role_api = UserApiProvider::instance().role_api(&tenant);
role_api
.grant_ownership(
&OwnershipObject::Table {
Expand All @@ -197,7 +201,7 @@ impl CreateTableInterpreter {
&current_role.name,
)
.await?;
RoleCacheManager::instance().invalidate_cache(tenant.as_str());
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// If the table creation query contains column definitions, like 'CREATE TABLE t1(a int) AS SELECT * from t2',
Expand Down Expand Up @@ -278,7 +282,7 @@ impl CreateTableInterpreter {
.await?;
let db_id = db.get_db_info().ident.db_id;

let role_api = UserApiProvider::instance().get_role_api_client(tenant.as_str())?;
let role_api = UserApiProvider::instance().role_api(&tenant);
role_api
.grant_ownership(
&OwnershipObject::Table {
Expand All @@ -289,7 +293,7 @@ impl CreateTableInterpreter {
&current_role.name,
)
.await?;
RoleCacheManager::instance().invalidate_cache(tenant.as_str());
RoleCacheManager::instance().invalidate_cache(&tenant);
}

// update share spec if needed
Expand Down
Loading

0 comments on commit bbeabf1

Please sign in to comment.