Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(5851): ArrowWriter memory usage #5967

Merged
merged 15 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl FallbackEncoder {
}
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize {
match &self.encoder {
FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
Expand Down Expand Up @@ -304,6 +308,12 @@ impl Storage for ByteArrayStorage {

key as u64
}

#[allow(dead_code)] // not used in parquet_derive, so is dead there
fn estimated_memory_size(&self) -> usize {
self.page.capacity() * std::mem::size_of::<u8>()
+ self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
}
}

/// A dictionary encoder for byte array data
Expand Down Expand Up @@ -334,6 +344,10 @@ impl DictEncoder {
num_required_bits(length.saturating_sub(1) as u64)
}

fn estimated_memory_size(&self) -> usize {
self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
}

fn estimated_data_page_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
Expand Down Expand Up @@ -443,10 +457,34 @@ impl ColumnValueEncoder for ByteArrayEncoder {
self.dict_encoder.is_some()
}

fn estimated_memory_size(&self) -> usize {
let encoder_size = match &self.dict_encoder {
Some(encoder) => encoder.estimated_memory_size(),
// For the FallbackEncoder, these unflushed bytes are already encoded.
// Therefore, the size should be the same as estimated_data_page_size.
None => self.fallback.estimated_data_page_size(),
};

let bloom_filter_size = self
.bloom_filter
.as_ref()
.map(|bf| bf.estimated_memory_size())
.unwrap_or_default();

let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
+ self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();

encoder_size + bloom_filter_size + stats_size
}

fn estimated_dict_page_size(&self) -> Option<usize> {
Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.estimated_data_page_size(),
Expand Down
83 changes: 71 additions & 12 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,20 @@ mod levels;
///
/// ## Memory Limiting
///
/// The nature of parquet forces buffering of an entire row group before it can be flushed
/// to the underlying writer. Data is buffered in its encoded form, to reduce memory usage,
/// but if writing rows containing large strings or very nested data, this may still result in
/// non-trivial memory usage.
/// The nature of parquet forces buffering of an entire row group before it can
/// be flushed to the underlying writer. Data is mostly buffered in its encoded
/// form, reducing memory usage. However, some data such as dictionary keys or
/// large strings or very nested data may still result in non-trivial memory
/// usage.
///
/// [`ArrowWriter::in_progress_size`] can be used to track the size of the buffered row group,
/// and potentially trigger an early flush of a row group based on a memory threshold and/or
/// global memory pressure. However, users should be aware that smaller row groups will result
/// in higher metadata overheads, and may worsen compression ratios and query performance.
/// See Also:
/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
///
/// Call [`Self::flush`] to trigger an early flush of a row group based on a
/// memory threshold and/or global memory pressure. However, smaller row groups
/// result in higher metadata overheads, and thus may worsen compression ratios
/// and query performance.
///
/// ```no_run
/// # use std::io::Write;
Expand All @@ -101,7 +106,7 @@ mod levels;
/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
/// # let batch: RecordBatch = todo!();
/// writer.write(&batch).unwrap();
/// // Trigger an early flush if buffered size exceeds 1_000_000
/// // Trigger an early flush if anticipated size exceeds 1_000_000
/// if writer.in_progress_size() > 1_000_000 {
/// writer.flush().unwrap();
/// }
Expand Down Expand Up @@ -203,7 +208,23 @@ impl<W: Write + Send> ArrowWriter<W> {
self.writer.flushed_row_groups()
}

/// Returns the estimated length in bytes of the current in progress row group
/// Estimated memory usage, in bytes, of this `ArrowWriter`
///
/// This estimate is formed bu summing the values of
/// [`ArrowColumnWriter::memory_size`] all in progress columns.
pub fn memory_size(&self) -> usize {
match &self.in_progress {
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
None => 0,
Comment on lines +216 to +218
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to understand that this calculation actually also includes the in_progress_size too

What do you think about making this more explicit like

Suggested change
match &self.in_progress {
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
None => 0,
match &self.in_progress {
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size() + x.get_estimated_total_bytes()).sum(),
None => 0,

And then change code like

    /// Returns the estimated total memory usage.
    ///
    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
    /// of the current memory usage and not it's anticipated encoded size.
    #[cfg(feature = "arrow")]
    pub(crate) fn memory_size(&self) -> usize {
        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
    }

to only include the estimated memory size:

    /// Returns the estimated total memory buffer usage.
    #[cfg(feature = "arrow")]
    pub(crate) fn memory_size(&self) -> usize {
        self.encoder.estimated_memory_size()
    }

Copy link
Contributor Author

@wiedld wiedld Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in_progress_size includes the flushed_bytes + unflushed_bytes.
The memory_size includes flushed_bytes + unflushed_bytes + processing_size.

At first glance, it does look like we could do memory_size = in_progress_size + processing_size. But what the calculation actually is:
in_progress_size = flushed_bytes + unflushed_bytes_anticipated_size_after_flush
memory_size = flushed_bytes + unflushed_bytes_current_mem_size + processing_size.

Per our approach to memory limiting, we should have unflushed bytes (in buffer) already be encoded. However, I believe that's not the case for the indices on the dict encoder? As such, the accounting in this PR specifically considers unflush_bytes seperately -- and pushes down the new memory_size() interface until where we can delineate the DictEncoder vs other encoders.

I added a commit to help explain. Please let me know if I misunderstood. 🙏🏼

Copy link
Contributor

@alamb alamb Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking from a user's point of view (e.g. our usecase in InfluxDB )

If I want to know how much memory the ArrowWriter is using (so I can track it, against a budget for example) what API should I use?

I was worried that I couldn't use

  1. ArrowWriter::memory_size because that does not include the estimated data pages sizes AND I couldn't use
  2. ArrowWriter::memory_size() + ArrowWriter::in_progress_size() because that would double count perviously buffered data.

However, after reviewing the code more closely I see the difference is that ArrowWriter::in_progress_size includes an estimate of how large the parquet encoded data would be after flush (which is not actually memory currently used) which presumably in most cases will be smaller than the actual bytes used. I will try and updated the comments as well to clarify this

}
}

/// Anticipated encoded size of the in progress row group.
///
/// This estimate the row group size after being completely encoded is,
/// formed by summing the values of
/// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
/// columns.
pub fn in_progress_size(&self) -> usize {
match &self.in_progress {
Some(in_progress) => in_progress
Expand Down Expand Up @@ -629,7 +650,30 @@ impl ArrowColumnWriter {
Ok(ArrowColumnChunk { data, close })
}

/// Returns the estimated total bytes for this column writer
/// Returns the estimated total memory usage by the writer.
///
/// This [`Self::get_estimated_total_bytes`] this is an estimate
/// of the current memory usage and not it's anticipated encoded size.
///
/// This includes:
/// 1. Data buffered in encoded form
/// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
///
/// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
pub fn memory_size(&self) -> usize {
match &self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
ArrowColumnWriterImpl::Column(c) => c.memory_size(),
}
}

/// Returns the estimated total encoded bytes for this column writer.
///
/// This includes:
/// 1. Data buffered in encoded form
/// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
///
/// This value should be less than or equal to [`Self::memory_size`]
pub fn get_estimated_total_bytes(&self) -> usize {
match &self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
Expand Down Expand Up @@ -2894,24 +2938,39 @@ mod tests {
// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.memory_size(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial header
writer.write(&batch).unwrap();

// updated on write
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), 5);
let initial_memory = writer.memory_size();
assert!(initial_memory > 0);
// memory estimate is larger than estimated encoded size
assert!(
initial_size <= initial_memory,
"{initial_size} <= {initial_memory}"
);

// updated on second write
writer.write(&batch).unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), 10);
assert!(writer.memory_size() > initial_memory);
assert!(
writer.in_progress_size() <= writer.memory_size(),
"in_progress_size {} <= memory_size {}",
writer.in_progress_size(),
writer.memory_size()
);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.memory_size(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().unwrap();
Expand Down
25 changes: 24 additions & 1 deletion parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,16 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
self.sync_writer.flushed_row_groups()
}

/// Returns the estimated length in bytes of the current in progress row group
/// Estimated memory usage, in bytes, of this `ArrowWriter`
///
/// See [ArrowWriter::memory_size] for more information.
pub fn memory_size(&self) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added these wrappers for symmetry with the other AsyncWriter methods

self.sync_writer.memory_size()
}

/// Anticipated encoded size of the in progress row group.
///
/// See [ArrowWriter::memory_size] for more information.
pub fn in_progress_size(&self) -> usize {
self.sync_writer.in_progress_size()
}
Expand Down Expand Up @@ -419,16 +428,30 @@ mod tests {
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), batch.num_rows());
let initial_memory = writer.memory_size();
// memory estimate is larger than estimated encoded size
assert!(
initial_size <= initial_memory,
"{initial_size} <= {initial_memory}"
);

// updated on second write
writer.write(&batch).await.unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);
assert!(writer.memory_size() > initial_memory);
assert!(
writer.in_progress_size() <= writer.memory_size(),
"in_progress_size {} <= memory_size {}",
writer.in_progress_size(),
writer.memory_size()
);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().await.unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.memory_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ impl Sbbf {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
}

/// Return the total in memory size of this bloom filter in bytes
pub(crate) fn estimated_memory_size(&self) -> usize {
self.0.capacity() * std::mem::size_of::<Block>()
}
}

// per spec we use xxHash with seed=0
Expand Down
29 changes: 27 additions & 2 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,17 @@ pub trait ColumnValueEncoder {
/// Returns true if this encoder has a dictionary page
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise, while this trait is marked pub it is not exposed outside the crate: https://docs.rs/parquet/latest/parquet/?search=ColumnValueEncoder

fn has_dictionary(&self) -> bool;

/// Returns an estimate of the dictionary page size in bytes, or `None` if no dictionary
/// Returns the estimated total memory usage of the encoder
alamb marked this conversation as resolved.
Show resolved Hide resolved
///
fn estimated_memory_size(&self) -> usize;

/// Returns an estimate of the encoded size of dictionary page size in bytes, or `None` if no dictionary
fn estimated_dict_page_size(&self) -> Option<usize>;

/// Returns an estimate of the data page size in bytes
/// Returns an estimate of the encoded data page size in bytes
///
/// This should include:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize;

/// Flush the dictionary page for this column chunk if any. Any subsequent calls to
Expand Down Expand Up @@ -227,6 +234,24 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
self.dict_encoder.is_some()
}

fn estimated_memory_size(&self) -> usize {
let encoder_size = self.encoder.estimated_memory_size();

let dict_encoder_size = self
.dict_encoder
.as_ref()
.map(|encoder| encoder.estimated_memory_size())
.unwrap_or_default();

let bloom_filter_size = self
.bloom_filter
.as_ref()
.map(|bf| bf.estimated_memory_size())
.unwrap_or_default();

encoder_size + dict_encoder_size + bloom_filter_size
}

fn estimated_dict_page_size(&self) -> Option<usize> {
Some(self.dict_encoder.as_ref()?.dict_encoded_size())
}
Expand Down
22 changes: 19 additions & 3 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ pub enum ColumnWriter<'a> {
}

impl<'a> ColumnWriter<'a> {
/// Returns the estimated total bytes for this column writer
/// Returns the estimated total memory usage
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
downcast_writer!(self, typed, typed.memory_size())
}

/// Returns the estimated total encoded bytes for this column writer
#[cfg(feature = "arrow")]
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
downcast_writer!(self, typed, typed.get_estimated_total_bytes())
Expand Down Expand Up @@ -419,6 +425,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
)
}

/// Returns the estimated total memory usage.
///
/// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
/// of the current memory usage and not the final anticipated encoded size.
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
}

/// Returns total number of bytes written by this column writer so far.
/// This value is also returned when column writer is closed.
///
Expand All @@ -428,10 +443,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_metrics.total_bytes_written
}

/// Returns the estimated total bytes for this column writer
/// Returns the estimated total encoded bytes for this column writer.
///
/// Unlike [`Self::get_total_bytes_written`] this includes an estimate
/// of any data that has not yet been flushed to a page
/// of any data that has not yet been flushed to a page, based on it's
/// anticipated encoded size.
#[cfg(feature = "arrow")]
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
self.column_metrics.total_bytes_written
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/encodings/encoding/byte_stream_split_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,9 @@ impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T> {
self.buffer.clear();
Ok(encoded.into())
}

/// return the estimated memory size of this encoder.
fn estimated_memory_size(&self) -> usize {
self.buffer.capacity() * std::mem::size_of::<u8>()
}
}
Loading
Loading