From aee1678e6c03717fdc27ca98a9ffd0c690740d2e Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Wed, 16 Jul 2025 07:34:17 +0200 Subject: [PATCH 1/9] Overflow --- accounts-db/src/buffered_reader.rs | 64 ++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index e038f700b78..35a12d6ff47 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -276,6 +276,70 @@ impl BufRead for BufferedReader<'_, T> { } } +struct BufReaderWithOverflow { + reader: R, + overflow_buf: Vec, +} + +impl BufReaderWithOverflow { + pub fn new(reader: R) -> Self { + Self { + reader, + overflow_buf: Vec::new(), + } + } +} + +impl BufRead for BufReaderWithOverflow { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if self.overflow_buf.is_empty() { + self.reader.fill_buf() + } else { + Ok(&self.overflow_buf) + } + } + + fn consume(&mut self, mut amt: usize) { + if !self.overflow_buf.is_empty() { + amt = amt + .checked_sub(self.overflow_buf.len()) + .expect("all overflow bytes are required to be consumed"); + self.overflow_buf.clear(); + } + self.reader.consume(amt); + } +} + +impl ContiguousBufFileRead for BufReaderWithOverflow { + fn get_file_offset(&self) -> usize { + todo!() + } + + fn fill_buf_required(&mut self, required_len: usize) -> io::Result<&[u8]> { + let buf = self.reader.fill_buf()?; + let available_len = buf.len(); + if available_len >= required_len || available_len == 0 { + return Ok(buf); + } + self.overflow_buf.resize(required_len, 0); + self.overflow_buf[..available_len].copy_from_slice(buf); + self.reader.consume(available_len); + self.reader.read(&mut self.overflow_buf[available_len..]); + Ok(&self.overflow_buf.as_slice()) + } + + fn fill_buf_required_or_overflow<'b>( + &'b mut self, + required_len: usize, + overflow_buffer: &'b mut Vec, + ) -> io::Result<&'b [u8]> + where + 'a: 'b, + { + todo!() + } +} + /// Open file at `path` with buffering reader using `buf_size` memory and doing /// read-ahead IO reads (if `io_uring` is supported by the host) pub fn large_file_buf_reader( From 7fa63830d9ee2e7ea842795acd395e0bd5ea6969 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Wed, 16 Jul 2025 16:16:37 +0200 Subject: [PATCH 2/9] Overflow wrapper. --- accounts-db/src/append_vec.rs | 34 ++--- accounts-db/src/buffered_reader.rs | 211 +++++++++++++---------------- 2 files changed, 109 insertions(+), 136 deletions(-) diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index 2ab20b0599c..c5bdb892a23 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -10,10 +10,12 @@ pub mod test_utils; // Used all over the accounts-db crate. Probably should be minimized. pub(crate) use meta::StoredAccountMeta; // Some tests/benches use AccountMeta/StoredMeta +use crate::buffered_reader::BufReaderWithOverflow; #[cfg(feature = "dev-context-only-utils")] pub use meta::{AccountMeta, StoredMeta}; #[cfg(not(feature = "dev-context-only-utils"))] use meta::{AccountMeta, StoredMeta}; + use { crate::{ account_info::Offset, @@ -1049,17 +1051,20 @@ impl AppendVec { {} } AppendVecFileBacking::File(file) => { + // // 128KiB covers a reasonably large distribution of typical account sizes. + // // In a recent sample, 99.98% of accounts' data lengths were less than or equal to 128KiB. + const MIN_CAPACITY: usize = 1024 * 128; + const MAX_CAPACITY: usize = + STORE_META_OVERHEAD + MAX_PERMITTED_DATA_LENGTH as usize; const BUFFER_SIZE: usize = PAGE_SIZE * 8; - let mut reader = BufferedReader::>::new_stack(self.len(), file); + let mut reader = BufReaderWithOverflow::new( + BufferedReader::>::new_stack(self.len(), file), + MIN_CAPACITY..(MAX_CAPACITY + 1), + ); let mut min_buf_len = STORE_META_OVERHEAD; - // Buffer for account data that doesn't fit within the stack allocated buffer. - // This will be re-used for each account that doesn't fit within the stack allocated buffer. - let mut data_overflow_buffer = vec![]; loop { let offset = reader.get_file_offset(); - let bytes = match reader - .fill_buf_required_or_overflow(min_buf_len, &mut data_overflow_buffer) - { + let bytes = match reader.fill_buf_required(min_buf_len) { Ok([]) => break, Ok(bytes) => ValidSlice::new(bytes), Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break, @@ -1090,21 +1095,6 @@ impl AppendVec { } else { // repeat loop with required buffer size holding whole account data min_buf_len = STORE_META_OVERHEAD + data_len; - - if min_buf_len > BUFFER_SIZE { - const MAX_CAPACITY: usize = - STORE_META_OVERHEAD + MAX_PERMITTED_DATA_LENGTH as usize; - // 128KiB covers a reasonably large distribution of typical account sizes. - // In a recent sample, 99.98% of accounts' data lengths were less than or equal to 128KiB. - const MIN_CAPACITY: usize = 1024 * 128; - if min_buf_len > data_overflow_buffer.capacity() { - let next_cap = min_buf_len - .next_power_of_two() - .clamp(MIN_CAPACITY, MAX_CAPACITY); - data_overflow_buffer - .reserve_exact(next_cap - data_overflow_buffer.len()); - } - } } } } diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index 35a12d6ff47..8285335a906 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -71,6 +71,8 @@ impl Backing for Stack { /// - Fall back to an overflow buffer if the internal buffer cannot satisfy the request. /// - Retrieve the current file offset corresponding to the start of the next buffer. pub(crate) trait ContiguousBufFileRead<'a>: BufRead { + fn contiguous_capacity(&self) -> usize; + /// Returns the current file offset corresponding to the start of the buffer /// that will be returned by the next call to `fill_buf_*`. /// @@ -84,24 +86,6 @@ pub(crate) trait ContiguousBufFileRead<'a>: BufRead { /// Returns `Err(io::ErrorKind::UnexpectedEof)` if the end of file is reached /// before the required number of bytes is available. fn fill_buf_required(&mut self, required_len: usize) -> io::Result<&[u8]>; - - /// Attempts to provide at least `required_len` contiguous bytes by using - /// the internal buffer or the provided `overflow_buffer` if needed. - /// - /// If the internal buffer alone does not satisfy the requirement, additional - /// bytes are read and appended to `overflow_buffer`, which is resized to fit the data. - /// - /// Returns a slice containing all the required data (may point to either buffer). - /// - /// Returns `Err(io::ErrorKind::UnexpectedEof)` if the end of file is reached - /// before the required number of bytes can be read. - fn fill_buf_required_or_overflow<'b>( - &'b mut self, - required_len: usize, - overflow_buffer: &'b mut Vec, - ) -> io::Result<&'b [u8]> - where - 'a: 'b; } /// read a file a large buffer at a time and provide access to a slice in that buffer @@ -137,6 +121,10 @@ impl<'a, T> BufferedReader<'a, T> { } impl<'a, T: Backing> ContiguousBufFileRead<'a> for BufferedReader<'a, T> { + fn contiguous_capacity(&self) -> usize { + self.buf.capacity() + } + #[inline(always)] fn get_file_offset(&self) -> usize { if self.buf_valid_bytes.is_empty() { @@ -158,49 +146,6 @@ impl<'a, T: Backing> ContiguousBufFileRead<'a> for BufferedReader<'a, T> { } Ok(self.valid_slice()) } - - fn fill_buf_required_or_overflow<'b>( - &'b mut self, - required_len: usize, - overflow_buffer: &'b mut Vec, - ) -> io::Result<&'b [u8]> - where - 'a: 'b, - { - if required_len <= self.buf.capacity() { - return self.fill_buf_required(required_len); - } - - if required_len > overflow_buffer.capacity() { - overflow_buffer.reserve_exact(required_len - overflow_buffer.len()); - } - // SAFETY: We only write to the uninitialized portion of the buffer via `copy_from_slice` and `read_into_buffer`. - // Later, we ensure we only read from the initialized portion of the buffer. - unsafe { - overflow_buffer.set_len(required_len); - } - - // Copy already read data to overflow buffer. - let available_valid_data = self.valid_slice(); - let leftover = available_valid_data.len(); - overflow_buffer[..leftover].copy_from_slice(available_valid_data); - - // Read remaining data into overflow buffer. - let read_dst = &mut overflow_buffer[leftover..]; - let bytes_read = read_into_buffer( - self.file, - self.file_len_valid, - self.file_offset_of_next_read, - read_dst, - )?; - if bytes_read < read_dst.len() { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "unable to read required amount of data", - )); - } - Ok(overflow_buffer.as_slice()) - } } impl BufferedReader<'_, T> @@ -238,15 +183,30 @@ impl<'a, const N: usize> BufferedReader<'a, Stack> { } impl io::Read for BufferedReader<'_, T> { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let available = self.fill_buf()?; - if available.is_empty() { - return Ok(0); + fn read(&mut self, mut buf: &mut [u8]) -> io::Result { + let available_len = self.buf_valid_bytes.len(); + if available_len > 0 { + // Copy already read data to buf. + let available_valid_data = self.valid_slice(); + if available_len >= buf.len() { + buf.copy_from_slice(&available_valid_data[..buf.len()]); + self.consume(buf.len()); + return Ok(buf.len()); + } + buf[..available_len].copy_from_slice(available_valid_data); + buf = &mut buf[available_len..]; } - let bytes_to_read = available.len().min(buf.len()); - buf[..bytes_to_read].copy_from_slice(&available[..bytes_to_read]); - self.consume(bytes_to_read); - Ok(bytes_to_read) + + // Read directly from file into any space still left in the buf. + let bytes_read = read_into_buffer( + self.file, + self.file_len_valid, + self.file_offset_of_next_read, + buf, + )?; + let filled_len = bytes_read + available_len; + self.consume(filled_len); + Ok(filled_len) } } @@ -276,16 +236,40 @@ impl BufRead for BufferedReader<'_, T> { } } -struct BufReaderWithOverflow { +pub struct BufReaderWithOverflow { reader: R, overflow_buf: Vec, + overflow_capacity_range: Range, } impl BufReaderWithOverflow { - pub fn new(reader: R) -> Self { + pub fn new(reader: R, overflow_capacity_range: Range) -> Self { Self { reader, overflow_buf: Vec::new(), + overflow_capacity_range, + } + } +} + +impl io::Read for BufReaderWithOverflow { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let available_len = self.overflow_buf.len(); + if available_len == 0 { + self.reader.read(buf) + } else { + assert!( + buf.len() >= available_len, + "should read all previously required bytes" + ); + buf[..available_len].copy_from_slice(&self.overflow_buf); + self.overflow_buf.clear(); + if buf.len() > available_len { + let bytes_read = self.reader.read(&mut buf[available_len..])?; + Ok(available_len + bytes_read) + } else { + Ok(available_len) + } } } } @@ -295,48 +279,56 @@ impl BufRead for BufReaderWithOverflow { if self.overflow_buf.is_empty() { self.reader.fill_buf() } else { - Ok(&self.overflow_buf) + Ok(self.overflow_buf.as_slice()) } } fn consume(&mut self, mut amt: usize) { - if !self.overflow_buf.is_empty() { + let overflow_len = self.overflow_buf.len(); + if overflow_len > 0 { amt = amt - .checked_sub(self.overflow_buf.len()) - .expect("all overflow bytes are required to be consumed"); + .checked_sub(overflow_len) + .expect("should consume all previously required bytes"); self.overflow_buf.clear(); } self.reader.consume(amt); } } -impl ContiguousBufFileRead for BufReaderWithOverflow { +impl<'a, R: ContiguousBufFileRead<'a>> ContiguousBufFileRead<'a> for BufReaderWithOverflow { + fn contiguous_capacity(&self) -> usize { + usize::MAX + } + fn get_file_offset(&self) -> usize { - todo!() + self.reader.get_file_offset() - self.overflow_buf.len() } fn fill_buf_required(&mut self, required_len: usize) -> io::Result<&[u8]> { - let buf = self.reader.fill_buf()?; - let available_len = buf.len(); - if available_len >= required_len || available_len == 0 { - return Ok(buf); + let available_len = self.overflow_buf.len(); + if available_len == 0 { + if self.reader.contiguous_capacity() >= required_len { + return self.reader.fill_buf_required(required_len); + } } - self.overflow_buf.resize(required_len, 0); - self.overflow_buf[..available_len].copy_from_slice(buf); - self.reader.consume(available_len); - self.reader.read(&mut self.overflow_buf[available_len..]); - Ok(&self.overflow_buf.as_slice()) - } - - fn fill_buf_required_or_overflow<'b>( - &'b mut self, - required_len: usize, - overflow_buffer: &'b mut Vec, - ) -> io::Result<&'b [u8]> - where - 'a: 'b, - { - todo!() + assert!( + available_len <= required_len, + "fill_buf_required should not decrease the required_len" + ); + if self.overflow_buf.capacity() < required_len { + let target_capacity = required_len.clamp( + self.overflow_capacity_range.start, + self.overflow_capacity_range.end - 1, + ); + self.overflow_buf + .reserve(target_capacity - self.overflow_buf.len()); + } + // Safety: we have reserved capacity and all of it will be filled by read + unsafe { self.overflow_buf.set_len(required_len) }; + self.reader + .read_exact(&mut self.overflow_buf[available_len..]) + .inspect_err(|_| self.overflow_buf.clear())?; + Ok(self.overflow_buf.as_slice()) } } @@ -636,48 +628,39 @@ mod tests { sample_file.write_all(&bytes).unwrap(); let file_len_valid = 32; - let mut reader = BufferedReader::new(backing, file_len_valid, &sample_file); + let mut reader = BufReaderWithOverflow::new( + BufferedReader::new(backing, file_len_valid, &sample_file), + 0..usize::MAX, + ); // Case 1: required_len <= buffer_size (no overflow needed) - let mut overflow = Vec::new(); let required_len = 8; - let slice = reader - .fill_buf_required_or_overflow(required_len, &mut overflow) - .unwrap(); + let slice = reader.fill_buf_required(required_len).unwrap(); assert_eq!(&slice[..required_len], &bytes[..required_len]); - assert!(overflow.is_empty()); // Consume part of the buffer to simulate partial reading reader.consume(required_len); // Case 2: required_len > buffer_size (overflow required) - let mut overflow = Vec::new(); let required_len = buffer_size + 8; - let slice = reader - .fill_buf_required_or_overflow(required_len, &mut overflow) - .unwrap(); + let slice = reader.fill_buf_required(required_len).unwrap(); // Internal buffer is size `buffer_size`, overflow should extend with the remaining `8` bytes assert_eq!(slice.len(), required_len); assert_eq!(slice, &bytes[8..8 + required_len]); - assert_eq!(overflow.len(), required_len); // Consume everything to reach EOF reader.consume(required_len); // Case 3: required_len larger than remaining data (expect UnexpectedEof) - let mut overflow = Vec::new(); let required_len = 64; - let result = reader.fill_buf_required_or_overflow(required_len, &mut overflow); + let result = reader.fill_buf_required(required_len); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); // Case 4: required_len = 0 (should return empty slice) - let mut overflow = Vec::new(); let required_len = 0; let offset_before = reader.get_file_offset(); - let slice = reader - .fill_buf_required_or_overflow(required_len, &mut overflow) - .unwrap(); + let slice = reader.fill_buf_required(required_len).unwrap(); assert_eq!(slice.len(), 0); let offset_after = reader.get_file_offset(); assert_eq!(offset_before, offset_after); From ec8f6df86996fe40abea172e908c94056b510f37 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Fri, 18 Jul 2025 12:14:23 +0200 Subject: [PATCH 3/9] Slight refactor. --- accounts-db/src/buffered_reader.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index 8285335a906..c75c1103bc1 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -315,13 +315,19 @@ impl<'a, R: ContiguousBufFileRead<'a>> ContiguousBufFileRead<'a> for BufReaderWi available_len <= required_len, "fill_buf_required should not decrease the required_len" ); - if self.overflow_buf.capacity() < required_len { - let target_capacity = required_len.clamp( + if required_len > self.overflow_buf.capacity() { + let target_capacity = required_len.next_power_of_two().clamp( self.overflow_capacity_range.start, self.overflow_capacity_range.end - 1, ); + if required_len > target_capacity { + return Err(io::Error::new( + io::ErrorKind::QuotaExceeded, + "requested more bytes than allowed capacity range", + )); + } self.overflow_buf - .reserve(target_capacity - self.overflow_buf.len()); + .reserve_exact(target_capacity - available_len); } // Safety: we have reserved capacity and all of it will be filled by read unsafe { self.overflow_buf.set_len(required_len) }; From 9dabf5ac44f7c44758965872e301ad0c867888a8 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Mon, 21 Jul 2025 17:43:22 +0200 Subject: [PATCH 4/9] Refactors and cleanups. --- accounts-db/src/append_vec.rs | 13 ++- accounts-db/src/buffered_reader.rs | 164 ++++++++++++++++++----------- 2 files changed, 111 insertions(+), 66 deletions(-) diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index c5bdb892a23..a383bb029dd 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -10,7 +10,7 @@ pub mod test_utils; // Used all over the accounts-db crate. Probably should be minimized. pub(crate) use meta::StoredAccountMeta; // Some tests/benches use AccountMeta/StoredMeta -use crate::buffered_reader::BufReaderWithOverflow; +use crate::buffered_reader::{BufReaderWithOverflow, FileBufRead as _}; #[cfg(feature = "dev-context-only-utils")] pub use meta::{AccountMeta, StoredMeta}; #[cfg(not(feature = "dev-context-only-utils"))] @@ -25,7 +25,7 @@ use { StoredAccountsInfo, }, accounts_hash::AccountHash, - buffered_reader::{BufferedReader, ContiguousBufFileRead, Stack}, + buffered_reader::{BufferedReader, RequiredLenBufRead, Stack}, file_io::read_into_buffer, is_zero_lamport::IsZeroLamport, storable_accounts::StorableAccounts, @@ -1059,7 +1059,8 @@ impl AppendVec { const BUFFER_SIZE: usize = PAGE_SIZE * 8; let mut reader = BufReaderWithOverflow::new( BufferedReader::>::new_stack(self.len(), file), - MIN_CAPACITY..(MAX_CAPACITY + 1), + MIN_CAPACITY, + MAX_CAPACITY, ); let mut min_buf_len = STORE_META_OVERHEAD; loop { @@ -1213,7 +1214,11 @@ impl AppendVec { AppendVecFileBacking::File(file) => { // Heuristic observed in benchmarking that maintains a reasonable balance between syscalls and data waste const BUFFER_SIZE: usize = PAGE_SIZE * 4; - let mut reader = BufferedReader::>::new_stack(self_len, file); + let mut reader = BufReaderWithOverflow::new( + BufferedReader::>::new_stack(self_len, file), + 0, + REQUIRED_READ_LEN, + ); const REQUIRED_READ_LEN: usize = mem::size_of::() + mem::size_of::(); loop { diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index c75c1103bc1..c8b92b4cec1 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -1,13 +1,16 @@ -//! File I/O buffered reader for AppendVec -//! Specialized `BufRead`-like type for reading account data. +//! File I/O buffered readers for AppendVec +//! Specialized `BufRead`-like types for reading account data. //! -//! Callers can use this type to iterate efficiently over append vecs. They can do so by repeatedly -//! calling `fill_buf()`, `consume()` and `set_required_data_len(account_data_len)` once the next account -//! data length is known. +//! Callers can use these types to iterate efficiently over append vecs. They can do so by repeatedly +//! calling: +//! * `fill_buf_required(account_header_len)` to scan the account header and determine the account +//! data size, +//! * optionally extend the obtained buffer to full account data using +//! `fill_buf_required(account_all_bytes_len)` +//! * `consume(account_all_bytes_len)` to move to the next account, //! -//! Unlike BufRead/BufReader, this type guarantees that on the next `fill_buf()` after calling -//! `set_required_data_len(len)`, the whole account data is buffered _linearly_ in memory and available to -//! be returned. +//! When reading full accounts data whose sizes exceed the small stack buffer, the `BufReaderWithOverflow` +//! can be used, which supports dynamically allocated buffer for preparing contiguous data slices. use { crate::file_io::{read_into_buffer, read_more_buffer}, std::{ @@ -62,29 +65,35 @@ impl Backing for Stack { } } -/// An extension of the `BufRead` trait for file readers that require stronger control -/// over returned buffer size and tracking of the file offset. -/// -/// Unlike the standard `fill_buf`, which only guarantees a non-empty buffer, -/// this trait allows callers to: -/// - Enforce a minimum number of contiguous bytes to be made available. -/// - Fall back to an overflow buffer if the internal buffer cannot satisfy the request. -/// - Retrieve the current file offset corresponding to the start of the next buffer. -pub(crate) trait ContiguousBufFileRead<'a>: BufRead { - fn contiguous_capacity(&self) -> usize; - +/// An extension of the `BufRead` trait for file readers that allow tracking file +/// read position offset. +pub(crate) trait FileBufRead: BufRead { /// Returns the current file offset corresponding to the start of the buffer - /// that will be returned by the next call to `fill_buf_*`. + /// that will be returned by the next call to `fill_buf`. /// /// This offset represents the position within the underlying file where data /// will be consumed from. fn get_file_offset(&self) -> usize; +} +/// An extension of the `BufRead` trait for readers that require stronger control +/// over returned buffer size. +/// +/// Unlike the standard `BufRead`, which only guarantees a non-empty buffer, +/// this trait allows callers to enforce a minimum number of contiguous bytes +/// to be made available. +pub(crate) trait RequiredLenBufRead: BufRead { /// Ensures the internal buffer contains at least `required_len` contiguous bytes, - /// and returns a slice to that buffer. + /// and returns a slice of that buffer. + /// + /// Note: subsequent calls with the same or larger `required_len` are allowed, but + /// before requesting smaller length all already provided bytes should be consumed + /// using a single `consume` call. /// /// Returns `Err(io::ErrorKind::UnexpectedEof)` if the end of file is reached /// before the required number of bytes is available. + /// + /// Returns `Err(io::ErrorKind::QuotaExceeded)` if `required_len` exceeds supported limit. fn fill_buf_required(&mut self, required_len: usize) -> io::Result<&[u8]>; } @@ -120,11 +129,7 @@ impl<'a, T> BufferedReader<'a, T> { } } -impl<'a, T: Backing> ContiguousBufFileRead<'a> for BufferedReader<'a, T> { - fn contiguous_capacity(&self) -> usize { - self.buf.capacity() - } - +impl FileBufRead for BufferedReader<'_, T> { #[inline(always)] fn get_file_offset(&self) -> usize { if self.buf_valid_bytes.is_empty() { @@ -133,19 +138,6 @@ impl<'a, T: Backing> ContiguousBufFileRead<'a> for BufferedReader<'a, T> { self.file_last_offset + self.buf_valid_bytes.start } } - - fn fill_buf_required(&mut self, required_len: usize) -> io::Result<&[u8]> { - if self.buf_valid_bytes.len() < required_len { - self.read_more_bytes()?; - if self.buf_valid_bytes.len() < required_len { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "unable to read enough data", - )); - } - } - Ok(self.valid_slice()) - } } impl BufferedReader<'_, T> @@ -193,11 +185,12 @@ impl io::Read for BufferedReader<'_, T> { self.consume(buf.len()); return Ok(buf.len()); } + // Only part of the buffer can be filled. buf[..available_len].copy_from_slice(available_valid_data); buf = &mut buf[available_len..]; } - // Read directly from file into any space still left in the buf. + // Read directly from file into space still left in the buf. let bytes_read = read_into_buffer( self.file, self.file_len_valid, @@ -205,6 +198,7 @@ impl io::Read for BufferedReader<'_, T> { buf, )?; let filled_len = bytes_read + available_len; + // Buffer was successfully filled, drop buffered data and move offset. self.consume(filled_len); Ok(filled_len) } @@ -236,18 +230,46 @@ impl BufRead for BufferedReader<'_, T> { } } +/// Supported `required_len` is limited by backing buffer size without ability to grow. +impl RequiredLenBufRead for BufferedReader<'_, T> { + fn fill_buf_required(&mut self, required_len: usize) -> io::Result<&[u8]> { + if self.buf_valid_bytes.len() < required_len { + self.read_more_bytes()?; + if self.buf_valid_bytes.len() < required_len { + if required_len > self.buf.capacity() { + return Err(io::Error::new( + io::ErrorKind::QuotaExceeded, + "requested more bytes than supported by buffer", + )); + } + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "unable to read enough data", + )); + } + } + Ok(self.valid_slice()) + } +} + +/// A buffered reader that wraps `BufRead` instance and implements `RequiredLenBufRead`. +/// +/// It uses auxiliary overflow buffer when `fill_buf` returns slice that doesn't satisfy +/// the length requirement. pub struct BufReaderWithOverflow { reader: R, overflow_buf: Vec, - overflow_capacity_range: Range, + overflow_min_capacity: usize, + overflow_max_capacity: usize, } impl BufReaderWithOverflow { - pub fn new(reader: R, overflow_capacity_range: Range) -> Self { + pub fn new(reader: R, overflow_min_capacity: usize, overflow_max_capacity: usize) -> Self { Self { reader, overflow_buf: Vec::new(), - overflow_capacity_range, + overflow_min_capacity, + overflow_max_capacity, } } } @@ -295,20 +317,23 @@ impl BufRead for BufReaderWithOverflow { } } -impl<'a, R: ContiguousBufFileRead<'a>> ContiguousBufFileRead<'a> for BufReaderWithOverflow { - fn contiguous_capacity(&self) -> usize { - usize::MAX - } - +impl FileBufRead for BufReaderWithOverflow { fn get_file_offset(&self) -> usize { self.reader.get_file_offset() - self.overflow_buf.len() } +} +/// Support large `required_len` (without configured limits) by using overflow buffer +/// retained during lifetime of the reader. +impl RequiredLenBufRead for BufReaderWithOverflow { fn fill_buf_required(&mut self, required_len: usize) -> io::Result<&[u8]> { let available_len = self.overflow_buf.len(); if available_len == 0 { - if self.reader.contiguous_capacity() >= required_len { - return self.reader.fill_buf_required(required_len); + let buf = self.reader.fill_buf()?; + if buf.len() >= required_len { + // Separate fill_buf call is needed due to borrow checker's limitation + // https://rust-lang.github.io/rfcs/2094-nll.html#problem-case-3-conditional-control-flow-across-functions + return self.reader.fill_buf(); } } assert!( @@ -316,10 +341,9 @@ impl<'a, R: ContiguousBufFileRead<'a>> ContiguousBufFileRead<'a> for BufReaderWi "fill_buf_required should not decrease the required_len" ); if required_len > self.overflow_buf.capacity() { - let target_capacity = required_len.next_power_of_two().clamp( - self.overflow_capacity_range.start, - self.overflow_capacity_range.end - 1, - ); + let target_capacity = required_len + .next_power_of_two() + .clamp(self.overflow_min_capacity, self.overflow_max_capacity); if required_len > target_capacity { return Err(io::Error::new( io::ErrorKind::QuotaExceeded, @@ -331,6 +355,9 @@ impl<'a, R: ContiguousBufFileRead<'a>> ContiguousBufFileRead<'a> for BufReaderWi } // Safety: we have reserved capacity and all of it will be filled by read unsafe { self.overflow_buf.set_len(required_len) }; + + // On error overflow buffer is completely cleared to avoid access to + // uninitialized memory. self.reader .read_exact(&mut self.overflow_buf[available_len..]) .inspect_err(|_| self.overflow_buf.clear())?; @@ -393,7 +420,8 @@ mod tests { assert_eq!(slice.len(), buffer_size); assert_eq!(slice.slice(), &bytes[0..buffer_size]); - // Consume the data and attempt to read next 32 bytes, expect to hit EOF + // Consume the data and attempt to read next 32 bytes, which is above supported buffer size, + // so file offset is moved, but call returns quota error. let advance = 16; let mut required_len = 32; reader.consume(advance); @@ -403,9 +431,9 @@ mod tests { assert_eq!( reader .fill_buf_required(required_len) - .expect_err("should hit EOF") + .expect_err("should fail due to required length above buffer size") .kind(), - io::ErrorKind::UnexpectedEof + io::ErrorKind::QuotaExceeded ); // Continue reading should yield EOF. @@ -413,6 +441,7 @@ mod tests { let offset = reader.get_file_offset(); expected_offset += advance; assert_eq!(offset, expected_offset); + required_len = 16; assert_eq!( reader .fill_buf_required(required_len) @@ -452,9 +481,9 @@ mod tests { assert_eq!(slice.len(), buffer_size); assert_eq!(slice.slice(), &bytes[0..buffer_size]); - // Consume the data and attempt read next 32 bytes, expect to hit `valid_len`, and only read 14 bytes + // Consume the data and attempt read next 16 bytes, expect to hit `valid_len`, and only read 14 bytes let mut advance = 16; - let mut required_data_len = 32; + let mut required_data_len = 16; reader.consume(advance); let offset = reader.get_file_offset(); expected_offset += advance; @@ -469,7 +498,7 @@ mod tests { // Continue reading should yield EOF. advance = 14; - required_data_len = 32; + required_data_len = 16; reader.consume(advance); let offset = reader.get_file_offset(); expected_offset += advance; @@ -562,7 +591,7 @@ mod tests { // Continue reading should yield EOF and empty slice. advance = 16; - required_len = 32; + required_len = 16; reader.consume(advance); let offset = reader.get_file_offset(); expected_offset += advance; @@ -574,6 +603,16 @@ mod tests { .kind(), io::ErrorKind::UnexpectedEof ); + + // Attempt to read more than the buffer size + required_len = 32; + assert_eq!( + reader + .fill_buf_required(required_len) + .expect_err("should fail due to too large required length") + .kind(), + io::ErrorKind::QuotaExceeded + ); } #[test_case(Stack::<16>::new(), 16)] @@ -636,7 +675,8 @@ mod tests { let file_len_valid = 32; let mut reader = BufReaderWithOverflow::new( BufferedReader::new(backing, file_len_valid, &sample_file), - 0..usize::MAX, + 0, + usize::MAX, ); // Case 1: required_len <= buffer_size (no overflow needed) From 749f7166b1295c8365a4178c802e56c92fde0472 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Mon, 21 Jul 2025 17:55:38 +0200 Subject: [PATCH 5/9] Cleanup. --- accounts-db/src/append_vec.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index a383bb029dd..4bdf4774de2 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -10,7 +10,6 @@ pub mod test_utils; // Used all over the accounts-db crate. Probably should be minimized. pub(crate) use meta::StoredAccountMeta; // Some tests/benches use AccountMeta/StoredMeta -use crate::buffered_reader::{BufReaderWithOverflow, FileBufRead as _}; #[cfg(feature = "dev-context-only-utils")] pub use meta::{AccountMeta, StoredMeta}; #[cfg(not(feature = "dev-context-only-utils"))] @@ -25,7 +24,9 @@ use { StoredAccountsInfo, }, accounts_hash::AccountHash, - buffered_reader::{BufferedReader, RequiredLenBufRead, Stack}, + buffered_reader::{ + BufReaderWithOverflow, BufferedReader, FileBufRead as _, RequiredLenBufRead as _, Stack, + }, file_io::read_into_buffer, is_zero_lamport::IsZeroLamport, storable_accounts::StorableAccounts, @@ -1214,11 +1215,7 @@ impl AppendVec { AppendVecFileBacking::File(file) => { // Heuristic observed in benchmarking that maintains a reasonable balance between syscalls and data waste const BUFFER_SIZE: usize = PAGE_SIZE * 4; - let mut reader = BufReaderWithOverflow::new( - BufferedReader::>::new_stack(self_len, file), - 0, - REQUIRED_READ_LEN, - ); + let mut reader = BufferedReader::>::new_stack(self_len, file); const REQUIRED_READ_LEN: usize = mem::size_of::() + mem::size_of::(); loop { From 860886825bc4d4f1e37ffdc496a1088aff1a8c73 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Mon, 21 Jul 2025 18:12:23 +0200 Subject: [PATCH 6/9] Limit overflow capacity by file len. --- accounts-db/src/append_vec.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index 4bdf4774de2..20ad03471cd 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -14,7 +14,6 @@ pub(crate) use meta::StoredAccountMeta; pub use meta::{AccountMeta, StoredMeta}; #[cfg(not(feature = "dev-context-only-utils"))] use meta::{AccountMeta, StoredMeta}; - use { crate::{ account_info::Offset, @@ -1058,10 +1057,11 @@ impl AppendVec { const MAX_CAPACITY: usize = STORE_META_OVERHEAD + MAX_PERMITTED_DATA_LENGTH as usize; const BUFFER_SIZE: usize = PAGE_SIZE * 8; + let self_len = self.len(); let mut reader = BufReaderWithOverflow::new( BufferedReader::>::new_stack(self.len(), file), - MIN_CAPACITY, - MAX_CAPACITY, + MIN_CAPACITY.min(self_len), + MAX_CAPACITY.min(self_len), ); let mut min_buf_len = STORE_META_OVERHEAD; loop { From 7d7062ce2288ac6f5b1e4d3e002e7d794b7dd9ec Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Mon, 21 Jul 2025 19:24:42 +0200 Subject: [PATCH 7/9] Update comment / string. --- accounts-db/src/buffered_reader.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index c8b92b4cec1..149dca02399 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -7,7 +7,7 @@ //! data size, //! * optionally extend the obtained buffer to full account data using //! `fill_buf_required(account_all_bytes_len)` -//! * `consume(account_all_bytes_len)` to move to the next account, +//! * `consume(account_all_bytes_len)` to move to the next account //! //! When reading full accounts data whose sizes exceed the small stack buffer, the `BufReaderWithOverflow` //! can be used, which supports dynamically allocated buffer for preparing contiguous data slices. @@ -323,7 +323,7 @@ impl FileBufRead for BufReaderWithOverflow { } } -/// Support large `required_len` (without configured limits) by using overflow buffer +/// Support large `required_len` (within configured limits) by using overflow buffer /// retained during lifetime of the reader. impl RequiredLenBufRead for BufReaderWithOverflow { fn fill_buf_required(&mut self, required_len: usize) -> io::Result<&[u8]> { @@ -338,7 +338,7 @@ impl RequiredLenBufRead for BufReaderWithOverflow { } assert!( available_len <= required_len, - "fill_buf_required should not decrease the required_len" + "fill_buf_required should keep or grow required_len until consume" ); if required_len > self.overflow_buf.capacity() { let target_capacity = required_len From bc6e7ddfd76a52703d16186f0001da5c9f475bc7 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 22 Jul 2025 17:14:36 +0200 Subject: [PATCH 8/9] Add test for Read and BufRead API and mix of access patterns. --- accounts-db/src/buffered_reader.rs | 54 +++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index 149dca02399..fae298deaa6 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -390,7 +390,10 @@ pub fn large_file_buf_reader( #[cfg(test)] mod tests { use { - super::*, crate::append_vec::ValidSlice, std::io::Write, tempfile::tempfile, + super::*, + crate::append_vec::ValidSlice, + std::io::{Read as _, Write}, + tempfile::tempfile, test_case::test_case, }; @@ -711,4 +714,53 @@ mod tests { let offset_after = reader.get_file_offset(); assert_eq!(offset_before, offset_after); } + + #[test_case(Stack::<16>::new(), 16)] + fn test_overflow_reader_read_and_fill_buf(backing: impl Backing, buffer_size: usize) { + // Setup a sample file with 64 bytes of data + const FILE_SIZE: usize = 64; + let mut sample_file = tempfile().unwrap(); + let bytes = rand_bytes::(); + sample_file.write_all(&bytes).unwrap(); + + let mut reader = BufReaderWithOverflow::new( + BufferedReader::new(backing, FILE_SIZE, &sample_file), + 0, + 32, + ); + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, &bytes[0..buffer_size]); + + reader.consume(8); + let mut buf = [0; 8]; + assert_eq!(reader.read(&mut buf).unwrap(), 8); + assert_eq!(buf, &bytes[8..16]); + + assert_eq!( + reader + .fill_buf_required(40) + .expect_err("should exceed len limit") + .kind(), + io::ErrorKind::QuotaExceeded + ); + + let buf = reader.fill_buf_required(32).unwrap(); + assert_eq!(buf, &bytes[16..48]); + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, &bytes[16..48]); + + let mut buf = [0; 48]; + assert_eq!(reader.read(&mut buf).unwrap(), 48); + assert_eq!(buf, &bytes[16..64]); + + assert_eq!(reader.read(&mut buf).unwrap(), 0); + + assert_eq!( + reader + .fill_buf_required(1) + .expect_err("should reach EOF") + .kind(), + io::ErrorKind::UnexpectedEof + ); + } } From 10a2f0a7b251bfcb4d4120f6b99e6fb26af67968 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 22 Jul 2025 18:29:18 +0200 Subject: [PATCH 9/9] Update comments. --- accounts-db/src/append_vec.rs | 4 ++-- accounts-db/src/buffered_reader.rs | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index 20ad03471cd..2ba7babc65c 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -1051,8 +1051,8 @@ impl AppendVec { {} } AppendVecFileBacking::File(file) => { - // // 128KiB covers a reasonably large distribution of typical account sizes. - // // In a recent sample, 99.98% of accounts' data lengths were less than or equal to 128KiB. + // 128KiB covers a reasonably large distribution of typical account sizes. + // In a recent sample, 99.98% of accounts' data lengths were less than or equal to 128KiB. const MIN_CAPACITY: usize = 1024 * 128; const MAX_CAPACITY: usize = STORE_META_OVERHEAD + MAX_PERMITTED_DATA_LENGTH as usize; diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index fae298deaa6..9ce752d9280 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -3,14 +3,14 @@ //! //! Callers can use these types to iterate efficiently over append vecs. They can do so by repeatedly //! calling: -//! * `fill_buf_required(account_header_len)` to scan the account header and determine the account +//! * `fill_buf_required(account_meta_len)` to scan the account metadata parts and determine the account //! data size, //! * optionally extend the obtained buffer to full account data using //! `fill_buf_required(account_all_bytes_len)` //! * `consume(account_all_bytes_len)` to move to the next account //! //! When reading full accounts data whose sizes exceed the small stack buffer, the `BufReaderWithOverflow` -//! can be used, which supports dynamically allocated buffer for preparing contiguous data slices. +//! should be used, which supports dynamically allocated buffer for preparing contiguous data slices. use { crate::file_io::{read_into_buffer, read_more_buffer}, std::{ @@ -717,7 +717,6 @@ mod tests { #[test_case(Stack::<16>::new(), 16)] fn test_overflow_reader_read_and_fill_buf(backing: impl Backing, buffer_size: usize) { - // Setup a sample file with 64 bytes of data const FILE_SIZE: usize = 64; let mut sample_file = tempfile().unwrap(); let bytes = rand_bytes::(); @@ -744,8 +743,10 @@ mod tests { io::ErrorKind::QuotaExceeded ); + // Required buffer is at maximum configured limit. let buf = reader.fill_buf_required(32).unwrap(); assert_eq!(buf, &bytes[16..48]); + // Same buffer should be returned. let buf = reader.fill_buf().unwrap(); assert_eq!(buf, &bytes[16..48]);