From efb737b7efa8507e7499944ec605ea5d5c26e513 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Mon, 3 Nov 2025 13:28:51 -0500 Subject: [PATCH 01/11] Improve Lookup Join performance with CachedDirectoryReader --- .../lookup/EnrichQuerySourceOperator.java | 237 +++++++++++++++++- 1 file changed, 235 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index 4a1ca22fcfebd..0d4f9331d4c01 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -7,8 +7,18 @@ package org.elasticsearch.compute.operator.lookup; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.ImpactsEnum; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; @@ -16,6 +26,9 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOBooleanSupplier; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.IntBlock; @@ -29,6 +42,10 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.IntFunction; /** * Lookup document IDs for the input queries. @@ -63,11 +80,227 @@ public EnrichQuerySourceOperator( this.shardContexts = shardContexts; this.shardContext = shardContexts.get(shardId); this.shardContext.incRef(); - this.searcher = shardContext.searcher(); - this.indexReader = searcher.getIndexReader(); + try { + this.indexReader = new CachedDirectoryReader((DirectoryReader) shardContext.searcher().getIndexReader()); + this.searcher = new IndexSearcher(this.indexReader); + } catch (IOException e) { + throw new UncheckedIOException(e); + } this.warnings = warnings; } + static class CachedDirectoryReader extends FilterDirectoryReader { + CachedDirectoryReader(DirectoryReader in) throws IOException { + super(in, new FilterDirectoryReader.SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return new CachedLeafReader(reader); + } + }); + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new CachedDirectoryReader(in); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + } + + static class CachedLeafReader extends FilterLeafReader { + final Map docValues = new HashMap<>(); + final Map termEnums = new HashMap<>(); + + CachedLeafReader(LeafReader in) { + super(in); + } + + @Override + public NumericDocValues getNumericDocValues(String field) throws IOException { + NumericDocValues dv = super.getNumericDocValues(field); + if (dv == null) { + return null; + } + return new CachedNumericDocValues(docId -> docValues.compute(field, (k, curr) -> { + if (curr == null || curr.docID() > docId) { + return dv; + } + return curr; + })); + } + + @Override + public Terms terms(String field) throws IOException { + Terms terms = super.terms(field); + if (terms == null) { + return null; + } + return new FilterTerms(terms) { + @Override + public TermsEnum iterator() throws IOException { + return new CachedTermsEnum((reuse) -> { + return termEnums.compute(field, (k, curr) -> { + if (curr == null || reuse == false) { + try { + curr = in.iterator(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return curr; + }); + }); + } + }; + } + + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getCoreCacheHelper(); + } + } + + static class CachedNumericDocValues extends NumericDocValues { + private NumericDocValues delegate = null; + private final IntFunction fromCache; + + CachedNumericDocValues(IntFunction fromCache) { + this.fromCache = fromCache; + } + + NumericDocValues getDelegate(int docID) { + if (delegate == null) { + delegate = fromCache.apply(docID); + } + return delegate; + } + + @Override + public long longValue() throws IOException { + return getDelegate(-1).longValue(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + return getDelegate(target).advanceExact(target); + } + + @Override + public int advance(int target) throws IOException { + return getDelegate(target).nextDoc(); + } + + @Override + public int docID() { + return getDelegate(-1).docID(); + } + + @Override + public int nextDoc() throws IOException { + return getDelegate(-1).nextDoc(); + } + + @Override + public long cost() { + return fromCache.apply(DocIdSetIterator.NO_MORE_DOCS).cost(); + } + } + + static class CachedTermsEnum extends TermsEnum { + private TermsEnum delegate = null; + private final Function fromCache; + + CachedTermsEnum(Function fromCache) { + this.fromCache = fromCache; + } + + TermsEnum getDelegate(boolean reuse) { + if (delegate == null) { + delegate = fromCache.apply(reuse); + } + return delegate; + } + + @Override + public AttributeSource attributes() { + return getDelegate(false).attributes(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + return getDelegate(true).seekExact(text); + } + + @Override + public IOBooleanSupplier prepareSeekExact(BytesRef text) throws IOException { + return getDelegate(true).prepareSeekExact(text); + } + + @Override + public void seekExact(long ord) throws IOException { + getDelegate(true).seekExact(ord); + } + + @Override + public void seekExact(BytesRef term, TermState state) throws IOException { + // TODO: when this can be true? + getDelegate(false).seekExact(term, state); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + return getDelegate(false).seekCeil(text); + } + + @Override + public BytesRef term() throws IOException { + return getDelegate(false).term(); + } + + @Override + public long ord() throws IOException { + return getDelegate(false).ord(); + } + + @Override + public int docFreq() throws IOException { + return getDelegate(false).docFreq(); + } + + @Override + public long totalTermFreq() throws IOException { + return getDelegate(false).totalTermFreq(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + return getDelegate(false).postings(reuse, flags); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + return getDelegate(false).impacts(flags); + } + + @Override + public TermState termState() throws IOException { + return getDelegate(false).termState(); + } + + @Override + public BytesRef next() throws IOException { + return getDelegate(false).next(); + } + } + @Override public void finish() {} From f4b48186b18f39adb8d3002743795ee8a0b915d0 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Mon, 3 Nov 2025 13:36:10 -0500 Subject: [PATCH 02/11] Update docs/changelog/137539.yaml --- docs/changelog/137539.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/137539.yaml diff --git a/docs/changelog/137539.yaml b/docs/changelog/137539.yaml new file mode 100644 index 0000000000000..5a0472bdbd8c1 --- /dev/null +++ b/docs/changelog/137539.yaml @@ -0,0 +1,5 @@ +pr: 137539 +summary: Improve Lookup Join performance with `CachedDirectoryReader` +area: ES|QL +type: enhancement +issues: [] From 1f987e031d7ec6afb9d365b84959072063b06d7e Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Wed, 5 Nov 2025 15:50:40 -0500 Subject: [PATCH 03/11] Fix a data issue when a field is used multiple times --- .../lookup/EnrichQuerySourceOperator.java | 122 ++---------------- 1 file changed, 14 insertions(+), 108 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index 0d4f9331d4c01..dfcb12fd4708d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -10,13 +10,10 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.FilterLeafReader; -import org.apache.lucene.index.ImpactsEnum; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.PostingsEnum; -import org.apache.lucene.index.TermState; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.ConstantScoreQuery; @@ -26,9 +23,6 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.util.AttributeSource; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.IOBooleanSupplier; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.IntBlock; @@ -44,7 +38,6 @@ import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; -import java.util.function.Function; import java.util.function.IntFunction; /** @@ -112,7 +105,7 @@ public CacheHelper getReaderCacheHelper() { static class CachedLeafReader extends FilterLeafReader { final Map docValues = new HashMap<>(); - final Map termEnums = new HashMap<>(); + final Map termsCache = new HashMap<>(); CachedLeafReader(LeafReader in) { super(in); @@ -134,25 +127,25 @@ public NumericDocValues getNumericDocValues(String field) throws IOException { @Override public Terms terms(String field) throws IOException { - Terms terms = super.terms(field); + Terms terms = termsCache.computeIfAbsent(field, k -> { + try { + return super.terms(k); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); if (terms == null) { return null; } + // Return a FilterTerms that always creates a fresh TermsEnum iterator + // We cache the Terms object itself for performance, but always create fresh TermsEnum + // instances because TermsEnum maintains position state and reusing it causes incorrect + // results when the same field is accessed multiple times with different conditions + // (e.g., in OR NOT expressions like: OR NOT (other1 != "omicron" AND other1 != "nu")) return new FilterTerms(terms) { @Override public TermsEnum iterator() throws IOException { - return new CachedTermsEnum((reuse) -> { - return termEnums.compute(field, (k, curr) -> { - if (curr == null || reuse == false) { - try { - curr = in.iterator(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - return curr; - }); - }); + return in.iterator(); } }; } @@ -214,93 +207,6 @@ public long cost() { } } - static class CachedTermsEnum extends TermsEnum { - private TermsEnum delegate = null; - private final Function fromCache; - - CachedTermsEnum(Function fromCache) { - this.fromCache = fromCache; - } - - TermsEnum getDelegate(boolean reuse) { - if (delegate == null) { - delegate = fromCache.apply(reuse); - } - return delegate; - } - - @Override - public AttributeSource attributes() { - return getDelegate(false).attributes(); - } - - @Override - public boolean seekExact(BytesRef text) throws IOException { - return getDelegate(true).seekExact(text); - } - - @Override - public IOBooleanSupplier prepareSeekExact(BytesRef text) throws IOException { - return getDelegate(true).prepareSeekExact(text); - } - - @Override - public void seekExact(long ord) throws IOException { - getDelegate(true).seekExact(ord); - } - - @Override - public void seekExact(BytesRef term, TermState state) throws IOException { - // TODO: when this can be true? - getDelegate(false).seekExact(term, state); - } - - @Override - public SeekStatus seekCeil(BytesRef text) throws IOException { - return getDelegate(false).seekCeil(text); - } - - @Override - public BytesRef term() throws IOException { - return getDelegate(false).term(); - } - - @Override - public long ord() throws IOException { - return getDelegate(false).ord(); - } - - @Override - public int docFreq() throws IOException { - return getDelegate(false).docFreq(); - } - - @Override - public long totalTermFreq() throws IOException { - return getDelegate(false).totalTermFreq(); - } - - @Override - public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { - return getDelegate(false).postings(reuse, flags); - } - - @Override - public ImpactsEnum impacts(int flags) throws IOException { - return getDelegate(false).impacts(flags); - } - - @Override - public TermState termState() throws IOException { - return getDelegate(false).termState(); - } - - @Override - public BytesRef next() throws IOException { - return getDelegate(false).next(); - } - } - @Override public void finish() {} From 1352039f2c6281e4cab01a725def9e82d22878a0 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Wed, 5 Nov 2025 15:57:00 -0500 Subject: [PATCH 04/11] Refactor CachedDirectoryReader to its own file --- .../lookup/CachedDirectoryReader.java | 148 ++++++++++++++++++ .../lookup/EnrichQuerySourceOperator.java | 134 ---------------- 2 files changed, 148 insertions(+), 134 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java new file mode 100644 index 0000000000000..dea35d27e3297 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.lookup; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.DocIdSetIterator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.IntFunction; + +class CachedDirectoryReader extends FilterDirectoryReader { + CachedDirectoryReader(DirectoryReader in) throws IOException { + super(in, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return new CachedLeafReader(reader); + } + }); + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return new CachedDirectoryReader(in); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + static class CachedLeafReader extends FilterLeafReader { + final Map docValues = new HashMap<>(); + final Map termsCache = new HashMap<>(); + + CachedLeafReader(LeafReader in) { + super(in); + } + + @Override + public NumericDocValues getNumericDocValues(String field) throws IOException { + NumericDocValues dv = super.getNumericDocValues(field); + if (dv == null) { + return null; + } + return new CachedNumericDocValues(docId -> docValues.compute(field, (k, curr) -> { + if (curr == null || curr.docID() > docId) { + return dv; + } + return curr; + })); + } + + @Override + public Terms terms(String field) throws IOException { + Terms terms = termsCache.computeIfAbsent(field, k -> { + try { + return super.terms(k); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + if (terms == null) { + return null; + } + // Return a FilterTerms that always creates a fresh TermsEnum iterator + // We cache the Terms object itself for performance, but always create fresh TermsEnum + // instances because TermsEnum maintains position state and reusing it causes incorrect + // results when the same field is accessed multiple times with different conditions + // (e.g., in OR NOT expressions like: OR NOT (other1 != "omicron" AND other1 != "nu")) + return new FilterTerms(terms) { + @Override + public TermsEnum iterator() throws IOException { + return in.iterator(); + } + }; + } + + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getCoreCacheHelper(); + } + } + + static class CachedNumericDocValues extends NumericDocValues { + private NumericDocValues delegate = null; + private final IntFunction fromCache; + + CachedNumericDocValues(IntFunction fromCache) { + this.fromCache = fromCache; + } + + NumericDocValues getDelegate(int docID) { + if (delegate == null) { + delegate = fromCache.apply(docID); + } + return delegate; + } + + @Override + public long longValue() throws IOException { + return getDelegate(-1).longValue(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + return getDelegate(target).advanceExact(target); + } + + @Override + public int advance(int target) throws IOException { + return getDelegate(target).nextDoc(); + } + + @Override + public int docID() { + return getDelegate(-1).docID(); + } + + @Override + public int nextDoc() throws IOException { + return getDelegate(-1).nextDoc(); + } + + @Override + public long cost() { + return fromCache.apply(DocIdSetIterator.NO_MORE_DOCS).cost(); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index dfcb12fd4708d..a856ca2bbe9ef 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -8,14 +8,8 @@ package org.elasticsearch.compute.operator.lookup; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FilterDirectoryReader; -import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.Terms; -import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; @@ -36,9 +30,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.HashMap; -import java.util.Map; -import java.util.function.IntFunction; /** * Lookup document IDs for the input queries. @@ -82,131 +73,6 @@ public EnrichQuerySourceOperator( this.warnings = warnings; } - static class CachedDirectoryReader extends FilterDirectoryReader { - CachedDirectoryReader(DirectoryReader in) throws IOException { - super(in, new FilterDirectoryReader.SubReaderWrapper() { - @Override - public LeafReader wrap(LeafReader reader) { - return new CachedLeafReader(reader); - } - }); - } - - @Override - protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return new CachedDirectoryReader(in); - } - - @Override - public CacheHelper getReaderCacheHelper() { - return in.getReaderCacheHelper(); - } - } - - static class CachedLeafReader extends FilterLeafReader { - final Map docValues = new HashMap<>(); - final Map termsCache = new HashMap<>(); - - CachedLeafReader(LeafReader in) { - super(in); - } - - @Override - public NumericDocValues getNumericDocValues(String field) throws IOException { - NumericDocValues dv = super.getNumericDocValues(field); - if (dv == null) { - return null; - } - return new CachedNumericDocValues(docId -> docValues.compute(field, (k, curr) -> { - if (curr == null || curr.docID() > docId) { - return dv; - } - return curr; - })); - } - - @Override - public Terms terms(String field) throws IOException { - Terms terms = termsCache.computeIfAbsent(field, k -> { - try { - return super.terms(k); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - if (terms == null) { - return null; - } - // Return a FilterTerms that always creates a fresh TermsEnum iterator - // We cache the Terms object itself for performance, but always create fresh TermsEnum - // instances because TermsEnum maintains position state and reusing it causes incorrect - // results when the same field is accessed multiple times with different conditions - // (e.g., in OR NOT expressions like: OR NOT (other1 != "omicron" AND other1 != "nu")) - return new FilterTerms(terms) { - @Override - public TermsEnum iterator() throws IOException { - return in.iterator(); - } - }; - } - - @Override - public CacheHelper getCoreCacheHelper() { - return in.getCoreCacheHelper(); - } - - @Override - public CacheHelper getReaderCacheHelper() { - return in.getCoreCacheHelper(); - } - } - - static class CachedNumericDocValues extends NumericDocValues { - private NumericDocValues delegate = null; - private final IntFunction fromCache; - - CachedNumericDocValues(IntFunction fromCache) { - this.fromCache = fromCache; - } - - NumericDocValues getDelegate(int docID) { - if (delegate == null) { - delegate = fromCache.apply(docID); - } - return delegate; - } - - @Override - public long longValue() throws IOException { - return getDelegate(-1).longValue(); - } - - @Override - public boolean advanceExact(int target) throws IOException { - return getDelegate(target).advanceExact(target); - } - - @Override - public int advance(int target) throws IOException { - return getDelegate(target).nextDoc(); - } - - @Override - public int docID() { - return getDelegate(-1).docID(); - } - - @Override - public int nextDoc() throws IOException { - return getDelegate(-1).nextDoc(); - } - - @Override - public long cost() { - return fromCache.apply(DocIdSetIterator.NO_MORE_DOCS).cost(); - } - } - @Override public void finish() {} From 937c78b2442c5e4fc2ed1fb6acbcd13231eaa7f2 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Thu, 20 Nov 2025 12:00:20 -0500 Subject: [PATCH 05/11] Remove termsCache completely --- .../compute/operator/lookup/CachedDirectoryReader.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java index dea35d27e3297..f0005c8e5ae14 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java @@ -17,7 +17,6 @@ import org.apache.lucene.search.DocIdSetIterator; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; import java.util.function.IntFunction; @@ -44,7 +43,6 @@ public CacheHelper getReaderCacheHelper() { static class CachedLeafReader extends FilterLeafReader { final Map docValues = new HashMap<>(); - final Map termsCache = new HashMap<>(); CachedLeafReader(LeafReader in) { super(in); @@ -66,13 +64,7 @@ public NumericDocValues getNumericDocValues(String field) throws IOException { @Override public Terms terms(String field) throws IOException { - Terms terms = termsCache.computeIfAbsent(field, k -> { - try { - return super.terms(k); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); + Terms terms = super.terms(field); if (terms == null) { return null; } From 3e5e9ff6db2c0d1c651b2db7c4ac7a02b2a0c517 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Mon, 24 Nov 2025 09:14:55 -0500 Subject: [PATCH 06/11] Only apply to fields without multiple queries --- .../lookup/CachedDirectoryReader.java | 133 ++++++++++++- .../lookup/EnrichQuerySourceOperator.java | 2 +- .../lookup/LookupEnrichQueryGenerator.java | 14 ++ .../compute/operator/lookup/QueryList.java | 8 + .../esql/enrich/ExpressionQueryList.java | 175 ++++++++++++++---- 5 files changed, 286 insertions(+), 46 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java index f0005c8e5ae14..f90de970a2703 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java @@ -10,30 +10,42 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.ImpactsEnum; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.TermState; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOBooleanSupplier; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.function.Function; import java.util.function.IntFunction; class CachedDirectoryReader extends FilterDirectoryReader { - CachedDirectoryReader(DirectoryReader in) throws IOException { + LookupEnrichQueryGenerator queryList; + + CachedDirectoryReader(DirectoryReader in, LookupEnrichQueryGenerator queryList) throws IOException { super(in, new SubReaderWrapper() { @Override public LeafReader wrap(LeafReader reader) { - return new CachedLeafReader(reader); + return new CachedLeafReader(reader, queryList); } }); + this.queryList = queryList; } @Override protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return new CachedDirectoryReader(in); + return new CachedDirectoryReader(in, queryList); } @Override @@ -43,9 +55,14 @@ public CacheHelper getReaderCacheHelper() { static class CachedLeafReader extends FilterLeafReader { final Map docValues = new HashMap<>(); + final Map termEnums = new HashMap<>(); + final Set fieldsWithMultipleQueries; - CachedLeafReader(LeafReader in) { + CachedLeafReader(LeafReader in, LookupEnrichQueryGenerator queryList) { super(in); + // Get the precomputed fields with multiple queries from the queryList + // Only ExpressionQueryList can have repeating fields, and it computes this once during initialization + this.fieldsWithMultipleQueries = queryList.fieldsWithMultipleQueries(); } @Override @@ -68,15 +85,27 @@ public Terms terms(String field) throws IOException { if (terms == null) { return null; } - // Return a FilterTerms that always creates a fresh TermsEnum iterator - // We cache the Terms object itself for performance, but always create fresh TermsEnum - // instances because TermsEnum maintains position state and reusing it causes incorrect - // results when the same field is accessed multiple times with different conditions + // If multiple queries use the same field, don't cache TermsEnum + // This avoids a data issue where the same TermsEnum is reused for different queries // (e.g., in OR NOT expressions like: OR NOT (other1 != "omicron" AND other1 != "nu")) + if (fieldsWithMultipleQueries.contains(field)) { + return terms; + } return new FilterTerms(terms) { @Override public TermsEnum iterator() throws IOException { - return in.iterator(); + return new CachedTermsEnum((reuse) -> { + return termEnums.compute(field, (k, curr) -> { + if (curr == null || reuse == false) { + try { + curr = in.iterator(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return curr; + }); + }); } }; } @@ -137,4 +166,90 @@ public long cost() { return fromCache.apply(DocIdSetIterator.NO_MORE_DOCS).cost(); } } + + static class CachedTermsEnum extends TermsEnum { + private TermsEnum delegate = null; + private final Function fromCache; + + CachedTermsEnum(Function fromCache) { + this.fromCache = fromCache; + } + + TermsEnum getDelegate(boolean reuse) { + if (delegate == null) { + delegate = fromCache.apply(reuse); + } + return delegate; + } + + @Override + public AttributeSource attributes() { + return getDelegate(false).attributes(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + return getDelegate(true).seekExact(text); + } + + @Override + public IOBooleanSupplier prepareSeekExact(BytesRef text) throws IOException { + return getDelegate(true).prepareSeekExact(text); + } + + @Override + public void seekExact(long ord) throws IOException { + getDelegate(true).seekExact(ord); + } + + @Override + public void seekExact(BytesRef term, TermState state) throws IOException { + getDelegate(false).seekExact(term, state); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + return getDelegate(false).seekCeil(text); + } + + @Override + public BytesRef term() throws IOException { + return getDelegate(false).term(); + } + + @Override + public long ord() throws IOException { + return getDelegate(false).ord(); + } + + @Override + public int docFreq() throws IOException { + return getDelegate(false).docFreq(); + } + + @Override + public long totalTermFreq() throws IOException { + return getDelegate(false).totalTermFreq(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + return getDelegate(false).postings(reuse, flags); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + return getDelegate(false).impacts(flags); + } + + @Override + public TermState termState() throws IOException { + return getDelegate(false).termState(); + } + + @Override + public BytesRef next() throws IOException { + return getDelegate(false).next(); + } + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index a856ca2bbe9ef..bd903a0e27a86 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -65,7 +65,7 @@ public EnrichQuerySourceOperator( this.shardContext = shardContexts.get(shardId); this.shardContext.incRef(); try { - this.indexReader = new CachedDirectoryReader((DirectoryReader) shardContext.searcher().getIndexReader()); + this.indexReader = new CachedDirectoryReader((DirectoryReader) shardContext.searcher().getIndexReader(), queryList); this.searcher = new IndexSearcher(this.indexReader); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java index cf581d9e83b43..601ca9a3684eb 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java @@ -10,6 +10,8 @@ import org.apache.lucene.search.Query; import org.elasticsearch.core.Nullable; +import java.util.Set; + /** * An interface to generates queries for the lookup and enrich operators. * This interface is used to retrieve queries based on a position index. @@ -27,4 +29,16 @@ public interface LookupEnrichQueryGenerator { */ int getPositionCount(); + /** + * Returns the set of field names that appear multiple times in queries. + * This is used to avoid caching TermsEnum for fields that are used in multiple query clauses, + * which can cause data issues when the same TermsEnum is reused for different queries. + *

+ * By default, returns an empty set since most query generators don't have repeating fields. + * Only {@code ExpressionQueryList} can have repeating fields. + */ + default Set fieldsWithMultipleQueries() { + return Set.of(); + } + } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index f300a400e60af..14d91e37cf5d2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -79,6 +79,14 @@ public int getPositionCount() { return block.getPositionCount(); } + /** + * Returns the field name for this query list, or null if no field is associated. + */ + @Nullable + public String getFieldName() { + return field != null ? field.name() : null; + } + /** * Returns a copy of this query list that only returns queries for single-valued positions. * That is, it returns `null` queries for either multivalued or null positions. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java index 934dd94770e73..69190f272e373 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -10,12 +10,14 @@ import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.Rewriteable; @@ -38,7 +40,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION; import static org.elasticsearch.xpack.esql.enrich.AbstractLookupService.termQueryList; @@ -57,10 +63,8 @@ * 2. Expression-based join: The join conditions are based on a complex expression that can involve multiple fields and operators. */ public class ExpressionQueryList implements LookupEnrichQueryGenerator { - private final List queryLists; - private final List lucenePushableFilters = new ArrayList<>(); + private final QuerySources querySources; private final SearchExecutionContext context; - private final AliasFilter aliasFilter; private final LucenePushdownPredicates lucenePushdownPredicates; private ExpressionQueryList( @@ -70,13 +74,16 @@ private ExpressionQueryList( ClusterService clusterService, AliasFilter aliasFilter ) { - this.queryLists = new ArrayList<>(queryLists); this.context = context; - this.aliasFilter = aliasFilter; + this.querySources = new QuerySources(context, aliasFilter); this.lucenePushdownPredicates = LucenePushdownPredicates.from( SearchContextStats.from(List.of(context)), new EsqlFlags(clusterService.getClusterSettings()) ); + // Initialize with existing queryLists + for (QueryList queryList : queryLists) { + this.querySources.addQueryList(queryList, queryList.getFieldName()); + } buildPreJoinFilter(rightPreJoinPlan, clusterService); } @@ -169,7 +176,10 @@ private boolean applyAsRightSidePushableFilter(Expression filter) { } catch (IOException e) { throw new UncheckedIOException("Error while rewriting query for Lucene pushable filter", e); } - addToLucenePushableFilters(queryBuilder); + // Extract all field names from the filter expression for tracking + List fieldNames = new ArrayList<>(); + filter.forEachDown(Attribute.class, attr -> fieldNames.add(attr.name())); + querySources.addLucenePushableFilter(queryBuilder, fieldNames); return true; } } @@ -205,23 +215,20 @@ private boolean applyAsLeftRightBinaryComparison( // TermQuery is faster than BinaryComparisonQueryList, as it does less work per row // so here we reuse the existing logic from field based join to build a termQueryList for Equals if (binaryComparison instanceof Equals) { - QueryList termQueryForEquals = termQueryList(rightFieldType, context, aliasFilter, block, dataType).onlySingleValues( - warnings, - "LOOKUP JOIN encountered multi-value" - ); - queryLists.add(termQueryForEquals); + QueryList termQueryForEquals = termQueryList(rightFieldType, context, querySources.getAliasFilter(), block, dataType) + .onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value"); + querySources.addQueryList(termQueryForEquals, rightAttribute.name()); } else { - queryLists.add( - new BinaryComparisonQueryList( - rightFieldType, - context, - block, - binaryComparison, - clusterService, - aliasFilter, - warnings - ) + QueryList binaryQueryList = new BinaryComparisonQueryList( + rightFieldType, + context, + block, + binaryComparison, + clusterService, + querySources.getAliasFilter(), + warnings ); + querySources.addQueryList(binaryQueryList, rightAttribute.name()); } return true; } @@ -229,23 +236,17 @@ private boolean applyAsLeftRightBinaryComparison( return false; } - private void addToLucenePushableFilters(QueryBuilder query) { - try { - if (query != null) { - lucenePushableFilters.add(query.toQuery(context)); - } - } catch (IOException e) { - throw new UncheckedIOException("Error while building query for Lucene pushable filter", e); - } - } - private void buildPreJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) { if (rightPreJoinPlan instanceof FilterExec filterExec) { List candidateRightHandFilters = Predicates.splitAnd(filterExec.condition()); for (Expression filter : candidateRightHandFilters) { if (filter instanceof TranslationAware translationAware) { if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) { - addToLucenePushableFilters(translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder()); + QueryBuilder queryBuilder = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder(); + // Extract all field names from the filter expression for tracking + List fieldNames = new ArrayList<>(); + filter.forEachDown(Attribute.class, attr -> fieldNames.add(attr.name())); + querySources.addLucenePushableFilter(queryBuilder, fieldNames); } } // If the filter is not translatable we will not apply it for now @@ -271,7 +272,7 @@ private void buildPreJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService cl @Override public Query getQuery(int position) { BooleanQuery.Builder builder = new BooleanQuery.Builder(); - for (QueryList queryList : queryLists) { + for (QueryList queryList : querySources.getQueryLists()) { Query q = queryList.getQuery(position); if (q == null) { // if any of the matchFields are null, it means there is no match for this position @@ -281,7 +282,7 @@ public Query getQuery(int position) { builder.add(q, BooleanClause.Occur.FILTER); } // also attach the pre-join filter if it exists - for (Query preJoinFilter : lucenePushableFilters) { + for (Query preJoinFilter : querySources.getLucenePushableFilters()) { builder.add(preJoinFilter, BooleanClause.Occur.FILTER); } return builder.build(); @@ -295,8 +296,8 @@ public Query getQuery(int position) { */ @Override public int getPositionCount() { - int positionCount = queryLists.get(0).getPositionCount(); - for (QueryList queryList : queryLists) { + int positionCount = querySources.getQueryLists().get(0).getPositionCount(); + for (QueryList queryList : querySources.getQueryLists()) { if (queryList.getPositionCount() != positionCount) { throw new IllegalArgumentException( "All QueryLists must have the same position count, expected: " @@ -308,4 +309,106 @@ public int getPositionCount() { } return positionCount; } + + @Override + public Set fieldsWithMultipleQueries() { + return querySources.getFieldsWithMultipleQueries(); + } + + /** + * Manages query sources (QueryLists, Lucene pushable filters, and AliasFilter) and tracks field occurrences. + */ + private static class QuerySources { + private final List queryLists = new ArrayList<>(); + private final List lucenePushableFilters = new ArrayList<>(); + private final AliasFilter aliasFilter; + private final Map fieldCount = new HashMap<>(); + private final SearchExecutionContext context; + private Set fieldsWithMultipleQueries; // Cached, invalidated when sources are added + + QuerySources(SearchExecutionContext context, AliasFilter aliasFilter) { + this.context = context; + this.aliasFilter = aliasFilter; + // Extract fields from aliasFilter query if present + if (aliasFilter != null && aliasFilter != AliasFilter.EMPTY && aliasFilter.getQueryBuilder() != null) { + try { + Query aliasQuery = aliasFilter.getQueryBuilder().toQuery(context); + extractFieldsFromQuery(aliasQuery, fieldCount); + } catch (IOException e) { + throw new UncheckedIOException("Error while converting alias filter to query", e); + } + } + } + + void addQueryList(QueryList queryList, @Nullable String fieldName) { + queryLists.add(queryList); + if (fieldName != null) { + fieldCount.merge(fieldName, 1, Integer::sum); + } + invalidateCache(); + } + + void addLucenePushableFilter(QueryBuilder queryBuilder, List fieldNames) { + try { + if (queryBuilder != null) { + Query luceneQuery = queryBuilder.toQuery(context); + if (luceneQuery != null) { + lucenePushableFilters.add(luceneQuery); + // Track all specified field names from the expression + for (String fieldName : fieldNames) { + fieldCount.merge(fieldName, 1, Integer::sum); + } + invalidateCache(); + } + } + } catch (IOException e) { + throw new UncheckedIOException("Error while converting query builder to query", e); + } + } + + Set getFieldsWithMultipleQueries() { + if (fieldsWithMultipleQueries == null) { + // Compute fields that appear more than once + fieldsWithMultipleQueries = fieldCount.entrySet() + .stream() + .filter(entry -> entry.getValue() > 1) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + return fieldsWithMultipleQueries; + } + + private void invalidateCache() { + fieldsWithMultipleQueries = null; + } + + List getQueryLists() { + return queryLists; + } + + List getLucenePushableFilters() { + return lucenePushableFilters; + } + + AliasFilter getAliasFilter() { + return aliasFilter; + } + + /** + * Extracts field names from a Lucene Query and adds them to the field count map. + */ + private static void extractFieldsFromQuery(Query query, Map fieldCount) { + if (query == null) { + return; + } + QueryVisitor visitor = new QueryVisitor() { + @Override + public boolean acceptField(String field) { + fieldCount.merge(field, 1, Integer::sum); + return true; + } + }; + query.visit(visitor); + } + } } From b0eaa653434faae67f7f0df1a3da7c12726dc8b7 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Mon, 1 Dec 2025 20:10:19 -0500 Subject: [PATCH 07/11] Remove caching of fieldsWithMultipleQueries, it is retrieved just once --- .../esql/enrich/ExpressionQueryList.java | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java index 69190f272e373..9bb958890416f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -324,7 +324,6 @@ private static class QuerySources { private final AliasFilter aliasFilter; private final Map fieldCount = new HashMap<>(); private final SearchExecutionContext context; - private Set fieldsWithMultipleQueries; // Cached, invalidated when sources are added QuerySources(SearchExecutionContext context, AliasFilter aliasFilter) { this.context = context; @@ -345,7 +344,6 @@ void addQueryList(QueryList queryList, @Nullable String fieldName) { if (fieldName != null) { fieldCount.merge(fieldName, 1, Integer::sum); } - invalidateCache(); } void addLucenePushableFilter(QueryBuilder queryBuilder, List fieldNames) { @@ -358,7 +356,6 @@ void addLucenePushableFilter(QueryBuilder queryBuilder, List fieldNames) for (String fieldName : fieldNames) { fieldCount.merge(fieldName, 1, Integer::sum); } - invalidateCache(); } } } catch (IOException e) { @@ -367,19 +364,12 @@ void addLucenePushableFilter(QueryBuilder queryBuilder, List fieldNames) } Set getFieldsWithMultipleQueries() { - if (fieldsWithMultipleQueries == null) { - // Compute fields that appear more than once - fieldsWithMultipleQueries = fieldCount.entrySet() - .stream() - .filter(entry -> entry.getValue() > 1) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - } - return fieldsWithMultipleQueries; - } - - private void invalidateCache() { - fieldsWithMultipleQueries = null; + // Compute fields that appear more than once on demand + return fieldCount.entrySet() + .stream() + .filter(entry -> entry.getValue() > 1) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); } List getQueryLists() { From bf478050e36fcaba9a8f70db1d69739334edd8ee Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Fri, 5 Dec 2025 13:01:58 -0500 Subject: [PATCH 08/11] Patch from Nhat --- .../lookup/CachedDirectoryReader.java | 46 ++++++++++--------- .../lookup/EnrichQuerySourceOperator.java | 7 +-- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java index f90de970a2703..b0f54f754712e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.ImpactsEnum; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.TermState; @@ -26,26 +27,23 @@ import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.function.Function; import java.util.function.IntFunction; class CachedDirectoryReader extends FilterDirectoryReader { - LookupEnrichQueryGenerator queryList; - CachedDirectoryReader(DirectoryReader in, LookupEnrichQueryGenerator queryList) throws IOException { + CachedDirectoryReader(DirectoryReader in) throws IOException { super(in, new SubReaderWrapper() { @Override public LeafReader wrap(LeafReader reader) { - return new CachedLeafReader(reader, queryList); + return new CachedLeafReader(reader); } }); - this.queryList = queryList; } @Override protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return new CachedDirectoryReader(in, queryList); + return new CachedDirectoryReader(in); } @Override @@ -53,16 +51,19 @@ public CacheHelper getReaderCacheHelper() { return in.getReaderCacheHelper(); } + void resetTermsEnumCache() { + for (LeafReaderContext leafContext : leaves()) { + CachedLeafReader cachedLeafReader = (CachedLeafReader) leafContext.reader(); + cachedLeafReader.termEnums.values().forEach(termEnum -> termEnum.inUsed = false); + } + } + static class CachedLeafReader extends FilterLeafReader { final Map docValues = new HashMap<>(); - final Map termEnums = new HashMap<>(); - final Set fieldsWithMultipleQueries; + final Map termEnums = new HashMap<>(); - CachedLeafReader(LeafReader in, LookupEnrichQueryGenerator queryList) { + CachedLeafReader(LeafReader in) { super(in); - // Get the precomputed fields with multiple queries from the queryList - // Only ExpressionQueryList can have repeating fields, and it computes this once during initialization - this.fieldsWithMultipleQueries = queryList.fieldsWithMultipleQueries(); } @Override @@ -85,24 +86,19 @@ public Terms terms(String field) throws IOException { if (terms == null) { return null; } - // If multiple queries use the same field, don't cache TermsEnum - // This avoids a data issue where the same TermsEnum is reused for different queries - // (e.g., in OR NOT expressions like: OR NOT (other1 != "omicron" AND other1 != "nu")) - if (fieldsWithMultipleQueries.contains(field)) { - return terms; - } return new FilterTerms(terms) { @Override public TermsEnum iterator() throws IOException { return new CachedTermsEnum((reuse) -> { return termEnums.compute(field, (k, curr) -> { - if (curr == null || reuse == false) { + if (curr == null || reuse == false || curr.inUsed) { try { - curr = in.iterator(); + curr = new SharedTermEnum(in.iterator()); } catch (IOException e) { throw new UncheckedIOException(e); } } + curr.inUsed = true; return curr; }); }); @@ -121,6 +117,14 @@ public CacheHelper getReaderCacheHelper() { } } + static final class SharedTermEnum extends FilterLeafReader.FilterTermsEnum { + boolean inUsed = false; + + SharedTermEnum(TermsEnum delegate) { + super(delegate); + } + } + static class CachedNumericDocValues extends NumericDocValues { private NumericDocValues delegate = null; private final IntFunction fromCache; @@ -204,7 +208,7 @@ public void seekExact(long ord) throws IOException { @Override public void seekExact(BytesRef term, TermState state) throws IOException { - getDelegate(false).seekExact(term, state); + getDelegate(true).seekExact(term, state); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index bd903a0e27a86..bfd77a20f184e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -8,7 +8,6 @@ package org.elasticsearch.compute.operator.lookup; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.DocIdSetIterator; @@ -42,7 +41,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator { private int queryPosition = -1; private final IndexedByShardId shardContexts; private final ShardContext shardContext; - private final IndexReader indexReader; + private final CachedDirectoryReader indexReader; private final IndexSearcher searcher; private final Warnings warnings; private final int maxPageSize; @@ -65,7 +64,7 @@ public EnrichQuerySourceOperator( this.shardContext = shardContexts.get(shardId); this.shardContext.incRef(); try { - this.indexReader = new CachedDirectoryReader((DirectoryReader) shardContext.searcher().getIndexReader(), queryList); + this.indexReader = new CachedDirectoryReader((DirectoryReader) shardContext.searcher().getIndexReader()); this.searcher = new IndexSearcher(this.indexReader); } catch (IOException e) { throw new UncheckedIOException(e); @@ -102,6 +101,8 @@ public Page getOutput() { assert isFinished(); break; } + // allow reusing the previous terms enums + indexReader.resetTermsEnumCache(); query = searcher.rewrite(new ConstantScoreQuery(query)); } catch (Exception e) { warnings.registerException(e); From 92dbc74650d052ac25a90d1df725446451ec3c83 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Fri, 5 Dec 2025 13:20:16 -0500 Subject: [PATCH 09/11] Clean up not needed changes --- .../lookup/LookupEnrichQueryGenerator.java | 14 -- .../compute/operator/lookup/QueryList.java | 8 - .../esql/enrich/ExpressionQueryList.java | 165 ++++-------------- 3 files changed, 36 insertions(+), 151 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java index 601ca9a3684eb..cf581d9e83b43 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java @@ -10,8 +10,6 @@ import org.apache.lucene.search.Query; import org.elasticsearch.core.Nullable; -import java.util.Set; - /** * An interface to generates queries for the lookup and enrich operators. * This interface is used to retrieve queries based on a position index. @@ -29,16 +27,4 @@ public interface LookupEnrichQueryGenerator { */ int getPositionCount(); - /** - * Returns the set of field names that appear multiple times in queries. - * This is used to avoid caching TermsEnum for fields that are used in multiple query clauses, - * which can cause data issues when the same TermsEnum is reused for different queries. - *

- * By default, returns an empty set since most query generators don't have repeating fields. - * Only {@code ExpressionQueryList} can have repeating fields. - */ - default Set fieldsWithMultipleQueries() { - return Set.of(); - } - } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index 37bdc2393c69e..c42946ed71777 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -79,14 +79,6 @@ public int getPositionCount() { return block.getPositionCount(); } - /** - * Returns the field name for this query list, or null if no field is associated. - */ - @Nullable - public String getFieldName() { - return field != null ? field.name() : null; - } - /** * Returns a copy of this query list that only returns queries for single-valued positions. * That is, it returns `null` queries for either multivalued or null positions. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java index 9bb958890416f..934dd94770e73 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -10,14 +10,12 @@ import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; -import org.apache.lucene.search.QueryVisitor; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; -import org.elasticsearch.core.Nullable; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.Rewriteable; @@ -40,11 +38,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION; import static org.elasticsearch.xpack.esql.enrich.AbstractLookupService.termQueryList; @@ -63,8 +57,10 @@ * 2. Expression-based join: The join conditions are based on a complex expression that can involve multiple fields and operators. */ public class ExpressionQueryList implements LookupEnrichQueryGenerator { - private final QuerySources querySources; + private final List queryLists; + private final List lucenePushableFilters = new ArrayList<>(); private final SearchExecutionContext context; + private final AliasFilter aliasFilter; private final LucenePushdownPredicates lucenePushdownPredicates; private ExpressionQueryList( @@ -74,16 +70,13 @@ private ExpressionQueryList( ClusterService clusterService, AliasFilter aliasFilter ) { + this.queryLists = new ArrayList<>(queryLists); this.context = context; - this.querySources = new QuerySources(context, aliasFilter); + this.aliasFilter = aliasFilter; this.lucenePushdownPredicates = LucenePushdownPredicates.from( SearchContextStats.from(List.of(context)), new EsqlFlags(clusterService.getClusterSettings()) ); - // Initialize with existing queryLists - for (QueryList queryList : queryLists) { - this.querySources.addQueryList(queryList, queryList.getFieldName()); - } buildPreJoinFilter(rightPreJoinPlan, clusterService); } @@ -176,10 +169,7 @@ private boolean applyAsRightSidePushableFilter(Expression filter) { } catch (IOException e) { throw new UncheckedIOException("Error while rewriting query for Lucene pushable filter", e); } - // Extract all field names from the filter expression for tracking - List fieldNames = new ArrayList<>(); - filter.forEachDown(Attribute.class, attr -> fieldNames.add(attr.name())); - querySources.addLucenePushableFilter(queryBuilder, fieldNames); + addToLucenePushableFilters(queryBuilder); return true; } } @@ -215,20 +205,23 @@ private boolean applyAsLeftRightBinaryComparison( // TermQuery is faster than BinaryComparisonQueryList, as it does less work per row // so here we reuse the existing logic from field based join to build a termQueryList for Equals if (binaryComparison instanceof Equals) { - QueryList termQueryForEquals = termQueryList(rightFieldType, context, querySources.getAliasFilter(), block, dataType) - .onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value"); - querySources.addQueryList(termQueryForEquals, rightAttribute.name()); + QueryList termQueryForEquals = termQueryList(rightFieldType, context, aliasFilter, block, dataType).onlySingleValues( + warnings, + "LOOKUP JOIN encountered multi-value" + ); + queryLists.add(termQueryForEquals); } else { - QueryList binaryQueryList = new BinaryComparisonQueryList( - rightFieldType, - context, - block, - binaryComparison, - clusterService, - querySources.getAliasFilter(), - warnings + queryLists.add( + new BinaryComparisonQueryList( + rightFieldType, + context, + block, + binaryComparison, + clusterService, + aliasFilter, + warnings + ) ); - querySources.addQueryList(binaryQueryList, rightAttribute.name()); } return true; } @@ -236,17 +229,23 @@ private boolean applyAsLeftRightBinaryComparison( return false; } + private void addToLucenePushableFilters(QueryBuilder query) { + try { + if (query != null) { + lucenePushableFilters.add(query.toQuery(context)); + } + } catch (IOException e) { + throw new UncheckedIOException("Error while building query for Lucene pushable filter", e); + } + } + private void buildPreJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) { if (rightPreJoinPlan instanceof FilterExec filterExec) { List candidateRightHandFilters = Predicates.splitAnd(filterExec.condition()); for (Expression filter : candidateRightHandFilters) { if (filter instanceof TranslationAware translationAware) { if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) { - QueryBuilder queryBuilder = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder(); - // Extract all field names from the filter expression for tracking - List fieldNames = new ArrayList<>(); - filter.forEachDown(Attribute.class, attr -> fieldNames.add(attr.name())); - querySources.addLucenePushableFilter(queryBuilder, fieldNames); + addToLucenePushableFilters(translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder()); } } // If the filter is not translatable we will not apply it for now @@ -272,7 +271,7 @@ private void buildPreJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService cl @Override public Query getQuery(int position) { BooleanQuery.Builder builder = new BooleanQuery.Builder(); - for (QueryList queryList : querySources.getQueryLists()) { + for (QueryList queryList : queryLists) { Query q = queryList.getQuery(position); if (q == null) { // if any of the matchFields are null, it means there is no match for this position @@ -282,7 +281,7 @@ public Query getQuery(int position) { builder.add(q, BooleanClause.Occur.FILTER); } // also attach the pre-join filter if it exists - for (Query preJoinFilter : querySources.getLucenePushableFilters()) { + for (Query preJoinFilter : lucenePushableFilters) { builder.add(preJoinFilter, BooleanClause.Occur.FILTER); } return builder.build(); @@ -296,8 +295,8 @@ public Query getQuery(int position) { */ @Override public int getPositionCount() { - int positionCount = querySources.getQueryLists().get(0).getPositionCount(); - for (QueryList queryList : querySources.getQueryLists()) { + int positionCount = queryLists.get(0).getPositionCount(); + for (QueryList queryList : queryLists) { if (queryList.getPositionCount() != positionCount) { throw new IllegalArgumentException( "All QueryLists must have the same position count, expected: " @@ -309,96 +308,4 @@ public int getPositionCount() { } return positionCount; } - - @Override - public Set fieldsWithMultipleQueries() { - return querySources.getFieldsWithMultipleQueries(); - } - - /** - * Manages query sources (QueryLists, Lucene pushable filters, and AliasFilter) and tracks field occurrences. - */ - private static class QuerySources { - private final List queryLists = new ArrayList<>(); - private final List lucenePushableFilters = new ArrayList<>(); - private final AliasFilter aliasFilter; - private final Map fieldCount = new HashMap<>(); - private final SearchExecutionContext context; - - QuerySources(SearchExecutionContext context, AliasFilter aliasFilter) { - this.context = context; - this.aliasFilter = aliasFilter; - // Extract fields from aliasFilter query if present - if (aliasFilter != null && aliasFilter != AliasFilter.EMPTY && aliasFilter.getQueryBuilder() != null) { - try { - Query aliasQuery = aliasFilter.getQueryBuilder().toQuery(context); - extractFieldsFromQuery(aliasQuery, fieldCount); - } catch (IOException e) { - throw new UncheckedIOException("Error while converting alias filter to query", e); - } - } - } - - void addQueryList(QueryList queryList, @Nullable String fieldName) { - queryLists.add(queryList); - if (fieldName != null) { - fieldCount.merge(fieldName, 1, Integer::sum); - } - } - - void addLucenePushableFilter(QueryBuilder queryBuilder, List fieldNames) { - try { - if (queryBuilder != null) { - Query luceneQuery = queryBuilder.toQuery(context); - if (luceneQuery != null) { - lucenePushableFilters.add(luceneQuery); - // Track all specified field names from the expression - for (String fieldName : fieldNames) { - fieldCount.merge(fieldName, 1, Integer::sum); - } - } - } - } catch (IOException e) { - throw new UncheckedIOException("Error while converting query builder to query", e); - } - } - - Set getFieldsWithMultipleQueries() { - // Compute fields that appear more than once on demand - return fieldCount.entrySet() - .stream() - .filter(entry -> entry.getValue() > 1) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - } - - List getQueryLists() { - return queryLists; - } - - List getLucenePushableFilters() { - return lucenePushableFilters; - } - - AliasFilter getAliasFilter() { - return aliasFilter; - } - - /** - * Extracts field names from a Lucene Query and adds them to the field count map. - */ - private static void extractFieldsFromQuery(Query query, Map fieldCount) { - if (query == null) { - return; - } - QueryVisitor visitor = new QueryVisitor() { - @Override - public boolean acceptField(String field) { - fieldCount.merge(field, 1, Integer::sum); - return true; - } - }; - query.visit(visitor); - } - } } From a62d0650aecdd3028b15701a3df753c93431a2fe Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Mon, 8 Dec 2025 09:12:11 -0500 Subject: [PATCH 10/11] Update docs/changelog/137539.yaml --- docs/changelog/137539.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/changelog/137539.yaml b/docs/changelog/137539.yaml index 5a0472bdbd8c1..d54cd8059b19a 100644 --- a/docs/changelog/137539.yaml +++ b/docs/changelog/137539.yaml @@ -2,4 +2,5 @@ pr: 137539 summary: Improve Lookup Join performance with `CachedDirectoryReader` area: ES|QL type: enhancement -issues: [] +issues: + - 137268 From 75efcf00eaa9ccc8c48bc56dad437f152bde1508 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Tue, 9 Dec 2025 14:50:33 -0500 Subject: [PATCH 11/11] Fix assertion error in Lucene --- .../compute/operator/lookup/CachedDirectoryReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java index b0f54f754712e..b82874fb06a12 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/CachedDirectoryReader.java @@ -198,7 +198,7 @@ public boolean seekExact(BytesRef text) throws IOException { @Override public IOBooleanSupplier prepareSeekExact(BytesRef text) throws IOException { - return getDelegate(true).prepareSeekExact(text); + return getDelegate(false).prepareSeekExact(text); } @Override