-
Notifications
You must be signed in to change notification settings - Fork 737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(5851): ArrowWriter memory usage #5967
feat(5851): ArrowWriter memory usage #5967
Conversation
… ArrowWriter and column writers
…mentations and the DictEncoder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wiedld -- this is looking great
My biggest comment / confusion about this PR is that as written it was somewhat unclear that ArrowWriter::memory_size
includes both the size of in progress buffers (not yet tracked by in_progress_size
) AND in_progress_size
). And then it was hard for me to convince myself that the code correctly accounts for both (as the memory use calculation needs to be a superset of in_progress_size and other memory)
I think as written and explained this is confusing. I think we should:
- Update the documentation to make it clear that
ArrowWriter::memory_size
includes both calculations (for example, explicitly say that the internal buffer size ismemory_size() - in_progress_size()
) - Change the implementation of
ArrowWriter::memory_size
to explicitly add the in progress estimated_total_bytes and memory size so it is easier to verify (I left a more specific suggestion inline)
In addition, can we update the docs here:
https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#memory-limiting
To mention estimated_memory_size
? as well
Something like "The writer itself has internal buffers which can consume substantial amounts of memory, especially for data that encodes very efficiently. ArrowWriter::memory_size
can be used to track the size of these internal memory buffers "
match &self.in_progress { | ||
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(), | ||
None => 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a while to understand that this calculation actually also includes the in_progress_size
too
What do you think about making this more explicit like
match &self.in_progress { | |
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(), | |
None => 0, | |
match &self.in_progress { | |
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size() + x.get_estimated_total_bytes()).sum(), | |
None => 0, |
And then change code like
/// Returns the estimated total memory usage.
///
/// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
/// of the current memory usage and not it's anticipated encoded size.
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
}
to only include the estimated memory size:
/// Returns the estimated total memory buffer usage.
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
self.encoder.estimated_memory_size()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The in_progress_size
includes the flushed_bytes + unflushed_bytes.
The memory_size
includes flushed_bytes + unflushed_bytes + processing_size.
At first glance, it does look like we could do memory_size = in_progress_size + processing_size
. But what the calculation actually is:
in_progress_size = flushed_bytes + unflushed_bytes_anticipated_size_after_flush
memory_size = flushed_bytes + unflushed_bytes_current_mem_size
+ processing_size.
Per our approach to memory limiting, we should have unflushed bytes (in buffer) already be encoded. However, I believe that's not the case for the indices on the dict encoder? As such, the accounting in this PR specifically considers unflush_bytes seperately -- and pushes down the new memory_size()
interface until where we can delineate the DictEncoder vs other encoders.
I added a commit to help explain. Please let me know if I misunderstood. 🙏🏼
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking from a user's point of view (e.g. our usecase in InfluxDB )
If I want to know how much memory the ArrowWriter is using (so I can track it, against a budget for example) what API should I use?
I was worried that I couldn't use
ArrowWriter::memory_size
because that does not include the estimated data pages sizes AND I couldn't useArrowWriter::memory_size() + ArrowWriter::in_progress_size()
because that would double count perviously buffered data.
However, after reviewing the code more closely I see the difference is that ArrowWriter::in_progress_size
includes an estimate of how large the parquet encoded data would be after flush (which is not actually memory currently used) which presumably in most cases will be smaller than the actual bytes used. I will try and updated the comments as well to clarify this
b88ac6f
to
25738dc
Compare
…nce btwn memory_size vs get_estimated_total_byte
…erface, and update impls to account for bloom filter size
25738dc
to
dcefe9e
Compare
parquet/src/column/writer/encoder.rs
Outdated
// Whereas for all other encoders the buffer contains encoded bytes. | ||
// Therefore, we can use the estimated_data_encoded_size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the phrase "all other encoders" should be changed to "it's presumed for all other encoders" since moving target.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed this carefully and I think it is worth not intermixing the encoded estimate with the memory usage, so I took the liberty of implementing estimated_memory_size
for Encoder
as well
For benchmarking, you may want to include allocators that are less prone to memory fragmentation like snmalloc (rust crate). |
@@ -334,6 +338,10 @@ impl DictEncoder { | |||
num_required_bits(length.saturating_sub(1) as u64) | |||
} | |||
|
|||
fn estimated_memory_size(&self) -> usize { | |||
self.interner.storage().page.len() + self.indices.len() * 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need to account for the interner's dedup hash table. I added some code to do this in XX
Also, in general a more accurate estimate of memory usage is capacity() * element_size
Sorry I lost some of my in progress comments -- I spent some time trying to clarify the documentation for this PR along with the implementation (I threaded through some additional memory calculations for encoder). Let me know what you think @wiedld |
/// Estimated memory usage, in bytes, of this `ArrowWriter` | ||
/// | ||
/// See [ArrowWriter::memory_size] for more information. | ||
pub fn memory_size(&self) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added these wrappers for symmetry with the other AsyncWriter methods
@@ -32,6 +32,9 @@ pub trait Storage { | |||
|
|||
/// Adds a new element, returning the key | |||
fn push(&mut self, value: &Self::Value) -> Self::Key; | |||
|
|||
/// Return an estimate of the memory used in this storage, in bytes | |||
fn estimated_memory_size(&self) -> usize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite this being 'pub trait' it is not pub
outside the module: https://docs.rs/parquet/latest/parquet/?search=storage
@@ -93,10 +93,17 @@ pub trait ColumnValueEncoder { | |||
/// Returns true if this encoder has a dictionary page |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise, while this trait is marked pub
it is not exposed outside the crate: https://docs.rs/parquet/latest/parquet/?search=ColumnValueEncoder
parquet/src/bloom_filter/mod.rs
Outdated
/// Return the total in memory size of this bloom filter in bytes | ||
pub(crate) fn memory_size(&self) -> usize { | ||
self.0.capacity() * std::mem::size_of::<Block>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼
|
||
fn estimated_memory_size(&self) -> usize { | ||
self.page.capacity() * std::mem::size_of::<u8>() | ||
+ self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼
pub fn estimated_memory_size(&self) -> usize { | ||
self.storage.estimated_memory_size() + | ||
// estimate size of dedup hashmap as just th size of the keys | ||
self.dedup.capacity() + std::mem::size_of::<S::Key>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼
I plan to merge this PR in an hour or two unless anyone else would like time to review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot approve my own PR; so consider this an approval of @alamb 's changes 😆 .
Thanks again @wiedld |
Which issue does this PR close?
Closes #5851
(Altho followup PRs may add bytes to the accounting.)
Rationale for this change
We have several profiling test cases which compare datafusion's tracked MemoryReservations with the actual peak memory usage. The largest single difference was in the tracking of memory used during the parquet encoding (via ArrowWriter). Here is a summary of the discrepancy per test case:
These^^ results provided significant motivation to fulfill the existing upstream feature request, to provide an ArrowWriter API for memory size used during encoding (refer to #5851). Currently, we have been reserving memory bytes based upon the anticipated encoded (compressed) size, as that was the only API available on the ArrowWriter.
This PR introduce a new
memory_size()
API, defined as both the already encoded size plus the uncompressed/unflushed bytes in buffer. Next, we limited our accounting of unflushed bytes to the DictEncoder, (although future PRs may expand this accounting). This change alone had a significant impact on our test case 3:Accounting for the DictEncoder unflushed bytes has improved our memory tracking by ~2 GBs in this test case. We anticipate followup PRs which expand this
memory_size()
accounting to cover our other test cases as well.What changes are included in this PR?
Are there any user-facing changes?
Yes, the new
ArrowWriter::memory_size()
API.