diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index 169ec68442f3..84a3d9783213 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -66,7 +66,7 @@ #[cfg_vis(feature = "benchmark-private", pub)] use self::util::NonMaximalU64; -use byteorder::{LittleEndian, ReadBytesExt as _, WriteBytesExt as _}; +use byteorder::{LittleEndian, ReadBytesExt as _}; use cfg_vis::cfg_vis; use cid::Cid; use itertools::Itertools as _; @@ -74,7 +74,7 @@ use positioned_io::{ReadAt, Size}; use smallvec::{SmallVec, smallvec}; use std::{ cmp, - io::{self, Read, Write}, + io::{self, Read}, iter, num::NonZeroUsize, pin::pin, @@ -395,30 +395,19 @@ impl Writer { .chain(iter::once(Slot::Empty)) } pub async fn write_into(self, writer: impl AsyncWrite) -> io::Result<()> { - let mut buf = vec![]; let mut writer = pin!(writer); let Self { version, header, slots, } = self; - /// Bridge between our sync [`Writeable`] trait, and async writing code - async fn write_via_buf( - buf: &mut Vec, - writer: impl AsyncWrite, - data: impl Writeable, - ) -> io::Result<()> { - buf.clear(); - data.write_to(&mut *buf)?; - pin!(writer).write_all(buf).await - } - write_via_buf(&mut buf, &mut writer, version).await?; - write_via_buf(&mut buf, &mut writer, &header).await?; + version.write_to(&mut writer).await?; + header.write_to(&mut writer).await?; for slot in Self::slots( header.initial_buckets.try_into().unwrap(), slots.iter().copied(), ) { - write_via_buf(&mut buf, &mut writer, slot).await?; + slot.write_to(&mut writer).await?; } Ok(()) } @@ -519,9 +508,9 @@ impl Readable for Version { } } -impl Writeable for Version { - fn write_to(&self, mut writer: impl Write) -> io::Result<()> { - writer.write_u64::(*self as u64) +impl Writable for Version { + async fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_u64_le(*self as u64).await } const LEN: u64 = std::mem::size_of::() as u64; } @@ -545,9 +534,9 @@ impl Readable for Slot { } } -impl Writeable for Slot { - fn write_to(&self, writer: impl Write) -> io::Result<()> { - self.into_raw().write_to(writer) +impl Writable for Slot { + async fn write_to(&self, writer: &mut W) -> io::Result<()> { + self.into_raw().write_to(writer).await } const LEN: u64 = RawSlot::LEN; } @@ -564,11 +553,11 @@ impl Readable for RawSlot { } } -impl Writeable for RawSlot { - fn write_to(&self, mut writer: impl Write) -> io::Result<()> { +impl Writable for RawSlot { + async fn write_to(&self, writer: &mut W) -> io::Result<()> { let Self { hash, frame_offset } = *self; - writer.write_u64::(hash)?; - writer.write_u64::(frame_offset)?; + writer.write_u64_le(hash).await?; + writer.write_u64_le(frame_offset).await?; Ok(()) } const LEN: u64 = std::mem::size_of::() as u64 * 2; @@ -587,16 +576,16 @@ impl Readable for V1Header { } } -impl Writeable for V1Header { - fn write_to(&self, mut writer: impl Write) -> io::Result<()> { +impl Writable for V1Header { + async fn write_to(&self, writer: &mut W) -> io::Result<()> { let Self { longest_distance, collisions, initial_buckets, } = *self; - writer.write_u64::(longest_distance)?; - writer.write_u64::(collisions)?; - writer.write_u64::(initial_buckets)?; + writer.write_u64_le(longest_distance).await?; + writer.write_u64_le(collisions).await?; + writer.write_u64_le(initial_buckets).await?; Ok(()) } const LEN: u64 = std::mem::size_of::() as u64 * 3; @@ -608,26 +597,26 @@ trait Readable { Self: Sized; } -trait Writeable { +trait Writable { /// Must only return [`Err(_)`] if the underlying io fails. - fn write_to(&self, writer: impl Write) -> io::Result<()>; - /// The number of bytes that will be written on a call to [`Writeable::write_to`]. + async fn write_to(&self, writer: &mut W) -> io::Result<()>; + /// The number of bytes that will be written on a call to [`Writable::write_to`]. /// /// Implementations may panic if this is incorrect. const LEN: u64; } /// Useful for exhaustiveness checking -fn written_len(_: T) -> u64 { +fn written_len(_: T) -> u64 { T::LEN } -impl Writeable for &T +impl Writable for &T where - T: Writeable, + T: Writable, { - fn write_to(&self, writer: impl Write) -> io::Result<()> { - T::write_to(self, writer) + async fn write_to(&self, writer: &mut W) -> io::Result<()> { + T::write_to(self, writer).await } const LEN: u64 = T::LEN; } @@ -767,8 +756,9 @@ mod tests { } #[track_caller] - fn round_trip(original: &T) { - let serialized = write_to_vec(|v| original.write_to(v)); + fn round_trip(original: &T) { + let serialized = + write_to_vec(|v| tokio::runtime::Runtime::new()?.block_on(original.write_to(v))); assert_eq!( serialized.len(), usize::try_from(written_len(original)).unwrap()