Skip to content

Commit

Permalink
refactor: parallel write buffer writing for large object disk cache (#…
Browse files Browse the repository at this point in the history
…619)

* refactor: parallel write buffer writing for large object disk cache

Signed-off-by: MrCroxx <[email protected]>

* chore: fix rust docs

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jul 20, 2024
1 parent f927f34 commit 3f98d0d
Show file tree
Hide file tree
Showing 13 changed files with 738 additions and 472 deletions.
12 changes: 4 additions & 8 deletions foyer-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ pub struct Metrics {
pub storage_delete_duration: Histogram,

pub storage_queue_rotate: Counter,
pub storage_queue_leader: Counter,
pub storage_queue_follower: Counter,
pub storage_queue_rotate_duration: Histogram,
pub storage_queue_drop: Counter,

pub storage_disk_write: Counter,
pub storage_disk_read: Counter,
Expand Down Expand Up @@ -124,12 +123,10 @@ impl Metrics {

let storage_queue_rotate =
counter!(format!("foyer_storage_inner_op_total"), "name" => name.to_string(), "op" => "queue_rotate");
let storage_queue_leader =
counter!(format!("foyer_storage_inner_op_total"), "name" => name.to_string(), "op" => "queue_leader");
let storage_queue_follower =
counter!(format!("foyer_storage_inner_op_total"), "name" => name.to_string(), "op" => "queue_follower");
let storage_queue_rotate_duration =
histogram!(format!("foyer_storage_inner_op_duration"), "name" => name.to_string(), "op" => "queue_rotate");
let storage_queue_drop =
counter!(format!("foyer_storage_inner_op_total"), "name" => name.to_string(), "op" => "queue_drop");

let storage_disk_write =
counter!(format!("foyer_storage_disk_io_total"), "name" => name.to_string(), "op" => "write");
Expand Down Expand Up @@ -199,9 +196,8 @@ impl Metrics {
storage_miss_duration,
storage_delete_duration,
storage_queue_rotate,
storage_queue_leader,
storage_queue_follower,
storage_queue_rotate_duration,
storage_queue_drop,
storage_disk_write,
storage_disk_read,
storage_disk_flush,
Expand Down
6 changes: 6 additions & 0 deletions foyer-common/src/wait_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ mod tests {

use super::*;

#[tokio::test]
async fn test_wait_group_empty() {
let wg = WaitGroup::default();
wg.wait().await;
}

#[tokio::test]
async fn test_wait_group_basic() {
let v = Arc::new(AtomicUsize::new(0));
Expand Down
86 changes: 83 additions & 3 deletions foyer-storage/src/device/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,71 @@ use std::{
};

use super::{allocator::AlignedAllocator, ALIGN, IO_BUFFER_ALLOCATOR};
use allocator_api2::vec::Vec as VecA;
use allocator_api2::{boxed::Box as BoxA, vec::Vec as VecA};
use bytes::{buf::UninitSlice, Buf, BufMut};
use foyer_common::bits;

/// A capacity-fixed 4K-aligned u8 buffer.
pub struct IoBuffer {
inner: BoxA<[u8], &'static AlignedAllocator<ALIGN>>,
}

impl Debug for IoBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&*self.inner, f)
}
}

impl IoBuffer {
/// Constructs a new 4K-aligned [`IoBuffer`] with at least the specified capacity.
///
/// The buffer is filled with random data.
pub fn new(size: usize) -> Self {
let size = bits::align_up(ALIGN, size);
let mut v = VecA::with_capacity_in(size, &IO_BUFFER_ALLOCATOR);
let aligned = bits::align_down(ALIGN, v.capacity());
unsafe { v.set_len(aligned) };
let inner = v.into_boxed_slice();
Self { inner }
}
}

impl Deref for IoBuffer {
type Target = BoxA<[u8], &'static AlignedAllocator<ALIGN>>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl DerefMut for IoBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl PartialEq for IoBuffer {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}

impl Eq for IoBuffer {}

impl Clone for IoBuffer {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

/// A 4K-aligend u8 vector.
///
/// # Growth
///
/// [`IoBytesMut`] will implicitly grow its buffer as necessary.
/// However, explicitly reserving the required space up-front before a series of inserts will be more efficient.
pub struct IoBytesMut {
inner: VecA<u8, &'static AlignedAllocator<ALIGN>>,
}
Expand Down Expand Up @@ -216,6 +276,15 @@ impl IoBytesMut {
Self { inner }
}

/// Align the length of the vector to 4K.
///
/// The extended part of the vector can be filled with any data without any guarantees.
pub fn align_to(&mut self) {
let aligned = bits::align_up(ALIGN, self.inner.len());
self.inner.reserve_exact(aligned - self.inner.len());
unsafe { self.inner.set_len(aligned) };
}

/// Convert [`IoBytesMut`] to [`IoBytes`].
pub fn freeze(self) -> IoBytes {
self.into()
Expand All @@ -224,7 +293,7 @@ impl IoBytesMut {

/// A 4K-aligned, shared, immutable u8 vector.
pub struct IoBytes {
inner: Arc<VecA<u8, &'static AlignedAllocator<ALIGN>>>,
inner: Arc<BoxA<[u8], &'static AlignedAllocator<ALIGN>>>,
offset: usize,
len: usize,
}
Expand All @@ -244,7 +313,8 @@ impl From<VecA<u8, &'static AlignedAllocator<ALIGN>>> for IoBytes {
value.reserve_exact(aligned - value.len());
unsafe { value.set_len(aligned) };

let inner = Arc::new(value);
let inner = value.into_boxed_slice();
let inner = Arc::new(inner);

Self { inner, offset, len }
}
Expand All @@ -256,6 +326,16 @@ impl From<IoBytesMut> for IoBytes {
}
}

impl From<IoBuffer> for IoBytes {
fn from(value: IoBuffer) -> Self {
assert!(bits::is_aligned(ALIGN, value.len()));
let offset = 0;
let len = value.len();
let inner = Arc::new(value.inner);
Self { inner, offset, len }
}
}

impl Default for IoBytes {
fn default() -> Self {
Self::new()
Expand Down
Loading

0 comments on commit 3f98d0d

Please sign in to comment.