Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 31 additions & 41 deletions src/db/car/forest/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@

#[cfg_vis(feature = "benchmark-private", pub)]
use self::util::NonMaximalU64;
use byteorder::{LittleEndian, ReadBytesExt as _, WriteBytesExt as _};
use byteorder::{LittleEndian, ReadBytesExt as _};
use cfg_vis::cfg_vis;
use cid::Cid;
use itertools::Itertools as _;
use positioned_io::{ReadAt, Size};
use smallvec::{SmallVec, smallvec};
use std::{
cmp,
io::{self, Read, Write},
io::{self, Read},
iter,
num::NonZeroUsize,
pin::pin,
Expand Down Expand Up @@ -395,30 +395,19 @@ impl Writer {
.chain(iter::once(Slot::Empty))
}
pub async fn write_into(self, writer: impl AsyncWrite) -> io::Result<()> {
let mut buf = vec![];
let mut writer = pin!(writer);
let Self {
version,
header,
slots,
} = self;
/// Bridge between our sync [`Writeable`] trait, and async writing code
async fn write_via_buf(
buf: &mut Vec<u8>,
writer: impl AsyncWrite,
data: impl Writeable,
) -> io::Result<()> {
buf.clear();
data.write_to(&mut *buf)?;
pin!(writer).write_all(buf).await
}
write_via_buf(&mut buf, &mut writer, version).await?;
write_via_buf(&mut buf, &mut writer, &header).await?;
version.write_to(&mut writer).await?;
header.write_to(&mut writer).await?;
for slot in Self::slots(
header.initial_buckets.try_into().unwrap(),
slots.iter().copied(),
) {
write_via_buf(&mut buf, &mut writer, slot).await?;
slot.write_to(&mut writer).await?;
}
Ok(())
}
Expand Down Expand Up @@ -519,9 +508,9 @@ impl Readable for Version {
}
}

impl Writeable for Version {
fn write_to(&self, mut writer: impl Write) -> io::Result<()> {
writer.write_u64::<LittleEndian>(*self as u64)
impl Writable for Version {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
writer.write_u64_le(*self as u64).await
}
const LEN: u64 = std::mem::size_of::<u64>() as u64;
}
Expand All @@ -545,9 +534,9 @@ impl Readable for Slot {
}
}

impl Writeable for Slot {
fn write_to(&self, writer: impl Write) -> io::Result<()> {
self.into_raw().write_to(writer)
impl Writable for Slot {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
self.into_raw().write_to(writer).await
}
const LEN: u64 = RawSlot::LEN;
}
Expand All @@ -564,11 +553,11 @@ impl Readable for RawSlot {
}
}

impl Writeable for RawSlot {
fn write_to(&self, mut writer: impl Write) -> io::Result<()> {
impl Writable for RawSlot {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
let Self { hash, frame_offset } = *self;
writer.write_u64::<LittleEndian>(hash)?;
writer.write_u64::<LittleEndian>(frame_offset)?;
writer.write_u64_le(hash).await?;
writer.write_u64_le(frame_offset).await?;
Ok(())
}
const LEN: u64 = std::mem::size_of::<u64>() as u64 * 2;
Expand All @@ -587,16 +576,16 @@ impl Readable for V1Header {
}
}

impl Writeable for V1Header {
fn write_to(&self, mut writer: impl Write) -> io::Result<()> {
impl Writable for V1Header {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
let Self {
longest_distance,
collisions,
initial_buckets,
} = *self;
writer.write_u64::<LittleEndian>(longest_distance)?;
writer.write_u64::<LittleEndian>(collisions)?;
writer.write_u64::<LittleEndian>(initial_buckets)?;
writer.write_u64_le(longest_distance).await?;
writer.write_u64_le(collisions).await?;
writer.write_u64_le(initial_buckets).await?;
Ok(())
}
const LEN: u64 = std::mem::size_of::<u64>() as u64 * 3;
Expand All @@ -608,26 +597,26 @@ trait Readable {
Self: Sized;
}

trait Writeable {
trait Writable {
/// Must only return [`Err(_)`] if the underlying io fails.
fn write_to(&self, writer: impl Write) -> io::Result<()>;
/// The number of bytes that will be written on a call to [`Writeable::write_to`].
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()>;
/// The number of bytes that will be written on a call to [`Writable::write_to`].
///
/// Implementations may panic if this is incorrect.
const LEN: u64;
}

/// Useful for exhaustiveness checking
fn written_len<T: Writeable>(_: T) -> u64 {
fn written_len<T: Writable>(_: T) -> u64 {
T::LEN
}

impl<T> Writeable for &T
impl<T> Writable for &T
where
T: Writeable,
T: Writable,
{
fn write_to(&self, writer: impl Write) -> io::Result<()> {
T::write_to(self, writer)
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
T::write_to(self, writer).await
}
const LEN: u64 = T::LEN;
}
Expand Down Expand Up @@ -767,8 +756,9 @@ mod tests {
}

#[track_caller]
fn round_trip<T: PartialEq + std::fmt::Debug + Readable + Writeable>(original: &T) {
let serialized = write_to_vec(|v| original.write_to(v));
fn round_trip<T: PartialEq + std::fmt::Debug + Readable + Writable>(original: &T) {
let serialized =
write_to_vec(|v| tokio::runtime::Runtime::new()?.block_on(original.write_to(v)));
assert_eq!(
serialized.len(),
usize::try_from(written_len(original)).unwrap()
Expand Down
Loading