From a64551ccdef8f380176051842c0695a1991bc42a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 16 Dec 2023 18:37:43 +0800 Subject: [PATCH 1/3] feat: Introduced `MetaSpec` for setting relative expiration (#14041) This is a compatible change with new feature provided. - This commit introduces a new type, `MetaSpec`, which specifies content of the `KVMeta` to be stored for a key. This new type is particularly utilized in `upsert` requests to enable the setting of relative expiration times. Previously, the `KVMeta` type was used directly for this purpose. However, to avoid impacting existing storage data types, `MetaSpec` has been specifically added for use in `upsert` operations. When applying a raft-log, a `KVMeta` is built from `MetaSpec`. Designed with backward compatibility, `MetaSpec` maintains a serialized format compatible with `KVMeta`, ensuring no disruption to existing functionality. - We introduce two new types `Time` and `Interval` to reprensent serde-able time stamp and time interval. - Tests are added to ensure meta-service works correctly with API with ttl support, but databend-query does not use these API yet. --- src/binaries/meta/kvapi.rs | 4 +- src/meta/api/src/background_api_impl.rs | 4 +- src/meta/api/src/schema_api_impl.rs | 1 + src/meta/api/src/util.rs | 2 + src/meta/kvapi/src/kvapi/test_suite.rs | 115 ++++++++--- src/meta/process/src/kv_processor.rs | 1 + src/meta/raft-store/src/applier.rs | 20 +- src/meta/raft-store/src/sm_v002/sm_v002.rs | 20 +- src/meta/raft-store/src/state_machine/sm.rs | 30 ++- .../tests/it/state_machine/expire.rs | 6 +- .../raft-store/tests/it/state_machine/mod.rs | 21 +- .../tests/it/grpc/metasrv_grpc_kv_read_v1.rs | 4 +- .../tests/it/grpc/metasrv_grpc_watch.rs | 1 + .../it/meta_node/meta_node_kv_api_expire.rs | 5 +- src/meta/types/build.rs | 6 + src/meta/types/proto/request.proto | 9 + src/meta/types/src/cmd/cmd_context.rs | 38 ++++ src/meta/types/src/cmd/meta_spec.rs | 114 ++++++++++ src/meta/types/src/{cmd.rs => cmd/mod.rs} | 30 +-- src/meta/types/src/eval_expire_time.rs | 39 ++++ src/meta/types/src/lib.rs | 13 +- src/meta/types/src/proto_ext/seq_v_ext.rs | 2 +- src/meta/types/src/proto_ext/txn_ext.rs | 16 ++ src/meta/types/src/seq_value.rs | 30 +-- src/meta/types/src/time.rs | 194 ++++++++++++++++++ .../management/src/cluster/cluster_mgr.rs | 6 +- src/query/management/tests/it/cluster.rs | 22 +- .../storages/result_cache/src/meta_manager.rs | 4 +- 28 files changed, 637 insertions(+), 120 deletions(-) create mode 100644 src/meta/types/src/cmd/cmd_context.rs create mode 100644 src/meta/types/src/cmd/meta_spec.rs rename src/meta/types/src/{cmd.rs => cmd/mod.rs} (89%) create mode 100644 src/meta/types/src/eval_expire_time.rs create mode 100644 src/meta/types/src/time.rs diff --git a/src/binaries/meta/kvapi.rs b/src/binaries/meta/kvapi.rs index 62e3a3db03441..aa2c3446b7c24 100644 --- a/src/binaries/meta/kvapi.rs +++ b/src/binaries/meta/kvapi.rs @@ -16,8 +16,8 @@ use std::sync::Arc; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::UpsertKVReq; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::MetaError; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::SeqV; use databend_common_meta_types::With; use databend_meta::configs::Config; @@ -40,7 +40,7 @@ impl KvApiCommand { let req = UpsertKVReq::update(config.key[0].as_str(), config.value.as_bytes()); let req = if let Some(expire_after) = config.expire_after { - req.with(KVMeta::new_expire(SeqV::<()>::now_sec() + expire_after)) + req.with(MetaSpec::new_expire(SeqV::<()>::now_sec() + expire_after)) } else { req }; diff --git a/src/meta/api/src/background_api_impl.rs b/src/meta/api/src/background_api_impl.rs index cb590812654d5..d15e2e7a1b96d 100644 --- a/src/meta/api/src/background_api_impl.rs +++ b/src/meta/api/src/background_api_impl.rs @@ -43,10 +43,10 @@ use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::ConditionResult::Eq; use databend_common_meta_types::InvalidReply; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeq::Any; use databend_common_meta_types::MetaError; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Operation; use databend_common_meta_types::TxnRequest; use log::as_debug; @@ -243,7 +243,7 @@ impl> BackgroundApi for KV { name_key.to_string_key().as_str(), Any, Operation::Update(serialize_struct(&meta)?), - Some(KVMeta::new_expire(req.expire_at)), + Some(MetaSpec::new_expire(req.expire_at)), )) .await?; // confirm a successful update diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 0dcfa7b4d478c..1c7f45e7dd592 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -4182,6 +4182,7 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp { value: 1_i8.to_le_bytes().to_vec(), prev_value: false, expire_at, + ttl_ms: None, })), } } diff --git a/src/meta/api/src/util.rs b/src/meta/api/src/util.rs index d152cc87415ea..d8cb58d683568 100644 --- a/src/meta/api/src/util.rs +++ b/src/meta/api/src/util.rs @@ -332,6 +332,7 @@ pub fn txn_op_put(key: &impl kvapi::Key, value: Vec) -> TxnOp { value, prev_value: true, expire_at: None, + ttl_ms: None, })), } } @@ -344,6 +345,7 @@ pub fn txn_op_put_with_expire(key: &impl kvapi::Key, value: Vec, expire_at: value, prev_value: true, expire_at: Some(expire_at), + ttl_ms: None, })), } } diff --git a/src/meta/kvapi/src/kvapi/test_suite.rs b/src/meta/kvapi/src/kvapi/test_suite.rs index 49b0e7a4e2a73..e7e14b55e2111 100644 --- a/src/meta/kvapi/src/kvapi/test_suite.rs +++ b/src/meta/kvapi/src/kvapi/test_suite.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::SystemTime; -use std::time::UNIX_EPOCH; +use std::time::Duration; use databend_common_meta_types::protobuf as pb; use databend_common_meta_types::txn_condition; @@ -22,6 +21,7 @@ use databend_common_meta_types::txn_op_response; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::KVMeta; use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Operation; use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnCondition; @@ -40,6 +40,7 @@ use databend_common_meta_types::TxnRequest; use databend_common_meta_types::With; use log::debug; use log::info; +use minitrace::full_name; use minitrace::func_name; use crate::kvapi; @@ -58,11 +59,13 @@ impl kvapi::TestSuite { self.kv_delete(&builder.build().await).await?; self.kv_update(&builder.build().await).await?; self.kv_timeout(&builder.build().await).await?; + self.kv_upsert_with_ttl(&builder.build().await).await?; self.kv_meta(&builder.build().await).await?; self.kv_list(&builder.build().await).await?; self.kv_mget(&builder.build().await).await?; self.kv_txn_absent_seq_0(&builder.build().await).await?; self.kv_transaction(&builder.build().await).await?; + self.kv_transaction_with_ttl(&builder.build().await).await?; self.kv_transaction_delete_match_seq_none(&builder.build().await) .await?; self.kv_transaction_delete_match_seq_some_not_match(&builder.build().await) @@ -248,13 +251,10 @@ impl kvapi::TestSuite { // - Test list expired and non-expired. // - Test update with a new expire value. - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + let now_sec = SeqV::<()>::now_sec(); let _res = kv - .upsert_kv(UpsertKVReq::update("k1", b"v1").with(KVMeta::new_expire(now + 2))) + .upsert_kv(UpsertKVReq::update("k1", b"v1").with(MetaSpec::new_expire(now_sec + 2))) .await?; // dbg!("upsert non expired k1", _res); @@ -267,17 +267,14 @@ impl kvapi::TestSuite { info!("---get expired"); { - tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; + tokio::time::sleep(Duration::from_millis(3000)).await; let res = kv.get_kv("k1").await?; // dbg!("k1 expired", &res); debug!("got k1:{:?}", res); assert!(res.is_none(), "got expired"); } - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + let now_sec = SeqV::<()>::now_sec(); info!("--- expired entry act as if it does not exist, an ADD op should apply"); { @@ -286,7 +283,7 @@ impl kvapi::TestSuite { .upsert_kv( UpsertKVReq::update("k1", b"v1") .with(MatchSeq::Exact(0)) - .with(KVMeta::new_expire(now - 1)), + .with(MetaSpec::new_expire(now_sec - 1)), ) .await?; // dbg!("update expired k1", _res); @@ -295,7 +292,7 @@ impl kvapi::TestSuite { .upsert_kv( UpsertKVReq::update("k2", b"v2") .with(MatchSeq::Exact(0)) - .with(KVMeta::new_expire(now + 10)), + .with(MetaSpec::new_expire(now_sec + 10)), ) .await?; // dbg!("update non expired k2", _res); @@ -306,7 +303,7 @@ impl kvapi::TestSuite { None, Some(SeqV::with_meta( 3, - Some(KVMeta::new_expire(now + 10)), + Some(KVMeta::new_expire(now_sec + 10)), b"v2".to_vec() )) ]); @@ -325,7 +322,7 @@ impl kvapi::TestSuite { kv.upsert_kv( UpsertKVReq::update("k2", b"v2") .with(MatchSeq::Exact(3)) - .with(KVMeta::new_expire(now - 1)), + .with(MetaSpec::new_expire(now_sec - 1)), ) .await?; @@ -336,16 +333,42 @@ impl kvapi::TestSuite { Ok(()) } + #[minitrace::trace] + pub async fn kv_upsert_with_ttl(&self, kv: &KV) -> anyhow::Result<()> { + // - Add with ttl + + info!("--- {}", full_name!()); + + let _res = kv + .upsert_kv( + UpsertKVReq::update("k1", b"v1") + .with(MetaSpec::new_ttl(Duration::from_millis(2_000))), + ) + .await?; + + info!("---get unexpired"); + { + let res = kv.get_kv("k1").await?; + assert!(res.is_some(), "got unexpired"); + } + + info!("---get expired"); + { + tokio::time::sleep(Duration::from_millis(2_100)).await; + let res = kv.get_kv("k1").await?; + assert!(res.is_none(), "got expired"); + } + + Ok(()) + } + #[minitrace::trace] pub async fn kv_meta(&self, kv: &KV) -> anyhow::Result<()> { info!("--- kvapi::KVApiTestSuite::kv_meta() start"); let test_key = "test_key_for_update_meta"; - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + let now_sec = SeqV::<()>::now_sec(); let r = kv.upsert_kv(UpsertKVReq::update(test_key, b"v1")).await?; assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.result); @@ -358,7 +381,7 @@ impl kvapi::TestSuite { test_key, MatchSeq::Exact(seq + 1), Operation::AsIs, - Some(KVMeta::new_expire(now + 20)), + Some(MetaSpec::new_expire(now_sec + 20)), )) .await?; assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev); @@ -371,14 +394,14 @@ impl kvapi::TestSuite { test_key, MatchSeq::Exact(seq), Operation::AsIs, - Some(KVMeta::new_expire(now + 20)), + Some(MetaSpec::new_expire(now_sec + 20)), )) .await?; assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev); assert_eq!( Some(SeqV::with_meta( 2, - Some(KVMeta::new_expire(now + 20)), + Some(KVMeta::new_expire(now_sec + 20)), b"v1".to_vec() )), r.result @@ -388,7 +411,11 @@ impl kvapi::TestSuite { let key_value = kv.get_kv(test_key).await?; assert!(key_value.is_some()); assert_eq!( - SeqV::with_meta(seq + 1, Some(KVMeta::new_expire(now + 20)), b"v1".to_vec()), + SeqV::with_meta( + seq + 1, + Some(KVMeta::new_expire(now_sec + 20)), + b"v1".to_vec() + ), key_value.unwrap(), ); @@ -484,6 +511,7 @@ impl kvapi::TestSuite { value: b"new_v1".to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }]; @@ -645,6 +673,7 @@ impl kvapi::TestSuite { value: b"new_v1".to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }]; @@ -697,6 +726,7 @@ impl kvapi::TestSuite { value: b"new_v1".to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }]; @@ -752,6 +782,7 @@ impl kvapi::TestSuite { value: val1_new.to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }, // change k2 @@ -761,6 +792,7 @@ impl kvapi::TestSuite { value: b"new_v2".to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }, // get k1 @@ -870,6 +902,7 @@ impl kvapi::TestSuite { value: val1_new.to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }, // get k1 @@ -915,6 +948,36 @@ impl kvapi::TestSuite { Ok(()) } + #[minitrace::trace] + pub async fn kv_transaction_with_ttl(&self, kv: &KV) -> anyhow::Result<()> { + // - Add a record via transaction with ttl + + info!("--- {}", full_name!()); + + let txn = TxnRequest { + condition: vec![], + if_then: vec![TxnOp::put_with_ttl("k1", b("v1"), Some(2_000))], + else_then: vec![], + }; + + let _resp = kv.transaction(txn).await?; + + info!("---get unexpired"); + { + let res = kv.get_kv("k1").await?; + assert!(res.is_some(), "got unexpired"); + } + + info!("---get expired"); + { + tokio::time::sleep(Duration::from_millis(2_100)).await; + let res = kv.get_kv("k1").await?; + assert!(res.is_none(), "got expired"); + } + + Ok(()) + } + /// If `TxnDeleteRequest.match_seq` is not set, /// the delete operation will always be executed. pub async fn kv_transaction_delete_match_seq_none( @@ -1086,3 +1149,7 @@ impl kvapi::TestSuite { Ok(()) } } + +fn b(s: &str) -> Vec { + s.as_bytes().to_vec() +} diff --git a/src/meta/process/src/kv_processor.rs b/src/meta/process/src/kv_processor.rs index ad5b65c4b20b9..37f5789096ea9 100644 --- a/src/meta/process/src/kv_processor.rs +++ b/src/meta/process/src/kv_processor.rs @@ -218,6 +218,7 @@ where F: Fn(&str, Vec) -> Result, anyhow::Error> value, prev_value: p.prev_value, expire_at: p.expire_at, + ttl_ms: p.ttl_ms, }; Ok(pr) diff --git a/src/meta/raft-store/src/applier.rs b/src/meta/raft-store/src/applier.rs index bd077b4bef13e..ece95c1ffdba8 100644 --- a/src/meta/raft-store/src/applier.rs +++ b/src/meta/raft-store/src/applier.rs @@ -23,11 +23,13 @@ use databend_common_meta_types::txn_op_response; use databend_common_meta_types::AppliedState; use databend_common_meta_types::Change; use databend_common_meta_types::Cmd; +use databend_common_meta_types::CmdContext; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::Entry; use databend_common_meta_types::EntryPayload; -use databend_common_meta_types::KVMeta; +use databend_common_meta_types::Interval; use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Node; use databend_common_meta_types::SeqV; use databend_common_meta_types::SeqValue; @@ -61,6 +63,9 @@ use crate::sm_v002::SMV002; pub struct Applier<'a> { sm: &'a mut SMV002, + /// The context of the current applying log. + cmd_ctx: CmdContext, + /// The changes has been made by the applying one log entry changes: Vec, String>>, } @@ -69,6 +74,7 @@ impl<'a> Applier<'a> { pub fn new(sm: &'a mut SMV002) -> Self { Self { sm, + cmd_ctx: CmdContext::from_millis(0), changes: Vec::new(), } } @@ -83,6 +89,8 @@ impl<'a> Applier<'a> { let log_id = &entry.log_id; let log_time_ms = Self::get_log_time(entry); + self.cmd_ctx = CmdContext::from_millis(log_time_ms); + self.clean_expired_kvs(log_time_ms).await?; *self.sm.sys_data_mut().last_applied_mut() = Some(*log_id); @@ -208,7 +216,10 @@ impl<'a> Applier<'a> { ) -> Result<(Option, Option), io::Error> { debug!(upsert_kv = as_debug!(upsert_kv); "upsert_kv"); - let (prev, result) = self.sm.upsert_kv_primary_index(upsert_kv).await?; + let (prev, result) = self + .sm + .upsert_kv_primary_index(upsert_kv, &self.cmd_ctx) + .await?; self.sm .update_expire_index(&upsert_kv.key, &prev, &result) @@ -381,7 +392,10 @@ impl<'a> Applier<'a> { put: &TxnPutRequest, resp: &mut TxnReply, ) -> Result<(), io::Error> { - let upsert = UpsertKV::update(&put.key, &put.value).with(KVMeta::new(put.expire_at)); + let upsert = UpsertKV::update(&put.key, &put.value).with(MetaSpec::new( + put.expire_at, + put.ttl_ms.map(Interval::from_millis), + )); let (prev, _result) = self.upsert_kv(&upsert).await?; diff --git a/src/meta/raft-store/src/sm_v002/sm_v002.rs b/src/meta/raft-store/src/sm_v002/sm_v002.rs index b261102577c56..8266728e5b756 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002.rs @@ -25,7 +25,9 @@ use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::protobuf::StreamItem; use databend_common_meta_types::AppliedState; +use databend_common_meta_types::CmdContext; use databend_common_meta_types::Entry; +use databend_common_meta_types::EvalExpireTime; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::Operation; use databend_common_meta_types::SeqV; @@ -385,7 +387,10 @@ impl SMV002 { pub(crate) async fn upsert_kv_primary_index( &mut self, upsert_kv: &UpsertKV, + cmd_ctx: &CmdContext, ) -> Result<(Marked>, Marked>), io::Error> { + let kv_meta = upsert_kv.value_meta.as_ref().map(|m| m.to_kv_meta(cmd_ctx)); + let prev = self.levels.str_map().get(&upsert_kv.key).await?.clone(); if upsert_kv.seq.match_seq(prev.seq()).is_err() { @@ -395,24 +400,17 @@ impl SMV002 { let (prev, mut result) = match &upsert_kv.value { Operation::Update(v) => { self.levels - .set( - upsert_kv.key.clone(), - Some((v.clone(), upsert_kv.value_meta.clone())), - ) + .set(upsert_kv.key.clone(), Some((v.clone(), kv_meta.clone()))) .await? } Operation::Delete => self.levels.set(upsert_kv.key.clone(), None).await?, Operation::AsIs => { - MapApiExt::update_meta( - &mut self.levels, - upsert_kv.key.clone(), - upsert_kv.value_meta.clone(), - ) - .await? + MapApiExt::update_meta(&mut self.levels, upsert_kv.key.clone(), kv_meta.clone()) + .await? } }; - let expire_ms = upsert_kv.get_expire_at_ms().unwrap_or(u64::MAX); + let expire_ms = kv_meta.eval_expire_at_ms(); if expire_ms < self.expire_cursor.time_ms { // The record has expired, delete it at once. // diff --git a/src/meta/raft-store/src/state_machine/sm.rs b/src/meta/raft-store/src/state_machine/sm.rs index 8b172822c4d74..8610d43af4121 100644 --- a/src/meta/raft-store/src/state_machine/sm.rs +++ b/src/meta/raft-store/src/state_machine/sm.rs @@ -34,17 +34,20 @@ use databend_common_meta_types::txn_op_response; use databend_common_meta_types::AppliedState; use databend_common_meta_types::Change; use databend_common_meta_types::Cmd; +use databend_common_meta_types::CmdContext; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::Entry; use databend_common_meta_types::EntryPayload; -use databend_common_meta_types::KVMeta; +use databend_common_meta_types::Interval; use databend_common_meta_types::LogId; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Node; use databend_common_meta_types::NodeId; use databend_common_meta_types::Operation; use databend_common_meta_types::SeqV; +use databend_common_meta_types::SeqValue; use databend_common_meta_types::StoredMembership; use databend_common_meta_types::TxnCondition; use databend_common_meta_types::TxnDeleteByPrefixRequest; @@ -532,7 +535,10 @@ impl StateMachine { ) -> Result<(), MetaStorageError> { let (expired, prev, result) = Self::txn_upsert_kv( txn_tree, - &UpsertKV::update(&put.key, &put.value).with(KVMeta::new(put.expire_at)), + &UpsertKV::update(&put.key, &put.value).with(MetaSpec::new( + put.expire_at, + put.ttl_ms.map(Interval::from_millis), + )), log_time_ms, )?; @@ -903,6 +909,8 @@ impl StateMachine { upsert_kv: &UpsertKV, log_time_ms: u64, ) -> Result<(Option, Option, Option), MetaStorageError> { + let cmd_ctx = CmdContext::from_millis(log_time_ms); + let kvs = txn_tree.key_space::(); let prev = kvs.get(&upsert_kv.key)?; @@ -915,16 +923,26 @@ impl StateMachine { } let mut new_seq_v = match &upsert_kv.value { - Operation::Update(v) => SeqV::with_meta(0, upsert_kv.value_meta.clone(), v.clone()), + Operation::Update(v) => SeqV::with_meta( + 0, + upsert_kv + .value_meta + .as_ref() + .map(|x| x.to_kv_meta(&cmd_ctx)), + v.clone(), + ), Operation::Delete => { kvs.remove(&upsert_kv.key)?; return Ok((expired, prev, None)); } Operation::AsIs => match prev { None => return Ok((expired, prev, None)), - Some(ref prev_kv_value) => { - prev_kv_value.clone().set_meta(upsert_kv.value_meta.clone()) - } + Some(ref prev_kv_value) => prev_kv_value.clone().set_meta( + upsert_kv + .value_meta + .as_ref() + .map(|m| m.to_kv_meta(&cmd_ctx)), + ), }, }; diff --git a/src/meta/raft-store/tests/it/state_machine/expire.rs b/src/meta/raft-store/tests/it/state_machine/expire.rs index 221e53bc26569..8aea62cccf92d 100644 --- a/src/meta/raft-store/tests/it/state_machine/expire.rs +++ b/src/meta/raft-store/tests/it/state_machine/expire.rs @@ -24,8 +24,8 @@ use databend_common_meta_types::new_log_id; use databend_common_meta_types::Cmd; use databend_common_meta_types::Entry; use databend_common_meta_types::EntryPayload; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::LogEntry; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use test_harness::test; @@ -154,7 +154,9 @@ fn ent(index: u64, key: &str, expire: Option, time_ms: Option) -> Entr payload: EntryPayload::Normal(LogEntry { txid: None, time_ms, - cmd: Cmd::UpsertKV(UpsertKV::update(key, key.as_bytes()).with(KVMeta::new(expire))), + cmd: Cmd::UpsertKV( + UpsertKV::update(key, key.as_bytes()).with(MetaSpec::new(expire, None)), + ), }), } } diff --git a/src/meta/raft-store/tests/it/state_machine/mod.rs b/src/meta/raft-store/tests/it/state_machine/mod.rs index a3b21805fc6a6..2e054343c0eb2 100644 --- a/src/meta/raft-store/tests/it/state_machine/mod.rs +++ b/src/meta/raft-store/tests/it/state_machine/mod.rs @@ -21,15 +21,18 @@ use databend_common_meta_types::new_log_id; use databend_common_meta_types::AppliedState; use databend_common_meta_types::Change; use databend_common_meta_types::Cmd; +use databend_common_meta_types::CmdContext; use databend_common_meta_types::Endpoint; use databend_common_meta_types::Entry; use databend_common_meta_types::EntryPayload; use databend_common_meta_types::KVMeta; use databend_common_meta_types::LogEntry; use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Node; use databend_common_meta_types::Operation; use databend_common_meta_types::SeqV; +use databend_common_meta_types::SeqValue; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use log::info; @@ -125,7 +128,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_upsert_get() -> anyhow::Res key: String, seq: MatchSeq, value: Vec, - value_meta: Option, + value_meta: Option, // want: prev: Option>>, result: Option>>, @@ -139,14 +142,20 @@ async fn test_state_machine_apply_non_dup_generic_kv_upsert_get() -> anyhow::Res prev: Option<(u64, &'static str)>, result: Option<(u64, &'static str)>, ) -> T { - let m = meta.map(KVMeta::new_expire); + let m = meta.map(MetaSpec::new_expire); T { key: name.to_string(), seq, value: value.to_string().into_bytes(), value_meta: m.clone(), prev: prev.map(|(a, b)| SeqV::new(a, b.into())), - result: result.map(|(a, b)| SeqV::with_meta(a, m, b.into())), + result: result.map(|(a, b)| { + SeqV::with_meta( + a, + m.map(|m| m.to_kv_meta(&CmdContext::from_millis(0))), + b.into(), + ) + }), } } @@ -281,7 +290,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res key: key.clone(), seq: MatchSeq::GE(0), value: Operation::AsIs, - value_meta: Some(KVMeta::new_expire(now + 10)), + value_meta: Some(MetaSpec::new_expire(now + 10)), }), &mut t, None, @@ -306,7 +315,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res key: key.clone(), seq: MatchSeq::GE(0), value: Operation::Update(b"value_meta_bar".to_vec()), - value_meta: Some(KVMeta::new_expire(now + 10)), + value_meta: Some(MetaSpec::new_expire(now + 10)), }), &mut t, None, @@ -323,7 +332,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res key: key.clone(), seq: MatchSeq::GE(0), value: Operation::AsIs, - value_meta: Some(KVMeta::new_expire(now + 20)), + value_meta: Some(MetaSpec::new_expire(now + 20)), }), &mut t, None, diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs index 9682c827c7e86..2cc106df3e2e0 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs @@ -25,7 +25,7 @@ use databend_common_meta_kvapi::kvapi::MGetKVReq; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::protobuf as pb; use databend_common_meta_types::protobuf::KvMeta; -use databend_common_meta_types::KVMeta; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::SeqV; use databend_common_meta_types::With; use futures::stream::StreamExt; @@ -80,7 +80,7 @@ async fn initialize_kvs(client: &Arc, now_sec: u64) -> anyhow::Res info!("--- prepare keys: a(meta),c,c1,c2"); let updates = vec![ - UpsertKVReq::insert("a", &b("a")).with(KVMeta::new_expire(now_sec + 10)), + UpsertKVReq::insert("a", &b("a")).with(MetaSpec::new_expire(now_sec + 10)), UpsertKVReq::insert("c", &b("c")), UpsertKVReq::insert("c1", &b("c1")), UpsertKVReq::insert("c2", &b("c2")), diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs index 0b6b748cdd52d..1a229bafb9825 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs @@ -269,6 +269,7 @@ async fn test_watch() -> anyhow::Result<()> { value: txn_val.clone(), prev_value: true, expire_at: None, + ttl_ms: None, })), }, TxnOp { diff --git a/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs b/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs index 6281e3e0172c3..f7decabcc7fee 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs @@ -20,6 +20,7 @@ use databend_common_meta_types::Cmd; use databend_common_meta_types::KVMeta; use databend_common_meta_types::LogEntry; use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; @@ -58,7 +59,7 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> { info!("--- write a kv expiring in 3 sec"); { - let upsert = UpsertKV::update(key, key.as_bytes()).with(KVMeta::new_expire(now_sec + 3)); + let upsert = UpsertKV::update(key, key.as_bytes()).with(MetaSpec::new_expire(now_sec + 3)); leader.write(LogEntry::new(Cmd::UpsertKV(upsert))).await?; log_index += 1; @@ -76,7 +77,7 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> { { let upsert = UpsertKV::update(key, value2.as_bytes()) .with(MatchSeq::Exact(seq)) - .with(KVMeta::new_expire(now_sec + 1000)); + .with(MetaSpec::new_expire(now_sec + 1000)); leader.write(LogEntry::new(Cmd::UpsertKV(upsert))).await?; log_index += 1; } diff --git a/src/meta/types/build.rs b/src/meta/types/build.rs index d58111732b88e..da1cd2cfd7a26 100644 --- a/src/meta/types/build.rs +++ b/src/meta/types/build.rs @@ -37,6 +37,8 @@ fn build_proto() { println!("cargo:rerun-if-changed={}", proto.to_str().unwrap()); } + println!("cargo:rerun-if-changed=build.rs"); + let mut config = prost_build::Config::new(); config.protoc_arg("--experimental_allow_proto3_optional"); @@ -131,6 +133,10 @@ fn build_proto() { "KVMeta", "#[derive(Eq, serde::Serialize, serde::Deserialize)]", ) + .field_attribute( + "TxnPutRequest.ttl_ms", + r#"#[serde(skip_serializing_if = "Option::is_none")]"#, + ) .compile_with_config(config, &protos, &[&proto_dir]) .unwrap(); } diff --git a/src/meta/types/proto/request.proto b/src/meta/types/proto/request.proto index 5acb948567642..e2b21862dbcda 100644 --- a/src/meta/types/proto/request.proto +++ b/src/meta/types/proto/request.proto @@ -38,11 +38,20 @@ message TxnGetResponse { // Put request and response message TxnPutRequest { string key = 1; + bytes value = 2; + // if or not return the prev value bool prev_value = 3; + // expire time optional uint64 expire_at = 4; + + // Time to last in milliseconds. + // + // TTL is the relative expire time, since the raft-log applied. + // If `ttl_ms` is set, `expire_at` is ignored. + optional uint64 ttl_ms = 5; } message TxnPutResponse { diff --git a/src/meta/types/src/cmd/cmd_context.rs b/src/meta/types/src/cmd/cmd_context.rs new file mode 100644 index 0000000000000..68d3c7d063981 --- /dev/null +++ b/src/meta/types/src/cmd/cmd_context.rs @@ -0,0 +1,38 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::Time; + +/// A context used when executing a [`Cmd`], to provide additional environment information. +/// +/// [`Cmd`]: crate::Cmd +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct CmdContext { + time: Time, +} + +impl CmdContext { + pub fn from_millis(millis: u64) -> Self { + Self::new(Time::from_millis(millis)) + } + + pub fn new(time: Time) -> Self { + CmdContext { time } + } + + /// Returns the time since 1970-01-01 when this log is proposed by the leader. + pub fn time(&self) -> Time { + self.time + } +} diff --git a/src/meta/types/src/cmd/meta_spec.rs b/src/meta/types/src/cmd/meta_spec.rs new file mode 100644 index 0000000000000..2222b5b0fdfa4 --- /dev/null +++ b/src/meta/types/src/cmd/meta_spec.rs @@ -0,0 +1,114 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use crate::cmd::CmdContext; +use crate::seq_value::KVMeta; +use crate::time::Interval; + +/// Specifies the metadata associated with a kv record, used in an `upsert` cmd. +/// +/// This is similar to [`KVMeta`] but differs, [`KVMeta`] is used in storage, +/// as this instance is employed for transport purposes. +/// When an `upsert` cmd is applied, this instance is evaluated and a `KVMeta` is built. +#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Clone, Eq, PartialEq)] +pub struct MetaSpec { + /// expiration time in second since 1970 + pub(crate) expire_at: Option, + + /// Relative expiration time interval since when the raft log is applied. + /// + /// Use this field if possible to avoid the clock skew between client and meta-service. + /// `expire_at` may already be expired when it is applied to state machine. + /// + /// If it is not None, once applied, the `expire_at` field will be replaced with the calculated absolute expiration time. + /// + /// For backward compatibility, this field is not serialized if it `None`, as if it does not exist. + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) ttl: Option, +} + +impl MetaSpec { + /// Create a new KVMeta + pub fn new(expire_at: Option, ttl: Option) -> Self { + Self { expire_at, ttl } + } + + /// Create a KVMeta with a absolute expiration time in second since 1970-01-01. + pub fn new_expire(expire_at_sec: u64) -> Self { + Self { + expire_at: Some(expire_at_sec), + ttl: None, + } + } + + /// Create a KVMeta with relative expiration time(ttl). + pub fn new_ttl(ttl: Duration) -> Self { + Self { + expire_at: None, + ttl: Some(Interval::from_duration(ttl)), + } + } + + /// Convert meta spec into a [`KVMeta`] to be stored in storage. + pub fn to_kv_meta(&self, cmd_ctx: &CmdContext) -> KVMeta { + // If `ttl` is set, override `expire_at` + if let Some(ttl) = self.ttl { + return KVMeta::new_expire((cmd_ctx.time() + ttl).seconds()); + } + + // No `ttl`, check if absolute expire time `expire_at` is set. + KVMeta::new(self.expire_at) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::MetaSpec; + use crate::cmd::CmdContext; + use crate::KVMeta; + use crate::Time; + + #[test] + fn test_serde() { + let meta = MetaSpec::new_expire(100); + let s = serde_json::to_string(&meta).unwrap(); + assert_eq!(r#"{"expire_at":100}"#, s); + + let got: KVMeta = serde_json::from_str(&s).unwrap(); + assert_eq!(Some(100), got.expire_at); + + let meta = MetaSpec::new_ttl(Duration::from_millis(100)); + let s = serde_json::to_string(&meta).unwrap(); + assert_eq!(r#"{"expire_at":null,"ttl":{"millis":100}}"#, s); + } + + #[test] + fn test_to_kv_meta() { + let cmd_ctx = CmdContext::new(Time::from_millis(2000)); + + // ttl + let meta = MetaSpec::new_ttl(Duration::from_millis(1000)); + let kv_meta = meta.to_kv_meta(&cmd_ctx); + assert_eq!(kv_meta.get_expire_at_ms().unwrap(), 3000); + + // expire_at + let meta = MetaSpec::new_expire(5); + let kv_meta = meta.to_kv_meta(&cmd_ctx); + assert_eq!(kv_meta.get_expire_at_ms().unwrap(), 5_000); + } +} diff --git a/src/meta/types/src/cmd.rs b/src/meta/types/src/cmd/mod.rs similarity index 89% rename from src/meta/types/src/cmd.rs rename to src/meta/types/src/cmd/mod.rs index 00e02899640a3..e2fa082152049 100644 --- a/src/meta/types/src/cmd.rs +++ b/src/meta/types/src/cmd/mod.rs @@ -13,18 +13,24 @@ // limitations under the License. use std::fmt; +use std::time::Duration; use serde::Deserialize; use serde::Serialize; use crate::with::With; -use crate::KVMeta; use crate::MatchSeq; use crate::Node; use crate::NodeId; use crate::Operation; use crate::TxnRequest; +mod cmd_context; +mod meta_spec; + +pub use cmd_context::CmdContext; +pub use meta_spec::MetaSpec; + /// A Cmd describes what a user want to do to raft state machine /// and is the essential part of a raft log. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -63,7 +69,7 @@ pub struct UpsertKV { pub value: Operation>, /// Meta data of a value. - pub value_meta: Option, + pub value_meta: Option, } impl fmt::Display for Cmd { @@ -108,7 +114,7 @@ impl UpsertKV { key: &str, seq: MatchSeq, value: Operation>, - value_meta: Option, + value_meta: Option, ) -> Self { Self { key: key.to_string(), @@ -146,17 +152,13 @@ impl UpsertKV { } pub fn with_expire_sec(self, expire_at_sec: u64) -> Self { - self.with(KVMeta { - expire_at: Some(expire_at_sec), - }) + self.with(MetaSpec::new_expire(expire_at_sec)) } - pub fn get_expire_at_ms(&self) -> Option { - if let Some(meta) = &self.value_meta { - meta.get_expire_at_ms() - } else { - None - } + /// Set the time to last for the value. + /// When the ttl is passed, the value is deleted. + pub fn with_ttl(self, ttl: Duration) -> Self { + self.with(MetaSpec::new_ttl(ttl)) } } @@ -167,8 +169,8 @@ impl With for UpsertKV { } } -impl With for UpsertKV { - fn with(mut self, meta: KVMeta) -> Self { +impl With for UpsertKV { + fn with(mut self, meta: MetaSpec) -> Self { self.value_meta = Some(meta); self } diff --git a/src/meta/types/src/eval_expire_time.rs b/src/meta/types/src/eval_expire_time.rs new file mode 100644 index 0000000000000..0b1245adb4080 --- /dev/null +++ b/src/meta/types/src/eval_expire_time.rs @@ -0,0 +1,39 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Evaluate and returns the absolute expire time. +pub trait EvalExpireTime { + /// Evaluate and returns the absolute expire time in millisecond since 1970. + /// + /// If there is no expire time, return u64::MAX. + fn eval_expire_at_ms(&self) -> u64; +} + +impl EvalExpireTime for &T +where T: EvalExpireTime +{ + fn eval_expire_at_ms(&self) -> u64 { + EvalExpireTime::eval_expire_at_ms(*self) + } +} + +impl EvalExpireTime for Option +where T: EvalExpireTime +{ + fn eval_expire_at_ms(&self) -> u64 { + self.as_ref() + .map(|m| m.eval_expire_at_ms()) + .unwrap_or(u64::MAX) + } +} diff --git a/src/meta/types/src/lib.rs b/src/meta/types/src/lib.rs index 9736cba79f26b..8128d70e1039e 100644 --- a/src/meta/types/src/lib.rs +++ b/src/meta/types/src/lib.rs @@ -21,10 +21,11 @@ mod applied_state; mod change; mod cluster; -mod cmd; +pub mod cmd; pub mod config; mod endpoint; pub mod errors; +mod eval_expire_time; mod grpc_config; mod log_entry; mod match_seq; @@ -36,6 +37,7 @@ mod raft_types; mod seq_errors; mod seq_num; mod seq_value; +mod time; mod with; mod proto_display; @@ -56,8 +58,6 @@ pub use applied_state::AppliedState; pub use change::Change; pub use cluster::Node; pub use cluster::NodeInfo; -pub use cmd::Cmd; -pub use cmd::UpsertKV; pub use endpoint::Endpoint; pub use errors::meta_api_errors::MetaAPIError; pub use errors::meta_api_errors::MetaDataError; @@ -74,6 +74,7 @@ pub use errors::meta_network_errors::MetaNetworkError; pub use errors::meta_network_errors::MetaNetworkResult; pub use errors::meta_startup_errors::MetaStartupError; pub use errors::rpc_errors::ForwardRPCError; +pub use eval_expire_time::EvalExpireTime; pub use grpc_config::GrpcConfig; pub use log_entry::LogEntry; pub use match_seq::MatchSeq; @@ -104,8 +105,14 @@ pub use seq_value::IntoSeqV; pub use seq_value::KVMeta; pub use seq_value::SeqV; pub use seq_value::SeqValue; +pub use time::Interval; +pub use time::Time; pub use with::With; +pub use crate::cmd::Cmd; +pub use crate::cmd::CmdContext; +pub use crate::cmd::MetaSpec; +pub use crate::cmd::UpsertKV; pub use crate::raft_snapshot_data::SnapshotData; pub use crate::raft_types::compat07; pub use crate::raft_types::new_log_id; diff --git a/src/meta/types/src/proto_ext/seq_v_ext.rs b/src/meta/types/src/proto_ext/seq_v_ext.rs index c0fb9df92c1f8..3c6dded06e232 100644 --- a/src/meta/types/src/proto_ext/seq_v_ext.rs +++ b/src/meta/types/src/proto_ext/seq_v_ext.rs @@ -19,7 +19,7 @@ use crate::SeqV; impl From for pb::KvMeta { fn from(m: KVMeta) -> Self { Self { - expire_at: m.get_expire_at_sec(), + expire_at: m.get_expire_at_ms().map(|x| x / 1000), } } } diff --git a/src/meta/types/src/proto_ext/txn_ext.rs b/src/meta/types/src/proto_ext/txn_ext.rs index 182e6e3e1cb33..b96fed56078fa 100644 --- a/src/meta/types/src/proto_ext/txn_ext.rs +++ b/src/meta/types/src/proto_ext/txn_ext.rs @@ -43,6 +43,22 @@ impl pb::TxnOp { value, prev_value: true, expire_at, + ttl_ms: None, + })), + } + } + + /// Create a txn operation that puts a record with ttl. + /// + /// `ttl` is relative expire time while `expire_at` is absolute expire time. + pub fn put_with_ttl(key: impl ToString, value: Vec, ttl_ms: Option) -> pb::TxnOp { + pb::TxnOp { + request: Some(pb::txn_op::Request::Put(pb::TxnPutRequest { + key: key.to_string(), + value, + prev_value: true, + expire_at: None, + ttl_ms, })), } } diff --git a/src/meta/types/src/seq_value.rs b/src/meta/types/src/seq_value.rs index 4fb08c2c32f1d..831271b1cc2b1 100644 --- a/src/meta/types/src/seq_value.rs +++ b/src/meta/types/src/seq_value.rs @@ -20,6 +20,8 @@ use std::time::UNIX_EPOCH; use serde::Deserialize; use serde::Serialize; +use crate::EvalExpireTime; + pub trait SeqValue> { fn seq(&self) -> u64; fn value(&self) -> Option<&V>; @@ -36,11 +38,7 @@ pub trait SeqValue> { /// Evaluate and returns the absolute expire time in millisecond since 1970. fn eval_expire_at_ms(&self) -> u64 { - if let Some(meta) = self.meta() { - meta.eval_expire_at_ms() - } else { - u64::MAX - } + self.meta().eval_expire_at_ms() } /// Return true if the record is expired. @@ -62,27 +60,21 @@ impl KVMeta { Self { expire_at } } - /// Create a new KVMeta with expiration time in second since 1970 + /// Create a KVMeta with a absolute expiration time in second since 1970-01-01. pub fn new_expire(expire_at: u64) -> Self { Self { expire_at: Some(expire_at), } } - /// Returns expire time in second since 1970. - pub fn get_expire_at_sec(&self) -> Option { - self.expire_at - } - /// Returns expire time in millisecond since 1970. pub fn get_expire_at_ms(&self) -> Option { self.expire_at.map(|t| t * 1000) } +} - /// Evaluate and returns the absolute expire time in millisecond since 1970. - /// - /// If there is no expire time, return u64::MAX. - pub fn eval_expire_at_ms(&self) -> u64 { +impl EvalExpireTime for KVMeta { + fn eval_expire_at_ms(&self) -> u64 { match self.expire_at { None => u64::MAX, Some(exp_at_sec) => exp_at_sec * 1000, @@ -195,14 +187,6 @@ impl SeqV { Self { seq, meta, data } } - /// Returns millisecond since 1970-01-01 - pub fn eval_expire_at_ms(&self) -> u64 { - match self.meta { - None => u64::MAX, - Some(ref m) => m.eval_expire_at_ms(), - } - } - #[must_use] pub fn set_seq(mut self, seq: u64) -> SeqV { self.seq = seq; diff --git a/src/meta/types/src/time.rs b/src/meta/types/src/time.rs new file mode 100644 index 0000000000000..702a90ec45049 --- /dev/null +++ b/src/meta/types/src/time.rs @@ -0,0 +1,194 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Add; +use std::ops::Sub; +use std::time::Duration; + +/// A interval of time. +/// +/// As a replacement of [`Duration`], which is not `serde`-able. +/// +/// `Interval` implements: `Interval +- Interval`. +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Default, + Clone, + Copy, + Hash, + Eq, + PartialEq, + PartialOrd, + Ord, +)] +pub struct Interval { + pub(crate) millis: u64, +} + +impl Interval { + pub fn from_duration(duration: Duration) -> Self { + Self { + millis: duration.as_millis() as u64, + } + } + + pub fn from_millis(millis: u64) -> Self { + Self::from_duration(Duration::from_millis(millis)) + } + + pub fn from_secs(secs: u64) -> Self { + Self::from_duration(Duration::from_secs(secs)) + } + + pub fn millis(&self) -> u64 { + self.millis + } + + pub fn seconds(&self) -> u64 { + self.millis / 1000 + } +} + +impl Add for Interval { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + millis: self.millis.saturating_add(rhs.millis), + } + } +} + +impl Sub for Interval { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self { + millis: self.millis.saturating_sub(rhs.millis), + } + } +} + +/// A time point since 1970-01-01. +/// +/// As a replacement of [`Instant`](std::time::Instant), which is not `serde`-able. +/// `Time` implements: `Time +- Interval = Time` and `Time - Time = Interval`. +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Default, + Clone, + Copy, + Hash, + Eq, + PartialEq, + PartialOrd, + Ord, +)] +pub struct Time { + pub(crate) time: Interval, +} + +impl Time { + pub fn from_millis(millis: u64) -> Self { + Self { + time: Interval::from_millis(millis), + } + } + + pub fn from_secs(secs: u64) -> Self { + Self { + time: Interval::from_secs(secs), + } + } + + pub fn millis(&self) -> u64 { + self.time.millis() + } + + pub fn seconds(&self) -> u64 { + self.time.seconds() + } +} + +impl Add for Time { + type Output = Self; + + fn add(self, rhs: Interval) -> Self::Output { + Self { + time: self.time + rhs, + } + } +} + +impl Sub for Time { + type Output = Self; + + fn sub(self, rhs: Interval) -> Self::Output { + Self { + time: self.time - rhs, + } + } +} + +impl Sub for Time { + type Output = Interval; + + fn sub(self, rhs: Self) -> Self::Output { + self.time - rhs.time + } +} + +#[cfg(test)] +mod tests { + use super::Interval; + use crate::time::Time; + + #[test] + fn test_interval() { + let interval = Interval::from_millis(1000); + assert_eq!(interval.millis(), 1000); + assert_eq!(interval.seconds(), 1); + + let interval = Interval::from_secs(1); + assert_eq!(interval.millis(), 1000); + assert_eq!(interval.seconds(), 1); + + assert_eq!(interval + interval, Interval::from_millis(2000)); + assert_eq!(interval - interval, Interval::from_millis(0)); + assert_eq!( + interval - Interval::from_millis(1500), + Interval::from_millis(0) + ); + } + + #[test] + fn test_time() { + let time = Time::from_millis(1000); + assert_eq!(time.millis(), 1000); + assert_eq!(time.seconds(), 1); + + let time = Time::from_secs(1); + assert_eq!(time.millis(), 1000); + assert_eq!(time.seconds(), 1); + + assert_eq!(time + Interval::from_millis(1000), Time::from_millis(2000)); + assert_eq!(time - Interval::from_millis(500), Time::from_millis(500)); + assert_eq!(time - Time::from_millis(500), Interval::from_millis(500)); + assert_eq!(time - Time::from_millis(1500), Interval::from_millis(0)); + } +} diff --git a/src/query/management/src/cluster/cluster_mgr.rs b/src/query/management/src/cluster/cluster_mgr.rs index 0dd1d1c2303b6..bc79a22863eae 100644 --- a/src/query/management/src/cluster/cluster_mgr.rs +++ b/src/query/management/src/cluster/cluster_mgr.rs @@ -24,8 +24,8 @@ use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_kvapi::kvapi::UpsertKVReply; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_store::MetaStore; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::NodeInfo; use databend_common_meta_types::Operation; use databend_common_meta_types::SeqV; @@ -65,14 +65,14 @@ impl ClusterMgr { }) } - fn new_lift_time(&self) -> KVMeta { + fn new_lift_time(&self) -> MetaSpec { let now = std::time::SystemTime::now(); let expire_at = now .add(self.lift_time) .duration_since(UNIX_EPOCH) .expect("Time went backwards"); - KVMeta::new_expire(expire_at.as_secs()) + MetaSpec::new_expire(expire_at.as_secs()) } } diff --git a/src/query/management/tests/it/cluster.rs b/src/query/management/tests/it/cluster.rs index 45d784267fd81..2f58c8a2c3a5e 100644 --- a/src/query/management/tests/it/cluster.rs +++ b/src/query/management/tests/it/cluster.rs @@ -14,7 +14,6 @@ use std::sync::Arc; use std::time::Duration; -use std::time::UNIX_EPOCH; use databend_common_base::base::tokio; use databend_common_exception::Result; @@ -28,7 +27,7 @@ use databend_common_meta_types::SeqV; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_successfully_add_node() -> Result<()> { - let current_time = current_seconds_time(); + let now_ms = SeqV::<()>::now_ms(); let (kv_api, cluster_api) = new_cluster_api().await?; let node_info = create_test_node_info(); @@ -43,7 +42,7 @@ async fn test_successfully_add_node() -> Result<()> { meta, data: value, }) => { - assert!(meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60); + assert!(meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000); assert_eq!(value, serde_json::to_vec(&node_info)?); } catch => panic!("GetKVActionReply{:?}", catch), @@ -116,7 +115,7 @@ async fn test_unknown_node_drop_node() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_successfully_heartbeat_node() -> Result<()> { - let current_time = current_seconds_time(); + let now_ms = SeqV::<()>::now_ms(); let (kv_api, cluster_api) = new_cluster_api().await?; let node_info = create_test_node_info(); @@ -126,26 +125,21 @@ async fn test_successfully_heartbeat_node() -> Result<()> { .get_kv("__fd_clusters/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") .await?; - assert!(value.unwrap().meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60); + let meta = value.unwrap().meta.unwrap(); + let expire_ms = meta.get_expire_at_ms().unwrap(); + assert!(expire_ms - now_ms >= 59_000); - let current_time = current_seconds_time(); + let now_ms = SeqV::<()>::now_ms(); cluster_api.heartbeat(&node_info, MatchSeq::GE(1)).await?; let value = kv_api .get_kv("__fd_clusters/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") .await?; - assert!(value.unwrap().meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60); + assert!(value.unwrap().meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000); Ok(()) } -fn current_seconds_time() -> u64 { - let now = std::time::SystemTime::now(); - now.duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() -} - fn create_test_node_info() -> NodeInfo { NodeInfo { id: String::from("test_node"), diff --git a/src/query/storages/result_cache/src/meta_manager.rs b/src/query/storages/result_cache/src/meta_manager.rs index 40cf4eecb0b93..5d7a58267e74f 100644 --- a/src/query/storages/result_cache/src/meta_manager.rs +++ b/src/query/storages/result_cache/src/meta_manager.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_store::MetaStore; -use databend_common_meta_types::KVMeta; use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::MetaSpec; use databend_common_meta_types::Operation; use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; @@ -50,7 +50,7 @@ impl ResultCacheMetaManager { key, seq, value: Operation::Update(value), - value_meta: Some(KVMeta::new_expire(expire_at)), + value_meta: Some(MetaSpec::new_expire(expire_at)), }) .await?; Ok(()) From f861e73c877f35c9ec7ae4a4b6f92683b2850202 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Sat, 16 Dec 2023 18:51:28 +0800 Subject: [PATCH 2/3] Update PULL_REQUEST_TEMPLATE.md --- .github/PULL_REQUEST_TEMPLATE.md | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 2b8bc5cb9af8d..88b4adb8c434e 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -2,12 +2,23 @@ I hereby agree to the terms of the CLA available at: https://docs.databend.com/d ## Summary -Summary about this PR +_Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ -- Closes #issue +Fixes #[Link the issue here] ## Tests + - [ ] Unit Test - [ ] Logic Test - [ ] Benchmark Test - [ ] No Test - _Explain why_ + +## Type of change + +- [ ] Bug Fix (non-breaking change which fixes an issue) +- [ ] New Feature (non-breaking change which adds functionality) +- [ ] Breaking Change (fix or feature that could cause existing functionality not to work as expected) +- [ ] Documentation Update +- [ ] Refactoring +- [ ] Performance Improvement +- [ ] Other (please describe): From caad7d24ae236f118bd59d7802d179a8d98ab02a Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Sat, 16 Dec 2023 18:54:17 +0800 Subject: [PATCH 3/3] refactor: using borsh instead of bincode for serde agg function state (#13997) * using borsh instead of bincode for serde agg function state * impl borsh serde for other agg funcs * impl borsh serde for other agg funcs * fix sqllogic tests * fix conflict --- Cargo.lock | 161 +++++++++++++----- Cargo.toml | 6 +- src/common/io/Cargo.toml | 3 +- ...ialization.rs => bincode_serialization.rs} | 0 src/common/io/src/borsh_serialization.rs | 41 +++++ src/common/io/src/lib.rs | 3 +- src/common/io/src/prelude.rs | 3 +- ...ialization.rs => bincode_serialization.rs} | 0 src/common/io/tests/it/borsh_serialization.rs | 89 ++++++++++ src/common/io/tests/it/main.rs | 3 +- src/query/expression/Cargo.toml | 7 +- src/query/expression/src/types/decimal.rs | 27 ++- src/query/expression/src/types/number.rs | 14 +- src/query/expression/src/values.rs | 23 ++- src/query/expression/tests/it/serde.rs | 30 ++++ src/query/functions/Cargo.toml | 4 +- .../aggregate_approx_count_distinct.rs | 8 +- .../src/aggregates/aggregate_arg_min_max.rs | 26 ++- .../src/aggregates/aggregate_array_agg.rs | 31 ++-- .../src/aggregates/aggregate_array_moving.rs | 41 +++-- .../functions/src/aggregates/aggregate_avg.rs | 33 ++-- .../src/aggregates/aggregate_count.rs | 8 +- .../src/aggregates/aggregate_covariance.rs | 14 +- .../aggregates/aggregate_distinct_state.rs | 28 +-- .../src/aggregates/aggregate_kurtosis.rs | 14 +- .../src/aggregates/aggregate_min_max_any.rs | 24 ++- .../src/aggregates/aggregate_quantile_cont.rs | 14 +- .../src/aggregates/aggregate_quantile_disc.rs | 28 ++- .../aggregates/aggregate_quantile_tdigest.rs | 14 +- .../aggregate_quantile_tdigest_weighted.rs | 8 +- .../src/aggregates/aggregate_retention.rs | 14 +- .../src/aggregates/aggregate_scalar_state.rs | 18 +- .../src/aggregates/aggregate_skewness.rs | 14 +- .../src/aggregates/aggregate_stddev.rs | 25 ++- .../src/aggregates/aggregate_string_agg.rs | 14 +- .../functions/src/aggregates/aggregate_sum.rs | 55 +++--- .../src/aggregates/aggregate_window_funnel.rs | 28 ++- .../src/aggregates/aggregator_common.rs | 14 +- .../02_0000_function_aggregate_state.test | 4 +- 39 files changed, 588 insertions(+), 303 deletions(-) rename src/common/io/src/{serialization.rs => bincode_serialization.rs} (100%) create mode 100644 src/common/io/src/borsh_serialization.rs rename src/common/io/tests/it/{serialization.rs => bincode_serialization.rs} (100%) create mode 100644 src/common/io/tests/it/borsh_serialization.rs diff --git a/Cargo.lock b/Cargo.lock index 458520423a22a..bc40520848dc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,10 +1130,20 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40f9ca3698b2e4cb7c15571db0abc5551dca417a21ae8140460b50309bb2cc62" dependencies = [ - "borsh-derive", + "borsh-derive 0.10.2", "hashbrown 0.13.2", ] +[[package]] +name = "borsh" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9897ef0f1bd2362169de6d7e436ea2237dc1085d7d1e4db75f4be34d86f309d1" +dependencies = [ + "borsh-derive 1.2.1", + "cfg_aliases", +] + [[package]] name = "borsh-derive" version = "0.10.2" @@ -1147,6 +1157,20 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "borsh-derive" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478b41ff04256c5c8330f3dfdaaae2a5cc976a8e75088bafa4625b0d0208de8c" +dependencies = [ + "once_cell", + "proc-macro-crate 2.0.1", + "proc-macro2", + "quote", + "syn 2.0.29", + "syn_derive", +] + [[package]] name = "borsh-derive-internal" version = "0.10.2" @@ -1454,6 +1478,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "chrono" version = "0.4.31" @@ -2355,7 +2385,7 @@ dependencies = [ "dyn-clone", "either", "env_logger", - "ethnum", + "ethnum 1.5.0", "fallible-streaming-iterator", "flate2", "foreign_vec", @@ -2395,7 +2425,7 @@ dependencies = [ "databend-common-io", "databend-common-meta-app", "enum-as-inner", - "ethnum", + "ethnum 1.5.0", "fast-float", "goldenfile", "itertools 0.10.5", @@ -2403,7 +2433,7 @@ dependencies = [ "minitrace", "nom", "nom-rule", - "ordered-float 3.7.0", + "ordered-float 4.2.0", "pratt 0.4.0", "pretty", "pretty_assertions", @@ -2628,6 +2658,7 @@ dependencies = [ "arrow-select", "async-backtrace", "base64 0.21.0", + "borsh 1.2.1", "bumpalo", "chrono", "chrono-tz", @@ -2641,7 +2672,7 @@ dependencies = [ "databend-common-io", "educe", "enum-as-inner", - "ethnum", + "ethnum 1.5.0", "futures", "goldenfile", "hex", @@ -2650,9 +2681,9 @@ dependencies = [ "lexical-core", "log", "match-template", - "micromarshal", + "micromarshal 0.5.0", "num-traits", - "ordered-float 3.7.0", + "ordered-float 4.2.0", "pretty_assertions", "rand 0.8.5", "roaring", @@ -2684,9 +2715,9 @@ dependencies = [ "jsonb 0.3.0 (git+https://github.com/datafuselabs/jsonb?rev=582c139)", "lexical-core", "match-template", - "micromarshal", + "micromarshal 0.4.0", "num", - "ordered-float 3.7.0", + "ordered-float 4.2.0", "pretty_assertions", "roaring", "serde_json", @@ -2699,6 +2730,7 @@ version = "0.1.0" dependencies = [ "base64 0.21.0", "blake3", + "borsh 1.2.1", "bstr 1.6.2", "bumpalo", "bytes", @@ -2717,7 +2749,7 @@ dependencies = [ "databend-common-io", "databend-common-openai", "databend-common-vector", - "ethnum", + "ethnum 1.5.0", "geo", "geohash", "goldenfile", @@ -2733,11 +2765,10 @@ dependencies = [ "naive-cityhash", "num-traits", "once_cell", - "ordered-float 3.7.0", + "ordered-float 4.2.0", "rand 0.8.5", "regex", "roaring", - "serde", "sha1", "sha2", "simdutf8", @@ -2772,8 +2803,8 @@ dependencies = [ "cfg-if", "databend-common-arrow", "databend-common-base", - "ethnum", - "ordered-float 3.7.0", + "ethnum 1.5.0", + "ordered-float 4.2.0", "rand 0.8.5", ] @@ -2800,14 +2831,15 @@ version = "0.1.0" dependencies = [ "aho-corasick", "bincode 2.0.0-rc.3", + "borsh 1.2.1", "bytes", "chrono", "chrono-tz", "databend-common-exception", - "ethnum", + "ethnum 1.5.0", "lexical-core", - "micromarshal", - "ordered-float 3.7.0", + "micromarshal 0.5.0", + "ordered-float 4.2.0", "rand 0.8.5", "roaring", "serde", @@ -3347,7 +3379,7 @@ dependencies = [ "num-derive", "num-traits", "opendal", - "ordered-float 3.7.0", + "ordered-float 4.2.0", "parking_lot 0.12.1", "percent-encoding", "regex", @@ -3382,7 +3414,7 @@ dependencies = [ "metrics", "once_cell", "opendal", - "ordered-float 3.7.0", + "ordered-float 4.2.0", "parquet", "regex", "reqwest", @@ -3526,7 +3558,7 @@ dependencies = [ "log", "minitrace", "opendal", - "ordered-float 3.7.0", + "ordered-float 4.2.0", "serde", "typetag", "volo-thrift", @@ -3639,7 +3671,7 @@ dependencies = [ "databend-storages-common-index", "databend-storages-common-pruner", "databend-storages-common-table-meta", - "ethnum", + "ethnum 1.5.0", "futures", "log", "opendal", @@ -4190,7 +4222,7 @@ dependencies = [ "databend-storages-common-cache-manager", "databend-storages-common-index", "databend-storages-common-table-meta", - "ethnum", + "ethnum 1.5.0", "futures", "futures-util", "goldenfile", @@ -4216,7 +4248,7 @@ dependencies = [ "once_cell", "opendal", "opensrv-mysql", - "ordered-float 3.7.0", + "ordered-float 4.2.0", "p256 0.13.0", "parking_lot 0.12.1", "parquet", @@ -4335,7 +4367,7 @@ dependencies = [ "databend-common-sql", "databend-driver", "databend-sql", - "ethnum", + "ethnum 1.5.0", "itertools 0.10.5", "jsonb 0.3.0 (git+https://github.com/datafuselabs/jsonb?rev=582c139)", "rand 0.8.5", @@ -5002,17 +5034,16 @@ name = "ethnum" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0198b9d0078e0f30dedc7acbb21c974e838fc8fae3ee170128658a98cb2c1c04" + +[[package]] +name = "ethnum" +version = "1.5.0" +source = "git+https://github.com/ariesdevil/ethnum-rs?rev=4cb05f1#4cb05f1e407f76b193d81eef71b5dd0b73216856" dependencies = [ - "ethnum-macros", + "borsh 1.2.1", "serde", ] -[[package]] -name = "ethnum-macros" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba69ba11ab069d2f946bb46a199630ad6680fa3ecdc87a62c7477f9916095841" - [[package]] name = "event-listener" version = "2.5.3" @@ -7721,7 +7752,7 @@ dependencies = [ "byteorder", "fast-float", "nom", - "ordered-float 4.1.1", + "ordered-float 4.2.0", "rand 0.8.5", "serde_json", ] @@ -7734,7 +7765,7 @@ dependencies = [ "byteorder", "fast-float", "nom", - "ordered-float 4.1.1", + "ordered-float 4.2.0", "rand 0.8.5", "serde_json", ] @@ -8388,10 +8419,19 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2777e7006e9b80afb5d2797e2b9e23865451708c9e280a61253586a0dd3a5db1" dependencies = [ - "ethnum", + "ethnum 1.3.2", "ordered-float 3.7.0", ] +[[package]] +name = "micromarshal" +version = "0.5.0" +source = "git+https://github.com/ariesdevil/opensrv?rev=6c96813#6c96813099b49e63719740c216914e2f252321bc" +dependencies = [ + "ethnum 1.5.0", + "ordered-float 4.2.0", +] + [[package]] name = "mime" version = "0.3.16" @@ -8945,7 +8985,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c11e44798ad209ccdd91fc192f0526a369a01234f7373e1b141c96d7cee4f0e" dependencies = [ - "proc-macro-crate 1.3.1", + "proc-macro-crate 2.0.1", "proc-macro2", "quote", "syn 2.0.29", @@ -9233,17 +9273,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" dependencies = [ "num-traits", - "rand 0.8.5", - "serde", ] [[package]] name = "ordered-float" -version = "4.1.1" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "536900a8093134cf9ccf00a27deb3532421099e958d9dd431135d0c7543ca1e8" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" dependencies = [ + "borsh 1.2.1", "num-traits", + "rand 0.8.5", + "serde", ] [[package]] @@ -9986,7 +10027,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" dependencies = [ "once_cell", - "toml_edit", + "toml_edit 0.19.14", +] + +[[package]] +name = "proc-macro-crate" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97dc5fea232fc28d2f597b37c4876b348a40e33f3b02cc975c8d006d78d94b1a" +dependencies = [ + "toml_datetime", + "toml_edit 0.20.2", ] [[package]] @@ -11031,7 +11082,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0446843641c69436765a35a5a77088e28c2e6a12da93e84aa3ab1cd4aa5a042" dependencies = [ "arrayvec 0.7.2", - "borsh", + "borsh 0.10.2", "bytecheck", "byteorder", "bytes", @@ -11972,8 +12023,9 @@ checksum = "2b2231b7c3057d5e4ad0156fb3dc807d900806020c5ffa3ee6ff2c8c76fb8520" [[package]] name = "streaming_algorithms" version = "0.3.0" -source = "git+https://github.com/datafuse-extras/streaming_algorithms?tag=hyperloglog_del_op_fix_overflow_bug#e07d26324e3d5f42bd233231e80522b2659fce5e" +source = "git+https://github.com/ariesdevil/streaming_algorithms?rev=2839d5d#2839d5d066c299bd37abd8679dfeb74f9bfb5093" dependencies = [ + "borsh 1.2.1", "rand 0.7.3", "serde", "twox-hash", @@ -12179,6 +12231,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "syn_derive" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1329189c02ff984e9736652b1631330da25eaa6bc639089ed4915d25446cbe7b" +dependencies = [ + "proc-macro-error 1.0.4", + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "sync_wrapper" version = "0.1.2" @@ -12535,7 +12599,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit", + "toml_edit 0.19.14", ] [[package]] @@ -12560,6 +12624,17 @@ dependencies = [ "winnow", ] +[[package]] +name = "toml_edit" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" +dependencies = [ + "indexmap 2.0.0", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.9.2" diff --git a/Cargo.toml b/Cargo.toml index 1e978899cc6bd..fa4271d29ed1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,7 @@ openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.9.0-alph # Core crates and utilities async-trait = { version = "0.1.57", package = "async-trait-fn" } bincode = { version = "2.0.0-rc.3", features = ["serde", "std", "alloc"] } +borsh = { version = "1.2.1", features = ["derive"] } bytes = "1.5.0" byteorder = "1.4.3" chrono = { version = "0.4.31", features = ["serde"] } @@ -138,7 +139,7 @@ clap = { version = "4.4.2", features = ["derive"] } dashmap = "5.4.0" derive_more = "0.99.17" enumflags2 = { version = "0.7.7", features = ["serde"] } -ethnum = { version = "1.3.2" } +ethnum = { git = "https://github.com/ariesdevil/ethnum-rs", rev = "4cb05f1" } itertools = "0.10.5" log = { version = "0.4.19", features = ["serde", "kv_unstable_std"] } logcall = "0.1.5" @@ -147,7 +148,7 @@ metrics = "0.20.1" minitrace = { version = "0.6", features = ["enable"] } mysql_async = { version = "0.33", default-features = false, features = ["rustls-tls"] } once_cell = "1.15.0" -ordered-float = { version = "3.6.0", default-features = false } +ordered-float = { version = "4.1.0", default-features = false } parking_lot = "0.12.1" poem = { version = "~1.3.57", features = ["rustls", "multipart", "compression"] } prometheus-client = "0.21.2" @@ -254,3 +255,4 @@ parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "b0e6545" metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "fc2ecd1" } icelake = { git = "https://github.com/icelake-io/icelake", rev = "f06cdf3" } sentry = { git = "https://github.com/getsentry/sentry-rust", rev = "6ef6d97" } +micromarshal = { git = "https://github.com/ariesdevil/opensrv", rev = "6c96813" } diff --git a/src/common/io/Cargo.toml b/src/common/io/Cargo.toml index 465f08f6f2189..dc2d421ec4e8f 100644 --- a/src/common/io/Cargo.toml +++ b/src/common/io/Cargo.toml @@ -18,12 +18,13 @@ databend-common-exception = { path = "../exception" } # Crates.io dependencies bincode = { workspace = true } +borsh = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } ethnum = { workspace = true } lexical-core = "0.8.5" -micromarshal = "0.4.0" +micromarshal = "0.5.0" ordered-float = { workspace = true } roaring = { version = "0.10.1", features = ["serde"] } serde = { workspace = true } diff --git a/src/common/io/src/serialization.rs b/src/common/io/src/bincode_serialization.rs similarity index 100% rename from src/common/io/src/serialization.rs rename to src/common/io/src/bincode_serialization.rs diff --git a/src/common/io/src/borsh_serialization.rs b/src/common/io/src/borsh_serialization.rs new file mode 100644 index 0000000000000..c6c66c2ec2402 --- /dev/null +++ b/src/common/io/src/borsh_serialization.rs @@ -0,0 +1,41 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use borsh::BorshDeserialize; +use borsh::BorshSerialize; +use databend_common_exception::Result; + +/// bincode serialize_into wrap with optimized config +#[inline] +pub fn borsh_serialize_into_buf( + writer: &mut W, + value: &T, +) -> Result<()> { + borsh::to_writer(writer, value)?; + Ok(()) +} + +/// bincode deserialize_from wrap with optimized config +#[inline] +pub fn borsh_deserialize_from_slice(slice: &[u8]) -> Result { + let value = borsh::from_slice::(slice)?; + Ok(value) +} + +#[inline] +pub fn borsh_deserialize_from_stream(stream: &mut &[u8]) -> Result { + // `T::deserialize` will updates the buffer to point at the remaining bytes. + let value = T::deserialize(stream)?; + Ok(value) +} diff --git a/src/common/io/src/lib.rs b/src/common/io/src/lib.rs index b83a6310cb8e1..f4973235dcbe8 100644 --- a/src/common/io/src/lib.rs +++ b/src/common/io/src/lib.rs @@ -30,13 +30,14 @@ pub mod prelude; mod binary_read; mod binary_write; +mod bincode_serialization; mod bitmap; +mod borsh_serialization; pub mod cursor_ext; mod decimal; mod escape; mod format_settings; mod position; -mod serialization; mod stat_buffer; pub use bitmap::parse_bitmap; diff --git a/src/common/io/src/prelude.rs b/src/common/io/src/prelude.rs index 6fa84ff4ce84c..a8f5aaeb2fe66 100644 --- a/src/common/io/src/prelude.rs +++ b/src/common/io/src/prelude.rs @@ -18,7 +18,8 @@ pub use bytes::BytesMut; pub use crate::binary_read::BinaryRead; pub use crate::binary_write::put_uvarint; pub use crate::binary_write::BinaryWrite; +pub use crate::bincode_serialization::*; +pub use crate::borsh_serialization::*; pub use crate::format_settings::FormatSettings; pub use crate::position::*; -pub use crate::serialization::*; pub use crate::stat_buffer::StatBuffer; diff --git a/src/common/io/tests/it/serialization.rs b/src/common/io/tests/it/bincode_serialization.rs similarity index 100% rename from src/common/io/tests/it/serialization.rs rename to src/common/io/tests/it/bincode_serialization.rs diff --git a/src/common/io/tests/it/borsh_serialization.rs b/src/common/io/tests/it/borsh_serialization.rs new file mode 100644 index 0000000000000..f1a43b72f7405 --- /dev/null +++ b/src/common/io/tests/it/borsh_serialization.rs @@ -0,0 +1,89 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Cursor; + +use borsh::BorshDeserialize; +use borsh::BorshSerialize; +use databend_common_exception::Result; +use databend_common_io::prelude::*; + +#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)] +struct TestStruct { + a: i32, + b: String, +} + +fn create_test_value() -> TestStruct { + TestStruct { + a: 42, + b: "Hello, world!".to_string(), + } +} + +#[test] +fn test_borsh_serialize_into_buf() { + let mut buffer = Cursor::new(Vec::new()); + let value = create_test_value(); + + let serialize_result = borsh_serialize_into_buf(&mut buffer, &value); + assert!(serialize_result.is_ok()); + assert!(!buffer.get_ref().is_empty()); +} + +#[test] +fn test_borsh_deserialize_from_slice() { + let value = create_test_value(); + let mut buffer = Cursor::new(Vec::new()); + borsh_serialize_into_buf(&mut buffer, &value).unwrap(); + let slice = buffer.get_ref().as_slice(); + + let deserialized: TestStruct = borsh_deserialize_from_slice(slice).unwrap(); + assert_eq!(value, deserialized); +} + +#[test] +fn test_borsh_deserialize_updates_slice_position() { + let first_value = create_test_value(); + let second_value = TestStruct { + a: 24, + b: "Goodbye, world!".to_string(), + }; + + // Serialize both values into a buffer + let mut buffer = Cursor::new(Vec::new()); + borsh_serialize_into_buf(&mut buffer, &first_value).unwrap(); + borsh_serialize_into_buf(&mut buffer, &second_value).unwrap(); + + // Create a mutable slice pointing to the buffer's content + let mut slice = buffer.get_ref().as_slice(); + + // Deserialize the first value + let deserialized_first: TestStruct = borsh_deserialize_from_stream(&mut slice).unwrap(); + assert_eq!(first_value, deserialized_first); + + // After deserializing the first value, the slice should have been updated to point to the remainder + let deserialized_second: TestStruct = borsh_deserialize_from_stream(&mut slice).unwrap(); + assert_eq!(second_value, deserialized_second); + + // Check if the slice is at the end (no more data to deserialize) + assert!(slice.is_empty()); +} + +#[test] +fn test_borsh_deserialize_from_invalid_slice() { + let invalid_slice = &b"invalid data"[..]; + let result: Result = borsh_deserialize_from_slice(invalid_slice); + assert!(result.is_err()); +} diff --git a/src/common/io/tests/it/main.rs b/src/common/io/tests/it/main.rs index 0e774367fc523..9ef129835d915 100644 --- a/src/common/io/tests/it/main.rs +++ b/src/common/io/tests/it/main.rs @@ -18,6 +18,7 @@ extern crate core; mod binary_read; mod binary_write; +mod bincode_serialization; +mod borsh_serialization; mod cursor_ext; mod escape; -mod serialization; diff --git a/src/query/expression/Cargo.toml b/src/query/expression/Cargo.toml index 326ba470a4904..014c8750ce37f 100755 --- a/src/query/expression/Cargo.toml +++ b/src/query/expression/Cargo.toml @@ -26,6 +26,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } async-backtrace = { workspace = true } base64 = "0.21.0" +borsh = { workspace = true, features = ["derive"] } bumpalo = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } @@ -33,7 +34,7 @@ comfy-table = "6" dashmap = { workspace = true } educe = "0.4" enum-as-inner = "0.5" -ethnum = { workspace = true, features = ["serde", "macros"] } +ethnum = { workspace = true, features = ["serde", "macros", "borsh"] } futures = { workspace = true } hex = "0.4.3" itertools = { workspace = true } @@ -41,9 +42,9 @@ jsonb = { workspace = true } lexical-core = "0.8.5" log = { workspace = true } match-template = { workspace = true } -micromarshal = "0.4.0" +micromarshal = "0.5.0" num-traits = "0.2.15" -ordered-float = { workspace = true, features = ["serde", "rand"] } +ordered-float = { workspace = true, features = ["serde", "rand", "borsh"] } rand = { workspace = true } roaring = { version = "0.10.1", features = ["serde"] } rust_decimal = "1.26" diff --git a/src/query/expression/src/types/decimal.rs b/src/query/expression/src/types/decimal.rs index 66ab424220ae3..d27cc86488d04 100644 --- a/src/query/expression/src/types/decimal.rs +++ b/src/query/expression/src/types/decimal.rs @@ -16,6 +16,8 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::ops::Range; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -195,7 +197,17 @@ pub enum DecimalDataType { Decimal256(DecimalSize), } -#[derive(Clone, Copy, PartialEq, Eq, EnumAsInner, Serialize, Deserialize)] +#[derive( + Clone, + Copy, + PartialEq, + Eq, + EnumAsInner, + Serialize, + Deserialize, + BorshSerialize, + BorshDeserialize, +)] pub enum DecimalScalar { Decimal128(i128, DecimalSize), Decimal256(i256, DecimalSize), @@ -240,7 +252,18 @@ pub enum DecimalDomain { Decimal256(SimpleDomain, DecimalSize), } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Hash, + Serialize, + Deserialize, + BorshSerialize, + BorshDeserialize, +)] pub struct DecimalSize { pub precision: u8, pub scale: u8, diff --git a/src/query/expression/src/types/number.rs b/src/query/expression/src/types/number.rs index 10c5acb172cae..58fe6a919dcad 100644 --- a/src/query/expression/src/types/number.rs +++ b/src/query/expression/src/types/number.rs @@ -16,6 +16,8 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::ops::Range; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::buffer::Buffer; use enum_as_inner::EnumAsInner; use itertools::Itertools; @@ -252,7 +254,17 @@ pub enum NumberDataType { Float64, } -#[derive(Clone, Copy, PartialEq, Eq, EnumAsInner, Serialize, Deserialize)] +#[derive( + Clone, + Copy, + PartialEq, + Eq, + EnumAsInner, + Serialize, + Deserialize, + BorshSerialize, + BorshDeserialize, +)] pub enum NumberScalar { UInt8(u8), UInt16(u16), diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index 3deb35c1757f7..200460ad075fe 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -15,10 +15,13 @@ use std::cmp::Ordering; use std::hash::Hash; use std::io::Read; +use std::io::Write; use std::ops::Range; use base64::engine::general_purpose; use base64::prelude::*; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::and; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::bitmap::MutableBitmap; @@ -99,7 +102,9 @@ pub enum ValueRef<'a, T: ValueType> { Column(T::Column), } -#[derive(Debug, Clone, EnumAsInner, Eq, Serialize, Deserialize)] +#[derive( + Debug, Clone, EnumAsInner, Eq, Serialize, Deserialize, BorshSerialize, BorshDeserialize, +)] pub enum Scalar { Null, EmptyArray, @@ -1978,6 +1983,22 @@ impl<'de> Deserialize<'de> for Column { } } +impl BorshSerialize for Column { + fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + let bytes = serialize_column(self); + BorshSerialize::serialize(&bytes, writer) + } +} + +impl BorshDeserialize for Column { + fn deserialize_reader(reader: &mut R) -> std::io::Result { + let bytes: Vec = borsh::BorshDeserialize::deserialize_reader(reader)?; + let column = + deserialize_column(&bytes).expect("expecting an arrow chunk with exactly one column"); + Ok(column) + } +} + impl Eq for Column {} impl ColumnBuilder { diff --git a/src/query/expression/tests/it/serde.rs b/src/query/expression/tests/it/serde.rs index b2df32b3c7393..96ad99eb605ef 100644 --- a/src/query/expression/tests/it/serde.rs +++ b/src/query/expression/tests/it/serde.rs @@ -14,6 +14,8 @@ use std::vec; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_exception::Result; use databend_common_expression::arrow::deserialize_column; use databend_common_expression::arrow::serialize_column; @@ -25,6 +27,8 @@ use databend_common_expression::RemoteExpr; use databend_common_expression::Scalar; use databend_common_io::prelude::bincode_deserialize_from_slice; use databend_common_io::prelude::bincode_serialize_into_buf; +use databend_common_io::prelude::borsh_deserialize_from_slice; +use databend_common_io::prelude::borsh_serialize_into_buf; #[test] fn test_serde_column() -> Result<()> { @@ -82,3 +86,29 @@ fn test_serde_bin_column() -> Result<()> { } Ok(()) } + +#[test] +fn test_borsh_serde_column() -> Result<()> { + #[derive(BorshSerialize, BorshDeserialize, Eq, PartialEq, Debug)] + struct Plan { + column: Column, + } + + let column = StringType::from_data(vec!["SM CASE", "a", "b", "e", "f", "g"]); + let plan = Plan { column }; + + { + let json = borsh::to_vec(&plan).unwrap(); + let new_plan = borsh::from_slice::(&json).unwrap(); + assert!(plan == new_plan); + } + + { + let mut vs = vec![]; + borsh_serialize_into_buf(&mut vs, &plan).unwrap(); + let vs = vs.as_slice(); + let new_plan: Plan = borsh_deserialize_from_slice(vs).unwrap(); + assert!(plan == new_plan); + } + Ok(()) +} diff --git a/src/query/functions/Cargo.toml b/src/query/functions/Cargo.toml index 55bfceab67cc7..702507bf4271e 100644 --- a/src/query/functions/Cargo.toml +++ b/src/query/functions/Cargo.toml @@ -24,6 +24,7 @@ jsonb = { workspace = true } # Crates.io dependencies base64 = "0.21.0" blake3 = "1.3.1" +borsh = { workspace = true, features = ["derive"] } bstr = "1.0.1" bumpalo = { workspace = true } bytes = { workspace = true } @@ -53,12 +54,11 @@ ordered-float = { workspace = true, features = [ rand = { workspace = true } regex = { workspace = true } roaring = "0.10.1" -serde = { workspace = true } sha1 = "0.10.5" sha2 = "0.10.6" simdutf8 = "0.1.4" siphasher = "0.3" -streaming_algorithms = { git = "https://github.com/datafuse-extras/streaming_algorithms", tag = "hyperloglog_del_op_fix_overflow_bug" } +streaming_algorithms = { git = "https://github.com/ariesdevil/streaming_algorithms", rev = "2839d5d" } strength_reduce = "0.2.3" twox-hash = "1.6.3" diff --git a/src/query/functions/src/aggregates/aggregate_approx_count_distinct.rs b/src/query/functions/src/aggregates/aggregate_approx_count_distinct.rs index de8a686742db0..22a293c86b9de 100644 --- a/src/query/functions/src/aggregates/aggregate_approx_count_distinct.rs +++ b/src/query/functions/src/aggregates/aggregate_approx_count_distinct.rs @@ -31,8 +31,8 @@ use streaming_algorithms::HyperLogLog; use super::aggregate_function::AggregateFunction; use super::aggregate_function_factory::AggregateFunctionDescription; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::AggregateUnaryFunction; use super::FunctionData; use super::UnaryState; @@ -82,12 +82,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, &self.hll) + borsh_serialize_state(writer, &self.hll) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - let hll = deserialize_state(reader)?; + let hll = borsh_deserialize_state(reader)?; Ok(Self { hll }) } } diff --git a/src/query/functions/src/aggregates/aggregate_arg_min_max.rs b/src/query/functions/src/aggregates/aggregate_arg_min_max.rs index 01348bc36ff5d..e671b1a74a216 100644 --- a/src/query/functions/src/aggregates/aggregate_arg_min_max.rs +++ b/src/query/functions/src/aggregates/aggregate_arg_min_max.rs @@ -17,6 +17,8 @@ use std::fmt; use std::marker::PhantomData; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -26,9 +28,6 @@ use databend_common_expression::with_number_mapped_type; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::Scalar; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; use super::aggregate_function_factory::AggregateFunctionDescription; use super::aggregate_scalar_state::ChangeIf; @@ -38,8 +37,8 @@ use super::aggregate_scalar_state::CmpMin; use super::aggregate_scalar_state::TYPE_ANY; use super::aggregate_scalar_state::TYPE_MAX; use super::aggregate_scalar_state::TYPE_MIN; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::AggregateFunctionRef; use super::StateAddr; use crate::aggregates::assert_binary_arguments; @@ -51,7 +50,7 @@ use crate::with_simple_no_number_mapped_type; // A: ValueType for arg. // V: ValueType for val. pub trait AggregateArgMinMaxState: - Serialize + DeserializeOwned + Send + Sync + 'static + BorshSerialize + BorshDeserialize + Send + Sync + 'static { fn new() -> Self; fn add(&mut self, value: V::ScalarRef<'_>, data: Scalar); @@ -66,18 +65,17 @@ pub trait AggregateArgMinMaxState: fn merge_result(&mut self, column: &mut ColumnBuilder) -> Result<()>; } -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] struct ArgMinMaxState where V: ValueType, - V::Scalar: Serialize + DeserializeOwned, + V::Scalar: BorshSerialize + BorshDeserialize, { - #[serde(bound(deserialize = "V::Scalar: DeserializeOwned"))] pub value: Option, pub data: Scalar, - #[serde(skip)] + #[borsh(skip)] _a: PhantomData, - #[serde(skip)] + #[borsh(skip)] _c: PhantomData, } @@ -85,7 +83,7 @@ impl AggregateArgMinMaxState for ArgMinMaxState where A: ValueType + Send + Sync, V: ValueType, - V::Scalar: Send + Sync + Serialize + DeserializeOwned, + V::Scalar: Send + Sync + BorshSerialize + BorshDeserialize, C: ChangeIf + Default, { fn new() -> Self { @@ -283,12 +281,12 @@ where fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let rhs: State = deserialize_state(reader)?; + let rhs: State = borsh_deserialize_state(reader)?; state.merge(&rhs) } diff --git a/src/query/functions/src/aggregates/aggregate_array_agg.rs b/src/query/functions/src/aggregates/aggregate_array_agg.rs index fedee39ba649d..10c29ee104890 100644 --- a/src/query/functions/src/aggregates/aggregate_array_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_array_agg.rs @@ -17,6 +17,8 @@ use std::fmt; use std::marker::PhantomData; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::Result; use databend_common_expression::types::decimal::*; @@ -30,33 +32,29 @@ use databend_common_expression::ColumnBuilder; use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; use ethnum::i256; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; use super::aggregate_function_factory::AggregateFunctionDescription; use super::aggregate_scalar_state::ScalarStateFunc; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::StateAddr; use crate::aggregates::assert_unary_arguments; use crate::aggregates::AggregateFunction; use crate::with_simple_no_number_mapped_type; -#[derive(Serialize, Deserialize, Debug)] +#[derive(BorshSerialize, BorshDeserialize, Debug)] pub struct ArrayAggState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { - #[serde(bound(deserialize = "T::Scalar: DeserializeOwned"))] values: Vec, } impl Default for ArrayAggState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { fn default() -> Self { Self { values: Vec::new() } @@ -66,7 +64,7 @@ where impl ScalarStateFunc for ArrayAggState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned + Send + Sync, + T::Scalar: BorshSerialize + BorshDeserialize + Send + Sync, { fn new() -> Self { Self::default() @@ -126,20 +124,19 @@ where } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(BorshSerialize, BorshDeserialize, Debug)] pub struct NullableArrayAggState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { - #[serde(bound(deserialize = "T::Scalar: DeserializeOwned"))] values: Vec>, } impl Default for NullableArrayAggState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { fn default() -> Self { Self { values: Vec::new() } @@ -149,7 +146,7 @@ where impl ScalarStateFunc for NullableArrayAggState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned + Send + Sync, + T::Scalar: BorshSerialize + BorshDeserialize + Send + Sync, { fn new() -> Self { Self::default() @@ -338,12 +335,12 @@ where fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let rhs: State = deserialize_state(reader)?; + let rhs: State = borsh_deserialize_state(reader)?; state.merge(&rhs) } diff --git a/src/query/functions/src/aggregates/aggregate_array_moving.rs b/src/query/functions/src/aggregates/aggregate_array_moving.rs index 2d4dd89ff0905..23efb0eea025a 100644 --- a/src/query/functions/src/aggregates/aggregate_array_moving.rs +++ b/src/query/functions/src/aggregates/aggregate_array_moving.rs @@ -17,6 +17,8 @@ use std::fmt; use std::marker::PhantomData; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::ErrorCode; @@ -43,39 +45,36 @@ use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; use ethnum::i256; use num_traits::AsPrimitive; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; use super::aggregate_function::AggregateFunction; use super::aggregate_function::AggregateFunctionRef; use super::aggregate_function_factory::AggregateFunctionDescription; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::StateAddr; use crate::aggregates::aggregate_sum::SumState; use crate::aggregates::assert_unary_arguments; use crate::aggregates::assert_variadic_params; use crate::BUILTIN_FUNCTIONS; -#[derive(Default, Debug, Deserialize, Serialize)] +#[derive(Default, Debug, BorshDeserialize, BorshSerialize)] pub struct NumberArrayMovingSumState { values: Vec, - #[serde(skip)] + #[borsh(skip)] _t: PhantomData, } impl SumState for NumberArrayMovingSumState where - T: Number + AsPrimitive + Serialize + DeserializeOwned, + T: Number + AsPrimitive + BorshSerialize + BorshDeserialize, TSum: Number + AsPrimitive + std::ops::AddAssign + std::ops::SubAssign, { fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, &self.values) + borsh_serialize_state(writer, &self.values) } fn deserialize(&mut self, reader: &mut &[u8]) -> Result<()> { - self.values = deserialize_state(reader)?; + self.values = borsh_deserialize_state(reader)?; Ok(()) } @@ -203,7 +202,7 @@ where } } -#[derive(Default, Deserialize, Serialize)] +#[derive(Default, BorshDeserialize, BorshSerialize)] pub struct DecimalArrayMovingSumState { pub values: Vec, } @@ -211,8 +210,8 @@ pub struct DecimalArrayMovingSumState { impl DecimalArrayMovingSumState where T: Decimal + std::ops::AddAssign - + Serialize - + DeserializeOwned + + BorshSerialize + + BorshDeserialize + Copy + Clone + std::fmt::Debug @@ -236,19 +235,19 @@ impl SumState for DecimalArrayMovingSumState where T: Decimal + std::ops::AddAssign + std::ops::SubAssign - + Serialize - + DeserializeOwned + + BorshSerialize + + BorshDeserialize + Copy + Clone + std::fmt::Debug + std::cmp::PartialOrd { fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, &self.values) + borsh_serialize_state(writer, &self.values) } fn deserialize(&mut self, reader: &mut &[u8]) -> Result<()> { - self.values = deserialize_state(reader)?; + self.values = borsh_deserialize_state(reader)?; Ok(()) } @@ -457,12 +456,12 @@ where State: SumState fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let rhs: State = deserialize_state(reader)?; + let rhs: State = borsh_deserialize_state(reader)?; state.merge(&rhs) } @@ -651,12 +650,12 @@ where State: SumState fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let rhs: State = deserialize_state(reader)?; + let rhs: State = borsh_deserialize_state(reader)?; state.merge(&rhs) } diff --git a/src/query/functions/src/aggregates/aggregate_avg.rs b/src/query/functions/src/aggregates/aggregate_avg.rs index 8988c38eda410..5304f9751ed49 100644 --- a/src/query/functions/src/aggregates/aggregate_avg.rs +++ b/src/query/functions/src/aggregates/aggregate_avg.rs @@ -16,6 +16,8 @@ use std::any::Any; use std::marker::PhantomData; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::decimal::*; @@ -24,13 +26,10 @@ use databend_common_expression::utils::arithmetics_type::ResultTypeOfUnary; use databend_common_expression::with_number_mapped_type; use databend_common_expression::Scalar; use num_traits::AsPrimitive; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; use super::aggregate_sum::DecimalSumState; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::AggregateUnaryFunction; use super::FunctionData; use super::UnaryState; @@ -38,13 +37,13 @@ use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; use crate::aggregates::aggregator_common::assert_unary_arguments; use crate::aggregates::AggregateFunctionRef; -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] struct NumberAvgState where TSum: ValueType { pub value: TSum::Scalar, pub count: u64, - #[serde(skip)] + #[borsh(skip)] _t: PhantomData, } @@ -53,7 +52,8 @@ where T: ValueType + Sync + Send, TSum: ValueType, T::Scalar: Number + AsPrimitive, - TSum::Scalar: Number + AsPrimitive + Serialize + DeserializeOwned + std::ops::AddAssign, + TSum::Scalar: + Number + AsPrimitive + BorshSerialize + BorshDeserialize + std::ops::AddAssign, { fn default() -> Self { Self { @@ -69,7 +69,8 @@ where T: ValueType + Sync + Send, TSum: ValueType, T::Scalar: Number + AsPrimitive, - TSum::Scalar: Number + AsPrimitive + Serialize + DeserializeOwned + std::ops::AddAssign, + TSum::Scalar: + Number + AsPrimitive + BorshSerialize + BorshDeserialize + std::ops::AddAssign, { fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> { self.count += 1; @@ -95,12 +96,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - deserialize_state(reader) + borsh_deserialize_state(reader) } } @@ -116,7 +117,7 @@ impl FunctionData for DecimalAvgData { } } -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] struct DecimalAvgState where T: ValueType, @@ -129,7 +130,7 @@ where impl Default for DecimalAvgState where T: ValueType, - T::Scalar: Decimal + std::ops::AddAssign + Serialize + DeserializeOwned, + T::Scalar: Decimal + std::ops::AddAssign + BorshSerialize + BorshDeserialize, { fn default() -> Self { Self { @@ -162,7 +163,7 @@ where impl UnaryState for DecimalAvgState where T: ValueType, - T::Scalar: Decimal + std::ops::AddAssign + Serialize + DeserializeOwned, + T::Scalar: Decimal + std::ops::AddAssign + BorshSerialize + BorshDeserialize, { fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> { self.add_internal(1, other) @@ -203,12 +204,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - deserialize_state(reader) + borsh_deserialize_state(reader) } } diff --git a/src/query/functions/src/aggregates/aggregate_count.rs b/src/query/functions/src/aggregates/aggregate_count.rs index 470320407bf3d..c3b9de9ed2445 100644 --- a/src/query/functions/src/aggregates/aggregate_count.rs +++ b/src/query/functions/src/aggregates/aggregate_count.rs @@ -28,8 +28,8 @@ use databend_common_expression::Scalar; use super::aggregate_function::AggregateFunction; use super::aggregate_function_factory::AggregateFunctionDescription; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::StateAddr; use crate::aggregates::aggregator_common::assert_variadic_arguments; @@ -150,12 +150,12 @@ impl AggregateFunction for AggregateCountFunction { fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, &state.count) + borsh_serialize_state(writer, &state.count) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let other: u64 = deserialize_state(reader)?; + let other: u64 = borsh_deserialize_state(reader)?; state.count += other; Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_covariance.rs b/src/query/functions/src/aggregates/aggregate_covariance.rs index a53480fcedcb7..aaeb0f243506c 100644 --- a/src/query/functions/src/aggregates/aggregate_covariance.rs +++ b/src/query/functions/src/aggregates/aggregate_covariance.rs @@ -17,6 +17,8 @@ use std::fmt; use std::marker::PhantomData; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -31,18 +33,16 @@ use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::Scalar; use num_traits::AsPrimitive; -use serde::Deserialize; -use serde::Serialize; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::StateAddr; use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; use crate::aggregates::aggregator_common::assert_binary_arguments; use crate::aggregates::AggregateFunction; use crate::aggregates::AggregateFunctionRef; -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] pub struct AggregateCovarianceState { pub count: u64, pub co_moments: f64, @@ -228,12 +228,12 @@ where fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let rhs: AggregateCovarianceState = deserialize_state(reader)?; + let rhs: AggregateCovarianceState = borsh_deserialize_state(reader)?; state.merge(&rhs); Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_distinct_state.rs b/src/query/functions/src/aggregates/aggregate_distinct_state.rs index f99345e869574..31f57a2071049 100644 --- a/src/query/functions/src/aggregates/aggregate_distinct_state.rs +++ b/src/query/functions/src/aggregates/aggregate_distinct_state.rs @@ -20,6 +20,8 @@ use std::marker::Send; use std::marker::Sync; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use bumpalo::Bump; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; @@ -40,13 +42,11 @@ use databend_common_hashtable::HashtableLike; use databend_common_hashtable::ShortStringHashSet; use databend_common_hashtable::StackHashSet; use databend_common_io::prelude::*; -use serde::de::DeserializeOwned; -use serde::Serialize; use siphasher::sip128::Hasher128; use siphasher::sip128::SipHasher24; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; pub trait DistinctStateFunc: Sized + Send + Sync { fn new() -> Self; @@ -86,11 +86,11 @@ impl DistinctStateFunc for AggregateDistinctState { } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, &self.set) + borsh_serialize_state(writer, &self.set) } fn deserialize(reader: &mut &[u8]) -> Result { - let set = deserialize_state(reader)?; + let set = borsh_deserialize_state(reader)?; Ok(Self { set }) } @@ -108,7 +108,7 @@ impl DistinctStateFunc for AggregateDistinctState { .map(|col| unsafe { AnyType::index_column_unchecked(col, row).to_owned() }) .collect::>(); let mut buffer = Vec::with_capacity(values.len() * std::mem::size_of::()); - serialize_state(&mut buffer, &values)?; + borsh_serialize_state(&mut buffer, &values)?; self.set.insert(buffer); Ok(()) } @@ -127,7 +127,7 @@ impl DistinctStateFunc for AggregateDistinctState { .collect::>(); let mut buffer = Vec::with_capacity(values.len() * std::mem::size_of::()); - serialize_state(&mut buffer, &values)?; + borsh_serialize_state(&mut buffer, &values)?; self.set.insert(buffer); } } @@ -146,7 +146,7 @@ impl DistinctStateFunc for AggregateDistinctState { for data in self.set.iter() { let mut slice = data.as_slice(); - let scalars: Vec = deserialize_state(&mut slice)?; + let scalars: Vec = borsh_deserialize_state(&mut slice)?; scalars.iter().enumerate().for_each(|(idx, group_value)| { builders[idx].push(group_value.as_ref()); }); @@ -241,7 +241,7 @@ impl DistinctStateFunc for AggregateDistinctStringState { } impl DistinctStateFunc for AggregateDistinctNumberState -where T: Number + Serialize + DeserializeOwned + HashtableKeyable +where T: Number + BorshSerialize + BorshDeserialize + HashtableKeyable { fn new() -> Self { AggregateDistinctNumberState { @@ -252,7 +252,7 @@ where T: Number + Serialize + DeserializeOwned + HashtableKeyable fn serialize(&self, writer: &mut Vec) -> Result<()> { writer.write_uvarint(self.set.len() as u64)?; for e in self.set.iter() { - serialize_state(writer, e.key())? + borsh_serialize_state(writer, e.key())? } Ok(()) } @@ -261,7 +261,7 @@ where T: Number + Serialize + DeserializeOwned + HashtableKeyable let size = reader.read_uvarint()?; let mut set = CommonHashSet::with_capacity(size as usize); for _ in 0..size { - let t: T = deserialize_state(reader)?; + let t: T = borsh_deserialize_state(reader)?; let _ = set.set_insert(t).is_ok(); } Ok(Self { set }) @@ -333,7 +333,7 @@ impl DistinctStateFunc for AggregateUniqStringState { fn serialize(&self, writer: &mut Vec) -> Result<()> { writer.write_uvarint(self.set.len() as u64)?; for value in self.set.iter() { - serialize_state(writer, value.key())? + borsh_serialize_state(writer, value.key())? } Ok(()) } @@ -342,7 +342,7 @@ impl DistinctStateFunc for AggregateUniqStringState { let size = reader.read_uvarint()?; let mut set = StackHashSet::with_capacity(size as usize); for _ in 0..size { - let e = deserialize_state(reader)?; + let e = borsh_deserialize_state(reader)?; let _ = set.set_insert(e).is_ok(); } Ok(Self { set }) diff --git a/src/query/functions/src/aggregates/aggregate_kurtosis.rs b/src/query/functions/src/aggregates/aggregate_kurtosis.rs index 563076f3219e6..f7bddcbbbed48 100644 --- a/src/query/functions/src/aggregates/aggregate_kurtosis.rs +++ b/src/query/functions/src/aggregates/aggregate_kurtosis.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::number::*; @@ -19,11 +21,9 @@ use databend_common_expression::types::*; use databend_common_expression::with_number_mapped_type; use databend_common_expression::Scalar; use num_traits::AsPrimitive; -use serde::Deserialize; -use serde::Serialize; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::AggregateUnaryFunction; use super::FunctionData; use super::UnaryState; @@ -31,7 +31,7 @@ use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; use crate::aggregates::assert_unary_arguments; use crate::aggregates::AggregateFunctionRef; -#[derive(Default, Serialize, Deserialize)] +#[derive(Default, BorshSerialize, BorshDeserialize)] struct KurtosisState { pub n: u64, pub sum: f64, @@ -102,11 +102,11 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result { - deserialize_state::(reader) + borsh_deserialize_state::(reader) } } diff --git a/src/query/functions/src/aggregates/aggregate_min_max_any.rs b/src/query/functions/src/aggregates/aggregate_min_max_any.rs index 6fc5e52d43dfe..cd8f91a8149a5 100644 --- a/src/query/functions/src/aggregates/aggregate_min_max_any.rs +++ b/src/query/functions/src/aggregates/aggregate_min_max_any.rs @@ -15,6 +15,8 @@ use std::marker::PhantomData; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::decimal::*; @@ -23,9 +25,6 @@ use databend_common_expression::types::*; use databend_common_expression::with_number_mapped_type; use databend_common_expression::Scalar; use ethnum::i256; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; use super::aggregate_function_factory::AggregateFunctionDescription; use super::aggregate_scalar_state::need_manual_drop_state; @@ -36,8 +35,8 @@ use super::aggregate_scalar_state::CmpMin; use super::aggregate_scalar_state::TYPE_ANY; use super::aggregate_scalar_state::TYPE_MAX; use super::aggregate_scalar_state::TYPE_MIN; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::AggregateUnaryFunction; use super::FunctionData; use super::UnaryState; @@ -46,22 +45,21 @@ use crate::aggregates::AggregateFunction; use crate::with_compare_mapped_type; use crate::with_simple_no_number_mapped_type; -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] pub struct MinMaxAnyState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { - #[serde(bound(deserialize = "T::Scalar: DeserializeOwned"))] pub value: Option, - #[serde(skip)] + #[borsh(skip)] _c: PhantomData, } impl Default for MinMaxAnyState where T: Send + Sync + ValueType, - T::Scalar: Serialize + DeserializeOwned + Send + Sync, + T::Scalar: BorshSerialize + BorshDeserialize + Send + Sync, C: ChangeIf + Default, { fn default() -> Self { @@ -75,7 +73,7 @@ where impl UnaryState for MinMaxAnyState where T: ValueType + Send + Sync, - T::Scalar: Serialize + DeserializeOwned + Send + Sync, + T::Scalar: BorshSerialize + BorshDeserialize + Send + Sync, C: ChangeIf + Default, { fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> { @@ -114,12 +112,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - deserialize_state(reader) + borsh_deserialize_state(reader) } } diff --git a/src/query/functions/src/aggregates/aggregate_quantile_cont.rs b/src/query/functions/src/aggregates/aggregate_quantile_cont.rs index 26771eac33a25..605e7e1a50958 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_cont.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_cont.rs @@ -15,6 +15,8 @@ use std::any::Any; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::type_check::check_number; @@ -28,11 +30,9 @@ use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; use num_traits::AsPrimitive; use ordered_float::OrderedFloat; -use serde::Deserialize; -use serde::Serialize; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::AggregateUnaryFunction; use super::FunctionData; use super::UnaryState; @@ -54,7 +54,7 @@ impl FunctionData for QuantileData { self } } -#[derive(Default, Serialize, Deserialize)] +#[derive(Default, BorshSerialize, BorshDeserialize)] struct QuantileContState { pub value: Vec>, } @@ -145,12 +145,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - deserialize_state(reader) + borsh_deserialize_state(reader) } } diff --git a/src/query/functions/src/aggregates/aggregate_quantile_disc.rs b/src/query/functions/src/aggregates/aggregate_quantile_disc.rs index 32e7dfee28498..da1712122859c 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_disc.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_disc.rs @@ -14,6 +14,8 @@ use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::array::ArrayColumnBuilder; @@ -23,13 +25,10 @@ use databend_common_expression::types::*; use databend_common_expression::with_number_mapped_type; use databend_common_expression::Scalar; use ethnum::i256; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; -use super::deserialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::get_levels; -use super::serialize_state; use super::AggregateUnaryFunction; use super::FunctionData; use super::QuantileData; @@ -39,20 +38,19 @@ use crate::aggregates::assert_unary_arguments; use crate::aggregates::AggregateFunctionRef; use crate::with_simple_no_number_mapped_type; -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] struct QuantileState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { - #[serde(bound(deserialize = "T::Scalar: DeserializeOwned"))] pub value: Vec, } impl Default for QuantileState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { fn default() -> Self { Self { value: vec![] } @@ -62,7 +60,7 @@ where impl UnaryState> for QuantileState where T: ValueType + Sync + Send, - T::Scalar: Serialize + DeserializeOwned + Sync + Send + Ord, + T::Scalar: BorshSerialize + BorshDeserialize + Sync + Send + Ord, { fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> { self.value.push(T::to_owned_scalar(other)); @@ -111,19 +109,19 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - deserialize_state(reader) + borsh_deserialize_state(reader) } } impl UnaryState for QuantileState where T: ArgType + Sync + Send, - T::Scalar: Serialize + DeserializeOwned + Sync + Send + Ord, + T::Scalar: BorshSerialize + BorshDeserialize + Sync + Send + Ord, { fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> { self.value.push(T::to_owned_scalar(other)); @@ -165,12 +163,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - deserialize_state(reader) + borsh_deserialize_state(reader) } } diff --git a/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs b/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs index 5736332e87451..8e460c3079660 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs @@ -19,6 +19,8 @@ use std::fmt::Formatter; use std::marker::PhantomData; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -34,11 +36,9 @@ use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; use itertools::Itertools; use num_traits::AsPrimitive; -use serde::Deserialize; -use serde::Serialize; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; use crate::aggregates::assert_params; use crate::aggregates::assert_unary_arguments; @@ -50,7 +50,7 @@ use crate::BUILTIN_FUNCTIONS; pub(crate) const MEDIAN: u8 = 0; pub(crate) const QUANTILE: u8 = 1; -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] pub(crate) struct QuantileTDigestState { epsilon: u32, max_centroids: usize, @@ -360,12 +360,12 @@ where T: Number + AsPrimitive } fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let mut rhs: QuantileTDigestState = deserialize_state(reader)?; + let mut rhs: QuantileTDigestState = borsh_deserialize_state(reader)?; state.merge(&mut rhs) } diff --git a/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs b/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs index a1dd5fa4b8801..830d698fc00c4 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs @@ -33,8 +33,8 @@ use databend_common_expression::FunctionContext; use databend_common_expression::Scalar; use num_traits::AsPrimitive; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; use crate::aggregates::aggregate_quantile_tdigest::QuantileTDigestState; use crate::aggregates::aggregate_quantile_tdigest::MEDIAN; @@ -144,12 +144,12 @@ where } fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let mut rhs: QuantileTDigestState = deserialize_state(reader)?; + let mut rhs: QuantileTDigestState = borsh_deserialize_state(reader)?; state.merge(&mut rhs) } diff --git a/src/query/functions/src/aggregates/aggregate_retention.rs b/src/query/functions/src/aggregates/aggregate_retention.rs index c1cf9be50c561..37e6e3d2088c2 100644 --- a/src/query/functions/src/aggregates/aggregate_retention.rs +++ b/src/query/functions/src/aggregates/aggregate_retention.rs @@ -16,6 +16,8 @@ use std::alloc::Layout; use std::fmt; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -26,18 +28,16 @@ use databend_common_expression::types::ValueType; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::Scalar; -use serde::Deserialize; -use serde::Serialize; use super::aggregate_function::AggregateFunction; use super::aggregate_function::AggregateFunctionRef; use super::aggregate_function_factory::AggregateFunctionDescription; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::StateAddr; use crate::aggregates::aggregator_common::assert_variadic_arguments; -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] struct AggregateRetentionState { pub events: u32, } @@ -139,12 +139,12 @@ impl AggregateFunction for AggregateRetentionFunction { fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let rhs: AggregateRetentionState = deserialize_state(reader)?; + let rhs: AggregateRetentionState = borsh_deserialize_state(reader)?; state.merge(&rhs); Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_scalar_state.rs b/src/query/functions/src/aggregates/aggregate_scalar_state.rs index 35fadbd8c572b..d391b77868f6d 100644 --- a/src/query/functions/src/aggregates/aggregate_scalar_state.rs +++ b/src/query/functions/src/aggregates/aggregate_scalar_state.rs @@ -15,14 +15,13 @@ use std::cmp::Ordering; use std::marker::PhantomData; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::types::ValueType; use databend_common_expression::ColumnBuilder; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; // These types can downcast their builders successfully. // TODO(@b41sh): Variant => VariantType can't be used because it will use Scalar::String to compare @@ -110,7 +109,7 @@ impl ChangeIf for CmpAny { } pub trait ScalarStateFunc: - Serialize + DeserializeOwned + Send + Sync + 'static + BorshSerialize + BorshDeserialize + Send + Sync + 'static { fn new() -> Self; fn mem_size() -> Option { @@ -122,22 +121,21 @@ pub trait ScalarStateFunc: fn merge_result(&mut self, builder: &mut ColumnBuilder) -> Result<()>; } -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] pub struct ScalarState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { - #[serde(bound(deserialize = "T::Scalar: DeserializeOwned"))] pub value: Option, - #[serde(skip)] + #[borsh(skip)] _c: PhantomData, } impl Default for ScalarState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned, + T::Scalar: BorshSerialize + BorshDeserialize, { fn default() -> Self { Self { @@ -150,7 +148,7 @@ where impl ScalarStateFunc for ScalarState where T: ValueType, - T::Scalar: Serialize + DeserializeOwned + Send + Sync, + T::Scalar: BorshSerialize + BorshDeserialize + Send + Sync, C: ChangeIf + Default, { fn new() -> Self { diff --git a/src/query/functions/src/aggregates/aggregate_skewness.rs b/src/query/functions/src/aggregates/aggregate_skewness.rs index 25febf62e50e5..969b9436fd8cd 100644 --- a/src/query/functions/src/aggregates/aggregate_skewness.rs +++ b/src/query/functions/src/aggregates/aggregate_skewness.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::number::*; @@ -20,18 +22,16 @@ use databend_common_expression::with_number_mapped_type; use databend_common_expression::AggregateFunctionRef; use databend_common_expression::Scalar; use num_traits::AsPrimitive; -use serde::Deserialize; -use serde::Serialize; use super::assert_unary_arguments; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::FunctionData; use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; use crate::aggregates::aggregate_unary::AggregateUnaryFunction; use crate::aggregates::aggregate_unary::UnaryState; -#[derive(Default, Serialize, Deserialize)] +#[derive(Default, BorshSerialize, BorshDeserialize)] pub struct SkewnessStateV2 { pub n: u64, pub sum: f64, @@ -97,11 +97,11 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result { - deserialize_state::(reader) + borsh_deserialize_state::(reader) } } diff --git a/src/query/functions/src/aggregates/aggregate_stddev.rs b/src/query/functions/src/aggregates/aggregate_stddev.rs index 6e0e7b68b1e89..2800e2423cf01 100644 --- a/src/query/functions/src/aggregates/aggregate_stddev.rs +++ b/src/query/functions/src/aggregates/aggregate_stddev.rs @@ -15,6 +15,8 @@ use std::any::Any; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::decimal::Decimal; @@ -34,12 +36,9 @@ use databend_common_expression::types::ValueType; use databend_common_expression::with_number_mapped_type; use databend_common_expression::Scalar; use num_traits::AsPrimitive; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::AggregateUnaryFunction; use super::FunctionData; use super::UnaryState; @@ -52,7 +51,7 @@ const SAMP: u8 = 1; const OVERFLOW_PRECISION: u8 = 18; const VARIANCE_PRECISION: u8 = 4; -#[derive(Default, Serialize, Deserialize)] +#[derive(Default, BorshSerialize, BorshDeserialize)] struct NumberAggregateStddevState { pub sum: f64, pub count: u64, @@ -109,12 +108,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - deserialize_state(reader) + borsh_deserialize_state(reader) } } @@ -128,7 +127,7 @@ impl FunctionData for DecimalFuncData { } } -#[derive(Deserialize, Serialize)] +#[derive(BorshDeserialize, BorshSerialize)] pub struct DecimalNumberAggregateStddevState where T: ValueType, @@ -143,7 +142,7 @@ impl Default for DecimalNumberAggregateStddevState where T: ValueType, - T::Scalar: Decimal + std::ops::AddAssign + Serialize + DeserializeOwned, + T::Scalar: Decimal + std::ops::AddAssign + BorshSerialize + BorshDeserialize, { fn default() -> Self { Self { @@ -158,7 +157,7 @@ impl UnaryState for DecimalNumberAggregateStddevState where T: ValueType, - T::Scalar: Decimal + std::ops::AddAssign + Serialize + DeserializeOwned, + T::Scalar: Decimal + std::ops::AddAssign + BorshSerialize + BorshDeserialize, { fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> { let value = T::to_owned_scalar(other); @@ -342,12 +341,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, self) + borsh_serialize_state(writer, self) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - deserialize_state(reader) + borsh_deserialize_state(reader) } } diff --git a/src/query/functions/src/aggregates/aggregate_string_agg.rs b/src/query/functions/src/aggregates/aggregate_string_agg.rs index 4a912a7c690a2..29f024770a8b7 100644 --- a/src/query/functions/src/aggregates/aggregate_string_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_string_agg.rs @@ -16,6 +16,8 @@ use std::alloc::Layout; use std::fmt; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -25,17 +27,15 @@ use databend_common_expression::types::ValueType; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::Scalar; -use serde::Deserialize; -use serde::Serialize; use super::aggregate_function_factory::AggregateFunctionDescription; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::StateAddr; use crate::aggregates::assert_variadic_arguments; use crate::aggregates::AggregateFunction; -#[derive(Serialize, Deserialize, Debug)] +#[derive(BorshSerialize, BorshDeserialize, Debug)] pub struct StringAggState { values: Vec, } @@ -122,13 +122,13 @@ impl AggregateFunction for AggregateStringAggFunction { fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::(); - serialize_state(writer, state)?; + borsh_serialize_state(writer, state)?; Ok(()) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); - let rhs: StringAggState = deserialize_state(reader)?; + let rhs: StringAggState = borsh_deserialize_state(reader)?; state.values.extend_from_slice(rhs.values.as_slice()); Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_sum.rs b/src/query/functions/src/aggregates/aggregate_sum.rs index a829f9e9bd8e0..200edf98f857c 100644 --- a/src/query/functions/src/aggregates/aggregate_sum.rs +++ b/src/query/functions/src/aggregates/aggregate_sum.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -26,19 +28,16 @@ use databend_common_expression::ColumnBuilder; use databend_common_expression::Scalar; use databend_common_expression::StateAddr; use num_traits::AsPrimitive; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; use super::assert_unary_arguments; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::FunctionData; use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; use crate::aggregates::aggregate_unary::UnaryState; use crate::aggregates::AggregateUnaryFunction; -pub trait SumState: Serialize + DeserializeOwned + Send + Sync + Default + 'static { +pub trait SumState: BorshSerialize + BorshDeserialize + Send + Sync + Default + 'static { fn merge(&mut self, other: &Self) -> Result<()>; fn mem_size() -> Option { None @@ -65,31 +64,31 @@ pub trait SumState: Serialize + DeserializeOwned + Send + Sync + Default + 'stat ) -> Result<()>; } -#[derive(Deserialize, Serialize)] -pub struct NumberSumState -where R: ValueType +#[derive(BorshSerialize, BorshDeserialize)] +pub struct NumberSumState +where N: ValueType { - pub value: R::Scalar, + pub value: N::Scalar, } -impl Default for NumberSumState +impl Default for NumberSumState where - R: ValueType, - R::Scalar: Number + AsPrimitive + Serialize + DeserializeOwned + std::ops::AddAssign, + N: ValueType, + N::Scalar: Number + AsPrimitive + BorshSerialize + BorshDeserialize + std::ops::AddAssign, { fn default() -> Self { - NumberSumState:: { - value: R::Scalar::default(), + NumberSumState:: { + value: N::Scalar::default(), } } } -impl UnaryState for NumberSumState +impl UnaryState for NumberSumState where T: ValueType + Sync + Send, - R: ValueType, - T::Scalar: Number + AsPrimitive, - R::Scalar: Number + AsPrimitive + Serialize + DeserializeOwned + std::ops::AddAssign, + N: ValueType, + T::Scalar: Number + AsPrimitive, + N::Scalar: Number + AsPrimitive + BorshSerialize + BorshDeserialize + std::ops::AddAssign, { fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> { let other = T::to_owned_scalar(other).as_(); @@ -104,24 +103,24 @@ where fn merge_result( &mut self, - builder: &mut R::ColumnBuilder, + builder: &mut N::ColumnBuilder, _function_data: Option<&dyn FunctionData>, ) -> Result<()> { - R::push_item(builder, R::to_scalar_ref(&self.value)); + N::push_item(builder, N::to_scalar_ref(&self.value)); Ok(()) } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, &self.value) + borsh_serialize_state(writer, &self.value) } fn deserialize(reader: &mut &[u8]) -> Result { - let value = deserialize_state(reader)?; + let value = borsh_deserialize_state(reader)?; Ok(Self { value }) } } -#[derive(Deserialize, Serialize)] +#[derive(BorshDeserialize, BorshSerialize)] pub struct DecimalSumState where T: ValueType, @@ -133,7 +132,7 @@ where impl Default for DecimalSumState where T: ValueType, - T::Scalar: Decimal + std::ops::AddAssign + Serialize + DeserializeOwned, + T::Scalar: Decimal + std::ops::AddAssign + BorshSerialize + BorshDeserialize, { fn default() -> Self { Self { @@ -145,7 +144,7 @@ where impl UnaryState for DecimalSumState where T: ValueType, - T::Scalar: Decimal + std::ops::AddAssign + Serialize + DeserializeOwned, + T::Scalar: Decimal + std::ops::AddAssign + BorshSerialize + BorshDeserialize, { fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> { self.value += T::to_owned_scalar(other); @@ -174,12 +173,12 @@ where } fn serialize(&self, writer: &mut Vec) -> Result<()> { - serialize_state(writer, &self.value) + borsh_serialize_state(writer, &self.value) } fn deserialize(reader: &mut &[u8]) -> Result where Self: Sized { - let value = deserialize_state(reader)?; + let value = borsh_deserialize_state(reader)?; Ok(Self { value }) } } diff --git a/src/query/functions/src/aggregates/aggregate_window_funnel.rs b/src/query/functions/src/aggregates/aggregate_window_funnel.rs index 7da08bdd6ecff..9493a8dc5bd4a 100644 --- a/src/query/functions/src/aggregates/aggregate_window_funnel.rs +++ b/src/query/functions/src/aggregates/aggregate_window_funnel.rs @@ -19,6 +19,8 @@ use std::marker::PhantomData; use std::ops::Sub; use std::sync::Arc; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -40,12 +42,9 @@ use databend_common_expression::Expr; use databend_common_expression::FunctionContext; use databend_common_expression::Scalar; use num_traits::AsPrimitive; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; -use super::deserialize_state; -use super::serialize_state; +use super::borsh_deserialize_state; +use super::borsh_serialize_state; use super::AggregateFunctionRef; use super::AggregateNullVariadicAdaptor; use super::StateAddr; @@ -55,9 +54,8 @@ use crate::aggregates::assert_variadic_arguments; use crate::aggregates::AggregateFunction; use crate::BUILTIN_FUNCTIONS; -#[derive(Serialize, Deserialize)] +#[derive(BorshSerialize, BorshDeserialize)] struct AggregateWindowFunnelState { - #[serde(bound(deserialize = "T: DeserializeOwned"))] pub events_list: Vec<(T, u8)>, pub sorted: bool, } @@ -66,8 +64,8 @@ impl AggregateWindowFunnelState where T: Ord + Sub + AsPrimitive - + Serialize - + DeserializeOwned + + BorshSerialize + + BorshDeserialize + Clone + Send + Sync @@ -167,8 +165,8 @@ where + Sub + AsPrimitive + Clone - + Serialize - + DeserializeOwned + + BorshSerialize + + BorshDeserialize + 'static, { fn name(&self) -> &str { @@ -278,12 +276,12 @@ where fn serialize(&self, place: StateAddr, writer: &mut Vec) -> Result<()> { let state = place.get::>(); - serialize_state(writer, state) + borsh_serialize_state(writer, state) } fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> { let state = place.get::>(); - let mut rhs: AggregateWindowFunnelState = deserialize_state(reader)?; + let mut rhs: AggregateWindowFunnelState = borsh_deserialize_state(reader)?; state.merge(&mut rhs); Ok(()) } @@ -338,8 +336,8 @@ where + Sub + AsPrimitive + Clone - + Serialize - + DeserializeOwned + + BorshSerialize + + BorshDeserialize + 'static, { pub fn try_create( diff --git a/src/query/functions/src/aggregates/aggregator_common.rs b/src/query/functions/src/aggregates/aggregator_common.rs index dab3d78377556..d9e440b82f74c 100644 --- a/src/query/functions/src/aggregates/aggregator_common.rs +++ b/src/query/functions/src/aggregates/aggregator_common.rs @@ -14,6 +14,8 @@ use std::fmt::Display; +use borsh::BorshDeserialize; +use borsh::BorshSerialize; use bumpalo::Bump; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -21,8 +23,8 @@ use databend_common_expression::types::DataType; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::Scalar; -use databend_common_io::prelude::bincode_deserialize_from_stream; -use databend_common_io::prelude::bincode_serialize_into_buf; +use databend_common_io::prelude::borsh_deserialize_from_stream; +use databend_common_io::prelude::borsh_serialize_into_buf; use super::AggregateFunctionFactory; use super::AggregateFunctionRef; @@ -154,14 +156,14 @@ pub fn eval_aggr( } #[inline] -pub fn serialize_state( +pub fn borsh_serialize_state( writer: &mut W, value: &T, ) -> Result<()> { - bincode_serialize_into_buf(writer, value) + borsh_serialize_into_buf(writer, value) } #[inline] -pub fn deserialize_state(slice: &mut &[u8]) -> Result { - bincode_deserialize_from_stream(slice) +pub fn borsh_deserialize_state(slice: &mut &[u8]) -> Result { + borsh_deserialize_from_stream(slice) } diff --git a/tests/sqllogictests/suites/query/02_function/02_0000_function_aggregate_state.test b/tests/sqllogictests/suites/query/02_function/02_0000_function_aggregate_state.test index 09d0baadf7793..c991241e875b8 100644 --- a/tests/sqllogictests/suites/query/02_function/02_0000_function_aggregate_state.test +++ b/tests/sqllogictests/suites/query/02_function/02_0000_function_aggregate_state.test @@ -1,9 +1,9 @@ query T select length(max_state(number)), typeof(max_state(number)) from numbers(100); ---- -3 VARCHAR +10 VARCHAR query I select length(sum_state(number)), typeof(max_state(number)) from numbers(10000); ---- -6 VARCHAR +9 VARCHAR