Skip to content

Commit

Permalink
[storage][pruner] Enable incremantal pruning (#2004)
Browse files Browse the repository at this point in the history
  • Loading branch information
zcchahaha authored Jul 22, 2022
1 parent b5ad20f commit f587731
Show file tree
Hide file tree
Showing 14 changed files with 590 additions and 212 deletions.
24 changes: 17 additions & 7 deletions config/src/config/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ pub struct StorageConfig {
pub const NO_OP_STORAGE_PRUNER_CONFIG: StoragePrunerConfig = StoragePrunerConfig {
state_store_prune_window: None,
ledger_prune_window: None,
pruning_batch_size: 10_000,
ledger_pruning_batch_size: 10_000,
state_store_pruning_batch_size: 10_000,
};

#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
Expand All @@ -84,21 +85,27 @@ pub struct StoragePrunerConfig {
/// being big in size, we might want to configure a smaller window for state store vs other
/// store.
pub ledger_prune_window: Option<u64>,
/// Batch size of the versions to be sent to the pruner - this is to avoid slowdown due to
/// issuing too many DB calls and batch prune instead.
pub pruning_batch_size: usize,
/// Batch size of the versions to be sent to the ledger pruner - this is to avoid slowdown due to
/// issuing too many DB calls and batch prune instead. For ledger pruner, this means the number
/// of versions to prune a time.
pub ledger_pruning_batch_size: usize,
/// Similar to the variable above but for state store pruner. It means the number of stale
/// nodes to prune a time.
pub state_store_pruning_batch_size: usize,
}

impl StoragePrunerConfig {
pub fn new(
state_store_prune_window: Option<u64>,
ledger_store_prune_window: Option<u64>,
pruning_batch_size: usize,
ledger_pruning_batch_size: usize,
state_store_pruning_batch_size: usize,
) -> Self {
StoragePrunerConfig {
state_store_prune_window,
ledger_prune_window: ledger_store_prune_window,
pruning_batch_size,
ledger_pruning_batch_size,
state_store_pruning_batch_size,
}
}
}
Expand All @@ -119,7 +126,10 @@ impl Default for StorageConfig {
storage_pruner_config: StoragePrunerConfig {
state_store_prune_window: Some(1_000_000),
ledger_prune_window: Some(10_000_000),
pruning_batch_size: 500,
ledger_pruning_batch_size: 500,
// A 10k transaction block (touching 60k state values, in the case of the account
// creation benchmark) on a 4B items DB (or 1.33B accounts) yields 300k JMT nodes
state_store_pruning_batch_size: 1_000,
},
data_dir: PathBuf::from("/opt/aptos/data"),
// Default read/write/connection timeout, in milliseconds
Expand Down
8 changes: 6 additions & 2 deletions execution/executor-benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ struct PrunerOpt {
ledger_prune_window: i64,

#[structopt(long, default_value = "500")]
pruning_batch_size: usize,
ledger_pruning_batch_size: usize,

#[structopt(long, default_value = "500")]
state_store_pruning_batch_size: usize,
}

impl PrunerOpt {
Expand All @@ -35,7 +38,8 @@ impl PrunerOpt {
} else {
Some(self.ledger_prune_window as u64)
},
pruning_batch_size: self.pruning_batch_size,
ledger_pruning_batch_size: self.ledger_pruning_batch_size,
state_store_pruning_batch_size: self.state_store_pruning_batch_size,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion storage/aptosdb/src/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ fn test_error_if_version_is_pruned() {
StoragePrunerConfig {
state_store_prune_window: Some(0),
ledger_prune_window: Some(0),
pruning_batch_size: 1,
ledger_pruning_batch_size: 1,
state_store_pruning_batch_size: 1,
},
);
pruner.testonly_update_min_version(&[Some(5), Some(10)]);
Expand Down
15 changes: 13 additions & 2 deletions storage/aptosdb/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,19 @@ pub static PRUNER_LEAST_READABLE_VERSION: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});

pub static PRUNER_BATCH_SIZE: Lazy<IntGauge> =
Lazy::new(|| register_int_gauge!("pruner_batch_size", "Aptos pruner batch size").unwrap());
/// Pruner batch size. For ledger pruner, this means the number of versions to be pruned at a time.
/// For state store pruner, this means the number of stale nodes to be pruned at a time.
pub static PRUNER_BATCH_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
// metric name
"pruner_batch_size",
// metric description
"Aptos pruner batch size",
// metric labels (dimensions)
&["pruner_name",]
)
.unwrap()
});

pub static API_LATENCY_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/pruner/db_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub trait DBPruner {

/// Performs the actual pruning, a target version is passed, which is the target the pruner
/// tries to prune.
fn prune(&self, max_versions: u64) -> anyhow::Result<Version>;
fn prune(&self, batch_size: usize) -> anyhow::Result<Version>;

/// Initializes the least readable version stored in underlying DB storage
fn initialize_min_readable_version(&self) -> anyhow::Result<Version>;
Expand Down
6 changes: 4 additions & 2 deletions storage/aptosdb/src/pruner/event_store/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ fn verify_event_store_pruner(events: Vec<Vec<ContractEvent>>) {
StoragePrunerConfig {
state_store_prune_window: Some(0),
ledger_prune_window: Some(0),
pruning_batch_size: 1,
ledger_pruning_batch_size: 1,
state_store_pruning_batch_size: 100,
},
);

Expand Down Expand Up @@ -108,7 +109,8 @@ fn verify_event_store_pruner_disabled(events: Vec<Vec<ContractEvent>>) {
StoragePrunerConfig {
state_store_prune_window: Some(0),
ledger_prune_window: None,
pruning_batch_size: 1,
ledger_pruning_batch_size: 1,
state_store_pruning_batch_size: 100,
},
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ impl DBPruner for LedgerPruner {
LEDGER_PRUNER_NAME
}

fn prune(&self, max_versions: u64) -> anyhow::Result<Version> {
fn prune(&self, max_versions: usize) -> anyhow::Result<Version> {
if !self.is_pruning_pending() {
return Ok(self.min_readable_version());
}
let mut db_batch = SchemaBatch::new();
let min_readable_version = self.min_readable_version();
// Current target version might be less than the target version to ensure we don't prune
// more than max_version in one go.
let current_target_version = self.get_currrent_batch_target(max_versions);
let current_target_version = self.get_currrent_batch_target(max_versions as Version);

self.transaction_store_pruner.prune(
&mut db_batch,
Expand Down
14 changes: 12 additions & 2 deletions storage/aptosdb/src/pruner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ impl Pruner {
.with_label_values(&["ledger_pruner"])
.set((storage_pruner_config.ledger_prune_window.unwrap_or(0)) as i64);

PRUNER_BATCH_SIZE.set(storage_pruner_config.pruning_batch_size as i64);
PRUNER_BATCH_SIZE
.with_label_values(&["ledger_pruner"])
.set(storage_pruner_config.ledger_pruning_batch_size as i64);

PRUNER_BATCH_SIZE
.with_label_values(&["state_store_pruner"])
.set(storage_pruner_config.state_store_pruning_batch_size as i64);

let worker = Worker::new(
ledger_rocksdb,
Expand All @@ -113,7 +119,7 @@ impl Pruner {
command_sender: Mutex::new(command_sender),
min_readable_versions: worker_progress_clone,
last_version_sent_to_pruners: Arc::new(Mutex::new(0)),
pruning_batch_size: storage_pruner_config.pruning_batch_size,
pruning_batch_size: storage_pruner_config.ledger_pruning_batch_size,
latest_version: Arc::new(Mutex::new(0)),
}
}
Expand All @@ -139,6 +145,10 @@ impl Pruner {
/// Sends pruning command to the worker thread when necessary.
pub fn maybe_wake_pruner(&self, latest_version: Version) {
*self.latest_version.lock() = latest_version;
// TODO(zcc): Right now we will still wake up the state store pruner once per pruning batch
// size even though state store pruner not necessarily prune one single version of nodes
// each time. We should make ledger pruner and state store separate and wake up them
// separately.
if latest_version
>= *self.last_version_sent_to_pruners.as_ref().lock() + self.pruning_batch_size as u64
{
Expand Down
Loading

0 comments on commit f587731

Please sign in to comment.