diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 07f23f5b17c6..fc37ebfb4510 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -210,6 +210,10 @@ impl FallbackEncoder { } } + /// Returns an estimate of the data page size in bytes + /// + /// This includes: + /// + fn estimated_data_page_size(&self) -> usize { match &self.encoder { FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(), @@ -304,6 +308,12 @@ impl Storage for ByteArrayStorage { key as u64 } + + #[allow(dead_code)] // not used in parquet_derive, so is dead there + fn estimated_memory_size(&self) -> usize { + self.page.capacity() * std::mem::size_of::() + + self.values.capacity() * std::mem::size_of::>() + } } /// A dictionary encoder for byte array data @@ -334,6 +344,10 @@ impl DictEncoder { num_required_bits(length.saturating_sub(1) as u64) } + fn estimated_memory_size(&self) -> usize { + self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::() + } + fn estimated_data_page_size(&self) -> usize { let bit_width = self.bit_width(); 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len()) @@ -443,10 +457,34 @@ impl ColumnValueEncoder for ByteArrayEncoder { self.dict_encoder.is_some() } + fn estimated_memory_size(&self) -> usize { + let encoder_size = match &self.dict_encoder { + Some(encoder) => encoder.estimated_memory_size(), + // For the FallbackEncoder, these unflushed bytes are already encoded. + // Therefore, the size should be the same as estimated_data_page_size. + None => self.fallback.estimated_data_page_size(), + }; + + let bloom_filter_size = self + .bloom_filter + .as_ref() + .map(|bf| bf.estimated_memory_size()) + .unwrap_or_default(); + + let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default() + + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default(); + + encoder_size + bloom_filter_size + stats_size + } + fn estimated_dict_page_size(&self) -> Option { Some(self.dict_encoder.as_ref()?.estimated_dict_page_size()) } + /// Returns an estimate of the data page size in bytes + /// + /// This includes: + /// + fn estimated_data_page_size(&self) -> usize { match &self.dict_encoder { Some(encoder) => encoder.estimated_data_page_size(), diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 0beb93f80a5f..f83e56c7abe6 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -84,15 +84,20 @@ mod levels; /// /// ## Memory Limiting /// -/// The nature of parquet forces buffering of an entire row group before it can be flushed -/// to the underlying writer. Data is buffered in its encoded form, to reduce memory usage, -/// but if writing rows containing large strings or very nested data, this may still result in -/// non-trivial memory usage. +/// The nature of parquet forces buffering of an entire row group before it can +/// be flushed to the underlying writer. Data is mostly buffered in its encoded +/// form, reducing memory usage. However, some data such as dictionary keys or +/// large strings or very nested data may still result in non-trivial memory +/// usage. /// -/// [`ArrowWriter::in_progress_size`] can be used to track the size of the buffered row group, -/// and potentially trigger an early flush of a row group based on a memory threshold and/or -/// global memory pressure. However, users should be aware that smaller row groups will result -/// in higher metadata overheads, and may worsen compression ratios and query performance. +/// See Also: +/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer. +/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group, +/// +/// Call [`Self::flush`] to trigger an early flush of a row group based on a +/// memory threshold and/or global memory pressure. However, smaller row groups +/// result in higher metadata overheads, and thus may worsen compression ratios +/// and query performance. /// /// ```no_run /// # use std::io::Write; @@ -101,7 +106,7 @@ mod levels; /// # let mut writer: ArrowWriter> = todo!(); /// # let batch: RecordBatch = todo!(); /// writer.write(&batch).unwrap(); -/// // Trigger an early flush if buffered size exceeds 1_000_000 +/// // Trigger an early flush if anticipated size exceeds 1_000_000 /// if writer.in_progress_size() > 1_000_000 { /// writer.flush().unwrap(); /// } @@ -203,7 +208,23 @@ impl ArrowWriter { self.writer.flushed_row_groups() } - /// Returns the estimated length in bytes of the current in progress row group + /// Estimated memory usage, in bytes, of this `ArrowWriter` + /// + /// This estimate is formed bu summing the values of + /// [`ArrowColumnWriter::memory_size`] all in progress columns. + pub fn memory_size(&self) -> usize { + match &self.in_progress { + Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(), + None => 0, + } + } + + /// Anticipated encoded size of the in progress row group. + /// + /// This estimate the row group size after being completely encoded is, + /// formed by summing the values of + /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress + /// columns. pub fn in_progress_size(&self) -> usize { match &self.in_progress { Some(in_progress) => in_progress @@ -629,7 +650,30 @@ impl ArrowColumnWriter { Ok(ArrowColumnChunk { data, close }) } - /// Returns the estimated total bytes for this column writer + /// Returns the estimated total memory usage by the writer. + /// + /// This [`Self::get_estimated_total_bytes`] this is an estimate + /// of the current memory usage and not it's anticipated encoded size. + /// + /// This includes: + /// 1. Data buffered in encoded form + /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys) + /// + /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`] + pub fn memory_size(&self) -> usize { + match &self.writer { + ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(), + ArrowColumnWriterImpl::Column(c) => c.memory_size(), + } + } + + /// Returns the estimated total encoded bytes for this column writer. + /// + /// This includes: + /// 1. Data buffered in encoded form + /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded + /// + /// This value should be less than or equal to [`Self::memory_size`] pub fn get_estimated_total_bytes(&self) -> usize { match &self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _, @@ -2894,6 +2938,7 @@ mod tests { // starts empty assert_eq!(writer.in_progress_size(), 0); assert_eq!(writer.in_progress_rows(), 0); + assert_eq!(writer.memory_size(), 0); assert_eq!(writer.bytes_written(), 4); // Initial header writer.write(&batch).unwrap(); @@ -2901,17 +2946,31 @@ mod tests { let initial_size = writer.in_progress_size(); assert!(initial_size > 0); assert_eq!(writer.in_progress_rows(), 5); + let initial_memory = writer.memory_size(); + assert!(initial_memory > 0); + // memory estimate is larger than estimated encoded size + assert!( + initial_size <= initial_memory, + "{initial_size} <= {initial_memory}" + ); // updated on second write writer.write(&batch).unwrap(); assert!(writer.in_progress_size() > initial_size); assert_eq!(writer.in_progress_rows(), 10); + assert!(writer.memory_size() > initial_memory); + assert!( + writer.in_progress_size() <= writer.memory_size(), + "in_progress_size {} <= memory_size {}", + writer.in_progress_size(), + writer.memory_size() + ); // in progress tracking is cleared, but the overall data written is updated let pre_flush_bytes_written = writer.bytes_written(); writer.flush().unwrap(); assert_eq!(writer.in_progress_size(), 0); - assert_eq!(writer.in_progress_rows(), 0); + assert_eq!(writer.memory_size(), 0); assert!(writer.bytes_written() > pre_flush_bytes_written); writer.close().unwrap(); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 0bedf1fcb731..edeb0fec00b7 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -176,7 +176,16 @@ impl AsyncArrowWriter { self.sync_writer.flushed_row_groups() } - /// Returns the estimated length in bytes of the current in progress row group + /// Estimated memory usage, in bytes, of this `ArrowWriter` + /// + /// See [ArrowWriter::memory_size] for more information. + pub fn memory_size(&self) -> usize { + self.sync_writer.memory_size() + } + + /// Anticipated encoded size of the in progress row group. + /// + /// See [ArrowWriter::memory_size] for more information. pub fn in_progress_size(&self) -> usize { self.sync_writer.in_progress_size() } @@ -419,16 +428,30 @@ mod tests { let initial_size = writer.in_progress_size(); assert!(initial_size > 0); assert_eq!(writer.in_progress_rows(), batch.num_rows()); + let initial_memory = writer.memory_size(); + // memory estimate is larger than estimated encoded size + assert!( + initial_size <= initial_memory, + "{initial_size} <= {initial_memory}" + ); // updated on second write writer.write(&batch).await.unwrap(); assert!(writer.in_progress_size() > initial_size); assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2); + assert!(writer.memory_size() > initial_memory); + assert!( + writer.in_progress_size() <= writer.memory_size(), + "in_progress_size {} <= memory_size {}", + writer.in_progress_size(), + writer.memory_size() + ); // in progress tracking is cleared, but the overall data written is updated let pre_flush_bytes_written = writer.bytes_written(); writer.flush().await.unwrap(); assert_eq!(writer.in_progress_size(), 0); + assert_eq!(writer.memory_size(), 0); assert_eq!(writer.in_progress_rows(), 0); assert!(writer.bytes_written() > pre_flush_bytes_written); diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index f746d8b9a8d7..d2acdcd0b2b8 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -383,6 +383,11 @@ impl Sbbf { let block_index = self.hash_to_block_index(hash); self.0[block_index].check(hash as u32) } + + /// Return the total in memory size of this bloom filter in bytes + pub(crate) fn estimated_memory_size(&self) -> usize { + self.0.capacity() * std::mem::size_of::() + } } // per spec we use xxHash with seed=0 diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 8624f859f4b0..b6c8212608b8 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -93,10 +93,17 @@ pub trait ColumnValueEncoder { /// Returns true if this encoder has a dictionary page fn has_dictionary(&self) -> bool; - /// Returns an estimate of the dictionary page size in bytes, or `None` if no dictionary + /// Returns the estimated total memory usage of the encoder + /// + fn estimated_memory_size(&self) -> usize; + + /// Returns an estimate of the encoded size of dictionary page size in bytes, or `None` if no dictionary fn estimated_dict_page_size(&self) -> Option; - /// Returns an estimate of the data page size in bytes + /// Returns an estimate of the encoded data page size in bytes + /// + /// This should include: + /// + fn estimated_data_page_size(&self) -> usize; /// Flush the dictionary page for this column chunk if any. Any subsequent calls to @@ -227,6 +234,24 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { self.dict_encoder.is_some() } + fn estimated_memory_size(&self) -> usize { + let encoder_size = self.encoder.estimated_memory_size(); + + let dict_encoder_size = self + .dict_encoder + .as_ref() + .map(|encoder| encoder.estimated_memory_size()) + .unwrap_or_default(); + + let bloom_filter_size = self + .bloom_filter + .as_ref() + .map(|bf| bf.estimated_memory_size()) + .unwrap_or_default(); + + encoder_size + dict_encoder_size + bloom_filter_size + } + fn estimated_dict_page_size(&self) -> Option { Some(self.dict_encoder.as_ref()?.dict_encoded_size()) } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index e084d9ba41fa..8594e59714dc 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -72,7 +72,13 @@ pub enum ColumnWriter<'a> { } impl<'a> ColumnWriter<'a> { - /// Returns the estimated total bytes for this column writer + /// Returns the estimated total memory usage + #[cfg(feature = "arrow")] + pub(crate) fn memory_size(&self) -> usize { + downcast_writer!(self, typed, typed.memory_size()) + } + + /// Returns the estimated total encoded bytes for this column writer #[cfg(feature = "arrow")] pub(crate) fn get_estimated_total_bytes(&self) -> u64 { downcast_writer!(self, typed, typed.get_estimated_total_bytes()) @@ -419,6 +425,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ) } + /// Returns the estimated total memory usage. + /// + /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate + /// of the current memory usage and not the final anticipated encoded size. + #[cfg(feature = "arrow")] + pub(crate) fn memory_size(&self) -> usize { + self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() + } + /// Returns total number of bytes written by this column writer so far. /// This value is also returned when column writer is closed. /// @@ -428,10 +443,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.total_bytes_written } - /// Returns the estimated total bytes for this column writer + /// Returns the estimated total encoded bytes for this column writer. /// /// Unlike [`Self::get_total_bytes_written`] this includes an estimate - /// of any data that has not yet been flushed to a page + /// of any data that has not yet been flushed to a page, based on it's + /// anticipated encoded size. #[cfg(feature = "arrow")] pub(crate) fn get_estimated_total_bytes(&self) -> u64 { self.column_metrics.total_bytes_written diff --git a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs index a95487041cee..3d5ba4cc2d0b 100644 --- a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs +++ b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs @@ -90,4 +90,9 @@ impl Encoder for ByteStreamSplitEncoder { self.buffer.clear(); Ok(encoded.into()) } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.buffer.capacity() * std::mem::size_of::() + } } diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index dafae064afbf..98283b574ebb 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -34,6 +34,7 @@ use crate::util::interner::{Interner, Storage}; struct KeyStorage { uniques: Vec, + /// size of unique values (keys) in the dictionary, in bytes. size_in_bytes: usize, type_length: usize, @@ -61,6 +62,10 @@ impl Storage for KeyStorage { self.uniques.push(value.clone()); key } + + fn estimated_memory_size(&self) -> usize { + self.size_in_bytes + self.uniques.capacity() * std::mem::size_of::() + } } /// Dictionary encoder. @@ -143,6 +148,7 @@ impl DictEncoder { fn bit_width(&self) -> u8 { num_required_bits(self.num_entries().saturating_sub(1) as u64) } + } impl Encoder for DictEncoder { @@ -161,6 +167,10 @@ impl Encoder for DictEncoder { Encoding::PLAIN_DICTIONARY } + /// Returns an estimate of the data page size in bytes + /// + /// This includes: + /// + fn estimated_data_encoded_size(&self) -> usize { let bit_width = self.bit_width(); RleEncoder::max_buffer_size(bit_width, self.indices.len()) @@ -169,4 +179,11 @@ impl Encoder for DictEncoder { fn flush_buffer(&mut self) -> Result { self.write_indices() } + + /// Returns the estimated total memory usage + /// + /// For this encoder, the indices are unencoded bytes (refer to [`Self::write_indices`]). + fn estimated_memory_size(&self) -> usize { + self.interner.storage().size_in_bytes + self.indices.len() * std::mem::size_of::() + } } diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index 56ca68db7d75..ecc376712490 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -68,6 +68,9 @@ pub trait Encoder: Send { /// Method call must be O(1). fn estimated_data_encoded_size(&self) -> usize; + /// Returns an estimate of the memory use of this encoder, in bytes + fn estimated_memory_size(&self) -> usize; + /// Flushes the underlying byte buffer that's being processed by this encoder, and /// return the immutable copy of it. This will also reset the internal state. fn flush_buffer(&mut self) -> Result; @@ -158,6 +161,11 @@ impl Encoder for PlainEncoder { T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?; Ok(()) } + + /// Return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.buffer.capacity() * std::mem::size_of::() + self.bit_writer.estimated_memory_size() + } } // ---------------------------------------------------------------------- @@ -243,6 +251,13 @@ impl Encoder for RleValueEncoder { Ok(buf.into()) } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.encoder + .as_ref() + .map_or(0, |enc| enc.estimated_memory_size()) + } } // ---------------------------------------------------------------------- @@ -477,6 +492,15 @@ impl Encoder for DeltaBitPackEncoder { Ok(buffer.into()) } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.page_header_writer.estimated_memory_size() + + self.bit_writer.estimated_memory_size() + + self.deltas.capacity() * std::mem::size_of::() + + std::mem::size_of::() + + } } /// Helper trait to define specific conversions and subtractions when computing deltas @@ -614,6 +638,14 @@ impl Encoder for DeltaLengthByteArrayEncoder { Ok(total_bytes.into()) } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.len_encoder.estimated_memory_size() + + self.data.len() + + std::mem::size_of::() + + } } // ---------------------------------------------------------------------- @@ -717,6 +749,14 @@ impl Encoder for DeltaByteArrayEncoder { ), } } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.prefix_len_encoder.estimated_memory_size() + + self.suffix_writer.estimated_memory_size() + + (self.previous.capacity() * std::mem::size_of::()) + + } } #[cfg(test)] diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 5d91c1e53d0f..581f14b3c99a 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -286,6 +286,13 @@ impl RleEncoder { } self.repeat_count = 0; } + + /// return the estimated memory size of this encoder. + pub(crate) fn estimated_memory_size(&self) -> usize { + self.bit_writer.estimated_memory_size() + + std::mem::size_of::() + + } } /// Size, in number of `i32s` of buffer to use for RLE batch reading diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 1ec764e2c869..eaaf3ee10279 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -329,6 +329,11 @@ impl BitWriter { let u: u64 = ((v << 1) ^ (v >> 63)) as u64; self.put_vlq_int(u) } + + /// Returns an estimate of the memory used, in bytes + pub fn estimated_memory_size(&self) -> usize { + self.buffer.capacity() * size_of::() + } } /// Maximum byte length for a VLQ encoded integer diff --git a/parquet/src/util/interner.rs b/parquet/src/util/interner.rs index a59ab8e7a31c..f57fc3a71409 100644 --- a/parquet/src/util/interner.rs +++ b/parquet/src/util/interner.rs @@ -32,6 +32,10 @@ pub trait Storage { /// Adds a new element, returning the key fn push(&mut self, value: &Self::Value) -> Self::Key; + + /// Return an estimate of the memory used in this storage, in bytes + #[allow(dead_code)] // not used in parquet_derive, so is dead there + fn estimated_memory_size(&self) -> usize; } /// A generic value interner supporting various different [`Storage`] @@ -82,6 +86,14 @@ impl Interner { } } + /// Return estimate of the memory used, in bytes + #[allow(dead_code)] // not used in parquet_derive, so is dead there + pub fn estimated_memory_size(&self) -> usize { + self.storage.estimated_memory_size() + + // estimate size of dedup hashmap as just th size of the keys + self.dedup.capacity() + std::mem::size_of::() + } + /// Returns the storage for this interner pub fn storage(&self) -> &S { &self.storage