Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Decompressor release memory buffer #11987

Closed
wants to merge 3 commits into from

Conversation

luyuncheng
Copy link
Contributor

Description

we have a es cluster(31G heap, 96G Mem, 30 instance nodes), with many shards per node(4000 per nodes), when nodes do many bulk and search requests concurrently, we can see the jvm going high memory usage, and can not release the memory even with the frequently GC and stop all write/search requests. we have to restart the node for recovery the heap, like the following GC metrics shows
image

we dumped the heap shows, CompressingStoredFieldsReader oncupied 70% heap:
image

all this reader path2GC roots shows with following(maybe in search or write thread):
image

Root cause

i think the root cause that these threadlocal holds the referent, because SegmentReader#getFieldsReader calling following code, and Elasticsearch always using fixed thread_pool and never calling CloseableThreadLocal#purge

In `lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java` defined fieldsReaderLocal
  final CloseableThreadLocal<StoredFieldsReader> fieldsReaderLocal =
      new CloseableThreadLocal<StoredFieldsReader>() {
        @Override
        protected StoredFieldsReader initialValue() {
          return fieldsReaderOrig.clone();
        }
      };

we have searched some issues like LUCENE-9959 , and LUCENE-10419, there is no answer for this problem


i compare between different jvm heap, and different LUCENE versions, i think the root cause is LZ4WithPresetDictDecompressor would allocate a buffer in the class and init

    LZ4WithPresetDictDecompressor() {
      compressedLengths = new int[0];
      buffer = new byte[0];
    }

when the elasticsearch instance doing Stored-Fields-Read operations, it will reallocate the JVM heap. but without release, because es currentEngineReference will keep the reference
image

Proposal

i think we can releasee this buffer memory when the decompress is done. it shows that jvm can holds more segment readers in the heap.
when these buffer memory can release, the heap metrics shows as following:
image

@rmuir
Copy link
Member

rmuir commented Nov 30, 2022

too many shards. need to make sure this doesn't cause performance regression for normal use-cases.

@rmuir
Copy link
Member

rmuir commented Nov 30, 2022

fwiw, assigning the 0-length array just makes even more waste. Still keeping logic to use arrayutil.grow to oversize the arrays when they won't be reused even more just adds more waste.

better to assign null and create array of the correct size, if it won't be reused.

@luyuncheng
Copy link
Contributor Author

@rmuir Thanks for the replying this issue, i did some benchmarks:

better to assign null and create array of the correct size, if it won't be reused.

LGTM , i assigned null at commit 0dfb7c

need to make sure this doesn't cause performance regression for normal use-cases.

I try to run benchmark shows little regression as follows:

runStoredFieldsBenchmark.py

Baseline Candidate
BEST_SPEED 313241.00 313355.00
BEST_COMPRESSION 606150.00 605147.00

localrun.py -source wikimedium1m:

                            TaskQPS baseline      StdDevQPS my_modified_version      StdDev                Pct diff p-value
               HighTermMonthSort     1166.94      (9.6%)     1110.96      (6.9%)   -4.8% ( -19% -   13%) 0.070
           BrowseMonthSSDVFacets       70.78     (13.1%)       68.11      (3.3%)   -3.8% ( -17% -   14%) 0.212
                         MedTerm      473.43      (6.0%)      461.52      (5.5%)   -2.5% ( -13% -    9%) 0.166
                          IntNRQ      205.75     (17.3%)      200.82     (19.4%)   -2.4% ( -33% -   41%) 0.681
       BrowseDayOfYearSSDVFacets       63.52     (13.0%)       62.16     (10.8%)   -2.1% ( -22% -   24%) 0.570
           BrowseMonthTaxoFacets       57.51     (28.9%)       56.46     (22.4%)   -1.8% ( -41% -   69%) 0.823
                     AndHighHigh      166.48      (4.4%)      164.56      (4.6%)   -1.2% (  -9% -    8%) 0.418
                        HighTerm      429.10      (5.4%)      425.86      (4.5%)   -0.8% ( -10% -    9%) 0.633
                      AndHighMed      225.64      (4.6%)      224.60      (5.0%)   -0.5% (  -9% -    9%) 0.759
                     LowSpanNear      114.76      (4.1%)      114.25      (3.5%)   -0.4% (  -7% -    7%) 0.714
             MedIntervalsOrdered       78.89      (5.9%)       78.60      (5.1%)   -0.4% ( -10% -   11%) 0.832
            HighIntervalsOrdered       21.65      (9.1%)       21.57      (7.9%)   -0.3% ( -15% -   18%) 0.902
                     MedSpanNear      186.08      (3.5%)      185.51      (4.9%)   -0.3% (  -8% -    8%) 0.821
             LowIntervalsOrdered       24.31      (5.7%)       24.28      (5.6%)   -0.1% ( -10% -   11%) 0.950
                        PKLookup      135.52      (7.1%)      135.50      (4.9%)   -0.0% ( -11% -   12%) 0.993
                        Wildcard       49.84     (11.7%)       49.85     (12.3%)    0.0% ( -21% -   27%) 0.996
                          Fuzzy1       79.17      (4.2%)       79.21      (4.3%)    0.0% (  -8% -    8%) 0.971
                       LowPhrase      116.62      (3.4%)      116.86      (4.1%)    0.2% (  -7% -    7%) 0.859
                          Fuzzy2       98.02      (4.3%)       98.23      (4.3%)    0.2% (  -8% -    9%) 0.872
           HighTermDayOfYearSort      198.72      (9.3%)      199.23      (5.3%)    0.3% ( -13% -   16%) 0.913
                         LowTerm      771.21      (5.1%)      773.32      (4.3%)    0.3% (  -8% -   10%) 0.854
                      OrHighHigh       62.86      (4.2%)       63.07      (6.0%)    0.3% (  -9% -   10%) 0.845
                    HighSpanNear       30.21      (5.4%)       30.33      (5.2%)    0.4% (  -9% -   11%) 0.810
            BrowseDateSSDVFacets       10.94     (14.0%)       10.99     (16.1%)    0.4% ( -26% -   35%) 0.925
                       MedPhrase      317.58      (4.7%)      319.13      (6.3%)    0.5% ( -10% -   12%) 0.781
            BrowseDateTaxoFacets       53.51     (26.8%)       53.81     (20.1%)    0.6% ( -36% -   64%) 0.941
                         Respell       95.92      (5.3%)       96.55      (4.3%)    0.7% (  -8% -   10%) 0.671
     BrowseRandomLabelTaxoFacets       45.89     (23.3%)       46.20     (18.8%)    0.7% ( -33% -   55%) 0.918
       BrowseDayOfYearTaxoFacets       53.49     (26.8%)       53.89     (20.0%)    0.8% ( -36% -   64%) 0.920
                      HighPhrase        7.87      (4.3%)        7.94      (6.1%)    0.9% (  -9% -   11%) 0.612
                         Prefix3      414.44      (4.6%)      419.19      (7.4%)    1.1% ( -10% -   13%) 0.557
                       OrHighMed      155.31      (4.2%)      157.71      (3.9%)    1.5% (  -6% -   10%) 0.226
                      AndHighLow     1053.26      (8.5%)     1069.91      (7.1%)    1.6% ( -12% -   18%) 0.523
                 MedSloppyPhrase       24.43      (3.8%)       24.83      (4.6%)    1.6% (  -6% -   10%) 0.220
                 LowSloppyPhrase      133.87      (3.3%)      136.89      (5.0%)    2.3% (  -5% -   10%) 0.091
                HighSloppyPhrase       20.51      (4.5%)       20.99      (4.4%)    2.3% (  -6% -   11%) 0.102
     BrowseRandomLabelSSDVFacets       44.18      (3.0%)       45.95      (6.1%)    4.0% (  -5% -   13%) 0.009
                       OrHighLow      331.66      (5.7%)      345.34      (5.9%)    4.1% (  -7% -   16%) 0.024

@rmuir
Copy link
Member

rmuir commented Nov 30, 2022

Thanks for running the stored fields benchmark: are you able to report the retrieval time as well? That's my first concern. Maybe, the StoredFieldsBenchmark.java needs to be run standalone to report it, here is the relevant code: https://github.com/mikemccand/luceneutil/blob/master/src/main/perf/StoredFieldsBenchmark.java#L89-L101

My other concern would be if we create too much pressure on GC for unoptimized merges. The StoredFieldsBenchmark uses geonames and does not delete/update documents, so it would never exercise this path much.
We could run the StoredFieldsBenchmark before and after the change with -Dorg.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsWriter.enableBulkMerge=false to force the slow merge path.

In all cases when running the benchmark, we may want to explicitly supply smaller heap (-Xmx), since the dataset is not very big and otherwise jvm may allocate a huge heap, dodging any GC impacts that we want to see.

Thank you again for benchmarking, if you run into trouble I can try to help run these benchmarks too.

@luyuncheng
Copy link
Contributor Author

are you able to report the retrieval time as well?

runStoredFieldsBenchmark.py

retrieved_time_msec Baseline Candidate
BEST_SPEED 260.71 283.29
BEST_COMPRESSION 2586.35 2553.10

if we create too much pressure on GC for unoptimized merges.

I will do this benchmark.

we may want to explicitly supply smaller heap

We have been met a situation: a es node with max_shards_per_node default 1000, and normally one shard with 40 segments, one buffer with retained heap: 100KB, so this would use 4G resident heap memory.
i think with smaller heap, this PR would cause more frequent GC when segment number is not big to influent the resident heap.

@rmuir
Copy link
Member

rmuir commented Nov 30, 2022

Thanks, yeah my remaining concern is the non-optimized merge... especially for those that delete and update documents (as it prevents them from getting optimized merges).

Alternative solution to this issue might be, instead of removing the reuse from Decompressor, to instead try removing the stored fields/term vectors CloseableThreadLocals from SegmentCoreReaders.... this is more difficult as we'd have to change APIs around IndexReader to no longer call document(i) but instead call e.g. getFieldsReader() and then pull documents from that.

It might alleviate the pressure, while still allowing merge to reuse stuff efficiently and queries to reuse stuff efficiently when pulling their top N, but it would require bigger changes.

cc @jpountz

@uschindler
Copy link
Contributor

Yes, before we work around all that stuff here, I'd also suggest to remove those ThreadLocals.

@luyuncheng
Copy link
Contributor Author

We could run the StoredFieldsBenchmark before and after the change with -Dorg.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsWriter.enableBulkMerge=false to force the slow merge path.
In all cases when running the benchmark, we may want to explicitly supply smaller heap (-Xmx),

@rmuir I just modified https://github.com/mikemccand/luceneutil/blob/master/src/python/runStoredFieldsBenchmark.py#L43 with
command = f'{localconstants.JAVA_EXE} -Xmx256m -Dorg.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsWriter.enableBulkMerge=false -cp {lucene_core_jar}:build perf.StoredFieldsBenchmark {geoname s_csv_in} {localconstants.INDEX_DIR_BASE}/geonames-stored-fields {mode} {doc_limit}

i do 4 different runStoredFieldsBenchmark as following tables shows which shows little performance regressions:

runStoredFieldsBenchmark.py enableBulkMerge=false

Baseline Candidate
indexing_time_msec
BEST_SPEED 365665.00 372287.00
BEST_COMPRESSION 849157.00 848813.00
retrieved_time_msec
BEST_SPEED 246.62 269.32
BEST_COMPRESSION 2606.98 2634.53

runStoredFieldsBenchmark.py enableBulkMerge=false -Xmx1g

Baseline Candidate
indexing_time_msec
BEST_SPEED 372457.00 366094.00
BEST_COMPRESSION 850273.00 852397.00
retrieved_time_msec
BEST_SPEED 247.70 279.11
BEST_COMPRESSION 2585.59 2633.83

runStoredFieldsBenchmark.py enableBulkMerge=false -Xmx512m

Baseline Candidate
indexing_time_msec
BEST_SPEED 368389.00 370878.00
BEST_COMPRESSION 851277.00 850121.00
retrieved_time_msec
BEST_SPEED 256.80 280.52
BEST_COMPRESSION 2576.36 2645.32

runStoredFieldsBenchmark.py enableBulkMerge=false -Xmx256m

Baseline Candidate
indexing_time_msec
BEST_SPEED 366735.00 368407.00
BEST_COMPRESSION 849980.00 852214.00
retrieved_time_msec
BEST_SPEED 256.10 278.06
BEST_COMPRESSION 2584.96 2632.69

@rmuir
Copy link
Member

rmuir commented Dec 1, 2022

thanks for running. somehow i think bulk merge didnt get disabled. without bulk merge optimization, indexing time should be significantly higher, the benchmark should be very very slow.

@luyuncheng
Copy link
Contributor Author

luyuncheng commented Dec 1, 2022

thanks for running. somehow i think bulk merge didnt get disabled. without bulk merge optimization, indexing time should be significantly higher, the benchmark should be very very slow.

@rmuir I am also curious about it.
BUT i use arthas vmtools to see BULK_MERGE_ENABLED is false
image

and i manually modified lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingStoredFieldsWriter.java#BULK_MERGE_ENABLED as false

when enableBulkMerge=false -Xmx256m
it shows almost the same:

Baseline Candidate
indexing_time_msec
BEST_SPEED 366735.00 368407.00
BEST_COMPRESSION 849980.00 852214.00
retrieved_time_msec
BEST_SPEED 256.10 278.06
BEST_COMPRESSION 2584.96 2632.69

i checked the thread runs into Lucene90CompressingStoredFieldsWriter#copyOneDoc
image

BTW my runStoredFieldsBenchmark geonames.txt dataset is about 1.5GB with 12489745 lines allCountries data.

@luyuncheng
Copy link
Contributor Author

Yes, before we work around all that stuff here, I'd also suggest to remove those ThreadLocals.

@uschindler I think this issue just have a GC path of ThreadLocals. BUT, for instance in ES, when there is a 1000-shard-nodes, and normally one shard with 40 segments per shard, one opened segments would allocate one buffer with retained heap: 100KB, so this would use 4G resident heap memory, even some segments are rarely used.

@rmuir
Copy link
Member

rmuir commented Dec 2, 2022

@uschindler I think this issue just have a GC path of ThreadLocals. BUT, for instance in ES, when there is a 1000-shard-nodes, and normally one shard with 40 segments per shard, one opened segments would allocate one buffer with retained heap: 100KB, so this would use 4G resident heap memory, even some segments are rarely used.

You can use less shards as a workaround for now.

@rmuir
Copy link
Member

rmuir commented Dec 2, 2022

In fact there is no situation where thousands of shards makes sense on a single node. That's bad design.

@rmuir
Copy link
Member

rmuir commented Dec 2, 2022

I will investigate the document API and try to make a proposal so that threadlocal is no longer needed. I'm really concerned about the merge case here causing regressions for folks that delete/update documents, I'll also try to run the benchmark myself for the unoptimized merge case.

It requires a bit of time, as document api will impact 100% of lucene users so we need to get it right.

@luyuncheng
Copy link
Contributor Author

In fact there is no situation where thousands of shards makes sense on a single node. That's bad design.

@rmuir i have another proposal:
what do you think of make ByteBufferDataInput as not fixed sized ( cause you mentioned LUCENE-10627 do not introduce any other ByteBlockPool)

every time we call decompress and wrap decompressed BytesRef as ByteArrayDataInput

the decompress can make a list ByteBuffer, and every time just copy dictionary,
this can reduce memory copy from buffer and reduce ArrayUtil.grow's memory copy,
as well as can solved buffer-heap-influence

simple code like following:

      // Read blocks that intersect with the interval we need
      ArrayList<ByteBuffer> bufferList = new ArrayList<>();
      while (offsetInBlock < offset + length) {
        final int bytesToDecompress = Math.min(blockLength, offset + length - offsetInBlock);
        byte[] bytebuffer = new byte[dictLength + blockLength];
        //Only copy dict
        System.arraycopy(dictBuffer, 0, bytebuffer, 0, dictLength);
        LZ4.decompress(in, bytesToDecompress, bytebuffer, dictLength);
        offsetInBlock += blockLength;
        
        bufferList.add(ByteBuffer.wrap(bytebuffer, dictLength, bytesToDecompress).slice());
      }
     return = new ByteBuffersDataInput(bufferList);

@jpountz
Copy link
Contributor

jpountz commented Dec 2, 2022

I had opened a very similar PR to this one at #137 which handled the merge case.

@jpountz
Copy link
Contributor

jpountz commented Dec 2, 2022

I think I had not merged it because the follow-up discussion about removing thread locals had triggered naming/API concerns, but it should be a good incremental step and we could figure a way to remove threadlocals in Lucene 10 since it will require API changes?

@rmuir
Copy link
Member

rmuir commented Dec 2, 2022

I think I had not merged it because the follow-up discussion about removing thread locals had triggered naming/API concerns, but it should be a good incremental step and we could figure a way to remove threadlocals in Lucene 10 since it will require API changes?

I don't think so, i tend to lean towards Uwe's thoughts here. I feel like this is "rush in another fix for 10,000 shards".

We can investigate removing the threadlocals in lucene 10. Very possibly the change could be backported with deprecations for IR.document(int) (e.g. which would use threadlocal like now). At least to me, that's the ideal scenario. So you could migrate your lucene 9 code to no longer use deprecated methods and not have the threadlocals.

@jpountz
Copy link
Contributor

jpountz commented Dec 2, 2022

Sounds good.

@rmuir
Copy link
Member

rmuir commented Dec 2, 2022

its just gonna take me some time, i can't get something out there like today. for example nearly 100% of tests would be impacted :) It is fair, I will feel the same pain the users will. But it is some motivation to take care with the API and spend some time to do it well.

@luyuncheng
Copy link
Contributor Author

luyuncheng commented Dec 2, 2022

Threadlocal just scale up the StoredFieldsReader's heap useage, BUT when one instance with only 10K segments would use 1G heap memory in fieldsReaderOrig#buffer without threadlocal. My idea is when the buffer is rarely used, it should be recycled

@rmuir
Copy link
Member

rmuir commented Dec 2, 2022

the idea is not to have one instance per segment. there would be zero instances. when you want to retrieve docs from indexreader, the user would call .getFieldsReader() or similar to create one. it would be eligible for GC afterwards.

but it means we can reuse buffers per-search and per-merge rather than creating per-document garbage. basically it would work like every other part of lucene.

@rmuir
Copy link
Member

rmuir commented Dec 2, 2022

and one idea i have is to try to prototype with the term vectors first (since both stored fields and term vectors have per-segment threadlocals that I'd like to remove). it is just less tests to fix, but still gives us a chance to look at the idea.

@uschindler
Copy link
Contributor

Hi,
another idea about the threadlocals: Why do we need thread locals "per" indexreader. A single thread local cannot be used from multiple threads, so it does not matter if multiple index readers/writers share the same thread locals. Let's do it like it is done for direct buffers inside the JDK's code, too: Let StoredFieldsReader allocate a single buffer per thread in a conventional ThreadLocal, not a closeable one (which has other problems). That is by the way how ThreadLocals are meant to be used.

Of course if one has thread pools with 10.000 Threads like Solr this may still a resorce problem, but that's not Lucene's fault.

So my poposal would be: Use a single ThreadLocal (non closeable) for decompression buffers in a static final field of the Decompressor class.

@rmuir
Copy link
Member

rmuir commented Dec 4, 2022

The threadlocals (closeable or not, i want them out here) only exist because the api is dumb and stateless: indexreader.document(i). So there's no place to store any state to reuse across multiple documents.

hence threadlocal is used to workaround a problematic API.

i propose we fix the api, then no threadlocal is needed anymore.

@rmuir
Copy link
Member

rmuir commented Dec 4, 2022

Hi, another idea about the threadlocals: Why do we need thread locals "per" indexreader. A single thread local cannot be used from multiple threads, so it does not matter if multiple index readers/writers share the same thread locals. Let's do it like it is done for direct buffers inside the JDK's code, too: Let StoredFieldsReader allocate a single buffer per thread in a conventional ThreadLocal, not a closeable one (which has other problems). That is by the way how ThreadLocals are meant to be used.

Of course if one has thread pools with 10.000 Threads like Solr this may still a resorce problem, but that's not Lucene's fault.

So my poposal would be: Use a single ThreadLocal (non closeable) for decompression buffers in a static final field of the Decompressor class.

I think you are confused. There is much more than just this buffer underneath threadlocal today.

* Move the buffer lifecycle into reader block state
@luyuncheng
Copy link
Contributor Author

luyuncheng commented Dec 5, 2022

the idea is not to have one instance per segment.
but it means we can reuse buffers per-search and per-merge rather than creating per-document garbage. basically it would work like every other part of lucene.

@rmuir it is absolutely right!! How about make the buffer decouple from Decompressor? when some instance wanna reuse buffer per-merge or retrieve docs, let the reader or BlockState holds the buffer's lifecycle rather than every decompressor

in commits 2f676e6 , i move the buffer into BytesRef, and let BytesRef to decide the buffer's lifecycle.

so when there is a merging, the bytes buffer would reused like code: https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingStoredFieldsReader.java#L527-L532

and benchmarks shows almost no performance regression:
when enableBulkMerge=false -Xmx256m

Baseline Candidate
indexing_time_msec
BEST_SPEED 365247.00 370257.00
BEST_COMPRESSION 850573.00 850017.00
retrieved_time_msec
BEST_SPEED 256.48 268.60
BEST_COMPRESSION 2646.73 2593.71

@rmuir
Copy link
Member

rmuir commented Dec 5, 2022

sorry, we definitely should not be adding arrays to bytesref here. Like i said, we can just remove the threadlocal.

The issue to me has nothing to do with buffers. it has to do with allowing codec to maintain state across multiple calls to IndexReader.document. And due to the api being IndexReader.document(int), the only way it can do that is via threadlocal. That's what i aim to fix.

In the case of this codec, some of that state happens to be a buffer which annoys you. But that isn't the only possible state which is possible (e.g. file pointer locations etc). Basically the IndexReader.document(int) api is out-of-date, since practically stored fields codecs don't just fetch individual documents in isolation anymore, they do things such as compress them in blocks.

@luyuncheng
Copy link
Contributor Author

@rmuir Thanks for your replying! i am looking forward to your no-threadlocal-design.

i have no question about it, i'll close this issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants