Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: revert the pre-serialization and parallel buffer #717

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ fastrace = { workspace = true }
fastrace-jaeger = { workspace = true, optional = true }
fastrace-opentelemetry = { workspace = true, optional = true }
foyer = { version = "*", path = "../foyer" }
opentelemetry = { version = "0.24", optional = true }
opentelemetry-otlp = { version = "0.17", optional = true }
opentelemetry-semantic-conventions = { version = "0.16", optional = true }
opentelemetry_sdk = { version = "0.24", features = [
opentelemetry = { version = "0.25", optional = true }
opentelemetry-otlp = { version = "0.25", optional = true }
opentelemetry-semantic-conventions = { version = "0.25", optional = true }
opentelemetry_sdk = { version = "0.25", features = [
"rt-tokio",
"trace",
], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion foyer-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde = { workspace = true }
serde_bytes = "0.11.15"
tokio = { workspace = true }
tracing = "0.1"
tracing-opentelemetry = { version = "0.25", optional = true }
tracing-opentelemetry = { version = "0.26", optional = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
zipf = "7"

Expand Down
1 change: 1 addition & 0 deletions foyer-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ bytes = "1"
clap = { workspace = true }
either = "1"
fastrace = { workspace = true }
flume = "0.11"
foyer-common = { version = "0.9.3", path = "../foyer-common" }
foyer-memory = { version = "0.7.3", path = "../foyer-memory" }
fs4 = "0.9.1"
Expand Down
21 changes: 12 additions & 9 deletions foyer-storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
use crate::{
error::Result,
large::generic::{GenericLargeStorage, GenericLargeStorageConfig},
serde::KvInfo,
small::generic::{GenericSmallStorage, GenericSmallStorageConfig},
storage::{
either::{Either, EitherConfig, Selection, Selector},
noop::Noop,
},
DeviceStats, IoBytes, Storage,
DeviceStats, Storage,
};

pub struct SizeSelector<K, V, S>
Expand Down Expand Up @@ -84,8 +83,12 @@
type Value = V;
type BuildHasher = S;

fn select(&self, _entry: &CacheEntry<Self::Key, Self::Value, Self::BuildHasher>, buffer: &IoBytes) -> Selection {
if buffer.len() < self.threshold {
fn select(
&self,
_entry: &CacheEntry<Self::Key, Self::Value, Self::BuildHasher>,
estimated_size: usize,
) -> Selection {
if estimated_size < self.threshold {

Check warning on line 91 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L86-L91

Added lines #L86 - L91 were not covered by tests
Selection::Left
} else {
Selection::Right
Expand Down Expand Up @@ -239,12 +242,12 @@
}
}

fn enqueue(&self, entry: CacheEntry<Self::Key, Self::Value, Self::BuildHasher>, buffer: IoBytes, info: KvInfo) {
fn enqueue(&self, entry: CacheEntry<Self::Key, Self::Value, Self::BuildHasher>, estimated_size: usize) {
match self {
Engine::Noop(storage) => storage.enqueue(entry, buffer, info),
Engine::Large(storage) => storage.enqueue(entry, buffer, info),
Engine::Small(storage) => storage.enqueue(entry, buffer, info),
Engine::Combined(storage) => storage.enqueue(entry, buffer, info),
Engine::Noop(storage) => storage.enqueue(entry, estimated_size),

Check warning on line 247 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L247

Added line #L247 was not covered by tests
Engine::Large(storage) => storage.enqueue(entry, estimated_size),
Engine::Small(storage) => storage.enqueue(entry, estimated_size),
Engine::Combined(storage) => storage.enqueue(entry, estimated_size),

Check warning on line 250 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L249-L250

Added lines #L249 - L250 were not covered by tests
}
}

Expand Down
154 changes: 77 additions & 77 deletions foyer-storage/src/large/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
fmt::Debug,
mem::ManuallyDrop,
ops::{Deref, DerefMut, Range},
time::Instant,
};
use std::{fmt::Debug, ops::Range, sync::Arc, time::Instant};

use foyer_common::{
bits,
code::{HashBuilder, StorageKey, StorageValue},
metrics::Metrics,
range::RangeBoundsExt,
strict_assert_eq,
wait_group::{WaitGroup, WaitGroupFuture, WaitGroupGuard},
};
use foyer_memory::CacheEntry;
use itertools::Itertools;
Expand All @@ -39,38 +34,12 @@
use crate::{
device::{bytes::IoBytes, MonitoredDevice, RegionId},
io_buffer_pool::IoBufferPool,
large::indexer::HashedEntryAddress,
large::{indexer::HashedEntryAddress, serde::EntryHeader},
region::{GetCleanRegionHandle, RegionManager},
Dev, DevExt, IoBuffer,
serde::{Checksummer, EntrySerializer},
Compression, Dev, DevExt, IoBuffer,
};

pub struct Allocation {
_guard: WaitGroupGuard,
slice: ManuallyDrop<Box<[u8]>>,
}

impl Deref for Allocation {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.slice.as_ref()
}
}

impl DerefMut for Allocation {
fn deref_mut(&mut self) -> &mut Self::Target {
self.slice.as_mut()
}
}

impl Allocation {
unsafe fn new(buffer: &mut [u8], guard: WaitGroupGuard) -> Self {
let fake = Vec::from_raw_parts(buffer.as_mut_ptr(), buffer.len(), buffer.len());
let slice = ManuallyDrop::new(fake.into_boxed_slice());
Self { _guard: guard, slice }
}
}

pub struct BatchMut<K, V, S>
where
K: StorageKey,
Expand All @@ -83,14 +52,14 @@
tombstones: Vec<TombstoneInfo>,
waiters: Vec<oneshot::Sender<()>>,
init: Option<Instant>,
wait: WaitGroup,

/// Cache write buffer between rotation to reduce page fault.
buffer_pool: IoBufferPool,

region_manager: RegionManager,
device: MonitoredDevice,
indexer: Indexer,
metrics: Arc<Metrics>,
}

impl<K, V, S> Debug for BatchMut<K, V, S>
Expand All @@ -116,7 +85,13 @@
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn new(capacity: usize, region_manager: RegionManager, device: MonitoredDevice, indexer: Indexer) -> Self {
pub fn new(
capacity: usize,
region_manager: RegionManager,
device: MonitoredDevice,
indexer: Indexer,
metrics: Arc<Metrics>,
) -> Self {
let capacity = bits::align_up(device.align(), capacity);
let mut batch = Self {
buffer: IoBuffer::new(capacity),
Expand All @@ -125,62 +100,100 @@
tombstones: vec![],
waiters: vec![],
init: None,
wait: WaitGroup::default(),
buffer_pool: IoBufferPool::new(capacity, 1),
region_manager,
device,
indexer,
metrics,
};
batch.append_group();
batch
}

pub fn entry(&mut self, size: usize, entry: CacheEntry<K, V, S>, sequence: Sequence) -> Option<Allocation> {
pub fn entry(&mut self, entry: CacheEntry<K, V, S>, compression: &Compression, sequence: Sequence) -> bool {
tracing::trace!("[batch]: append entry with sequence: {sequence}");

let aligned = bits::align_up(self.device.align(), size);
self.may_init();

if entry.is_outdated() || self.len + aligned > self.buffer.len() {
return None;
if entry.is_outdated() {
return false;
}

let allocation = self.allocate(aligned);
let pos = self.len;

let info = match EntrySerializer::serialize(
entry.key(),
entry.value(),
compression,
&mut self.buffer[pos + EntryHeader::serialized_len()..],
&self.metrics,
) {
Ok(info) => info,
Err(e) => {
tracing::warn!("[batch]: serialize entry error: {e}");
return false;

Check warning on line 134 in foyer-storage/src/large/batch.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/batch.rs#L132-L134

Added lines #L132 - L134 were not covered by tests
}
};

let header = EntryHeader {
key_len: info.key_len as _,
value_len: info.value_len as _,
hash: entry.hash(),
sequence,
checksum: Checksummer::checksum(
&self.buffer[pos + EntryHeader::serialized_len()
..pos + EntryHeader::serialized_len() + info.key_len + info.value_len],
),
compression: *compression,
};
header.write(&mut self.buffer[pos..pos + EntryHeader::serialized_len()]);

let aligned = bits::align_up(self.device.align(), header.entry_len());
self.advance(aligned);

let group = self.groups.last_mut().unwrap();
group.indices.push(HashedEntryAddress {
hash: entry.hash(),
address: EntryAddress {
region: RegionId::MAX,
offset: group.region.offset as u32 + group.region.len as u32,
len: size as _,
len: header.entry_len() as _,
sequence,
},
});
group.entries.push(entry);
group.region.len += aligned;
group.range.end += aligned;

Some(allocation)
true
}

pub fn tombstone(&mut self, tombstone: Tombstone, stats: Option<InvalidStats>) {
tracing::trace!("[batch]: append tombstone");

self.may_init();

self.tombstones.push(TombstoneInfo { tombstone, stats });
}

pub fn reinsertion(&mut self, reinsertion: &Reinsertion) -> Option<Allocation> {
pub fn reinsertion(&mut self, reinsertion: &Reinsertion) -> bool {
tracing::trace!("[batch]: submit reinsertion");

self.may_init();

let aligned = bits::align_up(self.device.align(), reinsertion.buffer.len());

// Skip if the entry is no longer in the indexer.
// Skip if the batch buffer size exceeds the threshold.
if self.indexer.get(reinsertion.hash).is_none() || self.len + aligned > self.buffer.len() {
return None;
return false;
}

let allocation = self.allocate(aligned);
let pos = self.len;

self.buffer[pos..pos + reinsertion.buffer.len()].copy_from_slice(&reinsertion.buffer);

self.advance(aligned);

let group = self.groups.last_mut().unwrap();
// Reserve buffer space for entry.
Expand All @@ -196,22 +209,17 @@
group.region.len += aligned;
group.range.end += aligned;

Some(allocation)
true
}

/// Register a waiter to be notified after the batch is finished.
pub fn wait(&mut self) -> oneshot::Receiver<()> {
pub fn wait(&mut self, tx: oneshot::Sender<()>) {
tracing::trace!("[batch]: register waiter");
self.may_init();
let (tx, rx) = oneshot::channel();
self.waiters.push(tx);
rx
}

// Note: Make sure `rotate` is called after all buffer from the last batch are dropped.
//
// Otherwise, the page fault caused by the buffer pool will hurt the performance.
pub fn rotate(&mut self) -> Option<(Batch<K, V, S>, WaitGroupFuture)> {
pub fn rotate(&mut self) -> Option<Batch<K, V, S>> {
if self.is_empty() {
return None;
}
Expand All @@ -222,8 +230,6 @@
let buffer = IoBytes::from(buffer);
self.buffer_pool.release(buffer.clone());

let wait = std::mem::take(&mut self.wait);

let init = self.init.take();

let tombstones = std::mem::take(&mut self.tombstones);
Expand Down Expand Up @@ -269,20 +275,16 @@
None => self.append_group(),
}

Some((
Batch {
groups,
tombstones,
waiters,
init,
},
wait.wait(),
))
Some(Batch {
groups,
tombstones,
waiters,
init,
})
}

fn allocate(&mut self, len: usize) -> Allocation {
fn advance(&mut self, len: usize) {
assert!(bits::is_aligned(self.device.align(), len));
self.may_init();
assert!(bits::is_aligned(self.device.align(), self.len));

// Rotate group if the current one is full.
Expand All @@ -292,24 +294,22 @@
self.append_group();
}

// Reserve buffer space for entry.
let start = self.len;
let end = start + len;
self.len = end;

unsafe { Allocation::new(&mut self.buffer[start..end], self.wait.acquire()) }
self.len += len;
}

fn is_empty(&self) -> bool {
#[inline]
pub fn is_empty(&self) -> bool {
self.tombstones.is_empty() && self.groups.iter().all(|group| group.range.is_empty()) && self.waiters.is_empty()
}

#[inline]
fn may_init(&mut self) {
if self.init.is_none() {
self.init = Some(Instant::now());
}
}

#[inline]
fn append_group(&mut self) {
self.groups.push(GroupMut {
region: RegionHandle {
Expand Down
Loading
Loading