Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

perf(metrics): Add metrics of state #397

Merged
merged 7 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions common/apm/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ make_auto_flush_static_metric! {
signed_tx,
wal,
hash_height,
state,
}

pub struct StoragePutCfTimeUsageVec: LocalCounter {
Expand Down Expand Up @@ -72,6 +73,20 @@ lazy_static! {
auto_flush_from!(STORAGE_GET_CF_COUNTER_VEC, StorageGetCfTotalVec);
}

pub fn on_storage_get_state(duration: Duration, keys: i64) {
let seconds = duration_to_sec(duration);

STORAGE_GET_CF_TIME_USAGE.state.inc_by(seconds);
STORAGE_GET_CF_COUNTER.state.inc_by(keys);
}

pub fn on_storage_put_state(duration: Duration, size: i64) {
let seconds = duration_to_sec(duration);

STORAGE_PUT_CF_TIME_USAGE.state.inc_by(seconds);
STORAGE_PUT_CF_BYTES_COUNTER.state.inc_by(size);
}

pub fn on_storage_get_cf(sc: StorageCategory, duration: Duration, keys: i64) {
let seconds = duration_to_sec(duration);

Expand Down
22 changes: 21 additions & 1 deletion framework/src/binding/state/trie_db.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use derive_more::{Display, From};
use rocksdb::{Options, WriteBatch, DB};

use common_apm::metrics::storage::{on_storage_get_state, on_storage_put_state};
use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult};

pub struct RocksTrieDB {
Expand Down Expand Up @@ -32,33 +34,51 @@ impl cita_trie::DB for RocksTrieDB {
type Error = RocksTrieDBError;

fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self.db.get(key).map_err(to_store_err)?.map(|v| v.to_vec()))
let inst = Instant::now();

let res = self.db.get(key).map_err(to_store_err)?.map(|v| v.to_vec());

on_storage_get_state(inst.elapsed(), 1i64);
Ok(res)
}

fn contains(&self, key: &[u8]) -> Result<bool, Self::Error> {
Ok(self.db.get(key).map_err(to_store_err)?.is_some())
}

fn insert(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), Self::Error> {
let inst = Instant::now();
let size = key.len() + value.len();

self.db
.put(Bytes::from(key), Bytes::from(value))
.map_err(to_store_err)?;

on_storage_put_state(inst.elapsed(), size as i64);
Ok(())
}

fn insert_batch(&self, keys: Vec<Vec<u8>>, values: Vec<Vec<u8>>) -> Result<(), Self::Error> {
let inst = Instant::now();

if keys.len() != values.len() {
return Err(RocksTrieDBError::BatchLengthMismatch);
}

let mut total_size = 0;
let mut batch = WriteBatch::default();
for i in 0..keys.len() {
let key = &keys[i];
let value = &values[i];

total_size += key.len();
total_size += value.len();
batch.put(key, value);
}

self.db.write(batch).map_err(to_store_err)?;

on_storage_put_state(inst.elapsed(), total_size as i64);
Ok(())
}

Expand Down