Skip to content

Commit

Permalink
refactor: add BackgroundTaskCreator
Browse files Browse the repository at this point in the history
`BackgroundJobIdent` is a kvapi::Key but not a value.
It does not need to be `serde`.
Thus in this commit we create another struct `BackgroundTaskCreator`
with the same structure as `BackgroundJobIdent` for usage in values.

And remove `serde` declaration from `BackgroundJobIdent`.

- Part of databendlabs#14719
  • Loading branch information
drmingdrmer committed Mar 22, 2024
1 parent 42375d6 commit 2b8650e
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/meta/app/src/background/background_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl BackgroundJobStatus {
}

// Serde is required by `BackgroundTaskInfo.creator`
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct BackgroundJobIdent {
// The user this job belongs to
pub tenant: String,
Expand Down
7 changes: 4 additions & 3 deletions src/meta/app/src/background/background_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::time::Duration;
use chrono::DateTime;
use chrono::Utc;

use crate::background::task_creator::BackgroundTaskCreator;
use crate::background::BackgroundJobIdent;
use crate::background::ManualTriggerParams;
use crate::schema::TableStatistics;
Expand Down Expand Up @@ -120,13 +121,13 @@ pub struct BackgroundTaskInfo {
pub vacuum_stats: Option<VacuumStats>,

pub manual_trigger: Option<ManualTriggerParams>,
pub creator: Option<BackgroundJobIdent>,
pub creator: Option<BackgroundTaskCreator>,
pub created_at: DateTime<Utc>,
}

impl BackgroundTaskInfo {
pub fn new_compaction_task(
creator: BackgroundJobIdent,
job_ident: BackgroundJobIdent,
db_id: u64,
tb_id: u64,
tb_stats: TableStatistics,
Expand All @@ -148,7 +149,7 @@ impl BackgroundTaskInfo {
}),
vacuum_stats: None,
manual_trigger,
creator: Some(creator),
creator: Some(job_ident.into()),
created_at: now,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/app/src/background/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod background_job;
mod background_task;
pub mod task_creator;

pub use background_job::BackgroundJobId;
pub use background_job::BackgroundJobIdent;
Expand Down
42 changes: 42 additions & 0 deletions src/meta/app/src/background/task_creator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use crate::background::BackgroundJobIdent;

/// Same as [`BackgroundJobIdent`] but provide serde support for use in a record value.
/// [`BackgroundJobIdent`] is a kvapi::Key that does not need to be `serde`
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct BackgroundTaskCreator {
/// The user this job belongs to
pub tenant: String,

pub name: String,
}

impl fmt::Display for BackgroundTaskCreator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.tenant, self.name)
}
}

impl From<BackgroundJobIdent> for BackgroundTaskCreator {
fn from(ident: BackgroundJobIdent) -> Self {
Self {
tenant: ident.tenant,
name: ident.name,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ impl FromToProto for BackgroundJobStatus {
}
}

impl FromToProto for mt::background::BackgroundJobIdent {
type PB = pb::BackgroundJobIdent;
impl FromToProto for mt::background::task_creator::BackgroundTaskCreator {
type PB = pb::BackgroundTaskCreator;
fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}
Expand All @@ -180,7 +180,7 @@ impl FromToProto for mt::background::BackgroundJobIdent {
})
}
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
let p = pb::BackgroundJobIdent {
let p = pb::BackgroundTaskCreator {
ver: VER,
min_reader_ver: MIN_READER_VER,
tenant: self.tenant.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl FromToProto for mt::background::BackgroundTaskInfo {
.manual_trigger
.and_then(|t| ManualTriggerParams::from_pb(t).ok()),
creator: match p.creator {
Some(c) => Some(mt::background::BackgroundJobIdent::from_pb(c)?),
Some(c) => Some(mt::background::task_creator::BackgroundTaskCreator::from_pb(c)?),
None => None,
},
created_at: DateTime::<Utc>::from_pb(p.created_at)?,
Expand Down
3 changes: 2 additions & 1 deletion src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(80, "2024-02-01: Add: datatype.proto/DataType Geometry type"),
(81, "2024-03-04: Add: udf.udf_script"),
(82, "2024-03-08: Add: table.inverted_index"),
(83, "2024-03-14: Add: null_if in user.proto/NDJSONFileFormatParams")
(83, "2024-03-14: Add: null_if in user.proto/NDJSONFileFormatParams"),
(84, "2024-03-21: Rename: background.proto/BackgroundJobIdent to BackgroundTaskCreator"),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,4 @@ mod v079_udf_created_on;
mod v081_udf_script;
mod v082_table_index;
mod v083_ndjson_format_params;
mod v084_background_task_creator;
4 changes: 2 additions & 2 deletions src/meta/proto-conv/tests/it/v045_background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;

use chrono::TimeZone;
use chrono::Utc;
use databend_common_meta_app::background::BackgroundJobIdent;
use databend_common_meta_app::background::task_creator::BackgroundTaskCreator;
use databend_common_meta_app::background::BackgroundJobParams;
use databend_common_meta_app::background::BackgroundJobState;
use databend_common_meta_app::background::BackgroundJobStatus;
Expand Down Expand Up @@ -107,7 +107,7 @@ fn test_decode_v40_background_task_case_2() -> anyhow::Result<()> {
}),
vacuum_stats: Some(VacuumStats {}),
manual_trigger: None,
creator: Some(BackgroundJobIdent {
creator: Some(BackgroundTaskCreator {
tenant: "test1".to_string(),
name: "compactor_job".to_string(),
}),
Expand Down
4 changes: 2 additions & 2 deletions src/meta/proto-conv/tests/it/v048_background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;

use chrono::TimeZone;
use chrono::Utc;
use databend_common_meta_app::background::BackgroundJobIdent;
use databend_common_meta_app::background::task_creator::BackgroundTaskCreator;
use databend_common_meta_app::background::BackgroundJobParams;
use databend_common_meta_app::background::BackgroundJobState;
use databend_common_meta_app::background::BackgroundJobStatus;
Expand Down Expand Up @@ -90,7 +90,7 @@ fn test_decode_v48_background_task_case_2() -> anyhow::Result<()> {

triggered_at: Default::default(),
}),
creator: Some(BackgroundJobIdent {
creator: Some(BackgroundTaskCreator {
tenant: "test1".to_string(),
name: "compactor_job".to_string(),
}),
Expand Down
145 changes: 145 additions & 0 deletions src/meta/proto-conv/tests/it/v084_background_task_creator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use chrono::TimeZone;
use chrono::Utc;
use databend_common_meta_app::background::task_creator::BackgroundTaskCreator;
use databend_common_meta_app::background::BackgroundJobParams;
use databend_common_meta_app::background::BackgroundJobState;
use databend_common_meta_app::background::BackgroundJobStatus;
use databend_common_meta_app::background::BackgroundJobType;
use databend_common_meta_app::background::BackgroundTaskState;
use databend_common_meta_app::background::BackgroundTaskType;
use databend_common_meta_app::background::CompactionStats;
use databend_common_meta_app::background::ManualTriggerParams;
use databend_common_meta_app::background::VacuumStats;
use databend_common_meta_app::schema::TableStatistics;
use minitrace::func_name;

use crate::common;

// These bytes are built when a new version in introduced,
// and are kept for backward compatibility test.
//
// *************************************************************
// * These messages should never be updated, *
// * only be added when a new version is added, *
// * or be removed when an old version is no longer supported. *
// *************************************************************
//
// The message bytes are built from the output of `test_pb_from_to()`
#[test]
fn test_decode_v48_background_task_case_2() -> anyhow::Result<()> {
let bytes = vec![
26, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85,
84, 67, 32, 1, 50, 24, 100, 97, 116, 97, 98, 101, 110, 100, 32, 98, 97, 99, 107, 103, 114,
111, 117, 110, 100, 32, 116, 97, 115, 107, 58, 60, 8, 21, 16, 92, 26, 21, 8, 144, 78, 16,
160, 156, 1, 24, 30, 32, 40, 40, 10, 48, 11, 160, 6, 48, 168, 6, 24, 34, 20, 8, 232, 7, 16,
208, 15, 24, 3, 32, 4, 40, 5, 48, 6, 160, 6, 48, 168, 6, 24, 45, 0, 0, 200, 66, 160, 6, 48,
168, 6, 24, 66, 6, 160, 6, 48, 168, 6, 24, 74, 45, 10, 4, 49, 50, 51, 49, 18, 6, 160, 6,
48, 168, 6, 24, 26, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58,
48, 48, 32, 85, 84, 67, 160, 6, 48, 168, 6, 24, 210, 5, 28, 10, 5, 116, 101, 115, 116, 49,
18, 13, 99, 111, 109, 112, 97, 99, 116, 111, 114, 95, 106, 111, 98, 160, 6, 48, 168, 6, 24,
218, 5, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 48, 48, 32,
85, 84, 67, 160, 6, 48, 168, 6, 24,
];

let want = || databend_common_meta_app::background::BackgroundTaskInfo {
last_updated: Some(Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap()),
task_type: BackgroundTaskType::VACUUM,
task_state: BackgroundTaskState::STARTED,
message: "databend background task".to_string(),
compaction_task_stats: Some(CompactionStats {
db_id: 21,
table_id: 92,
before_compaction_stats: Some(TableStatistics {
number_of_rows: 10000,
data_bytes: 20000,
compressed_data_bytes: 30,
index_data_bytes: 40,
number_of_segments: Some(10),
number_of_blocks: Some(11),
}),
after_compaction_stats: Some(TableStatistics {
number_of_rows: 1000,
data_bytes: 2000,
compressed_data_bytes: 3,
index_data_bytes: 4,
number_of_segments: Some(5),
number_of_blocks: Some(6),
}),
total_compaction_time: Some(Duration::from_secs(100)),
}),
vacuum_stats: Some(VacuumStats {}),
manual_trigger: Some(ManualTriggerParams {
id: "1231".to_string(),
trigger: Default::default(),

triggered_at: Default::default(),
}),
creator: Some(BackgroundTaskCreator {
tenant: "test1".to_string(),
name: "compactor_job".to_string(),
}),

created_at: Default::default(),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), bytes.as_slice(), 48, want())
}

#[test]
fn test_decode_v48_background_job() -> anyhow::Result<()> {
let bytes = vec![
10, 78, 8, 1, 16, 100, 34, 19, 65, 109, 101, 114, 105, 99, 97, 47, 76, 111, 115, 95, 65,
110, 103, 101, 108, 101, 115, 42, 45, 10, 4, 49, 50, 51, 49, 18, 6, 160, 6, 48, 168, 6, 24,
26, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 48, 48, 32, 85,
84, 67, 160, 6, 48, 168, 6, 24, 160, 6, 48, 168, 6, 24, 18, 37, 34, 4, 116, 101, 115, 116,
42, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85,
84, 67, 160, 6, 48, 168, 6, 24, 42, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, 50,
58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 218, 5, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
32, 48, 48, 58, 48, 48, 58, 48, 48, 32, 85, 84, 67, 160, 6, 48, 168, 6, 24,
];

let want = || databend_common_meta_app::background::BackgroundJobInfo {
job_params: Some(BackgroundJobParams {
job_type: BackgroundJobType::INTERVAL,
scheduled_job_interval: std::time::Duration::from_secs(100),
scheduled_job_cron: "".to_string(),
scheduled_job_timezone: Some(chrono_tz::America::Los_Angeles),
manual_trigger_params: Some(ManualTriggerParams {
id: "1231".to_string(),
trigger: Default::default(),
triggered_at: Default::default(),
}),
}),
last_updated: Some(Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap()),
task_type: BackgroundTaskType::COMPACTION,
message: "".to_string(),
creator: None,
created_at: Default::default(),
job_status: Some(BackgroundJobStatus {
job_state: BackgroundJobState::RUNNING,
last_task_id: Some("test".to_string()),
last_task_run_at: Some(Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap()),
next_task_scheduled_time: None,
}),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), bytes.as_slice(), 48, want())
}
4 changes: 2 additions & 2 deletions src/meta/protos/proto/background.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ message BackgroundTaskInfo {
optional VacuumStats vacuum_stats = 8;
optional ManualTriggerParams manual_trigger = 9;
// Audit
optional BackgroundJobIdent creator = 90;
optional BackgroundTaskCreator creator = 90;
string created_at = 91;
}

message BackgroundJobIdent {
message BackgroundTaskCreator {
uint64 ver = 100;
uint64 min_reader_ver = 101;

Expand Down

0 comments on commit 2b8650e

Please sign in to comment.