Skip to content

Commit

Permalink
refactor: use type Tenant for BackgroundJobIdent.tenant
Browse files Browse the repository at this point in the history
Define BackgroundJobIdent with `TIdent`, make
`BackgroundJobIdent.tenant` a `Tenant` instead of a plain `String`.

- Part of databendlabs#14719
  • Loading branch information
drmingdrmer committed Mar 23, 2024
1 parent 935f57f commit 90714c7
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 105 deletions.
6 changes: 3 additions & 3 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
} else {
Err(KVAppError::AppError(AppError::BackgroundJobAlreadyExists(
BackgroundJobAlreadyExists::new(
&name_key.name,
name_key.name(),
format!("create background job: {:?}", req.job_name),
),
)))
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
let r = get_background_job_by_id(self, &BackgroundJobId { id: job_id.0 }).await?;
// filter none and get the task info
if let Some(task_info) = r.1 {
res.push((r.0, ident.name, task_info));
res.push((r.0, ident.name().to_string(), task_info));
}
}
Ok(res)
Expand Down Expand Up @@ -326,7 +326,7 @@ pub fn background_job_has_to_exist(
if seq == 0 {
debug!(seq = seq, name_ident :? =(name_ident); "background job does not exist");
Err(KVAppError::AppError(AppError::UnknownBackgroundJob(
UnknownBackgroundJob::new(&name_ident.name, format!("{:?}", name_ident)),
UnknownBackgroundJob::new(name_ident.name(), format!("{:?}", name_ident)),
)))
} else {
Ok(())
Expand Down
16 changes: 8 additions & 8 deletions src/meta/api/src/background_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_meta_app::background::ManualTriggerParams;
use databend_common_meta_app::background::UpdateBackgroundJobParamsReq;
use databend_common_meta_app::background::UpdateBackgroundJobStatusReq;
use databend_common_meta_app::background::UpdateBackgroundTaskReq;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::MetaError;
use log::info;
Expand Down Expand Up @@ -199,17 +200,16 @@ impl BackgroundApiTestSuite {
&self,
mt: &MT,
) -> anyhow::Result<()> {
let tenant = "tenant1";
let tenant_name = "tenant1";
let job_name = "uuid1";
let job_ident = BackgroundJobIdent {
tenant: tenant.to_string(),
name: job_name.to_string(),
};

let tenant = Tenant::new_literal(tenant_name);
let job_ident = BackgroundJobIdent::new(tenant.clone(), job_name);

info!("--- list background jobs when their is no tasks");
{
let req = ListBackgroundJobsReq {
tenant: tenant.to_string(),
tenant: tenant_name.to_string(),
};

let res = mt.list_background_jobs(req).await;
Expand Down Expand Up @@ -334,14 +334,14 @@ impl BackgroundApiTestSuite {
info!("--- list background jobs when their is 1 tasks");
{
let req = ListBackgroundJobsReq {
tenant: tenant.to_string(),
tenant: tenant_name.to_string(),
};

let res = mt.list_background_jobs(req).await;
assert!(res.is_ok());
let resp = res.unwrap();
assert_eq!(1, resp.len());
assert_eq!(job_ident.name, resp[0].1, "expect same ident name");
assert_eq!(job_ident.name(), resp[0].1, "expect same ident name");
assert_eq!(
BackgroundJobState::FAILED,
resp[0].2.job_status.clone().unwrap().job_state,
Expand Down
68 changes: 9 additions & 59 deletions src/meta/app/src/background/background_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use chrono::DateTime;
use chrono::Utc;
use cron::Schedule;

use crate::background::BackgroundJobIdent;
use crate::background::BackgroundTaskType;
use crate::principal::UserIdentity;

Expand Down Expand Up @@ -182,21 +183,6 @@ impl BackgroundJobStatus {
}
}

// Serde is required by `BackgroundTaskInfo.creator`
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct BackgroundJobIdent {
// The user this job belongs to
pub tenant: String,

pub name: String,
}

impl Display for BackgroundJobIdent {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.tenant, self.name)
}
}

// Info
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct BackgroundJobInfo {
Expand Down Expand Up @@ -243,7 +229,7 @@ impl Display for CreateBackgroundJobReq {
write!(
f,
"create_background_job({}, {}, {:?}, {:?}, {}, {:?})",
self.job_name.name,
self.job_name.name(),
self.job_info.task_type,
self.job_info.job_params,
self.job_info.job_status,
Expand All @@ -265,7 +251,7 @@ pub struct GetBackgroundJobReq {

impl Display for GetBackgroundJobReq {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "get_background_job({})", self.name.name)
write!(f, "get_background_job({})", self.name.name())
}
}

Expand All @@ -286,7 +272,8 @@ impl Display for UpdateBackgroundJobStatusReq {
write!(
f,
"update_background_job_status({}, {})",
self.job_name.name, self.status
self.job_name.name(),
self.status
)
}
}
Expand All @@ -302,7 +289,8 @@ impl Display for UpdateBackgroundJobParamsReq {
write!(
f,
"update_background_job_params({}, {})",
self.job_name.name, self.params
self.job_name.name(),
self.params
)
}
}
Expand All @@ -319,7 +307,7 @@ impl Display for UpdateBackgroundJobReq {
write!(
f,
"update_background_job({}, {}, {:?}, {:?}, {}, {:?})",
self.job_name.name,
self.job_name.name(),
self.info.task_type,
self.info.job_params,
self.info.job_status,
Expand All @@ -341,7 +329,7 @@ pub struct DeleteBackgroundJobReq {

impl Display for DeleteBackgroundJobReq {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "delete_background_job({})", self.name.name)
write!(f, "delete_background_job({})", self.name.name())
}
}

Expand All @@ -361,41 +349,9 @@ impl Display for ListBackgroundJobsReq {

mod kvapi_key_impl {
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;

use crate::background::background_job::BackgroundJobId;
use crate::background::background_job::BackgroundJobIdent;
use crate::background::BackgroundJobInfo;
use crate::tenant::Tenant;

/// <prefix>/<tenant>/<background_job_ident> -> <id>
impl kvapi::Key for BackgroundJobIdent {
const PREFIX: &'static str = "__fd_background_job";

type ValueType = BackgroundJobId;

/// It belongs to a tenant
fn parent(&self) -> Option<String> {
Some(Tenant::new(&self.tenant).to_string_key())
}

fn to_string_key(&self) -> String {
kvapi::KeyBuilder::new_prefixed(Self::PREFIX)
.push_str(&self.tenant)
.push_str(&self.name)
.done()
}

fn from_str_key(s: &str) -> Result<Self, kvapi::KeyError> {
let mut p = kvapi::KeyParser::new_prefixed(s, Self::PREFIX)?;

let tenant = p.next_str()?;
let name = p.next_str()?;
p.done()?;

Ok(BackgroundJobIdent { tenant, name })
}
}

impl kvapi::Key for BackgroundJobId {
const PREFIX: &'static str = "__fd_background_job_by_id";
Expand All @@ -422,12 +378,6 @@ mod kvapi_key_impl {
}
}

impl kvapi::Value for BackgroundJobId {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[self.to_string_key()]
}
}

impl kvapi::Value for BackgroundJobInfo {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[]
Expand Down
75 changes: 75 additions & 0 deletions src/meta/app/src/background/job_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::TIdent;

/// Defines the meta-service key for background job.
pub type BackgroundJobIdent = TIdent<Resource>;

pub use kvapi_impl::Resource;

mod kvapi_impl {

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;

use crate::background::BackgroundJobId;
use crate::tenant_key::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_background_job";
type ValueType = BackgroundJobId;
}

impl kvapi::Value for BackgroundJobId {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[self.to_string_key()]
}
}

// impl From<ExistError<Resource>> for ErrorCode {
// fn from(err: ExistError<Resource>) -> Self {
// ErrorCode::ConnectionAlreadyExists(err.to_string())
// }
// }
//
// impl From<UnknownError<Resource>> for ErrorCode {
// fn from(err: UnknownError<Resource>) -> Self {
// // Special case: use customized message to keep backward compatibility.
// // TODO: consider using the default message in the future(`err.to_string()`)
// ErrorCode::UnknownConnection(format!("Connection '{}' does not exist.", err.name()))
// .add_message_back(err.ctx())
// }
// }
}

#[cfg(test)]
mod tests {
use databend_common_meta_kvapi::kvapi::Key;

use super::BackgroundJobIdent;
use crate::tenant::Tenant;

#[test]
fn test_connection_ident() {
let tenant = Tenant::new_literal("test");
let ident = BackgroundJobIdent::new(tenant, "test1");

let key = ident.to_string_key();
assert_eq!(key, "__fd_background_job/test/test1");

assert_eq!(ident, BackgroundJobIdent::from_str_key(&key).unwrap());
}
}
3 changes: 2 additions & 1 deletion src/meta/app/src/background/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

mod background_job;
mod background_task;
pub mod job_ident;
pub mod task_creator;

pub use background_job::BackgroundJobId;
pub use background_job::BackgroundJobIdent;
pub use background_job::BackgroundJobInfo;
pub use background_job::BackgroundJobParams;
pub use background_job::BackgroundJobState;
Expand Down Expand Up @@ -46,3 +46,4 @@ pub use background_task::ListBackgroundTasksReq;
pub use background_task::UpdateBackgroundTaskReply;
pub use background_task::UpdateBackgroundTaskReq;
pub use background_task::VacuumStats;
pub use job_ident::BackgroundJobIdent;
15 changes: 11 additions & 4 deletions src/meta/app/src/background/task_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::fmt;

use crate::background::BackgroundJobIdent;
use crate::KeyWithTenant;

/// Same as [`BackgroundJobIdent`] but provide serde support for use in a record value.
/// [`BackgroundJobIdent`] is a kvapi::Key that does not need to be `serde`
Expand All @@ -26,6 +27,15 @@ pub struct BackgroundTaskCreator {
pub name: String,
}

impl BackgroundTaskCreator {
pub fn new(tenant: impl ToString, name: impl ToString) -> Self {
Self {
tenant: tenant.to_string(),
name: name.to_string(),
}
}
}

impl fmt::Display for BackgroundTaskCreator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.tenant, self.name)
Expand All @@ -34,9 +44,6 @@ impl fmt::Display for BackgroundTaskCreator {

impl From<BackgroundJobIdent> for BackgroundTaskCreator {
fn from(ident: BackgroundJobIdent) -> Self {
Self {
tenant: ident.tenant,
name: ident.name,
}
Self::new(ident.tenant_name(), ident.name())
}
}
Loading

0 comments on commit 90714c7

Please sign in to comment.