From 0efcc47ca1f3990c34370fed57c829c96c0cedb3 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 19 Aug 2024 14:00:08 +0000 Subject: [PATCH] feat: support disabled, unified, and separated runtime Signed-off-by: MrCroxx --- examples/hybrid_full.rs | 14 +- foyer-bench/src/main.rs | 69 +++++-- foyer-storage/src/engine.rs | 9 - foyer-storage/src/large/generic.rs | 37 ++-- foyer-storage/src/prelude.rs | 2 +- foyer-storage/src/small/generic.rs | 4 - foyer-storage/src/storage/either.rs | 12 -- foyer-storage/src/storage/mod.rs | 6 - foyer-storage/src/storage/noop.rs | 16 +- foyer-storage/src/store.rs | 267 ++++++++++++++++++---------- foyer/src/hybrid/builder.rs | 2 +- foyer/src/hybrid/cache.rs | 3 +- foyer/src/prelude.rs | 3 +- 13 files changed, 272 insertions(+), 172 deletions(-) diff --git a/examples/hybrid_full.rs b/examples/hybrid_full.rs index 573e56f7..14ca0ff1 100644 --- a/examples/hybrid_full.rs +++ b/examples/hybrid_full.rs @@ -18,7 +18,7 @@ use anyhow::Result; use chrono::Datelike; use foyer::{ DirectFsDeviceOptionsBuilder, FifoPicker, HybridCache, HybridCacheBuilder, LruConfig, RateLimitPicker, RecoverMode, - RuntimeConfig, TombstoneLogConfigBuilder, + RuntimeConfig, TokioRuntimeConfig, TombstoneLogConfigBuilder, }; use tempfile::tempdir; @@ -59,9 +59,15 @@ async fn main() -> Result<()> { .with_flush(true) .build(), ) - .with_runtime_config(RuntimeConfig { - worker_threads: 4, - max_blocking_threads: 8, + .with_runtime_config(RuntimeConfig::Separated { + read_runtime_config: TokioRuntimeConfig { + worker_threads: 4, + max_blocking_threads: 8, + }, + write_runtime_config: TokioRuntimeConfig { + worker_threads: 4, + max_blocking_threads: 8, + }, }) .build() .await?; diff --git a/foyer-bench/src/main.rs b/foyer-bench/src/main.rs index fb1bab9d..a64310df 100644 --- a/foyer-bench/src/main.rs +++ b/foyer-bench/src/main.rs @@ -20,7 +20,7 @@ use bytesize::MIB; use foyer::{ Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, FifoConfig, FifoPicker, HybridCache, HybridCacheBuilder, InvalidRatioPicker, LfuConfig, LruConfig, RateLimitPicker, RecoverMode, RuntimeConfig, - S3FifoConfig, TracingConfig, + S3FifoConfig, TokioRuntimeConfig, TracingConfig, }; use metrics_exporter_prometheus::PrometheusBuilder; @@ -37,7 +37,7 @@ use std::{ }; use analyze::{analyze, monitor, Metrics}; -use clap::{ArgGroup, Parser}; +use clap::{builder::PossibleValuesParser, ArgGroup, Parser}; use futures::future::join_all; use itertools::Itertools; @@ -156,16 +156,49 @@ pub struct Args { #[arg(long, default_value_t = false)] metrics: bool, + /// Benchmark user runtime worker threads. #[arg(long, default_value_t = 0)] - benchmark_runtime_worker_threads: usize, + user_runtime_worker_threads: usize, - /// dedicated runtime worker threads + /// Dedicated runtime type. + #[arg(long, value_parser = PossibleValuesParser::new(["disabled", "unified", "separated"]), default_value = "disabled")] + runtime: String, + + /// Dedicated runtime worker threads. + /// + /// Only valid when using unified dedicated runtime. #[arg(long, default_value_t = 0)] runtime_worker_threads: usize, - /// max threads for blocking io + /// Max threads for blocking io. + /// + /// Only valid when using unified dedicated runtime. + #[arg(long, default_value_t = 0)] + runtime_max_blocking_threads: usize, + + /// Dedicated runtime for writes worker threads. + /// + /// Only valid when using separated dedicated runtime. + #[arg(long, default_value_t = 0)] + write_runtime_worker_threads: usize, + + /// Dedicated runtime for writes Max threads for blocking io. + /// + /// Only valid when using separated dedicated runtime. + #[arg(long, default_value_t = 0)] + write_runtime_max_blocking_threads: usize, + + /// Dedicated runtime for reads worker threads. + /// + /// Only valid when using separated dedicated runtime. + #[arg(long, default_value_t = 0)] + read_runtime_worker_threads: usize, + + /// Dedicated runtime for writes max threads for blocking io. + /// + /// Only valid when using separated dedicated runtime. #[arg(long, default_value_t = 0)] - max_blocking_threads: usize, + read_runtime_max_blocking_threads: usize, /// compression algorithm #[arg(long, value_enum, default_value_t = Compression::None)] @@ -379,8 +412,8 @@ fn main() { println!("{:#?}", args); let mut builder = tokio::runtime::Builder::new_multi_thread(); - if args.benchmark_runtime_worker_threads != 0 { - builder.worker_threads(args.benchmark_runtime_worker_threads); + if args.user_runtime_worker_threads != 0 { + builder.worker_threads(args.user_runtime_worker_threads); } builder.thread_name("foyer-bench"); let runtime = builder.enable_all().build().unwrap(); @@ -484,9 +517,23 @@ async fn benchmark(args: Args) { Box::::default(), ]) .with_compression(args.compression) - .with_runtime_config(RuntimeConfig { - worker_threads: args.runtime_worker_threads, - max_blocking_threads: args.max_blocking_threads, + .with_runtime_config(match args.runtime.as_str() { + "disabled" => RuntimeConfig::Disabled, + "unified" => RuntimeConfig::Unified(TokioRuntimeConfig { + worker_threads: args.runtime_worker_threads, + max_blocking_threads: args.runtime_max_blocking_threads, + }), + "separated" => RuntimeConfig::Separated { + read_runtime_config: TokioRuntimeConfig { + worker_threads: args.read_runtime_worker_threads, + max_blocking_threads: args.read_runtime_max_blocking_threads, + }, + write_runtime_config: TokioRuntimeConfig { + worker_threads: args.write_runtime_worker_threads, + max_blocking_threads: args.write_runtime_max_blocking_threads, + }, + }, + _ => unreachable!(), }); if args.admission_rate_limit > 0 { diff --git a/foyer-storage/src/engine.rs b/foyer-storage/src/engine.rs index edea7f38..0f5a1481 100644 --- a/foyer-storage/src/engine.rs +++ b/foyer-storage/src/engine.rs @@ -304,13 +304,4 @@ where Engine::Combined(storage) => StoreFuture::Combined(storage.wait()), } } - - fn runtime(&self) -> &tokio::runtime::Handle { - match self { - Engine::Noop(storage) => storage.runtime(), - Engine::Large(storage) => storage.runtime(), - Engine::Small(storage) => storage.runtime(), - Engine::Combined(storage) => storage.runtime(), - } - } } diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index 9ce361bc..ef620a58 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -83,6 +83,9 @@ where pub reinsertion_picker: Arc>, pub tombstone_log_config: Option, pub statistics: Arc, + pub read_runtime_handle: Handle, + pub write_runtime_handle: Handle, + pub user_runtime_handle: Handle, pub marker: PhantomData<(V, S)>, } @@ -109,6 +112,8 @@ where .field("reinsertion_pickers", &self.reinsertion_picker) .field("tombstone_log_config", &self.tombstone_log_config) .field("statistics", &self.statistics) + .field("read_runtime_handle", &self.read_runtime_handle) + .field("write_runtime_handle", &self.write_runtime_handle) .finish() } } @@ -152,7 +157,9 @@ where sequence: AtomicSequence, - runtime: Handle, + _read_runtime_handle: Handle, + write_runtime_handle: Handle, + _user_runtime_handle: Handle, active: AtomicBool, @@ -179,8 +186,6 @@ where S: HashBuilder + Debug, { async fn open(mut config: GenericLargeStorageConfig) -> Result { - let runtime = Handle::current(); - let stats = config.statistics.clone(); let device = config.device.clone(); @@ -223,7 +228,7 @@ where &indexer, ®ion_manager, &tombstones, - runtime.clone(), + config.user_runtime_handle.clone(), ) .await?; @@ -236,7 +241,7 @@ where tombstone_log.clone(), stats.clone(), metrics.clone(), - runtime.clone(), + config.write_runtime_handle.clone(), ) .await })) @@ -251,7 +256,7 @@ where flushers.clone(), stats.clone(), config.flush, - runtime.clone(), + config.write_runtime_handle.clone(), ) .await })) @@ -267,7 +272,9 @@ where statistics: stats, flush: config.flush, sequence, - runtime, + _read_runtime_handle: config.read_runtime_handle, + write_runtime_handle: config.write_runtime_handle, + _user_runtime_handle: config.user_runtime_handle, active: AtomicBool::new(true), metrics, }), @@ -375,7 +382,7 @@ where }); let this = self.clone(); - self.inner.runtime.spawn(async move { + self.inner.write_runtime_handle.spawn(async move { let sequence = this.inner.sequence.fetch_add(1, Ordering::Relaxed); this.inner.flushers[sequence as usize % this.inner.flushers.len()].submit(Submission::Tombstone { tombstone: Tombstone { hash, sequence }, @@ -424,10 +431,6 @@ where Ok(()) } - - fn runtime(&self) -> &Handle { - &self.inner.runtime - } } impl Storage for GenericLargeStorage @@ -476,10 +479,6 @@ where fn wait(&self) -> impl Future + Send + 'static { self.wait() } - - fn runtime(&self) -> &Handle { - self.runtime() - } } #[cfg(test)] @@ -554,6 +553,9 @@ mod tests { tombstone_log_config: None, buffer_threshold: 16 * 1024 * 1024, statistics: Arc::::default(), + read_runtime_handle: Handle::current(), + write_runtime_handle: Handle::current(), + user_runtime_handle: Handle::current(), marker: PhantomData, }; GenericLargeStorage::open(config).await.unwrap() @@ -582,6 +584,9 @@ mod tests { tombstone_log_config: Some(TombstoneLogConfigBuilder::new(path).with_flush(true).build()), buffer_threshold: 16 * 1024 * 1024, statistics: Arc::::default(), + read_runtime_handle: Handle::current(), + write_runtime_handle: Handle::current(), + user_runtime_handle: Handle::current(), marker: PhantomData, }; GenericLargeStorage::open(config).await.unwrap() diff --git a/foyer-storage/src/prelude.rs b/foyer-storage/src/prelude.rs index 8bed91a7..2744dee1 100644 --- a/foyer-storage/src/prelude.rs +++ b/foyer-storage/src/prelude.rs @@ -32,5 +32,5 @@ pub use crate::{ }, statistics::Statistics, storage::{either::Order, Storage}, - store::{CombinedConfig, DeviceConfig, RuntimeConfig, Store, StoreBuilder}, + store::{CombinedConfig, DeviceConfig, RuntimeConfig, RuntimeHandles, Store, StoreBuilder, TokioRuntimeConfig}, }; diff --git a/foyer-storage/src/small/generic.rs b/foyer-storage/src/small/generic.rs index 47c5814b..c3251ce2 100644 --- a/foyer-storage/src/small/generic.rs +++ b/foyer-storage/src/small/generic.rs @@ -121,8 +121,4 @@ where fn wait(&self) -> impl Future + Send + 'static { async { todo!() } } - - fn runtime(&self) -> &tokio::runtime::Handle { - todo!() - } } diff --git a/foyer-storage/src/storage/either.rs b/foyer-storage/src/storage/either.rs index 44853050..7ffa1440 100644 --- a/foyer-storage/src/storage/either.rs +++ b/foyer-storage/src/storage/either.rs @@ -18,7 +18,6 @@ use futures::{ future::{join, ready, select, try_join, Either as EitherFuture}, pin_mut, Future, FutureExt, }; -use tokio::runtime::Handle; use crate::{error::Result, serde::KvInfo, storage::Storage, DeviceStats, IoBytes}; @@ -290,15 +289,4 @@ where fn wait(&self) -> impl Future + Send + 'static { join(self.left.wait(), self.right.wait()).map(|_| ()) } - - fn runtime(&self) -> &Handle { - if cfg!(debug_assertions) { - let hleft = self.left.runtime(); - let hright = self.right.runtime(); - assert_eq!(hleft.id(), hright.id()); - hleft - } else { - self.left.runtime() - } - } } diff --git a/foyer-storage/src/storage/mod.rs b/foyer-storage/src/storage/mod.rs index 9f341a13..bbaba9a4 100644 --- a/foyer-storage/src/storage/mod.rs +++ b/foyer-storage/src/storage/mod.rs @@ -19,7 +19,6 @@ use std::{fmt::Debug, future::Future, sync::Arc}; use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; use foyer_memory::CacheEntry; -use tokio::runtime::Handle; use crate::{device::monitor::DeviceStats, error::Result, serde::KvInfo, IoBytes}; @@ -74,9 +73,4 @@ pub trait Storage: Send + Sync + 'static + Clone + Debug { /// Wait for the ongoing flush and reclaim tasks to finish. #[must_use] fn wait(&self) -> impl Future + Send + 'static; - - /// Get disk cache runtime handle. - /// - /// The runtime is determined during the opening phase. - fn runtime(&self) -> &Handle; } diff --git a/foyer-storage/src/storage/noop.rs b/foyer-storage/src/storage/noop.rs index ed8ddab0..9d44d665 100644 --- a/foyer-storage/src/storage/noop.rs +++ b/foyer-storage/src/storage/noop.rs @@ -19,7 +19,6 @@ use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; use foyer_memory::CacheEntry; use futures::future::ready; -use tokio::runtime::Handle; use crate::device::monitor::DeviceStats; use crate::serde::KvInfo; @@ -34,7 +33,6 @@ where V: StorageValue, S: HashBuilder + Debug, { - runtime: Handle, _marker: PhantomData<(K, V, S)>, } @@ -56,10 +54,7 @@ where S: HashBuilder + Debug, { fn clone(&self) -> Self { - Self { - runtime: self.runtime.clone(), - _marker: PhantomData, - } + Self { _marker: PhantomData } } } @@ -75,10 +70,7 @@ where type Config = (); async fn open(_: Self::Config) -> Result { - Ok(Self { - runtime: Handle::current(), - _marker: PhantomData, - }) + Ok(Self { _marker: PhantomData }) } async fn close(&self) -> Result<()> { @@ -108,10 +100,6 @@ where fn wait(&self) -> impl Future + Send + 'static { ready(()) } - - fn runtime(&self) -> &Handle { - &self.runtime - } } #[cfg(test)] diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index 4d8dab5d..7eb714d0 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -20,7 +20,7 @@ use crate::{ DeviceOptions, RegionId, }, engine::{Engine, EngineConfig, SizeSelector}, - error::Result, + error::{Error, Result}, large::{generic::GenericLargeStorageConfig, recover::RecoverMode, tombstone::TombstoneLogConfig}, picker::{ utils::{AdmitAllPicker, FifoPicker, InvalidRatioPicker, RejectAllPicker}, @@ -60,7 +60,12 @@ where compression: Compression, - runtime: Arc, + read_runtime: Option>, + write_runtime: Option>, + + read_runtime_handle: Handle, + write_runtime_handle: Handle, + user_runtime_handle: Handle, statistics: Arc, metrics: Arc, @@ -78,7 +83,11 @@ where .field("engine", &self.engine) .field("admission_picker", &self.admission_picker) .field("compression", &self.compression) - .field("runtime", &self.runtime) + .field("read_runtime", &self.read_runtime) + .field("write_runtime", &self.write_runtime) + .field("read_runtime_handle", &self.read_runtime_handle) + .field("write_runtime_handle", &self.write_runtime_handle) + .field("user_runtime_handle", &self.user_runtime_handle) .finish() } } @@ -95,7 +104,11 @@ where engine: self.engine.clone(), admission_picker: self.admission_picker.clone(), compression: self.compression, - runtime: self.runtime.clone(), + read_runtime: self.read_runtime.clone(), + write_runtime: self.write_runtime.clone(), + read_runtime_handle: self.read_runtime_handle.clone(), + write_runtime_handle: self.write_runtime_handle.clone(), + user_runtime_handle: self.user_runtime_handle.clone(), statistics: self.statistics.clone(), metrics: self.metrics.clone(), } @@ -127,7 +140,7 @@ where let compression = self.compression; let this = self.clone(); - self.runtime().spawn(async move { + self.write_runtime_handle.spawn(async move { if force || this.pick(entry.key()) { let mut buffer = IoBytesMut::new(); match EntrySerializer::serialize(entry.key(), entry.value(), &compression, &mut buffer) { @@ -154,7 +167,7 @@ where { let hash = self.memory.hash(key); let future = self.engine.load(hash); - match self.runtime.spawn(future).await.unwrap() { + match self.read_runtime_handle.spawn(future).await.unwrap() { Ok(Some((k, v))) if k.borrow() == key => Ok(Some((k, v))), Ok(_) => Ok(None), Err(e) => Err(e), @@ -193,11 +206,13 @@ where self.engine.stats() } - /// Get disk cache runtime handle. - /// - /// The runtime is determined during the opening phase. - pub fn runtime(&self) -> &Handle { - self.engine.runtime() + /// Get the runtime handles. + pub fn runtimes(&self) -> RuntimeHandles<'_> { + RuntimeHandles { + read_runtime_handle: &self.read_runtime_handle, + write_runtime_handle: &self.write_runtime_handle, + user_runtime_handle: &self.user_runtime_handle, + } } } @@ -272,8 +287,8 @@ impl CombinedConfig { } } -/// Configuration for the dedicated runtime. -pub struct RuntimeConfig { +/// Tokio runtime configuration. +pub struct TokioRuntimeConfig { /// Dedicated runtime worker threads. /// /// If the value is set to `0`, the dedicated will use the default worker threads of tokio. @@ -290,6 +305,31 @@ pub struct RuntimeConfig { pub max_blocking_threads: usize, } +/// Configuration for the dedicated runtime. +pub enum RuntimeConfig { + /// Disable dedicated runtime. The runtime which foyer is built on will be used. + Disabled, + /// Use unified dedicated runtime for both reads and writes. + Unified(TokioRuntimeConfig), + /// Use separated dedicated runtime for reads or writes. + Separated { + /// Dedicated runtime for reads. + read_runtime_config: TokioRuntimeConfig, + /// Dedicated runtime for both foreground and background writes + write_runtime_config: TokioRuntimeConfig, + }, +} + +/// Runtime handles. +pub struct RuntimeHandles<'a> { + /// Runtime handle for reads. + pub read_runtime_handle: &'a Handle, + /// Runtime handle for writes. + pub write_runtime_handle: &'a Handle, + /// User runtime handle. + pub user_runtime_handle: &'a Handle, +} + /// The builder of the disk cache. pub struct StoreBuilder where @@ -345,10 +385,7 @@ where compression: Compression::None, tombstone_log_config: None, combined_config: CombinedConfig::default(), - runtime_config: RuntimeConfig { - worker_threads: 0, - max_blocking_threads: 0, - }, + runtime_config: RuntimeConfig::Disabled, } } @@ -522,87 +559,83 @@ where let statistics = Arc::::default(); let compression = self.compression; - let runtime = { + let build_runtime = |config: &TokioRuntimeConfig, suffix: &str| { let mut builder = tokio::runtime::Builder::new_multi_thread(); - if self.runtime_config.worker_threads != 0 { - builder.worker_threads(self.runtime_config.worker_threads); + if config.worker_threads != 0 { + builder.worker_threads(config.worker_threads); } - if self.runtime_config.max_blocking_threads != 0 { - builder.max_blocking_threads(self.runtime_config.max_blocking_threads); + if config.max_blocking_threads != 0 { + builder.max_blocking_threads(config.max_blocking_threads); } - builder.thread_name(&self.name); + builder.thread_name(format!("{}-{}", &self.name, suffix)); let runtime = builder.enable_all().build().map_err(anyhow::Error::from)?; let runtime = BackgroundShutdownRuntime::from(runtime); - Arc::new(runtime) + Ok::<_, Error>(Arc::new(runtime)) + }; + + let (read_runtime, write_runtime, read_runtime_handle, write_runtime_handle) = match self.runtime_config { + RuntimeConfig::Disabled => { + tracing::warn!("[store]: Dedicated runtime is disabled"); + (None, None, Handle::current(), Handle::current()) + } + RuntimeConfig::Unified(runtime_config) => { + let runtime = build_runtime(&runtime_config, "unified")?; + ( + Some(runtime.clone()), + Some(runtime.clone()), + runtime.handle().clone(), + runtime.handle().clone(), + ) + } + RuntimeConfig::Separated { + read_runtime_config, + write_runtime_config, + } => { + let read_runtime = build_runtime(&read_runtime_config, "read")?; + let write_runtime = build_runtime(&write_runtime_config, "write")?; + let read_runtime_handle = read_runtime.handle().clone(); + let write_runtime_handle = write_runtime.handle().clone(); + ( + Some(read_runtime), + Some(write_runtime), + read_runtime_handle, + write_runtime_handle, + ) + } }; + let user_runtime_handle = Handle::current(); let engine = { let statistics = statistics.clone(); let metrics = metrics.clone(); - runtime.spawn(async move { - match self.device_config { - DeviceConfig::None => { - tracing::warn!( - "[store builder]: No device config set. Use `NoneStore` which always returns `None` for queries." - ); - Engine::open(EngineConfig::Noop).await - } - DeviceConfig::DeviceOptions(options) => { - let device = match Monitored::open(MonitoredOptions { - options, - metrics: metrics.clone(), - }) - .await { - Ok(device) => device, - Err(e) =>return Err(e), - }; - match self.combined_config { - CombinedConfig::Large => { - let regions = 0..device.regions() as RegionId; - Engine::open(EngineConfig::Large(GenericLargeStorageConfig { - name: self.name, - device, - regions, - compression: self.compression, - flush: self.flush, - indexer_shards: self.indexer_shards, - recover_mode: self.recover_mode, - recover_concurrency: self.recover_concurrency, - flushers: self.flushers, - reclaimers: self.reclaimers, - clean_region_threshold, - eviction_pickers: self.eviction_pickers, - reinsertion_picker: self.reinsertion_picker, - tombstone_log_config: self.tombstone_log_config, - buffer_threshold: self.buffer_threshold, - statistics: statistics.clone(), - marker: PhantomData, - })) - .await - } - CombinedConfig::Small => { - Engine::open(EngineConfig::Small(GenericSmallStorageConfig { - placeholder: PhantomData, - })) - .await - } - CombinedConfig::Combined { - large_object_cache_ratio, - large_object_threshold, - load_order, - } => { - let large_region_count = (device.regions() as f64 * large_object_cache_ratio) as usize; - let large_regions = - (device.regions() - large_region_count) as RegionId..device.regions() as RegionId; - Engine::open(EngineConfig::Combined(EitherConfig { - selector: SizeSelector::new(large_object_threshold), - left: GenericSmallStorageConfig { - placeholder: PhantomData, - }, - right: GenericLargeStorageConfig { + let write_runtime_handle = write_runtime_handle.clone(); + let read_runtime_handle = read_runtime_handle.clone(); + let user_runtime_handle = user_runtime_handle.clone(); + // Use the user runtiem to open engine. + tokio::spawn(async move { + match self.device_config { + DeviceConfig::None => { + tracing::warn!( + "[store builder]: No device config set. Use `NoneStore` which always returns `None` for queries." + ); + Engine::open(EngineConfig::Noop).await + } + DeviceConfig::DeviceOptions(options) => { + let device = match Monitored::open(MonitoredOptions { + options, + metrics: metrics.clone(), + }) + .await { + Ok(device) => device, + Err(e) =>return Err(e), + }; + match self.combined_config { + CombinedConfig::Large => { + let regions = 0..device.regions() as RegionId; + Engine::open(EngineConfig::Large(GenericLargeStorageConfig { name: self.name, device, - regions: large_regions, + regions, compression: self.compression, flush: self.flush, indexer_shards: self.indexer_shards, @@ -616,16 +649,62 @@ where tombstone_log_config: self.tombstone_log_config, buffer_threshold: self.buffer_threshold, statistics: statistics.clone(), + write_runtime_handle, + read_runtime_handle, + user_runtime_handle, marker: PhantomData, - }, + })) + .await + } + CombinedConfig::Small => { + Engine::open(EngineConfig::Small(GenericSmallStorageConfig { + placeholder: PhantomData, + })) + .await + } + CombinedConfig::Combined { + large_object_cache_ratio, + large_object_threshold, load_order, - })) - .await + } => { + let large_region_count = (device.regions() as f64 * large_object_cache_ratio) as usize; + let large_regions = + (device.regions() - large_region_count) as RegionId..device.regions() as RegionId; + Engine::open(EngineConfig::Combined(EitherConfig { + selector: SizeSelector::new(large_object_threshold), + left: GenericSmallStorageConfig { + placeholder: PhantomData, + }, + right: GenericLargeStorageConfig { + name: self.name, + device, + regions: large_regions, + compression: self.compression, + flush: self.flush, + indexer_shards: self.indexer_shards, + recover_mode: self.recover_mode, + recover_concurrency: self.recover_concurrency, + flushers: self.flushers, + reclaimers: self.reclaimers, + clean_region_threshold, + eviction_pickers: self.eviction_pickers, + reinsertion_picker: self.reinsertion_picker, + tombstone_log_config: self.tombstone_log_config, + buffer_threshold: self.buffer_threshold, + statistics: statistics.clone(), + write_runtime_handle, + read_runtime_handle, + user_runtime_handle, + marker: PhantomData, + }, + load_order, + })) + .await + } } } } - } - }).await.unwrap()? + }).await.unwrap()? }; Ok(Store { @@ -633,7 +712,11 @@ where engine, admission_picker, compression, - runtime, + read_runtime, + write_runtime, + read_runtime_handle, + write_runtime_handle, + user_runtime_handle, statistics, metrics, }) diff --git a/foyer/src/hybrid/builder.rs b/foyer/src/hybrid/builder.rs index d08bdebe..53c2dceb 100644 --- a/foyer/src/hybrid/builder.rs +++ b/foyer/src/hybrid/builder.rs @@ -412,7 +412,7 @@ where } } - /// Enable the dedicated runtime for the disk cache store. + /// Configure the dedicated runtime for the disk cache store. pub fn with_runtime_config(self, runtime_config: RuntimeConfig) -> Self { let builder = self.builder.with_runtime_config(runtime_config); Self { diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index df3a0fb5..ffc497f7 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -511,7 +511,8 @@ where .await } }, - self.storage().runtime(), + // TODO(MrCroxx): check regression + self.storage().runtimes().user_runtime_handle, ); if inner.state() == FetchState::Hit { diff --git a/foyer/src/prelude.rs b/foyer/src/prelude.rs index 3c1e3933..d0f20768 100644 --- a/foyer/src/prelude.rs +++ b/foyer/src/prelude.rs @@ -30,7 +30,8 @@ pub use storage::{ AdmissionPicker, AdmitAllPicker, Compression, Dev, DevExt, DevOptions, DeviceStats, DirectFileDevice, DirectFileDeviceOptions, DirectFileDeviceOptionsBuilder, DirectFsDevice, DirectFsDeviceOptions, DirectFsDeviceOptionsBuilder, EvictionPicker, FifoPicker, InvalidRatioPicker, RateLimitPicker, RecoverMode, - ReinsertionPicker, RejectAllPicker, RuntimeConfig, Storage, Store, StoreBuilder, TombstoneLogConfigBuilder, + ReinsertionPicker, RejectAllPicker, RuntimeConfig, RuntimeHandles, Storage, Store, StoreBuilder, + TokioRuntimeConfig, TombstoneLogConfigBuilder, }; pub use crate::hybrid::{