Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(5851): ArrowWriter memory usage #5967

Merged
merged 15 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl FallbackEncoder {
}
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize {
match &self.encoder {
FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
Expand Down Expand Up @@ -334,6 +338,10 @@ impl DictEncoder {
num_required_bits(length.saturating_sub(1) as u64)
}

fn estimated_memory_size(&self) -> usize {
self.interner.storage().page.len() + self.indices.len() * 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to account for the interner's dedup hash table. I added some code to do this in XX

Also, in general a more accurate estimate of memory usage is capacity() * element_size

}

fn estimated_data_page_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
Expand Down Expand Up @@ -443,10 +451,23 @@ impl ColumnValueEncoder for ByteArrayEncoder {
self.dict_encoder.is_some()
}

fn estimated_memory_size(&self) -> usize {
match &self.dict_encoder {
wiedld marked this conversation as resolved.
Show resolved Hide resolved
Some(encoder) => encoder.estimated_memory_size(),
// For the FallbackEncoder, these unflushed bytes are already encoded.
// Therefore, the size should be the same as estimated_data_page_size.
None => self.fallback.estimated_data_page_size(),
}
}

fn estimated_dict_page_size(&self) -> Option<usize> {
Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.estimated_data_page_size(),
Expand Down
45 changes: 42 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,23 @@ impl<W: Write + Send> ArrowWriter<W> {
self.writer.flushed_row_groups()
}

/// Returns the estimated length in bytes of the current in progress row group
/// Returns the estimated memory usage of the current in progress row group.
///
/// This includes:
/// <already_written_encoded_byte_size> + <current_memory_size_of_unflushed_bytes> + <bytes_associated_with_processing>
///
/// In the vast majority of cases our unflushed bytes are already encoded.
pub fn memory_size(&self) -> usize {
match &self.in_progress {
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
None => 0,
Comment on lines +216 to +218
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to understand that this calculation actually also includes the in_progress_size too

What do you think about making this more explicit like

Suggested change
match &self.in_progress {
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
None => 0,
match &self.in_progress {
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size() + x.get_estimated_total_bytes()).sum(),
None => 0,

And then change code like

    /// Returns the estimated total memory usage.
    ///
    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
    /// of the current memory usage and not it's anticipated encoded size.
    #[cfg(feature = "arrow")]
    pub(crate) fn memory_size(&self) -> usize {
        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
    }

to only include the estimated memory size:

    /// Returns the estimated total memory buffer usage.
    #[cfg(feature = "arrow")]
    pub(crate) fn memory_size(&self) -> usize {
        self.encoder.estimated_memory_size()
    }

Copy link
Contributor Author

@wiedld wiedld Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in_progress_size includes the flushed_bytes + unflushed_bytes.
The memory_size includes flushed_bytes + unflushed_bytes + processing_size.

At first glance, it does look like we could do memory_size = in_progress_size + processing_size. But what the calculation actually is:
in_progress_size = flushed_bytes + unflushed_bytes_anticipated_size_after_flush
memory_size = flushed_bytes + unflushed_bytes_current_mem_size + processing_size.

Per our approach to memory limiting, we should have unflushed bytes (in buffer) already be encoded. However, I believe that's not the case for the indices on the dict encoder? As such, the accounting in this PR specifically considers unflush_bytes seperately -- and pushes down the new memory_size() interface until where we can delineate the DictEncoder vs other encoders.

I added a commit to help explain. Please let me know if I misunderstood. 🙏🏼

Copy link
Contributor

@alamb alamb Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking from a user's point of view (e.g. our usecase in InfluxDB )

If I want to know how much memory the ArrowWriter is using (so I can track it, against a budget for example) what API should I use?

I was worried that I couldn't use

  1. ArrowWriter::memory_size because that does not include the estimated data pages sizes AND I couldn't use
  2. ArrowWriter::memory_size() + ArrowWriter::in_progress_size() because that would double count perviously buffered data.

However, after reviewing the code more closely I see the difference is that ArrowWriter::in_progress_size includes an estimate of how large the parquet encoded data would be after flush (which is not actually memory currently used) which presumably in most cases will be smaller than the actual bytes used. I will try and updated the comments as well to clarify this

}
}

/// Returns the estimated length in encoded bytes of the current in progress row group.
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
pub fn in_progress_size(&self) -> usize {
match &self.in_progress {
Some(in_progress) => in_progress
Expand Down Expand Up @@ -629,7 +645,26 @@ impl ArrowColumnWriter {
Ok(ArrowColumnChunk { data, close })
}

/// Returns the estimated total bytes for this column writer
/// Returns the estimated total memory usage.
///
/// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
/// of the current memory usage and not it's anticipated encoded size.
///
/// This includes:
/// <already_written_encoded_byte_size> + <current_memory_size_of_unflushed_bytes> + <bytes_associated_with_processing>
///
/// In the vast majority of cases our unflushed bytes are already encoded.
pub fn memory_size(&self) -> usize {
match &self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
ArrowColumnWriterImpl::Column(c) => c.memory_size(),
}
}

/// Returns the estimated total encoded bytes for this column writer.
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
pub fn get_estimated_total_bytes(&self) -> usize {
match &self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
Expand Down Expand Up @@ -2894,24 +2929,28 @@ mod tests {
// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.memory_size(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial header
writer.write(&batch).unwrap();

// updated on write
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), 5);
let initial_memory = writer.memory_size();
assert!(initial_memory > 0);

// updated on second write
writer.write(&batch).unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), 10);
assert!(writer.memory_size() > initial_memory);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.memory_size(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().unwrap();
Expand Down
19 changes: 19 additions & 0 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,19 @@ pub trait ColumnValueEncoder {
/// Returns true if this encoder has a dictionary page
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise, while this trait is marked pub it is not exposed outside the crate: https://docs.rs/parquet/latest/parquet/?search=ColumnValueEncoder

fn has_dictionary(&self) -> bool;

/// Returns the estimated total memory usage of the encoder
alamb marked this conversation as resolved.
Show resolved Hide resolved
///
/// This should include:
/// <already_written_encoded_byte_size> + <current_memory_size_of_unflushed_bytes> + <bytes_associated_with_processing>
fn estimated_memory_size(&self) -> usize;

/// Returns an estimate of the dictionary page size in bytes, or `None` if no dictionary
fn estimated_dict_page_size(&self) -> Option<usize>;

/// Returns an estimate of the data page size in bytes
///
/// This should include:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize;

/// Flush the dictionary page for this column chunk if any. Any subsequent calls to
Expand Down Expand Up @@ -227,6 +236,16 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
self.dict_encoder.is_some()
}

fn estimated_memory_size(&self) -> usize {
match &self.dict_encoder {
// For this DictEncoder, we have unencoded bytes in the buffer.
Some(encoder) => encoder.estimated_memory_size(),
// Whereas for all other encoders the buffer contains encoded bytes.
// Therefore, we can use the estimated_data_encoded_size.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the phrase "all other encoders" should be changed to "it's presumed for all other encoders" since moving target.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this carefully and I think it is worth not intermixing the encoded estimate with the memory usage, so I took the liberty of implementing estimated_memory_size for Encoder as well

_ => self.encoder.estimated_data_encoded_size(),
}
}

fn estimated_dict_page_size(&self) -> Option<usize> {
Some(self.dict_encoder.as_ref()?.dict_encoded_size())
}
Expand Down
30 changes: 27 additions & 3 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ pub enum ColumnWriter<'a> {
}

impl<'a> ColumnWriter<'a> {
/// Returns the estimated total bytes for this column writer
/// Returns the estimated total memory usage
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
downcast_writer!(self, typed, typed.memory_size())
}

/// Returns the estimated total encoded bytes for this column writer
#[cfg(feature = "arrow")]
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
downcast_writer!(self, typed, typed.get_estimated_total_bytes())
Expand Down Expand Up @@ -419,6 +425,20 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
)
}

/// Returns the estimated total memory usage.
///
/// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
/// of the current memory usage and not it's anticipated encoded size.
///
/// This includes:
/// <already_written_encoded_byte_size> + <current_memory_size_of_unflushed_bytes> + <bytes_associated_with_processing>
///
/// In the vast majority of cases our unflushed bytes are already encoded.
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
}

/// Returns total number of bytes written by this column writer so far.
/// This value is also returned when column writer is closed.
///
Expand All @@ -428,10 +448,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_metrics.total_bytes_written
}

/// Returns the estimated total bytes for this column writer
/// Returns the estimated total encoded bytes for this column writer.
///
/// Unlike [`Self::get_total_bytes_written`] this includes an estimate
/// of any data that has not yet been flushed to a page
/// of any data that has not yet been flushed to a page, based on it's
/// anticipated encoded size.
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
#[cfg(feature = "arrow")]
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
self.column_metrics.total_bytes_written
Expand Down
14 changes: 14 additions & 0 deletions parquet/src/encodings/encoding/dict_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ impl<T: DataType> DictEncoder<T> {
fn bit_width(&self) -> u8 {
num_required_bits(self.num_entries().saturating_sub(1) as u64)
}

/// Returns the estimated total memory usage
///
/// For this encoder, the indices are unencoded bytes (refer to [`Self::write_indices`]).
///
/// Therefore, we have a specific memory estimate (during encoding) of:
/// <already_written_encoded_byte_size> + <current_memory_size_of_unflushed_bytes>
pub(crate) fn estimated_memory_size(&self) -> usize {
self.interner.storage().size_in_bytes + self.indices.len() * 8
}
}

impl<T: DataType> Encoder<T> for DictEncoder<T> {
Expand All @@ -161,6 +171,10 @@ impl<T: DataType> Encoder<T> for DictEncoder<T> {
Encoding::PLAIN_DICTIONARY
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_encoded_size(&self) -> usize {
let bit_width = self.bit_width();
RleEncoder::max_buffer_size(bit_width, self.indices.len())
Expand Down