Skip to content

Conversation

@gszadovszky
Copy link
Contributor

No description provided.

@gszadovszky gszadovszky requested a review from zivanfi February 20, 2019 13:31
@gszadovszky
Copy link
Contributor Author

@liupc, could you take a look?

@liupc
Copy link

liupc commented Feb 20, 2019

@gszadovszky
How would the extra preallocated 128MiB causing the OOM? I don't get it.

@gszadovszky
Copy link
Contributor Author

@liupc,

The relation between -Xmx and direct memory allocation is not clear to me. By definition, -Xmx should not directly limit the direct memory size (there is the specific config -XX:MaxDirectMemorySize for that) however, I can confirm that TestSnappyCodec fails with OOM: Direct buffer memory if I decrease the -Xmx from 512M to 256M. Also, @shangxinli can confirm that the same test fails with the same error on a Windows environment with the original 512M -Xmx value.

The memory consumption of parquet-mr can be estimated quite precisely based on the page and row-group size. For reading/writing, parquet-mr has to keep the current row-group in memory so the memory consumption is around the row-group size which default is 128M. Even if we use larger block sizes on HDFS we are usually around 512M or 1G. The additional 128M is still a significant additional memory consumption.
Parquet is usually used in multi-node clusters where the different components which uses the parquet-mr library are run in containers. The configuration of these containers are usually pretty strict in memory. In our internal environment (where I've already cherry-picked your change) we had some OOM issues so I can confirm that the addition 128M direct memory allocations cause issues in a production-like environment as well.

Parquet compress works at page level which default size is 1M. Therefore, it does not make sense to create such large buffers by default. Meanwhile, I don't have a good number for default size because page size can vary based on the current configuration to write or the configuration used for writing the file we are about the read.

It is also my mistake that I haven't discovered this problem during the code review.

@liupc
Copy link

liupc commented Feb 21, 2019

@gszadovszky Thanks for explaining, I didn't notice that the extra 128MiB would causes OOMs, because in our spark cluster, the default memoryOverhead(aka the directMemory) minimum is 384M.

Yes, it can causes such memory issues when the environment is limited for directMemory, for as what I known, the JVM maximum size of directMemory is 0.8 * XmxSize if no -XX:MaxDirectMemorySize is specified. So if the XmxSize is lower than 160M, the OOM: Direct buffer memory may be thrown.

The original idea of these codes is to preallocate memory for compression/decompression, for the setInput may be called many times, and also even without the preallocation, the codes will try to reset the inputBuffer and outputBuffer size back to 64MiB if they are growing large in the previous compression/decompression.

So I think maybe we can keep the reset logic and remove the preallocation codes.


@Override
public synchronized void reset() {
if (inputBuffer.capacity() > initialBufferSize) {
Copy link

@liupc liupc Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we can keep these codes in the reset call and lower the initialBufferSize to 4MiB or else, as event without the preallocation of 128MiB, the inputBuffer and writeBuffer can grow very large if we just see it from the API itself, because the setInput can be called many times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot see why the buffers would grow that much. We compress/decompress one page at a time so it should not grow much larger than the largest page which is usually much smaller than 64M. By default it is 1M.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know 4M is a good number to be. If multiple compressors/decompressors are instantiated in parallel, the number can go up. Let's say 20 decompressors and each initialized with 2*4M, that results in 160M direct memory. That could be a problem for some applications. I agree with liupc that "keep the reset logic and remove the preallocation codes" unless we see a huge penalty for not doing that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you instantiates 20 decompressors then you probably tries to read 20 parquet files in the same time in the same jvm. It means you'll need tons of memory to keep 20 row-groups in the memory. Comparing to that 160M would not be too much I guess.
Meanwhile, I am not against keeping the reset logic but what would be the size to be reset to? 0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about vectorized parquet reader? Does it instantiate the decompressor per column? I am not an expert of Parquet, so I could be wrong.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shangxinli
I think it's up to the implementation, either instantiating the decompressor per column or not can do this work. If you keep so many column compressor/decompressor in memory, then the memory consumed by the column data itself is much larger than 4MiB.
What's more, I suggest 4MiB here because the compress/decompress buffer size is 4MiB now. so I think it's ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the default page size is 1M(assume the typical usage is default), 4M still seems too large. It will be adjusted later anyway if it is smaller than needs. I would prefer low over high.


@Override
public synchronized void reset() {
if (inputBuffer.capacity() > initialBufferSize) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comments.

@gszadovszky
Copy link
Contributor Author

@liupc,
As we compress/decompress one page at a time it shall not grow much larger than the largest page which is usually much smaller than 64M. By default it is 1M. I don't feel it is that large to require a separate handling at reset().
You mentioned in the original JIRA that your production environment was failing with OOM. Do you have some concrete numbers how much direct space was allocated when the OOM happened? It would be even better to know how much direct ByteBuffer objects were created and what were they average size.

During re-thinking your code modification (and mine as well) I think, there is another potential memory leak. This is when the SnappyCompressor/SnappyDecompressor objects themselves are stopped using. Maybe, the end() method should do the cleanup however, I cannot see it is really invoked anywhere.

@gszadovszky gszadovszky reopened this Feb 21, 2019
@shangxinli
Copy link
Contributor

shangxinli commented Feb 21, 2019

@liupc,

The relation between -Xmx and direct memory allocation is not clear to me. By definition, -Xmx should not directly limit the direct memory size (there is the specific config -XX:MaxDirectMemorySize for that) however, I can confirm that TestSnappyCodec fails with OOM: Direct buffer memory if I decrease the -Xmx from 512M to 256M. Also, @shangxinli can confirm that the same test fails with the same error on a Windows environment with the original 512M -Xmx value.

The memory consumption of parquet-mr can be estimated quite precisely based on the page and row-group size. For reading/writing, parquet-mr has to keep the current row-group in memory so the memory consumption is around the row-group size which default is 128M. Even if we use larger block sizes on HDFS we are usually around 512M or 1G. The additional 128M is still a significant additional memory consumption.
Parquet is usually used in multi-node clusters where the different components which uses the parquet-mr library are run in containers. The configuration of these containers are usually pretty strict in memory. In our internal environment (where I've already cherry-picked your change) we had some OOM issues so I can confirm that the addition 128M direct memory allocations cause issues in a production-like environment as well.

Parquet compress works at page level which default size is 1M. Therefore, it does not make sense to create such large buffers by default. Meanwhile, I don't have a good number for default size because page size can vary based on the current configuration to write or the configuration used for writing the file we are about the read.

It is also my mistake that I haven't discovered this problem during the code review.

A small correction: My OOM is on MAC book Pro with OSX 10.14.1.

Regarding the relation of -Xmx and directory memory size, I think it is initialized as 64m arbitrarily and then reset, but the reset is tricky. If -XX:MaxDirectMemorySize is set, then it is reset to this size. Otherwise it will be Runtime.getRuntime().maxMemory() which is impacted by -Xmx a lot. Here is the jdk code https://github.com/frohoff/jdk8u-dev-jdk/blob/master/src/share/classes/sun/misc/VM.java#L186. You can check the comments of the code between line 180-188

In production, I only see once out of 50+ Java applications/services which explicitly set MaxDirectMemorySize. In most cases, developers don't set it in the arguments.

@gszadovszky
Copy link
Contributor Author

Thanks, @shangxinli for the explanation. However you are setting the maximum direct memory size or leave it with the default I think, 128M is large enough to significantly fail on any environment.

PS. I've no idea where I got "Windows" :)

@shangxinli
Copy link
Contributor

Thanks, @shangxinli for the explanation. However you are setting the maximum direct memory size or leave it with the default I think, 128M is large enough to significantly fail on any environment.

PS. I've no idea where I got "Windows" :)

Yes. I agree.

@liupc
Copy link

liupc commented Feb 22, 2019

@gszadovszky
In our production environment, the snappy offheap consumption is growing very large, about 1 ~ 3G size due to the leak.
I agree that the 128MiB is a bit aggressive, but actually we usually got the -Xmx larger than 200MiB.

  • MR default to 200MiB
  • Spark default to 1GiB
  • ....
    As @shangxinli also mentioned, it's related to -Xmx setting, and the -Xmx will be much larger than I mentioned above, so it can hardly cause issues in production environment. But we can still be more conservative and remove it for maybe the gain is not obvious for pre-allocating these buffers (I don't haven't got a benchmark result).

@liupc
Copy link

liupc commented Feb 22, 2019

During re-thinking your code modification (and mine as well) I think, there is another potential memory leak. This is when the SnappyCompressor/SnappyDecompressor objects themselves are stopped using. Maybe, the end() method should do the cleanup however, I cannot see it is really invoked anywhere.

@gszadovszky Yes, I think it's a potential leak. If the end is not called, the direct memory can not be released in time unless GC collections happen.
But it seems that it will not grow rapidly because it leaks only when the compression/decompression ends, so it's not a big problem but worth to fix.

@gszadovszky
Copy link
Contributor Author

About the initial size: We are handling pages here meaning that the buffer size should never be much larger than the largest page (reading or writing). A page is usually about 1M so it does not make sense to have the initial size so much larger. The problem is that I don't have a good guess in this ranges. If the actual page size is usually larger than the initial size then it would work just as with the 0 initial size. If the actual page size is usually less than the initial size then we would consume more memory then needed. That's why I would keep the 0 initial size.

About the reset: It might make sense to have a maximum value and if the buffers were grown larger than this value we might reset it back to some value. Based on the logic above this value might be 0 as well. However, it would not solve any possible OOM as if we need several compressors/decompressors in the same time (e.g. writing several parquet files in the same time or vectorization which I don't have deep knowledge about as it is implemented outside of parquet-mr) we would not reset them until all the reading/writing is done. So, we have to consume enough memory at least to fit the largest pages of the files.

What do you think? (I'll add the clean to the end() at least to fulfill the contract.)

@liupc
Copy link

liupc commented Feb 22, 2019

Yes, I agree to add the clean to the end() and also I think it's good to reset to 0 to be more memory friendly.
Thanks for your explaination @gszadovszky

@zivanfi
Copy link
Contributor

zivanfi commented Feb 22, 2019

I am concerned about the suggestion of resetting the size to 0 every time, because it leads to a guaranteed allocation the next time the buffer needs to be used. This is radically different from how things used to work, as originally an allocation only happened when the buffer needed to grow. This change would mean a lot more allocation, which may affect performance badly. It also doesn't seem necessary to me, because this change would not reduce the peak memory usage of the buffer, it would only shorten the duration of it to a single use of the buffer instead of a single use of the compressor, which does not seem to be a large gain.

My suggestion would be to remove the deallocation from reset(), since it is not strictly necessary for fixing the memory leak. Later it can be added in a follow-up change accompanied by benchmarks ensuring that it does not cause significant performance regression.

@gszadovszky
Copy link
Contributor Author

I agree with @zivanfi.
Now, added the clean to end() and kept reset as was before the fix of PARQUET-1485 (no doing any clean).

@shangxinli
Copy link
Contributor

I agree with 1) initialize to 0; 2) add clean to end().

@gszadovszky gszadovszky merged commit f799893 into apache:master Feb 25, 2019
@gudladona
Copy link

We are investigating a memory leak issue in our environment where the snappy off-heap consumption has grown to 3-4 GBs in size. Even after using the version of parquet-mr which has this fix we see similar growth. We suspect there is still an open leak in here. We do not see end() being invoked anywhere to clean up the buffer. Is there something missing here?

LantaoJin pushed a commit to LantaoJin/parquet-mr that referenced this pull request Jun 15, 2021
shangxinli added a commit to shangxinli/parquet-mr that referenced this pull request Mar 1, 2023
Summary:
Merge two commits from upstream

Revert "PARQUET-1381: Add merge blocks command to parquet-tools (apache#512)  apache#621
PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change apache#622

Reviewers: pavi, leisun

Reviewed By: leisun

Differential Revision: https://code.uberinternal.com/D2544359
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants