Skip to content

Commit

Permalink
feat: Introduced MetaSpec for setting relative expiration (#14041)
Browse files Browse the repository at this point in the history
This is a compatible change with new feature provided.

- This commit introduces a new type, `MetaSpec`, which specifies content
  of the `KVMeta` to be stored for a key. This new type is particularly
  utilized in `upsert` requests to enable the setting of relative
  expiration times.

  Previously, the `KVMeta` type was used directly for this purpose.
  However, to avoid impacting existing storage data types, `MetaSpec` has
  been specifically added for use in `upsert` operations. When applying a
  raft-log, a `KVMeta` is built from `MetaSpec`.

  Designed with backward compatibility, `MetaSpec` maintains a serialized
  format compatible with `KVMeta`, ensuring no disruption to existing
  functionality.

- We introduce two new types `Time` and `Interval` to reprensent
  serde-able time stamp and time interval.

- Tests are added to ensure meta-service works correctly with API with
  ttl support, but databend-query does not use these API yet.
drmingdrmer authored Dec 16, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 629ee7b commit a64551c
Showing 28 changed files with 637 additions and 120 deletions.
4 changes: 2 additions & 2 deletions src/binaries/meta/kvapi.rs
Original file line number Diff line number Diff line change
@@ -16,8 +16,8 @@ use std::sync::Arc;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::With;
use databend_meta::configs::Config;
@@ -40,7 +40,7 @@ impl KvApiCommand {
let req = UpsertKVReq::update(config.key[0].as_str(), config.value.as_bytes());

let req = if let Some(expire_after) = config.expire_after {
req.with(KVMeta::new_expire(SeqV::<()>::now_sec() + expire_after))
req.with(MetaSpec::new_expire(SeqV::<()>::now_sec() + expire_after))
} else {
req
};
4 changes: 2 additions & 2 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
@@ -43,10 +43,10 @@ use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeq::Any;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::TxnRequest;
use log::as_debug;
@@ -243,7 +243,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
name_key.to_string_key().as_str(),
Any,
Operation::Update(serialize_struct(&meta)?),
Some(KVMeta::new_expire(req.expire_at)),
Some(MetaSpec::new_expire(req.expire_at)),
))
.await?;
// confirm a successful update
1 change: 1 addition & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
@@ -4182,6 +4182,7 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
value: 1_i8.to_le_bytes().to_vec(),
prev_value: false,
expire_at,
ttl_ms: None,
})),
}
}
2 changes: 2 additions & 0 deletions src/meta/api/src/util.rs
Original file line number Diff line number Diff line change
@@ -332,6 +332,7 @@ pub fn txn_op_put(key: &impl kvapi::Key, value: Vec<u8>) -> TxnOp {
value,
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}
}
@@ -344,6 +345,7 @@ pub fn txn_op_put_with_expire(key: &impl kvapi::Key, value: Vec<u8>, expire_at:
value,
prev_value: true,
expire_at: Some(expire_at),
ttl_ms: None,
})),
}
}
115 changes: 91 additions & 24 deletions src/meta/kvapi/src/kvapi/test_suite.rs
Original file line number Diff line number Diff line change
@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use std::time::Duration;

use databend_common_meta_types::protobuf as pb;
use databend_common_meta_types::txn_condition;
@@ -22,6 +21,7 @@ use databend_common_meta_types::txn_op_response;
use databend_common_meta_types::ConditionResult;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnCondition;
@@ -40,6 +40,7 @@ use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::With;
use log::debug;
use log::info;
use minitrace::full_name;
use minitrace::func_name;

use crate::kvapi;
@@ -58,11 +59,13 @@ impl kvapi::TestSuite {
self.kv_delete(&builder.build().await).await?;
self.kv_update(&builder.build().await).await?;
self.kv_timeout(&builder.build().await).await?;
self.kv_upsert_with_ttl(&builder.build().await).await?;
self.kv_meta(&builder.build().await).await?;
self.kv_list(&builder.build().await).await?;
self.kv_mget(&builder.build().await).await?;
self.kv_txn_absent_seq_0(&builder.build().await).await?;
self.kv_transaction(&builder.build().await).await?;
self.kv_transaction_with_ttl(&builder.build().await).await?;
self.kv_transaction_delete_match_seq_none(&builder.build().await)
.await?;
self.kv_transaction_delete_match_seq_some_not_match(&builder.build().await)
@@ -248,13 +251,10 @@ impl kvapi::TestSuite {
// - Test list expired and non-expired.
// - Test update with a new expire value.

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let now_sec = SeqV::<()>::now_sec();

let _res = kv
.upsert_kv(UpsertKVReq::update("k1", b"v1").with(KVMeta::new_expire(now + 2)))
.upsert_kv(UpsertKVReq::update("k1", b"v1").with(MetaSpec::new_expire(now_sec + 2)))
.await?;
// dbg!("upsert non expired k1", _res);

@@ -267,17 +267,14 @@ impl kvapi::TestSuite {

info!("---get expired");
{
tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await;
tokio::time::sleep(Duration::from_millis(3000)).await;
let res = kv.get_kv("k1").await?;
// dbg!("k1 expired", &res);
debug!("got k1:{:?}", res);
assert!(res.is_none(), "got expired");
}

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let now_sec = SeqV::<()>::now_sec();

info!("--- expired entry act as if it does not exist, an ADD op should apply");
{
@@ -286,7 +283,7 @@ impl kvapi::TestSuite {
.upsert_kv(
UpsertKVReq::update("k1", b"v1")
.with(MatchSeq::Exact(0))
.with(KVMeta::new_expire(now - 1)),
.with(MetaSpec::new_expire(now_sec - 1)),
)
.await?;
// dbg!("update expired k1", _res);
@@ -295,7 +292,7 @@ impl kvapi::TestSuite {
.upsert_kv(
UpsertKVReq::update("k2", b"v2")
.with(MatchSeq::Exact(0))
.with(KVMeta::new_expire(now + 10)),
.with(MetaSpec::new_expire(now_sec + 10)),
)
.await?;
// dbg!("update non expired k2", _res);
@@ -306,7 +303,7 @@ impl kvapi::TestSuite {
None,
Some(SeqV::with_meta(
3,
Some(KVMeta::new_expire(now + 10)),
Some(KVMeta::new_expire(now_sec + 10)),
b"v2".to_vec()
))
]);
@@ -325,7 +322,7 @@ impl kvapi::TestSuite {
kv.upsert_kv(
UpsertKVReq::update("k2", b"v2")
.with(MatchSeq::Exact(3))
.with(KVMeta::new_expire(now - 1)),
.with(MetaSpec::new_expire(now_sec - 1)),
)
.await?;

@@ -336,16 +333,42 @@ impl kvapi::TestSuite {
Ok(())
}

#[minitrace::trace]
pub async fn kv_upsert_with_ttl<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
// - Add with ttl

info!("--- {}", full_name!());

let _res = kv
.upsert_kv(
UpsertKVReq::update("k1", b"v1")
.with(MetaSpec::new_ttl(Duration::from_millis(2_000))),
)
.await?;

info!("---get unexpired");
{
let res = kv.get_kv("k1").await?;
assert!(res.is_some(), "got unexpired");
}

info!("---get expired");
{
tokio::time::sleep(Duration::from_millis(2_100)).await;
let res = kv.get_kv("k1").await?;
assert!(res.is_none(), "got expired");
}

Ok(())
}

#[minitrace::trace]
pub async fn kv_meta<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
info!("--- kvapi::KVApiTestSuite::kv_meta() start");

let test_key = "test_key_for_update_meta";

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let now_sec = SeqV::<()>::now_sec();

let r = kv.upsert_kv(UpsertKVReq::update(test_key, b"v1")).await?;
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.result);
@@ -358,7 +381,7 @@ impl kvapi::TestSuite {
test_key,
MatchSeq::Exact(seq + 1),
Operation::AsIs,
Some(KVMeta::new_expire(now + 20)),
Some(MetaSpec::new_expire(now_sec + 20)),
))
.await?;
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev);
@@ -371,14 +394,14 @@ impl kvapi::TestSuite {
test_key,
MatchSeq::Exact(seq),
Operation::AsIs,
Some(KVMeta::new_expire(now + 20)),
Some(MetaSpec::new_expire(now_sec + 20)),
))
.await?;
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev);
assert_eq!(
Some(SeqV::with_meta(
2,
Some(KVMeta::new_expire(now + 20)),
Some(KVMeta::new_expire(now_sec + 20)),
b"v1".to_vec()
)),
r.result
@@ -388,7 +411,11 @@ impl kvapi::TestSuite {
let key_value = kv.get_kv(test_key).await?;
assert!(key_value.is_some());
assert_eq!(
SeqV::with_meta(seq + 1, Some(KVMeta::new_expire(now + 20)), b"v1".to_vec()),
SeqV::with_meta(
seq + 1,
Some(KVMeta::new_expire(now_sec + 20)),
b"v1".to_vec()
),
key_value.unwrap(),
);

@@ -484,6 +511,7 @@ impl kvapi::TestSuite {
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];

@@ -645,6 +673,7 @@ impl kvapi::TestSuite {
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];

@@ -697,6 +726,7 @@ impl kvapi::TestSuite {
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];

@@ -752,6 +782,7 @@ impl kvapi::TestSuite {
value: val1_new.to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
// change k2
@@ -761,6 +792,7 @@ impl kvapi::TestSuite {
value: b"new_v2".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
// get k1
@@ -870,6 +902,7 @@ impl kvapi::TestSuite {
value: val1_new.to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
// get k1
@@ -915,6 +948,36 @@ impl kvapi::TestSuite {
Ok(())
}

#[minitrace::trace]
pub async fn kv_transaction_with_ttl<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
// - Add a record via transaction with ttl

info!("--- {}", full_name!());

let txn = TxnRequest {
condition: vec![],
if_then: vec![TxnOp::put_with_ttl("k1", b("v1"), Some(2_000))],
else_then: vec![],
};

let _resp = kv.transaction(txn).await?;

info!("---get unexpired");
{
let res = kv.get_kv("k1").await?;
assert!(res.is_some(), "got unexpired");
}

info!("---get expired");
{
tokio::time::sleep(Duration::from_millis(2_100)).await;
let res = kv.get_kv("k1").await?;
assert!(res.is_none(), "got expired");
}

Ok(())
}

/// If `TxnDeleteRequest.match_seq` is not set,
/// the delete operation will always be executed.
pub async fn kv_transaction_delete_match_seq_none<KV: kvapi::KVApi>(
@@ -1086,3 +1149,7 @@ impl kvapi::TestSuite {
Ok(())
}
}

fn b(s: &str) -> Vec<u8> {
s.as_bytes().to_vec()
}
1 change: 1 addition & 0 deletions src/meta/process/src/kv_processor.rs
Original file line number Diff line number Diff line change
@@ -218,6 +218,7 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
value,
prev_value: p.prev_value,
expire_at: p.expire_at,
ttl_ms: p.ttl_ms,
};

Ok(pr)
20 changes: 17 additions & 3 deletions src/meta/raft-store/src/applier.rs
Original file line number Diff line number Diff line change
@@ -23,11 +23,13 @@ use databend_common_meta_types::txn_op_response;
use databend_common_meta_types::AppliedState;
use databend_common_meta_types::Change;
use databend_common_meta_types::Cmd;
use databend_common_meta_types::CmdContext;
use databend_common_meta_types::ConditionResult;
use databend_common_meta_types::Entry;
use databend_common_meta_types::EntryPayload;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::Interval;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Node;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
@@ -61,6 +63,9 @@ use crate::sm_v002::SMV002;
pub struct Applier<'a> {
sm: &'a mut SMV002,

/// The context of the current applying log.
cmd_ctx: CmdContext,

/// The changes has been made by the applying one log entry
changes: Vec<Change<Vec<u8>, String>>,
}
@@ -69,6 +74,7 @@ impl<'a> Applier<'a> {
pub fn new(sm: &'a mut SMV002) -> Self {
Self {
sm,
cmd_ctx: CmdContext::from_millis(0),
changes: Vec::new(),
}
}
@@ -83,6 +89,8 @@ impl<'a> Applier<'a> {
let log_id = &entry.log_id;
let log_time_ms = Self::get_log_time(entry);

self.cmd_ctx = CmdContext::from_millis(log_time_ms);

self.clean_expired_kvs(log_time_ms).await?;

*self.sm.sys_data_mut().last_applied_mut() = Some(*log_id);
@@ -208,7 +216,10 @@ impl<'a> Applier<'a> {
) -> Result<(Option<SeqV>, Option<SeqV>), io::Error> {
debug!(upsert_kv = as_debug!(upsert_kv); "upsert_kv");

let (prev, result) = self.sm.upsert_kv_primary_index(upsert_kv).await?;
let (prev, result) = self
.sm
.upsert_kv_primary_index(upsert_kv, &self.cmd_ctx)
.await?;

self.sm
.update_expire_index(&upsert_kv.key, &prev, &result)
@@ -381,7 +392,10 @@ impl<'a> Applier<'a> {
put: &TxnPutRequest,
resp: &mut TxnReply,
) -> Result<(), io::Error> {
let upsert = UpsertKV::update(&put.key, &put.value).with(KVMeta::new(put.expire_at));
let upsert = UpsertKV::update(&put.key, &put.value).with(MetaSpec::new(
put.expire_at,
put.ttl_ms.map(Interval::from_millis),
));

let (prev, _result) = self.upsert_kv(&upsert).await?;

20 changes: 9 additions & 11 deletions src/meta/raft-store/src/sm_v002/sm_v002.rs
Original file line number Diff line number Diff line change
@@ -25,7 +25,9 @@ use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::AppliedState;
use databend_common_meta_types::CmdContext;
use databend_common_meta_types::Entry;
use databend_common_meta_types::EvalExpireTime;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
@@ -385,7 +387,10 @@ impl SMV002 {
pub(crate) async fn upsert_kv_primary_index(
&mut self,
upsert_kv: &UpsertKV,
cmd_ctx: &CmdContext,
) -> Result<(Marked<Vec<u8>>, Marked<Vec<u8>>), io::Error> {
let kv_meta = upsert_kv.value_meta.as_ref().map(|m| m.to_kv_meta(cmd_ctx));

let prev = self.levels.str_map().get(&upsert_kv.key).await?.clone();

if upsert_kv.seq.match_seq(prev.seq()).is_err() {
@@ -395,24 +400,17 @@ impl SMV002 {
let (prev, mut result) = match &upsert_kv.value {
Operation::Update(v) => {
self.levels
.set(
upsert_kv.key.clone(),
Some((v.clone(), upsert_kv.value_meta.clone())),
)
.set(upsert_kv.key.clone(), Some((v.clone(), kv_meta.clone())))
.await?
}
Operation::Delete => self.levels.set(upsert_kv.key.clone(), None).await?,
Operation::AsIs => {
MapApiExt::update_meta(
&mut self.levels,
upsert_kv.key.clone(),
upsert_kv.value_meta.clone(),
)
.await?
MapApiExt::update_meta(&mut self.levels, upsert_kv.key.clone(), kv_meta.clone())
.await?
}
};

let expire_ms = upsert_kv.get_expire_at_ms().unwrap_or(u64::MAX);
let expire_ms = kv_meta.eval_expire_at_ms();
if expire_ms < self.expire_cursor.time_ms {
// The record has expired, delete it at once.
//
30 changes: 24 additions & 6 deletions src/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
@@ -34,17 +34,20 @@ use databend_common_meta_types::txn_op_response;
use databend_common_meta_types::AppliedState;
use databend_common_meta_types::Change;
use databend_common_meta_types::Cmd;
use databend_common_meta_types::CmdContext;
use databend_common_meta_types::ConditionResult;
use databend_common_meta_types::Entry;
use databend_common_meta_types::EntryPayload;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::Interval;
use databend_common_meta_types::LogId;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Node;
use databend_common_meta_types::NodeId;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::StoredMembership;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnDeleteByPrefixRequest;
@@ -532,7 +535,10 @@ impl StateMachine {
) -> Result<(), MetaStorageError> {
let (expired, prev, result) = Self::txn_upsert_kv(
txn_tree,
&UpsertKV::update(&put.key, &put.value).with(KVMeta::new(put.expire_at)),
&UpsertKV::update(&put.key, &put.value).with(MetaSpec::new(
put.expire_at,
put.ttl_ms.map(Interval::from_millis),
)),
log_time_ms,
)?;

@@ -903,6 +909,8 @@ impl StateMachine {
upsert_kv: &UpsertKV,
log_time_ms: u64,
) -> Result<(Option<SeqV>, Option<SeqV>, Option<SeqV>), MetaStorageError> {
let cmd_ctx = CmdContext::from_millis(log_time_ms);

let kvs = txn_tree.key_space::<GenericKV>();

let prev = kvs.get(&upsert_kv.key)?;
@@ -915,16 +923,26 @@ impl StateMachine {
}

let mut new_seq_v = match &upsert_kv.value {
Operation::Update(v) => SeqV::with_meta(0, upsert_kv.value_meta.clone(), v.clone()),
Operation::Update(v) => SeqV::with_meta(
0,
upsert_kv
.value_meta
.as_ref()
.map(|x| x.to_kv_meta(&cmd_ctx)),
v.clone(),
),
Operation::Delete => {
kvs.remove(&upsert_kv.key)?;
return Ok((expired, prev, None));
}
Operation::AsIs => match prev {
None => return Ok((expired, prev, None)),
Some(ref prev_kv_value) => {
prev_kv_value.clone().set_meta(upsert_kv.value_meta.clone())
}
Some(ref prev_kv_value) => prev_kv_value.clone().set_meta(
upsert_kv
.value_meta
.as_ref()
.map(|m| m.to_kv_meta(&cmd_ctx)),
),
},
};

6 changes: 4 additions & 2 deletions src/meta/raft-store/tests/it/state_machine/expire.rs
Original file line number Diff line number Diff line change
@@ -24,8 +24,8 @@ use databend_common_meta_types::new_log_id;
use databend_common_meta_types::Cmd;
use databend_common_meta_types::Entry;
use databend_common_meta_types::EntryPayload;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::LogEntry;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;
use test_harness::test;
@@ -154,7 +154,9 @@ fn ent(index: u64, key: &str, expire: Option<u64>, time_ms: Option<u64>) -> Entr
payload: EntryPayload::Normal(LogEntry {
txid: None,
time_ms,
cmd: Cmd::UpsertKV(UpsertKV::update(key, key.as_bytes()).with(KVMeta::new(expire))),
cmd: Cmd::UpsertKV(
UpsertKV::update(key, key.as_bytes()).with(MetaSpec::new(expire, None)),
),
}),
}
}
21 changes: 15 additions & 6 deletions src/meta/raft-store/tests/it/state_machine/mod.rs
Original file line number Diff line number Diff line change
@@ -21,15 +21,18 @@ use databend_common_meta_types::new_log_id;
use databend_common_meta_types::AppliedState;
use databend_common_meta_types::Change;
use databend_common_meta_types::Cmd;
use databend_common_meta_types::CmdContext;
use databend_common_meta_types::Endpoint;
use databend_common_meta_types::Entry;
use databend_common_meta_types::EntryPayload;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::LogEntry;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Node;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;
use log::info;
@@ -125,7 +128,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_upsert_get() -> anyhow::Res
key: String,
seq: MatchSeq,
value: Vec<u8>,
value_meta: Option<KVMeta>,
value_meta: Option<MetaSpec>,
// want:
prev: Option<SeqV<Vec<u8>>>,
result: Option<SeqV<Vec<u8>>>,
@@ -139,14 +142,20 @@ async fn test_state_machine_apply_non_dup_generic_kv_upsert_get() -> anyhow::Res
prev: Option<(u64, &'static str)>,
result: Option<(u64, &'static str)>,
) -> T {
let m = meta.map(KVMeta::new_expire);
let m = meta.map(MetaSpec::new_expire);
T {
key: name.to_string(),
seq,
value: value.to_string().into_bytes(),
value_meta: m.clone(),
prev: prev.map(|(a, b)| SeqV::new(a, b.into())),
result: result.map(|(a, b)| SeqV::with_meta(a, m, b.into())),
result: result.map(|(a, b)| {
SeqV::with_meta(
a,
m.map(|m| m.to_kv_meta(&CmdContext::from_millis(0))),
b.into(),
)
}),
}
}

@@ -281,7 +290,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res
key: key.clone(),
seq: MatchSeq::GE(0),
value: Operation::AsIs,
value_meta: Some(KVMeta::new_expire(now + 10)),
value_meta: Some(MetaSpec::new_expire(now + 10)),
}),
&mut t,
None,
@@ -306,7 +315,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res
key: key.clone(),
seq: MatchSeq::GE(0),
value: Operation::Update(b"value_meta_bar".to_vec()),
value_meta: Some(KVMeta::new_expire(now + 10)),
value_meta: Some(MetaSpec::new_expire(now + 10)),
}),
&mut t,
None,
@@ -323,7 +332,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res
key: key.clone(),
seq: MatchSeq::GE(0),
value: Operation::AsIs,
value_meta: Some(KVMeta::new_expire(now + 20)),
value_meta: Some(MetaSpec::new_expire(now + 20)),
}),
&mut t,
None,
4 changes: 2 additions & 2 deletions src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ use databend_common_meta_kvapi::kvapi::MGetKVReq;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf as pb;
use databend_common_meta_types::protobuf::KvMeta;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::With;
use futures::stream::StreamExt;
@@ -80,7 +80,7 @@ async fn initialize_kvs(client: &Arc<ClientHandle>, now_sec: u64) -> anyhow::Res
info!("--- prepare keys: a(meta),c,c1,c2");

let updates = vec![
UpsertKVReq::insert("a", &b("a")).with(KVMeta::new_expire(now_sec + 10)),
UpsertKVReq::insert("a", &b("a")).with(MetaSpec::new_expire(now_sec + 10)),
UpsertKVReq::insert("c", &b("c")),
UpsertKVReq::insert("c1", &b("c1")),
UpsertKVReq::insert("c2", &b("c2")),
1 change: 1 addition & 0 deletions src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs
Original file line number Diff line number Diff line change
@@ -269,6 +269,7 @@ async fn test_watch() -> anyhow::Result<()> {
value: txn_val.clone(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
TxnOp {
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ use databend_common_meta_types::Cmd;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::LogEntry;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;
@@ -58,7 +59,7 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> {

info!("--- write a kv expiring in 3 sec");
{
let upsert = UpsertKV::update(key, key.as_bytes()).with(KVMeta::new_expire(now_sec + 3));
let upsert = UpsertKV::update(key, key.as_bytes()).with(MetaSpec::new_expire(now_sec + 3));

leader.write(LogEntry::new(Cmd::UpsertKV(upsert))).await?;
log_index += 1;
@@ -76,7 +77,7 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> {
{
let upsert = UpsertKV::update(key, value2.as_bytes())
.with(MatchSeq::Exact(seq))
.with(KVMeta::new_expire(now_sec + 1000));
.with(MetaSpec::new_expire(now_sec + 1000));
leader.write(LogEntry::new(Cmd::UpsertKV(upsert))).await?;
log_index += 1;
}
6 changes: 6 additions & 0 deletions src/meta/types/build.rs
Original file line number Diff line number Diff line change
@@ -37,6 +37,8 @@ fn build_proto() {
println!("cargo:rerun-if-changed={}", proto.to_str().unwrap());
}

println!("cargo:rerun-if-changed=build.rs");

let mut config = prost_build::Config::new();
config.protoc_arg("--experimental_allow_proto3_optional");

@@ -131,6 +133,10 @@ fn build_proto() {
"KVMeta",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.field_attribute(
"TxnPutRequest.ttl_ms",
r#"#[serde(skip_serializing_if = "Option::is_none")]"#,
)
.compile_with_config(config, &protos, &[&proto_dir])
.unwrap();
}
9 changes: 9 additions & 0 deletions src/meta/types/proto/request.proto
Original file line number Diff line number Diff line change
@@ -38,11 +38,20 @@ message TxnGetResponse {
// Put request and response
message TxnPutRequest {
string key = 1;

bytes value = 2;

// if or not return the prev value
bool prev_value = 3;

// expire time
optional uint64 expire_at = 4;

// Time to last in milliseconds.
//
// TTL is the relative expire time, since the raft-log applied.
// If `ttl_ms` is set, `expire_at` is ignored.
optional uint64 ttl_ms = 5;
}

message TxnPutResponse {
38 changes: 38 additions & 0 deletions src/meta/types/src/cmd/cmd_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::Time;

/// A context used when executing a [`Cmd`], to provide additional environment information.
///
/// [`Cmd`]: crate::Cmd
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CmdContext {
time: Time,
}

impl CmdContext {
pub fn from_millis(millis: u64) -> Self {
Self::new(Time::from_millis(millis))
}

pub fn new(time: Time) -> Self {
CmdContext { time }
}

/// Returns the time since 1970-01-01 when this log is proposed by the leader.
pub fn time(&self) -> Time {
self.time
}
}
114 changes: 114 additions & 0 deletions src/meta/types/src/cmd/meta_spec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use crate::cmd::CmdContext;
use crate::seq_value::KVMeta;
use crate::time::Interval;

/// Specifies the metadata associated with a kv record, used in an `upsert` cmd.
///
/// This is similar to [`KVMeta`] but differs, [`KVMeta`] is used in storage,
/// as this instance is employed for transport purposes.
/// When an `upsert` cmd is applied, this instance is evaluated and a `KVMeta` is built.
#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Clone, Eq, PartialEq)]
pub struct MetaSpec {
/// expiration time in second since 1970
pub(crate) expire_at: Option<u64>,

/// Relative expiration time interval since when the raft log is applied.
///
/// Use this field if possible to avoid the clock skew between client and meta-service.
/// `expire_at` may already be expired when it is applied to state machine.
///
/// If it is not None, once applied, the `expire_at` field will be replaced with the calculated absolute expiration time.
///
/// For backward compatibility, this field is not serialized if it `None`, as if it does not exist.
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) ttl: Option<Interval>,
}

impl MetaSpec {
/// Create a new KVMeta
pub fn new(expire_at: Option<u64>, ttl: Option<Interval>) -> Self {
Self { expire_at, ttl }
}

/// Create a KVMeta with a absolute expiration time in second since 1970-01-01.
pub fn new_expire(expire_at_sec: u64) -> Self {
Self {
expire_at: Some(expire_at_sec),
ttl: None,
}
}

/// Create a KVMeta with relative expiration time(ttl).
pub fn new_ttl(ttl: Duration) -> Self {
Self {
expire_at: None,
ttl: Some(Interval::from_duration(ttl)),
}
}

/// Convert meta spec into a [`KVMeta`] to be stored in storage.
pub fn to_kv_meta(&self, cmd_ctx: &CmdContext) -> KVMeta {
// If `ttl` is set, override `expire_at`
if let Some(ttl) = self.ttl {
return KVMeta::new_expire((cmd_ctx.time() + ttl).seconds());
}

// No `ttl`, check if absolute expire time `expire_at` is set.
KVMeta::new(self.expire_at)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::MetaSpec;
use crate::cmd::CmdContext;
use crate::KVMeta;
use crate::Time;

#[test]
fn test_serde() {
let meta = MetaSpec::new_expire(100);
let s = serde_json::to_string(&meta).unwrap();
assert_eq!(r#"{"expire_at":100}"#, s);

let got: KVMeta = serde_json::from_str(&s).unwrap();
assert_eq!(Some(100), got.expire_at);

let meta = MetaSpec::new_ttl(Duration::from_millis(100));
let s = serde_json::to_string(&meta).unwrap();
assert_eq!(r#"{"expire_at":null,"ttl":{"millis":100}}"#, s);
}

#[test]
fn test_to_kv_meta() {
let cmd_ctx = CmdContext::new(Time::from_millis(2000));

// ttl
let meta = MetaSpec::new_ttl(Duration::from_millis(1000));
let kv_meta = meta.to_kv_meta(&cmd_ctx);
assert_eq!(kv_meta.get_expire_at_ms().unwrap(), 3000);

// expire_at
let meta = MetaSpec::new_expire(5);
let kv_meta = meta.to_kv_meta(&cmd_ctx);
assert_eq!(kv_meta.get_expire_at_ms().unwrap(), 5_000);
}
}
30 changes: 16 additions & 14 deletions src/meta/types/src/cmd.rs → src/meta/types/src/cmd/mod.rs
Original file line number Diff line number Diff line change
@@ -13,18 +13,24 @@
// limitations under the License.

use std::fmt;
use std::time::Duration;

use serde::Deserialize;
use serde::Serialize;

use crate::with::With;
use crate::KVMeta;
use crate::MatchSeq;
use crate::Node;
use crate::NodeId;
use crate::Operation;
use crate::TxnRequest;

mod cmd_context;
mod meta_spec;

pub use cmd_context::CmdContext;
pub use meta_spec::MetaSpec;

/// A Cmd describes what a user want to do to raft state machine
/// and is the essential part of a raft log.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
@@ -63,7 +69,7 @@ pub struct UpsertKV {
pub value: Operation<Vec<u8>>,

/// Meta data of a value.
pub value_meta: Option<KVMeta>,
pub value_meta: Option<MetaSpec>,
}

impl fmt::Display for Cmd {
@@ -108,7 +114,7 @@ impl UpsertKV {
key: &str,
seq: MatchSeq,
value: Operation<Vec<u8>>,
value_meta: Option<KVMeta>,
value_meta: Option<MetaSpec>,
) -> Self {
Self {
key: key.to_string(),
@@ -146,17 +152,13 @@ impl UpsertKV {
}

pub fn with_expire_sec(self, expire_at_sec: u64) -> Self {
self.with(KVMeta {
expire_at: Some(expire_at_sec),
})
self.with(MetaSpec::new_expire(expire_at_sec))
}

pub fn get_expire_at_ms(&self) -> Option<u64> {
if let Some(meta) = &self.value_meta {
meta.get_expire_at_ms()
} else {
None
}
/// Set the time to last for the value.
/// When the ttl is passed, the value is deleted.
pub fn with_ttl(self, ttl: Duration) -> Self {
self.with(MetaSpec::new_ttl(ttl))
}
}

@@ -167,8 +169,8 @@ impl With<MatchSeq> for UpsertKV {
}
}

impl With<KVMeta> for UpsertKV {
fn with(mut self, meta: KVMeta) -> Self {
impl With<MetaSpec> for UpsertKV {
fn with(mut self, meta: MetaSpec) -> Self {
self.value_meta = Some(meta);
self
}
39 changes: 39 additions & 0 deletions src/meta/types/src/eval_expire_time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Evaluate and returns the absolute expire time.
pub trait EvalExpireTime {
/// Evaluate and returns the absolute expire time in millisecond since 1970.
///
/// If there is no expire time, return u64::MAX.
fn eval_expire_at_ms(&self) -> u64;
}

impl<T> EvalExpireTime for &T
where T: EvalExpireTime
{
fn eval_expire_at_ms(&self) -> u64 {
EvalExpireTime::eval_expire_at_ms(*self)
}
}

impl<T> EvalExpireTime for Option<T>
where T: EvalExpireTime
{
fn eval_expire_at_ms(&self) -> u64 {
self.as_ref()
.map(|m| m.eval_expire_at_ms())
.unwrap_or(u64::MAX)
}
}
13 changes: 10 additions & 3 deletions src/meta/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -21,10 +21,11 @@
mod applied_state;
mod change;
mod cluster;
mod cmd;
pub mod cmd;
pub mod config;
mod endpoint;
pub mod errors;
mod eval_expire_time;
mod grpc_config;
mod log_entry;
mod match_seq;
@@ -36,6 +37,7 @@ mod raft_types;
mod seq_errors;
mod seq_num;
mod seq_value;
mod time;
mod with;

mod proto_display;
@@ -56,8 +58,6 @@ pub use applied_state::AppliedState;
pub use change::Change;
pub use cluster::Node;
pub use cluster::NodeInfo;
pub use cmd::Cmd;
pub use cmd::UpsertKV;
pub use endpoint::Endpoint;
pub use errors::meta_api_errors::MetaAPIError;
pub use errors::meta_api_errors::MetaDataError;
@@ -74,6 +74,7 @@ pub use errors::meta_network_errors::MetaNetworkError;
pub use errors::meta_network_errors::MetaNetworkResult;
pub use errors::meta_startup_errors::MetaStartupError;
pub use errors::rpc_errors::ForwardRPCError;
pub use eval_expire_time::EvalExpireTime;
pub use grpc_config::GrpcConfig;
pub use log_entry::LogEntry;
pub use match_seq::MatchSeq;
@@ -104,8 +105,14 @@ pub use seq_value::IntoSeqV;
pub use seq_value::KVMeta;
pub use seq_value::SeqV;
pub use seq_value::SeqValue;
pub use time::Interval;
pub use time::Time;
pub use with::With;

pub use crate::cmd::Cmd;
pub use crate::cmd::CmdContext;
pub use crate::cmd::MetaSpec;
pub use crate::cmd::UpsertKV;
pub use crate::raft_snapshot_data::SnapshotData;
pub use crate::raft_types::compat07;
pub use crate::raft_types::new_log_id;
2 changes: 1 addition & 1 deletion src/meta/types/src/proto_ext/seq_v_ext.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ use crate::SeqV;
impl From<KVMeta> for pb::KvMeta {
fn from(m: KVMeta) -> Self {
Self {
expire_at: m.get_expire_at_sec(),
expire_at: m.get_expire_at_ms().map(|x| x / 1000),
}
}
}
16 changes: 16 additions & 0 deletions src/meta/types/src/proto_ext/txn_ext.rs
Original file line number Diff line number Diff line change
@@ -43,6 +43,22 @@ impl pb::TxnOp {
value,
prev_value: true,
expire_at,
ttl_ms: None,
})),
}
}

/// Create a txn operation that puts a record with ttl.
///
/// `ttl` is relative expire time while `expire_at` is absolute expire time.
pub fn put_with_ttl(key: impl ToString, value: Vec<u8>, ttl_ms: Option<u64>) -> pb::TxnOp {
pb::TxnOp {
request: Some(pb::txn_op::Request::Put(pb::TxnPutRequest {
key: key.to_string(),
value,
prev_value: true,
expire_at: None,
ttl_ms,
})),
}
}
30 changes: 7 additions & 23 deletions src/meta/types/src/seq_value.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@ use std::time::UNIX_EPOCH;
use serde::Deserialize;
use serde::Serialize;

use crate::EvalExpireTime;

pub trait SeqValue<V = Vec<u8>> {
fn seq(&self) -> u64;
fn value(&self) -> Option<&V>;
@@ -36,11 +38,7 @@ pub trait SeqValue<V = Vec<u8>> {

/// Evaluate and returns the absolute expire time in millisecond since 1970.
fn eval_expire_at_ms(&self) -> u64 {
if let Some(meta) = self.meta() {
meta.eval_expire_at_ms()
} else {
u64::MAX
}
self.meta().eval_expire_at_ms()
}

/// Return true if the record is expired.
@@ -62,27 +60,21 @@ impl KVMeta {
Self { expire_at }
}

/// Create a new KVMeta with expiration time in second since 1970
/// Create a KVMeta with a absolute expiration time in second since 1970-01-01.
pub fn new_expire(expire_at: u64) -> Self {
Self {
expire_at: Some(expire_at),
}
}

/// Returns expire time in second since 1970.
pub fn get_expire_at_sec(&self) -> Option<u64> {
self.expire_at
}

/// Returns expire time in millisecond since 1970.
pub fn get_expire_at_ms(&self) -> Option<u64> {
self.expire_at.map(|t| t * 1000)
}
}

/// Evaluate and returns the absolute expire time in millisecond since 1970.
///
/// If there is no expire time, return u64::MAX.
pub fn eval_expire_at_ms(&self) -> u64 {
impl EvalExpireTime for KVMeta {
fn eval_expire_at_ms(&self) -> u64 {
match self.expire_at {
None => u64::MAX,
Some(exp_at_sec) => exp_at_sec * 1000,
@@ -195,14 +187,6 @@ impl<T> SeqV<T> {
Self { seq, meta, data }
}

/// Returns millisecond since 1970-01-01
pub fn eval_expire_at_ms(&self) -> u64 {
match self.meta {
None => u64::MAX,
Some(ref m) => m.eval_expire_at_ms(),
}
}

#[must_use]
pub fn set_seq(mut self, seq: u64) -> SeqV<T> {
self.seq = seq;
194 changes: 194 additions & 0 deletions src/meta/types/src/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Add;
use std::ops::Sub;
use std::time::Duration;

/// A interval of time.
///
/// As a replacement of [`Duration`], which is not `serde`-able.
///
/// `Interval` implements: `Interval +- Interval`.
#[derive(
serde::Serialize,
serde::Deserialize,
Debug,
Default,
Clone,
Copy,
Hash,
Eq,
PartialEq,
PartialOrd,
Ord,
)]
pub struct Interval {
pub(crate) millis: u64,
}

impl Interval {
pub fn from_duration(duration: Duration) -> Self {
Self {
millis: duration.as_millis() as u64,
}
}

pub fn from_millis(millis: u64) -> Self {
Self::from_duration(Duration::from_millis(millis))
}

pub fn from_secs(secs: u64) -> Self {
Self::from_duration(Duration::from_secs(secs))
}

pub fn millis(&self) -> u64 {
self.millis
}

pub fn seconds(&self) -> u64 {
self.millis / 1000
}
}

impl Add for Interval {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
Self {
millis: self.millis.saturating_add(rhs.millis),
}
}
}

impl Sub for Interval {
type Output = Self;

fn sub(self, rhs: Self) -> Self::Output {
Self {
millis: self.millis.saturating_sub(rhs.millis),
}
}
}

/// A time point since 1970-01-01.
///
/// As a replacement of [`Instant`](std::time::Instant), which is not `serde`-able.
/// `Time` implements: `Time +- Interval = Time` and `Time - Time = Interval`.
#[derive(
serde::Serialize,
serde::Deserialize,
Debug,
Default,
Clone,
Copy,
Hash,
Eq,
PartialEq,
PartialOrd,
Ord,
)]
pub struct Time {
pub(crate) time: Interval,
}

impl Time {
pub fn from_millis(millis: u64) -> Self {
Self {
time: Interval::from_millis(millis),
}
}

pub fn from_secs(secs: u64) -> Self {
Self {
time: Interval::from_secs(secs),
}
}

pub fn millis(&self) -> u64 {
self.time.millis()
}

pub fn seconds(&self) -> u64 {
self.time.seconds()
}
}

impl Add<Interval> for Time {
type Output = Self;

fn add(self, rhs: Interval) -> Self::Output {
Self {
time: self.time + rhs,
}
}
}

impl Sub<Interval> for Time {
type Output = Self;

fn sub(self, rhs: Interval) -> Self::Output {
Self {
time: self.time - rhs,
}
}
}

impl Sub for Time {
type Output = Interval;

fn sub(self, rhs: Self) -> Self::Output {
self.time - rhs.time
}
}

#[cfg(test)]
mod tests {
use super::Interval;
use crate::time::Time;

#[test]
fn test_interval() {
let interval = Interval::from_millis(1000);
assert_eq!(interval.millis(), 1000);
assert_eq!(interval.seconds(), 1);

let interval = Interval::from_secs(1);
assert_eq!(interval.millis(), 1000);
assert_eq!(interval.seconds(), 1);

assert_eq!(interval + interval, Interval::from_millis(2000));
assert_eq!(interval - interval, Interval::from_millis(0));
assert_eq!(
interval - Interval::from_millis(1500),
Interval::from_millis(0)
);
}

#[test]
fn test_time() {
let time = Time::from_millis(1000);
assert_eq!(time.millis(), 1000);
assert_eq!(time.seconds(), 1);

let time = Time::from_secs(1);
assert_eq!(time.millis(), 1000);
assert_eq!(time.seconds(), 1);

assert_eq!(time + Interval::from_millis(1000), Time::from_millis(2000));
assert_eq!(time - Interval::from_millis(500), Time::from_millis(500));
assert_eq!(time - Time::from_millis(500), Interval::from_millis(500));
assert_eq!(time - Time::from_millis(1500), Interval::from_millis(0));
}
}
6 changes: 3 additions & 3 deletions src/query/management/src/cluster/cluster_mgr.rs
Original file line number Diff line number Diff line change
@@ -24,8 +24,8 @@ use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_store::MetaStore;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::NodeInfo;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
@@ -65,14 +65,14 @@ impl ClusterMgr {
})
}

fn new_lift_time(&self) -> KVMeta {
fn new_lift_time(&self) -> MetaSpec {
let now = std::time::SystemTime::now();
let expire_at = now
.add(self.lift_time)
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");

KVMeta::new_expire(expire_at.as_secs())
MetaSpec::new_expire(expire_at.as_secs())
}
}

22 changes: 8 additions & 14 deletions src/query/management/tests/it/cluster.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@

use std::sync::Arc;
use std::time::Duration;
use std::time::UNIX_EPOCH;

use databend_common_base::base::tokio;
use databend_common_exception::Result;
@@ -28,7 +27,7 @@ use databend_common_meta_types::SeqV;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_successfully_add_node() -> Result<()> {
let current_time = current_seconds_time();
let now_ms = SeqV::<()>::now_ms();
let (kv_api, cluster_api) = new_cluster_api().await?;

let node_info = create_test_node_info();
@@ -43,7 +42,7 @@ async fn test_successfully_add_node() -> Result<()> {
meta,
data: value,
}) => {
assert!(meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60);
assert!(meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000);
assert_eq!(value, serde_json::to_vec(&node_info)?);
}
catch => panic!("GetKVActionReply{:?}", catch),
@@ -116,7 +115,7 @@ async fn test_unknown_node_drop_node() -> Result<()> {

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_successfully_heartbeat_node() -> Result<()> {
let current_time = current_seconds_time();
let now_ms = SeqV::<()>::now_ms();
let (kv_api, cluster_api) = new_cluster_api().await?;

let node_info = create_test_node_info();
@@ -126,26 +125,21 @@ async fn test_successfully_heartbeat_node() -> Result<()> {
.get_kv("__fd_clusters/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.await?;

assert!(value.unwrap().meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60);
let meta = value.unwrap().meta.unwrap();
let expire_ms = meta.get_expire_at_ms().unwrap();
assert!(expire_ms - now_ms >= 59_000);

let current_time = current_seconds_time();
let now_ms = SeqV::<()>::now_ms();
cluster_api.heartbeat(&node_info, MatchSeq::GE(1)).await?;

let value = kv_api
.get_kv("__fd_clusters/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.await?;

assert!(value.unwrap().meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60);
assert!(value.unwrap().meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000);
Ok(())
}

fn current_seconds_time() -> u64 {
let now = std::time::SystemTime::now();
now.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
}

fn create_test_node_info() -> NodeInfo {
NodeInfo {
id: String::from("test_node"),
4 changes: 2 additions & 2 deletions src/query/storages/result_cache/src/meta_manager.rs
Original file line number Diff line number Diff line change
@@ -17,8 +17,8 @@ use std::sync::Arc;
use databend_common_exception::Result;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_store::MetaStore;
use databend_common_meta_types::KVMeta;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::UpsertKV;
@@ -50,7 +50,7 @@ impl ResultCacheMetaManager {
key,
seq,
value: Operation::Update(value),
value_meta: Some(KVMeta::new_expire(expire_at)),
value_meta: Some(MetaSpec::new_expire(expire_at)),
})
.await?;
Ok(())

0 comments on commit a64551c

Please sign in to comment.