From b79698257128f360db1627d0a542c7de543ed956 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 26 Jun 2024 12:16:00 -0700 Subject: [PATCH 01/14] refactor(5851): delineate the different memory estimates APIs for the ArrowWriter and column writers --- parquet/src/arrow/arrow_writer/mod.rs | 32 +++++++++++++++++++++++++-- parquet/src/column/writer/mod.rs | 22 +++++++++++++++--- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 0beb93f80a5f..faaa67e1c8ba 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -203,7 +203,21 @@ impl ArrowWriter { self.writer.flushed_row_groups() } - /// Returns the estimated length in bytes of the current in progress row group + /// Returns the estimated memory usage of the current in progress row group. + /// + /// This includes the current encoded size of written bytes, as well as + /// the size of the unencoded data not yet flushed. + pub fn memory_size(&self) -> usize { + match &self.in_progress { + Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(), + None => 0, + } + } + + /// Returns the estimated length in encoded bytes of the current in progress row group. + /// + /// This includes an estimate of any data that has not yet been flushed to a page, + /// based on it's anticipated encoded size. pub fn in_progress_size(&self) -> usize { match &self.in_progress { Some(in_progress) => in_progress @@ -629,7 +643,21 @@ impl ArrowColumnWriter { Ok(ArrowColumnChunk { data, close }) } - /// Returns the estimated total bytes for this column writer + /// Returns the estimated total memory usage. + /// + /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate + /// of the current memory usage and not it's anticipated encoded size. + pub fn memory_size(&self) -> usize { + match &self.writer { + ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(), + ArrowColumnWriterImpl::Column(c) => c.memory_size(), + } + } + + /// Returns the estimated total encoded bytes for this column writer. + /// + /// This includes an estimate of any data that has not yet been flushed to a page, + /// based on it's anticipated encoded size. pub fn get_estimated_total_bytes(&self) -> usize { match &self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _, diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index e084d9ba41fa..a81f07016695 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -72,7 +72,13 @@ pub enum ColumnWriter<'a> { } impl<'a> ColumnWriter<'a> { - /// Returns the estimated total bytes for this column writer + /// Returns the estimated total memory usage + #[cfg(feature = "arrow")] + pub(crate) fn memory_size(&self) -> usize { + downcast_writer!(self, typed, typed.memory_size()) + } + + /// Returns the estimated total encoded bytes for this column writer #[cfg(feature = "arrow")] pub(crate) fn get_estimated_total_bytes(&self) -> u64 { downcast_writer!(self, typed, typed.get_estimated_total_bytes()) @@ -419,6 +425,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ) } + /// Returns the estimated total memory usage. + /// + /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate + /// of the current memory usage and not it's anticipated encoded size. + #[cfg(feature = "arrow")] + pub(crate) fn memory_size(&self) -> usize { + todo!("TODO in next commit") + } + /// Returns total number of bytes written by this column writer so far. /// This value is also returned when column writer is closed. /// @@ -428,10 +443,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.total_bytes_written } - /// Returns the estimated total bytes for this column writer + /// Returns the estimated total encoded bytes for this column writer. /// /// Unlike [`Self::get_total_bytes_written`] this includes an estimate - /// of any data that has not yet been flushed to a page + /// of any data that has not yet been flushed to a page, based on it's + /// anticipated encoded size. #[cfg(feature = "arrow")] pub(crate) fn get_estimated_total_bytes(&self) -> u64 { self.column_metrics.total_bytes_written From b9a002fab051dfb89aa95c3e03e359fdf794ec45 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 26 Jun 2024 13:12:43 -0700 Subject: [PATCH 02/14] feat(5851): add memory size estimates to the ColumnValueEncoder implementations and the DictEncoder --- parquet/src/arrow/arrow_writer/byte_array.rs | 11 +++++++++++ parquet/src/column/writer/encoder.rs | 12 ++++++++++++ parquet/src/column/writer/mod.rs | 2 +- parquet/src/encodings/encoding/dict_encoder.rs | 5 +++++ 4 files changed, 29 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 07f23f5b17c6..12b9f284a914 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -334,6 +334,10 @@ impl DictEncoder { num_required_bits(length.saturating_sub(1) as u64) } + fn estimated_memory_size(&self) -> usize { + self.interner.storage().page.len() + self.indices.len() * 8 + } + fn estimated_data_page_size(&self) -> usize { let bit_width = self.bit_width(); 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len()) @@ -443,6 +447,13 @@ impl ColumnValueEncoder for ByteArrayEncoder { self.dict_encoder.is_some() } + fn estimated_memory_size(&self) -> usize { + match &self.dict_encoder { + Some(encoder) => encoder.estimated_memory_size(), + None => self.fallback.estimated_data_page_size(), + } + } + fn estimated_dict_page_size(&self) -> Option { Some(self.dict_encoder.as_ref()?.estimated_dict_page_size()) } diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 8624f859f4b0..4c983c2dc8ea 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -93,6 +93,9 @@ pub trait ColumnValueEncoder { /// Returns true if this encoder has a dictionary page fn has_dictionary(&self) -> bool; + /// Returns the estimated total memory usage of the encoder + fn estimated_memory_size(&self) -> usize; + /// Returns an estimate of the dictionary page size in bytes, or `None` if no dictionary fn estimated_dict_page_size(&self) -> Option; @@ -227,6 +230,15 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { self.dict_encoder.is_some() } + fn estimated_memory_size(&self) -> usize { + match &self.dict_encoder { + Some(encoder) => encoder.estimated_memory_size(), + // TODO: for now, we are ignoring the memory overhead for unflushed data + // in the other encoders (besides DictEncoder) + _ => self.encoder.estimated_data_encoded_size(), + } + } + fn estimated_dict_page_size(&self) -> Option { Some(self.dict_encoder.as_ref()?.dict_encoded_size()) } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index a81f07016695..78ff13ba453e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -431,7 +431,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// of the current memory usage and not it's anticipated encoded size. #[cfg(feature = "arrow")] pub(crate) fn memory_size(&self) -> usize { - todo!("TODO in next commit") + self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() } /// Returns total number of bytes written by this column writer so far. diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index dafae064afbf..64ad61611dc8 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -143,6 +143,11 @@ impl DictEncoder { fn bit_width(&self) -> u8 { num_required_bits(self.num_entries().saturating_sub(1) as u64) } + + /// Returns the estimated total memory usage + pub(crate) fn estimated_memory_size(&self) -> usize { + self.interner.storage().size_in_bytes + self.indices.len() * 8 + } } impl Encoder for DictEncoder { From 23775bd8909f502024345609d55914bebf0dc08d Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 26 Jun 2024 20:45:57 -0700 Subject: [PATCH 03/14] test(5851): add memory_size() to in-progress test --- parquet/src/arrow/arrow_writer/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index faaa67e1c8ba..0bc00b6a3c3f 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -2922,6 +2922,7 @@ mod tests { // starts empty assert_eq!(writer.in_progress_size(), 0); assert_eq!(writer.in_progress_rows(), 0); + assert_eq!(writer.memory_size(), 0); assert_eq!(writer.bytes_written(), 4); // Initial header writer.write(&batch).unwrap(); @@ -2929,17 +2930,20 @@ mod tests { let initial_size = writer.in_progress_size(); assert!(initial_size > 0); assert_eq!(writer.in_progress_rows(), 5); + let initial_memory = writer.memory_size(); + assert!(initial_memory > 0); // updated on second write writer.write(&batch).unwrap(); assert!(writer.in_progress_size() > initial_size); assert_eq!(writer.in_progress_rows(), 10); + assert!(writer.memory_size() > initial_memory); // in progress tracking is cleared, but the overall data written is updated let pre_flush_bytes_written = writer.bytes_written(); writer.flush().unwrap(); assert_eq!(writer.in_progress_size(), 0); - assert_eq!(writer.in_progress_rows(), 0); + assert_eq!(writer.memory_size(), 0); assert!(writer.bytes_written() > pre_flush_bytes_written); writer.close().unwrap(); From 070275697bd4a74faa4ed377140db7d8984163f7 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 28 Jun 2024 08:00:14 -0700 Subject: [PATCH 04/14] chore(5851): update docs to make it more explicit what is the difference btwn memory_size vs get_estimated_total_byte --- parquet/src/arrow/arrow_writer/byte_array.rs | 10 ++++++++++ parquet/src/arrow/arrow_writer/mod.rs | 19 +++++++++++++------ parquet/src/column/writer/encoder.rs | 11 +++++++++-- parquet/src/column/writer/mod.rs | 8 ++++++++ .../src/encodings/encoding/dict_encoder.rs | 9 +++++++++ 5 files changed, 49 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 12b9f284a914..e709e5047f59 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -210,6 +210,10 @@ impl FallbackEncoder { } } + /// Returns an estimate of the data page size in bytes + /// + /// This includes: + /// + fn estimated_data_page_size(&self) -> usize { match &self.encoder { FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(), @@ -450,6 +454,8 @@ impl ColumnValueEncoder for ByteArrayEncoder { fn estimated_memory_size(&self) -> usize { match &self.dict_encoder { Some(encoder) => encoder.estimated_memory_size(), + // For the FallbackEncoder, these unflushed bytes are already encoded. + // Therefore, the size should be the same as estimated_data_page_size. None => self.fallback.estimated_data_page_size(), } } @@ -458,6 +464,10 @@ impl ColumnValueEncoder for ByteArrayEncoder { Some(self.dict_encoder.as_ref()?.estimated_dict_page_size()) } + /// Returns an estimate of the data page size in bytes + /// + /// This includes: + /// + fn estimated_data_page_size(&self) -> usize { match &self.dict_encoder { Some(encoder) => encoder.estimated_data_page_size(), diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 0bc00b6a3c3f..55c56dc230ea 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -205,8 +205,10 @@ impl ArrowWriter { /// Returns the estimated memory usage of the current in progress row group. /// - /// This includes the current encoded size of written bytes, as well as - /// the size of the unencoded data not yet flushed. + /// This includes: + /// + + + /// + /// In the vast majority of cases our unflushed bytes are already encoded. pub fn memory_size(&self) -> usize { match &self.in_progress { Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(), @@ -216,8 +218,8 @@ impl ArrowWriter { /// Returns the estimated length in encoded bytes of the current in progress row group. /// - /// This includes an estimate of any data that has not yet been flushed to a page, - /// based on it's anticipated encoded size. + /// This includes: + /// + pub fn in_progress_size(&self) -> usize { match &self.in_progress { Some(in_progress) => in_progress @@ -647,6 +649,11 @@ impl ArrowColumnWriter { /// /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate /// of the current memory usage and not it's anticipated encoded size. + /// + /// This includes: + /// + + + /// + /// In the vast majority of cases our unflushed bytes are already encoded. pub fn memory_size(&self) -> usize { match &self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(), @@ -656,8 +663,8 @@ impl ArrowColumnWriter { /// Returns the estimated total encoded bytes for this column writer. /// - /// This includes an estimate of any data that has not yet been flushed to a page, - /// based on it's anticipated encoded size. + /// This includes: + /// + pub fn get_estimated_total_bytes(&self) -> usize { match &self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _, diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 4c983c2dc8ea..c8fea3ea9f62 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -94,12 +94,18 @@ pub trait ColumnValueEncoder { fn has_dictionary(&self) -> bool; /// Returns the estimated total memory usage of the encoder + /// + /// This should include: + /// + + fn estimated_memory_size(&self) -> usize; /// Returns an estimate of the dictionary page size in bytes, or `None` if no dictionary fn estimated_dict_page_size(&self) -> Option; /// Returns an estimate of the data page size in bytes + /// + /// This should include: + /// + fn estimated_data_page_size(&self) -> usize; /// Flush the dictionary page for this column chunk if any. Any subsequent calls to @@ -232,9 +238,10 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { fn estimated_memory_size(&self) -> usize { match &self.dict_encoder { + // For this DictEncoder, we have unencoded bytes in the buffer. Some(encoder) => encoder.estimated_memory_size(), - // TODO: for now, we are ignoring the memory overhead for unflushed data - // in the other encoders (besides DictEncoder) + // Whereas for all other encoders the buffer contains encoded bytes. + // Therefore, we can use the estimated_data_encoded_size. _ => self.encoder.estimated_data_encoded_size(), } } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 78ff13ba453e..4dd12666244f 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -429,6 +429,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate /// of the current memory usage and not it's anticipated encoded size. + /// + /// This includes: + /// + + + /// + /// In the vast majority of cases our unflushed bytes are already encoded. #[cfg(feature = "arrow")] pub(crate) fn memory_size(&self) -> usize { self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() @@ -448,6 +453,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// Unlike [`Self::get_total_bytes_written`] this includes an estimate /// of any data that has not yet been flushed to a page, based on it's /// anticipated encoded size. + /// + /// This includes: + /// + #[cfg(feature = "arrow")] pub(crate) fn get_estimated_total_bytes(&self) -> u64 { self.column_metrics.total_bytes_written diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 64ad61611dc8..f0444800172f 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -145,6 +145,11 @@ impl DictEncoder { } /// Returns the estimated total memory usage + /// + /// For this encoder, the indices are unencoded bytes (refer to [`Self::write_indices`]). + /// + /// Therefore, we have a specific memory estimate (during encoding) of: + /// + pub(crate) fn estimated_memory_size(&self) -> usize { self.interner.storage().size_in_bytes + self.indices.len() * 8 } @@ -166,6 +171,10 @@ impl Encoder for DictEncoder { Encoding::PLAIN_DICTIONARY } + /// Returns an estimate of the data page size in bytes + /// + /// This includes: + /// + fn estimated_data_encoded_size(&self) -> usize { let bit_width = self.bit_width(); RleEncoder::max_buffer_size(bit_width, self.indices.len()) From 3f8681bddd0070d0802aded9d2ebd795c696047b Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 28 Jun 2024 09:41:32 -0700 Subject: [PATCH 05/14] feat(5851): clarify the ColumnValueEncoder::estimated_memory_size interface, and update impls to account for bloom filter size --- parquet/src/arrow/arrow_writer/byte_array.rs | 12 ++++++++++-- parquet/src/bloom_filter/mod.rs | 5 +++++ parquet/src/column/writer/encoder.rs | 12 ++++++++++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index e709e5047f59..c8dd276f0466 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -452,12 +452,20 @@ impl ColumnValueEncoder for ByteArrayEncoder { } fn estimated_memory_size(&self) -> usize { - match &self.dict_encoder { + let encoder_size = match &self.dict_encoder { Some(encoder) => encoder.estimated_memory_size(), // For the FallbackEncoder, these unflushed bytes are already encoded. // Therefore, the size should be the same as estimated_data_page_size. None => self.fallback.estimated_data_page_size(), - } + }; + + let bloom_filter_size = self + .bloom_filter + .as_ref() + .map(|bf| bf.byte_size()) + .unwrap_or_default(); + + encoder_size + bloom_filter_size } fn estimated_dict_page_size(&self) -> Option { diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index f746d8b9a8d7..83ce660e9aca 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -383,6 +383,11 @@ impl Sbbf { let block_index = self.hash_to_block_index(hash); self.0[block_index].check(hash as u32) } + + pub(crate) fn byte_size(&self) -> usize { + // each block = [u32; 8] + self.0.len() * 4 * 8 + } } // per spec we use xxHash with seed=0 diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index c8fea3ea9f62..7329aecee124 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -237,13 +237,21 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } fn estimated_memory_size(&self) -> usize { - match &self.dict_encoder { + let encoder_size = match &self.dict_encoder { // For this DictEncoder, we have unencoded bytes in the buffer. Some(encoder) => encoder.estimated_memory_size(), // Whereas for all other encoders the buffer contains encoded bytes. // Therefore, we can use the estimated_data_encoded_size. _ => self.encoder.estimated_data_encoded_size(), - } + }; + + let bloom_filter_size = self + .bloom_filter + .as_ref() + .map(|bf| bf.byte_size()) + .unwrap_or_default(); + + encoder_size + bloom_filter_size } fn estimated_dict_page_size(&self) -> Option { From dcefe9ec7396420a7d25aa8ab4b2e9b9b5f64f55 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 28 Jun 2024 09:48:56 -0700 Subject: [PATCH 06/14] feat(5851): account for stats array size in the ByteArrayEncoder --- parquet/src/arrow/arrow_writer/byte_array.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index c8dd276f0466..4b15a9bb419b 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -465,7 +465,10 @@ impl ColumnValueEncoder for ByteArrayEncoder { .map(|bf| bf.byte_size()) .unwrap_or_default(); - encoder_size + bloom_filter_size + let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default() + + self.max_value.as_ref().map(|v| v.len()).unwrap_or_default(); + + encoder_size + bloom_filter_size + stats_size } fn estimated_dict_page_size(&self) -> Option { From 678766bbbfb0fc85a134cd2abf28c0b388e1e02a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jul 2024 13:32:45 -0400 Subject: [PATCH 07/14] Refine documentation --- parquet/src/arrow/arrow_writer/mod.rs | 52 +++++++++++++++------------ parquet/src/column/writer/encoder.rs | 6 ++-- parquet/src/column/writer/mod.rs | 10 +----- 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 55c56dc230ea..b769dee499ac 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -84,15 +84,20 @@ mod levels; /// /// ## Memory Limiting /// -/// The nature of parquet forces buffering of an entire row group before it can be flushed -/// to the underlying writer. Data is buffered in its encoded form, to reduce memory usage, -/// but if writing rows containing large strings or very nested data, this may still result in -/// non-trivial memory usage. +/// The nature of parquet forces buffering of an entire row group before it can +/// be flushed to the underlying writer. Data is mostly buffered in its encoded +/// form, reducing memory usage. However, some data such as dictionary keys or +/// large strings or very nested data may still result in non-trivial memory +/// usage. /// -/// [`ArrowWriter::in_progress_size`] can be used to track the size of the buffered row group, -/// and potentially trigger an early flush of a row group based on a memory threshold and/or -/// global memory pressure. However, users should be aware that smaller row groups will result -/// in higher metadata overheads, and may worsen compression ratios and query performance. +/// See Also: +/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer. +/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group, +/// +/// Call [`Self::flush`] to trigger an early flush of a row group based on a +/// memory threshold and/or global memory pressure. However, smaller row groups +/// result in higher metadata overheads, and thus may worsen compression ratios +/// and query performance. /// /// ```no_run /// # use std::io::Write; @@ -101,7 +106,7 @@ mod levels; /// # let mut writer: ArrowWriter> = todo!(); /// # let batch: RecordBatch = todo!(); /// writer.write(&batch).unwrap(); -/// // Trigger an early flush if buffered size exceeds 1_000_000 +/// // Trigger an early flush if anticipated size exceeds 1_000_000 /// if writer.in_progress_size() > 1_000_000 { /// writer.flush().unwrap(); /// } @@ -203,12 +208,9 @@ impl ArrowWriter { self.writer.flushed_row_groups() } - /// Returns the estimated memory usage of the current in progress row group. - /// - /// This includes: - /// + + + /// Estimated memory usage, in bytes, of this `ArrowWriter` /// - /// In the vast majority of cases our unflushed bytes are already encoded. + /// See [`ArrowColumnWriter::memory_size`] for details pub fn memory_size(&self) -> usize { match &self.in_progress { Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(), @@ -216,10 +218,12 @@ impl ArrowWriter { } } - /// Returns the estimated length in encoded bytes of the current in progress row group. + /// Anticipated encoded size of the in progress row group. /// - /// This includes: - /// + + /// This estimate the row group size after being completely encoded is, + /// formed by summing the values of + /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress + /// columns. pub fn in_progress_size(&self) -> usize { match &self.in_progress { Some(in_progress) => in_progress @@ -645,15 +649,16 @@ impl ArrowColumnWriter { Ok(ArrowColumnChunk { data, close }) } - /// Returns the estimated total memory usage. + /// Returns the estimated total memory usage by the writer. /// - /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate + /// This [`Self::get_estimated_total_bytes`] this is an estimate /// of the current memory usage and not it's anticipated encoded size. /// /// This includes: - /// + + + /// 1. Data buffered in encoded form + /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys) /// - /// In the vast majority of cases our unflushed bytes are already encoded. + /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`] pub fn memory_size(&self) -> usize { match &self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(), @@ -664,7 +669,10 @@ impl ArrowColumnWriter { /// Returns the estimated total encoded bytes for this column writer. /// /// This includes: - /// + + /// 1. Data buffered in encoded form + /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded + /// + /// This value should be less than or equal to [`Self::memory_size`] pub fn get_estimated_total_bytes(&self) -> usize { match &self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _, diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 7329aecee124..3cb4215e3400 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -95,14 +95,12 @@ pub trait ColumnValueEncoder { /// Returns the estimated total memory usage of the encoder /// - /// This should include: - /// + + fn estimated_memory_size(&self) -> usize; - /// Returns an estimate of the dictionary page size in bytes, or `None` if no dictionary + /// Returns an estimate of the encoded size of dictionary page size in bytes, or `None` if no dictionary fn estimated_dict_page_size(&self) -> Option; - /// Returns an estimate of the data page size in bytes + /// Returns an estimate of the encoded data page size in bytes /// /// This should include: /// + diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 4dd12666244f..8594e59714dc 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -428,12 +428,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// Returns the estimated total memory usage. /// /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate - /// of the current memory usage and not it's anticipated encoded size. - /// - /// This includes: - /// + + - /// - /// In the vast majority of cases our unflushed bytes are already encoded. + /// of the current memory usage and not the final anticipated encoded size. #[cfg(feature = "arrow")] pub(crate) fn memory_size(&self) -> usize { self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() @@ -453,9 +448,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// Unlike [`Self::get_total_bytes_written`] this includes an estimate /// of any data that has not yet been flushed to a page, based on it's /// anticipated encoded size. - /// - /// This includes: - /// + #[cfg(feature = "arrow")] pub(crate) fn get_estimated_total_bytes(&self) -> u64 { self.column_metrics.total_bytes_written From aced4a1c0cbd4f8df6c8e6f6d13d06b51a2a1b10 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jul 2024 13:55:51 -0400 Subject: [PATCH 08/14] More accurate memory estimation --- parquet/src/arrow/arrow_writer/byte_array.rs | 9 +++++++-- parquet/src/bloom_filter/mod.rs | 6 +++--- parquet/src/column/writer/encoder.rs | 2 +- parquet/src/encodings/encoding/dict_encoder.rs | 5 +++++ parquet/src/util/interner.rs | 10 ++++++++++ 5 files changed, 26 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 4b15a9bb419b..49714c65f0ee 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -308,6 +308,11 @@ impl Storage for ByteArrayStorage { key as u64 } + + fn estimated_memory_size(&self) -> usize { + self.page.capacity() * std::mem::size_of::() + + self.values.capacity() * std::mem::size_of::>() + } } /// A dictionary encoder for byte array data @@ -339,7 +344,7 @@ impl DictEncoder { } fn estimated_memory_size(&self) -> usize { - self.interner.storage().page.len() + self.indices.len() * 8 + self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::() } fn estimated_data_page_size(&self) -> usize { @@ -462,7 +467,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { let bloom_filter_size = self .bloom_filter .as_ref() - .map(|bf| bf.byte_size()) + .map(|bf| bf.memory_size()) .unwrap_or_default(); let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default() diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 83ce660e9aca..a825acf9d977 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -384,9 +384,9 @@ impl Sbbf { self.0[block_index].check(hash as u32) } - pub(crate) fn byte_size(&self) -> usize { - // each block = [u32; 8] - self.0.len() * 4 * 8 + /// Return the total in memory size of this bloom filter in bytes + pub(crate) fn memory_size(&self) -> usize { + self.0.capacity() * std::mem::size_of::() } } diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 3cb4215e3400..2690e5eeab0f 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -246,7 +246,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { let bloom_filter_size = self .bloom_filter .as_ref() - .map(|bf| bf.byte_size()) + .map(|bf| bf.memory_size()) .unwrap_or_default(); encoder_size + bloom_filter_size diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index f0444800172f..7727d561d622 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -34,6 +34,7 @@ use crate::util::interner::{Interner, Storage}; struct KeyStorage { uniques: Vec, + /// size of unique values (keys) in the dictionary, in bytes. size_in_bytes: usize, type_length: usize, @@ -61,6 +62,10 @@ impl Storage for KeyStorage { self.uniques.push(value.clone()); key } + + fn estimated_memory_size(&self) -> usize { + self.size_in_bytes + self.uniques.capacity() * std::mem::size_of::() + } } /// Dictionary encoder. diff --git a/parquet/src/util/interner.rs b/parquet/src/util/interner.rs index a59ab8e7a31c..ec31e7e802b4 100644 --- a/parquet/src/util/interner.rs +++ b/parquet/src/util/interner.rs @@ -32,6 +32,9 @@ pub trait Storage { /// Adds a new element, returning the key fn push(&mut self, value: &Self::Value) -> Self::Key; + + /// Return an estimate of the memory used in this storage, in bytes + fn estimated_memory_size(&self) -> usize; } /// A generic value interner supporting various different [`Storage`] @@ -82,6 +85,13 @@ impl Interner { } } + /// Return estimate of the memory used, in bytes + pub fn estimated_memory_size(&self) -> usize { + self.storage.estimated_memory_size() + + // estimate size of dedup hashmap as just th size of the keys + self.dedup.capacity() + std::mem::size_of::() + } + /// Returns the storage for this interner pub fn storage(&self) -> &S { &self.storage From 3ca8839d637434a71e6205c5206c114bb1d91b3f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jul 2024 14:01:06 -0400 Subject: [PATCH 09/14] Improve tests --- parquet/src/arrow/arrow_writer/mod.rs | 11 +++++++++++ parquet/src/arrow/async_writer/mod.rs | 14 ++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index b769dee499ac..e8ad98f9ea6e 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -2947,12 +2947,23 @@ mod tests { assert_eq!(writer.in_progress_rows(), 5); let initial_memory = writer.memory_size(); assert!(initial_memory > 0); + // memory estimate is larger than estimated encoded size + assert!( + initial_size <= initial_memory, + "{initial_size} <= {initial_memory}" + ); // updated on second write writer.write(&batch).unwrap(); assert!(writer.in_progress_size() > initial_size); assert_eq!(writer.in_progress_rows(), 10); assert!(writer.memory_size() > initial_memory); + assert!( + writer.in_progress_size() <= writer.memory_size(), + "in_progress_size {} <= memory_size {}", + writer.in_progress_size(), + writer.memory_size() + ); // in progress tracking is cleared, but the overall data written is updated let pre_flush_bytes_written = writer.bytes_written(); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 0bedf1fcb731..016a8c907f9e 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -419,16 +419,30 @@ mod tests { let initial_size = writer.in_progress_size(); assert!(initial_size > 0); assert_eq!(writer.in_progress_rows(), batch.num_rows()); + let initial_memory = writer.memory_size(); + // memory estimate is larger than estimated encoded size + assert!( + initial_size <= initial_memory, + "{initial_size} <= {initial_memory}" + ); // updated on second write writer.write(&batch).await.unwrap(); assert!(writer.in_progress_size() > initial_size); assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2); + assert!(writer.memory_size() > initial_memory); + assert!( + writer.in_progress_size() <= writer.memory_size(), + "in_progress_size {} <= memory_size {}", + writer.in_progress_size(), + writer.memory_size() + ); // in progress tracking is cleared, but the overall data written is updated let pre_flush_bytes_written = writer.bytes_written(); writer.flush().await.unwrap(); assert_eq!(writer.in_progress_size(), 0); + assert_eq!(writer.memory_size(), 0); assert_eq!(writer.in_progress_rows(), 0); assert!(writer.bytes_written() > pre_flush_bytes_written); From 81e5efbf11bcebcf42f5b1cd0ad9791d2fd7b9db Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jul 2024 14:04:15 -0400 Subject: [PATCH 10/14] Update accounting for non dict encoded data --- parquet/src/arrow/arrow_writer/byte_array.rs | 2 +- parquet/src/bloom_filter/mod.rs | 2 +- parquet/src/column/writer/encoder.rs | 14 ++++++-------- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 49714c65f0ee..32786fba68cf 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -467,7 +467,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { let bloom_filter_size = self .bloom_filter .as_ref() - .map(|bf| bf.memory_size()) + .map(|bf| bf.estimated_memory_size()) .unwrap_or_default(); let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default() diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index a825acf9d977..d2acdcd0b2b8 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -385,7 +385,7 @@ impl Sbbf { } /// Return the total in memory size of this bloom filter in bytes - pub(crate) fn memory_size(&self) -> usize { + pub(crate) fn estimated_memory_size(&self) -> usize { self.0.capacity() * std::mem::size_of::() } } diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 2690e5eeab0f..cadadb80e25c 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -235,18 +235,16 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } fn estimated_memory_size(&self) -> usize { - let encoder_size = match &self.dict_encoder { - // For this DictEncoder, we have unencoded bytes in the buffer. - Some(encoder) => encoder.estimated_memory_size(), - // Whereas for all other encoders the buffer contains encoded bytes. - // Therefore, we can use the estimated_data_encoded_size. - _ => self.encoder.estimated_data_encoded_size(), - }; + let encoder_size = self + .dict_encoder + .as_ref() + .map(|encoder| encoder.estimated_memory_size()) + .unwrap_or_default(); let bloom_filter_size = self .bloom_filter .as_ref() - .map(|bf| bf.memory_size()) + .map(|bf| bf.estimated_memory_size()) .unwrap_or_default(); encoder_size + bloom_filter_size From 27ceff806ba4ada1469da31b6de3b3f3002cf595 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jul 2024 14:17:38 -0400 Subject: [PATCH 11/14] Include more memory size calculations --- parquet/src/column/writer/encoder.rs | 6 ++- .../encoding/byte_stream_split_encoder.rs | 5 +++ .../src/encodings/encoding/dict_encoder.rs | 16 ++++---- parquet/src/encodings/encoding/mod.rs | 40 +++++++++++++++++++ parquet/src/encodings/rle.rs | 7 ++++ parquet/src/util/bit_util.rs | 5 +++ 6 files changed, 68 insertions(+), 11 deletions(-) diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index cadadb80e25c..b6c8212608b8 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -235,7 +235,9 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } fn estimated_memory_size(&self) -> usize { - let encoder_size = self + let encoder_size = self.encoder.estimated_memory_size(); + + let dict_encoder_size = self .dict_encoder .as_ref() .map(|encoder| encoder.estimated_memory_size()) @@ -247,7 +249,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { .map(|bf| bf.estimated_memory_size()) .unwrap_or_default(); - encoder_size + bloom_filter_size + encoder_size + dict_encoder_size + bloom_filter_size } fn estimated_dict_page_size(&self) -> Option { diff --git a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs index a95487041cee..3d5ba4cc2d0b 100644 --- a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs +++ b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs @@ -90,4 +90,9 @@ impl Encoder for ByteStreamSplitEncoder { self.buffer.clear(); Ok(encoded.into()) } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.buffer.capacity() * std::mem::size_of::() + } } diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 7727d561d622..73fb03fe0cff 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -149,15 +149,6 @@ impl DictEncoder { num_required_bits(self.num_entries().saturating_sub(1) as u64) } - /// Returns the estimated total memory usage - /// - /// For this encoder, the indices are unencoded bytes (refer to [`Self::write_indices`]). - /// - /// Therefore, we have a specific memory estimate (during encoding) of: - /// + - pub(crate) fn estimated_memory_size(&self) -> usize { - self.interner.storage().size_in_bytes + self.indices.len() * 8 - } } impl Encoder for DictEncoder { @@ -188,4 +179,11 @@ impl Encoder for DictEncoder { fn flush_buffer(&mut self) -> Result { self.write_indices() } + + /// Returns the estimated total memory usage + /// + /// For this encoder, the indices are unencoded bytes (refer to [`Self::write_indices`]). + fn estimated_memory_size(&self) -> usize { + self.interner.storage().size_in_bytes + self.indices.len() * 8 + } } diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index 56ca68db7d75..ecc376712490 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -68,6 +68,9 @@ pub trait Encoder: Send { /// Method call must be O(1). fn estimated_data_encoded_size(&self) -> usize; + /// Returns an estimate of the memory use of this encoder, in bytes + fn estimated_memory_size(&self) -> usize; + /// Flushes the underlying byte buffer that's being processed by this encoder, and /// return the immutable copy of it. This will also reset the internal state. fn flush_buffer(&mut self) -> Result; @@ -158,6 +161,11 @@ impl Encoder for PlainEncoder { T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?; Ok(()) } + + /// Return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.buffer.capacity() * std::mem::size_of::() + self.bit_writer.estimated_memory_size() + } } // ---------------------------------------------------------------------- @@ -243,6 +251,13 @@ impl Encoder for RleValueEncoder { Ok(buf.into()) } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.encoder + .as_ref() + .map_or(0, |enc| enc.estimated_memory_size()) + } } // ---------------------------------------------------------------------- @@ -477,6 +492,15 @@ impl Encoder for DeltaBitPackEncoder { Ok(buffer.into()) } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.page_header_writer.estimated_memory_size() + + self.bit_writer.estimated_memory_size() + + self.deltas.capacity() * std::mem::size_of::() + + std::mem::size_of::() + + } } /// Helper trait to define specific conversions and subtractions when computing deltas @@ -614,6 +638,14 @@ impl Encoder for DeltaLengthByteArrayEncoder { Ok(total_bytes.into()) } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.len_encoder.estimated_memory_size() + + self.data.len() + + std::mem::size_of::() + + } } // ---------------------------------------------------------------------- @@ -717,6 +749,14 @@ impl Encoder for DeltaByteArrayEncoder { ), } } + + /// return the estimated memory size of this encoder. + fn estimated_memory_size(&self) -> usize { + self.prefix_len_encoder.estimated_memory_size() + + self.suffix_writer.estimated_memory_size() + + (self.previous.capacity() * std::mem::size_of::()) + + } } #[cfg(test)] diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 5d91c1e53d0f..581f14b3c99a 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -286,6 +286,13 @@ impl RleEncoder { } self.repeat_count = 0; } + + /// return the estimated memory size of this encoder. + pub(crate) fn estimated_memory_size(&self) -> usize { + self.bit_writer.estimated_memory_size() + + std::mem::size_of::() + + } } /// Size, in number of `i32s` of buffer to use for RLE batch reading diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 1ec764e2c869..eaaf3ee10279 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -329,6 +329,11 @@ impl BitWriter { let u: u64 = ((v << 1) ^ (v >> 63)) as u64; self.put_vlq_int(u) } + + /// Returns an estimate of the memory used, in bytes + pub fn estimated_memory_size(&self) -> usize { + self.buffer.capacity() * size_of::() + } } /// Maximum byte length for a VLQ encoded integer From 9043f20e5d0d0261c16c5fdc92fe7a8d7da9ddd9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jul 2024 14:22:05 -0400 Subject: [PATCH 12/14] clean up async writer --- parquet/src/arrow/arrow_writer/mod.rs | 3 ++- parquet/src/arrow/async_writer/mod.rs | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index e8ad98f9ea6e..f83e56c7abe6 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -210,7 +210,8 @@ impl ArrowWriter { /// Estimated memory usage, in bytes, of this `ArrowWriter` /// - /// See [`ArrowColumnWriter::memory_size`] for details + /// This estimate is formed bu summing the values of + /// [`ArrowColumnWriter::memory_size`] all in progress columns. pub fn memory_size(&self) -> usize { match &self.in_progress { Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(), diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 016a8c907f9e..edeb0fec00b7 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -176,7 +176,16 @@ impl AsyncArrowWriter { self.sync_writer.flushed_row_groups() } - /// Returns the estimated length in bytes of the current in progress row group + /// Estimated memory usage, in bytes, of this `ArrowWriter` + /// + /// See [ArrowWriter::memory_size] for more information. + pub fn memory_size(&self) -> usize { + self.sync_writer.memory_size() + } + + /// Anticipated encoded size of the in progress row group. + /// + /// See [ArrowWriter::memory_size] for more information. pub fn in_progress_size(&self) -> usize { self.sync_writer.in_progress_size() } From 6485b3c7dc19881b8ebd1fcd99a12a81dbe8c8d5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jul 2024 14:33:40 -0400 Subject: [PATCH 13/14] clippy --- parquet/src/arrow/arrow_writer/byte_array.rs | 1 + parquet/src/encodings/encoding/dict_encoder.rs | 2 +- parquet/src/util/interner.rs | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 32786fba68cf..fe0b69766645 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -309,6 +309,7 @@ impl Storage for ByteArrayStorage { key as u64 } + #[cfg(arrow)] fn estimated_memory_size(&self) -> usize { self.page.capacity() * std::mem::size_of::() + self.values.capacity() * std::mem::size_of::>() diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 73fb03fe0cff..98283b574ebb 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -184,6 +184,6 @@ impl Encoder for DictEncoder { /// /// For this encoder, the indices are unencoded bytes (refer to [`Self::write_indices`]). fn estimated_memory_size(&self) -> usize { - self.interner.storage().size_in_bytes + self.indices.len() * 8 + self.interner.storage().size_in_bytes + self.indices.len() * std::mem::size_of::() } } diff --git a/parquet/src/util/interner.rs b/parquet/src/util/interner.rs index ec31e7e802b4..f57fc3a71409 100644 --- a/parquet/src/util/interner.rs +++ b/parquet/src/util/interner.rs @@ -34,6 +34,7 @@ pub trait Storage { fn push(&mut self, value: &Self::Value) -> Self::Key; /// Return an estimate of the memory used in this storage, in bytes + #[allow(dead_code)] // not used in parquet_derive, so is dead there fn estimated_memory_size(&self) -> usize; } @@ -86,6 +87,7 @@ impl Interner { } /// Return estimate of the memory used, in bytes + #[allow(dead_code)] // not used in parquet_derive, so is dead there pub fn estimated_memory_size(&self) -> usize { self.storage.estimated_memory_size() + // estimate size of dedup hashmap as just th size of the keys From 099b00efc93af3eb71a5db4b6ad01e538b42352a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jul 2024 14:38:15 -0400 Subject: [PATCH 14/14] tweak --- parquet/src/arrow/arrow_writer/byte_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index fe0b69766645..fc37ebfb4510 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -309,7 +309,7 @@ impl Storage for ByteArrayStorage { key as u64 } - #[cfg(arrow)] + #[allow(dead_code)] // not used in parquet_derive, so is dead there fn estimated_memory_size(&self) -> usize { self.page.capacity() * std::mem::size_of::() + self.values.capacity() * std::mem::size_of::>()