Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory account not adding up in SortExec #10073

Open
Tracked by #9846
westonpace opened this issue Apr 14, 2024 · 4 comments · May be fixed by #12288
Open
Tracked by #9846

Memory account not adding up in SortExec #10073

westonpace opened this issue Apr 14, 2024 · 4 comments · May be fixed by #12288
Labels
bug Something isn't working

Comments

@westonpace
Copy link
Member

westonpace commented Apr 14, 2024

Describe the bug

This is related / tangential to #9359

My primary problem is that I am trying to sort 100 million strings and always getting errors like:

Resources exhausted: Failed to allocate additional 39698712 bytes for ExternalSorterMerge[0] with 4053021824 bytes already allocated - maximum available is 26915520

After reading through the sort impl a bit I have noticed a few concerns (recorded in additional context)

To Reproduce

I don't have a df reproduction but this reproduces it for me in lance:

import pyarrow as pa
import lance

# Once the dataset has been generated you can comment out this generation code and reproduce the issue very quickly
print("Generating data")
my_strings = [f"string-{i}" * 3 for i in range(100 * 1024 * 1024)]                                                                                                                                               
my_table = pa.table({"my_strings": my_strings})                                                                                                                                                                  

print("Writing dataset")
ds = lance.write_dataset(                                                                                                                                                                                        
    my_table, "/tmp/big_strings.lance", mode="overwrite", schema=my_table.schema                                                                                                                                 
)                                                                                                                                                                                                                
del my_table                                                                                                                                                                                                     
# End of generation code

ds = lance.dataset("/tmp/big_strings.lance")
print("Training scalar index")
# To create a scalar index we must sort the column, this is where the error occurs
ds.create_scalar_index("my_strings", "BTREE")

Expected behavior

I can sort any number of strings, as long as I don't overflow the disk

Additional context

Here is how I understand memory accounting in the sort today:

  • As batches arrive, they are placed in an accumulation queue, and the size of the spillable reservation grows
  • Once the pool is out of space we begin the spill process
    • The first part of the spill process is to sort the accumulation queue (which, at this point, has many batches in it)
    • Each batch becomes an input stream for a SortPreservingMergeStream (this is a LOT of inputs).
    • When the batch input stream is polled the batch is sorted, the unsorted batch is dropped (and removed from the spillable reservation) and the sorted batch is returned. The SortPreservingMergeStream then puts this batch in the batch builder (which adds it to the non-spillable reservation). This is a problem, as described below
    • As the sort preserving merge stream is polled it polls the various inputs, fills up the batch builder, and then starts to emit output batches
  • Back in the sort exec the sort preserving merge stream is fully drained (try_collect). This is a problem, as described below
  • These collected and sorted batches are then written to the spill file.

The first problem (and the one causing my error) is that a sorted batch of strings (the output of sort_batch) is occupying 25% more memory than the unsorted batch of strings. I'm not sure if this buffer alignment, padding, or some kind of 2x allocation strategy used by the sort, but it seems reasonable something like this could happen. Unfortunately, this is a problem. We are spilling because we have used up the entire memory pool. We now take X bytes from the memory pool, convert it into 1.25 * X bytes, and try to put it back in the memory pool. This fails with the error listed above.

The second problem is that we are not accounting for the output of the sort perserving merge stream. Each output batch from the sort preserving merge stream is made up of rows from the various input batches. In the degenerate case, where the input data is fully random, this means we will probably require 2 * X bytes. This is because each output batch is made up of 1 batch from each input stream. We can't release any of the input batches until we emit the final output batch.

The solution to this second problem is that we should be streaming into the spill file. We should not collect from the sort preserving merge stream and then write the collected batches into the spill file. This problem is a bit less concerning for me at the moment because it is "datafusion uses more memory than it should" and not "datafusion is failing the plan with an error". We don't do a lot of sorting in lance and so we can work around it reasonably well by halving the size of the spill pool.

@westonpace westonpace added the bug Something isn't working label Apr 14, 2024
@alamb
Copy link
Contributor

alamb commented Apr 15, 2024

It sounds like the issue, at a (really) high level is "additional buffer space is required to actually implement the spill"

And since during spill the plan is under memory pressure, getting this additional memory can and does fail

Some strategies I can think of are:

  1. Simply turn off the memory accounting of intermediate results (String batches in your example) above during the spilling process (pro: simpler to implement I think, con: overshoots limits)
  2. Reserve additional buffer space up front to be used during spill (e.g. set aside 50MB). (pro: won't overshoot, cons: not clear how much is "enough" and will reduce amount of memory that can be reserved
  3. Reduce the memory required for intermediate spilling (e.g. maybe use a batch size 1/2 the size)

Maybe we can do 1 in the sort term while figuring out a more sophisticated strategy for 2 or 3

@comphead
Copy link
Contributor

I'm investigating this as part of #9359

@yjshen
Copy link
Member

yjshen commented Aug 25, 2024

Through examining the current implementation of multi-column sort's spill-to-disk strategies, I find we are asking for more memory during spill, which I think is worth discussing:
During the spill, Rows are created for comparison efficiency for each in-memory RecordBatch. Considering why we spill in the first place, does this Rows optimization increase the possibility of execution failure due to memory shortage?

I find this also related. #9528 (comment)

@yjshen
Copy link
Member

yjshen commented Aug 25, 2024

Another point of code worth noticing is inside the current sort_batch implementation:

let indices = if is_multi_column_with_lists(&sort_columns) {
// lex_sort_to_indices doesn't support List with more than one column
// https://github.com/apache/arrow-rs/issues/5454
lexsort_to_indices_multi_columns(sort_columns, fetch)?
} else {
lexsort_to_indices(&sort_columns, fetch)?
};

Performance-wise, I think it's beneficial to apply the row format comparison to all multi-column cases, however, while considering sort_batch is used in multiple places where spill is being called; creating rows before comparing would introduce more memory pressure.

BTW, I think we should report memory usage inside lexsort_to_indices_multi_columns:

let converter = RowConverter::new(fields)?;
let rows = converter.convert_columns(&columns)?;
let mut sort: Vec<_> = rows.iter().enumerate().collect();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants