diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java index 43396d43186b4..9b6f48d3b0283 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java @@ -60,6 +60,7 @@ import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -306,13 +307,16 @@ private static BlockLoader numericBlockLoader(WhereAndBaseName w, NumberFieldMap @Benchmark @OperationsPerInvocation(INDEX_SIZE) public void benchmark() { + List fields = fields(name); + boolean reuseColumnLoaders = fields.size() <= PlannerSettings.REUSE_COLUMN_LOADERS_THRESHOLD.get(Settings.EMPTY); ValuesSourceReaderOperator op = new ValuesSourceReaderOperator( new DriverContext(BigArrays.NON_RECYCLING_INSTANCE, blockFactory, null), ByteSizeValue.ofMb(1).getBytes(), - fields(name), + fields, new IndexedByShardIdFromSingleton<>(new ValuesSourceReaderOperator.ShardContext(reader, (sourcePaths) -> { throw new UnsupportedOperationException("can't load _source here"); }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))), + reuseColumnLoaders, 0 ); long sum = 0; diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index 86385eabfe6d9..a4e25c568d720 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -292,7 +293,7 @@ interface StoredFields { * {@code null} or if they can't load column-at-a-time themselves. */ @Nullable - ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException; + IOSupplier columnAtATimeReader(LeafReaderContext context) throws IOException; /** * Build a row-by-row reader. Must never return {@code null}, @@ -367,7 +368,7 @@ public Builder builder(BlockFactory factory, int expectedCount) { protected abstract boolean canUsePreferLoaderForDoc(int docId) throws IOException; @Override - public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { + public IOSupplier columnAtATimeReader(LeafReaderContext context) throws IOException { if (canUsePreferLoaderForLeaf(context)) { return preferLoader.columnAtATimeReader(context); } else { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java index 4bdabf8cf5b7c..a47c9dde22fef 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockSourceReader.java @@ -17,6 +17,7 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOSupplier; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.index.mapper.blockloader.ConstantNull; import org.elasticsearch.search.fetch.StoredFieldsSpec; @@ -102,7 +103,7 @@ private SourceBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) { } @Override - public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { + public final IOSupplier columnAtATimeReader(LeafReaderContext context) { return null; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockStoredFieldsReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockStoredFieldsReader.java index 12584577b561a..ae67e6540d2b3 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockStoredFieldsReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockStoredFieldsReader.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.index.mapper.BlockLoader.BytesRefBuilder; import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader; import org.elasticsearch.search.fetch.StoredFieldsSpec; @@ -44,7 +45,7 @@ public StoredFieldsBlockLoader(String field) { } @Override - public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) { + public final IOSupplier columnAtATimeReader(LeafReaderContext context) { return null; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/FallbackSyntheticSourceBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/FallbackSyntheticSourceBlockLoader.java index 5974e3b55cb95..8bc0c6bb8c76c 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/FallbackSyntheticSourceBlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/FallbackSyntheticSourceBlockLoader.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.search.fetch.StoredFieldsSpec; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; @@ -54,7 +55,7 @@ protected FallbackSyntheticSourceBlockLoader( } @Override - public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { + public IOSupplier columnAtATimeReader(LeafReaderContext context) { return null; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldBlockLoader.java index 24bb30a3ac6e8..c1989882bedaa 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldBlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldBlockLoader.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.search.fetch.StoredFieldsSpec; import java.io.IOException; @@ -26,7 +27,7 @@ public Builder builder(BlockFactory factory, int expectedCount) { } @Override - public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) { + public IOSupplier columnAtATimeReader(LeafReaderContext context) { return null; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesMetadataFieldBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesMetadataFieldBlockLoader.java index c123b29263827..79379a80f6c99 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesMetadataFieldBlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesMetadataFieldBlockLoader.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.index.IndexMode; import org.elasticsearch.search.fetch.StoredFieldsSpec; @@ -37,7 +38,7 @@ public Builder builder(BlockFactory factory, int expectedCount) { } @Override - public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) { + public IOSupplier columnAtATimeReader(LeafReaderContext context) { return null; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/ConstantBytes.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/ConstantBytes.java index 5558db61ff253..23bc66ededb01 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/ConstantBytes.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/ConstantBytes.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.search.fetch.StoredFieldsSpec; @@ -32,8 +33,8 @@ public Builder builder(BlockFactory factory, int expectedCount) { } @Override - public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) { - return reader; + public IOSupplier columnAtATimeReader(LeafReaderContext context) { + return () -> reader; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/ConstantNull.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/ConstantNull.java index edbae56f819eb..8fca8a026c166 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/ConstantNull.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/ConstantNull.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.search.fetch.StoredFieldsSpec; @@ -31,8 +32,8 @@ public Builder builder(BlockFactory factory, int expectedCount) { } @Override - public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) { - return READER; + public IOSupplier columnAtATimeReader(LeafReaderContext context) { + return () -> READER; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/DelegatingBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/DelegatingBlockLoader.java index 14b5214d30ae3..3686eef97a103 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/DelegatingBlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/DelegatingBlockLoader.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.search.fetch.StoredFieldsSpec; @@ -32,12 +33,12 @@ public Builder builder(BlockFactory factory, int expectedCount) { } @Override - public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { - ColumnAtATimeReader reader = delegate.columnAtATimeReader(context); + public IOSupplier columnAtATimeReader(LeafReaderContext context) throws IOException { + IOSupplier reader = delegate.columnAtATimeReader(context); if (reader == null) { return null; } - return new ColumnReader(reader); + return () -> new ColumnReader(reader.get()); } private class ColumnReader implements ColumnAtATimeReader { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/BlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/BlockDocValuesReader.java index 735cf598b502e..44c8412b3f38a 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/BlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/BlockDocValuesReader.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.search.fetch.StoredFieldsSpec; @@ -44,8 +45,8 @@ public abstract static class DocValuesBlockLoader implements BlockLoader { public abstract AllReader reader(LeafReaderContext context) throws IOException; @Override - public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException { - return reader(context); + public final IOSupplier columnAtATimeReader(LeafReaderContext context) { + return () -> reader(context); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java index cccb045c333dc..299d66a7865b9 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java @@ -492,7 +492,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException { assertThat(loader, instanceOf(BooleanScriptBlockDocValuesReader.BooleanScriptBlockLoader.class)); // ignored source doesn't support column at a time loading: - var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()); + var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get(); assertThat(columnAtATimeLoader, instanceOf(BooleanScriptBlockDocValuesReader.class)); var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst()); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java index 033e806b5bed4..00964224c0068 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java @@ -855,7 +855,7 @@ public void testSingletonLongBulkBlockReadingManyValues() throws Exception { LeafReaderContext context = reader.leaves().get(0); { // One big doc block - var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context); + var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context).get(); assertThat(columnReader.numericDocValues(), instanceOf(BlockLoader.OptionalColumnAtATimeReader.class)); var docBlock = TestBlock.docs(IntStream.range(from, to).toArray()); var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false); @@ -867,7 +867,7 @@ public void testSingletonLongBulkBlockReadingManyValues() throws Exception { { // Smaller doc blocks int docBlockSize = 1000; - var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context); + var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context).get(); assertThat(columnReader.numericDocValues(), instanceOf(BlockLoader.OptionalColumnAtATimeReader.class)); for (int i = from; i < to; i += docBlockSize) { var docBlock = TestBlock.docs(IntStream.range(i, i + docBlockSize).toArray()); @@ -881,7 +881,7 @@ public void testSingletonLongBulkBlockReadingManyValues() throws Exception { } { // One smaller doc block: - var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context); + var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context).get(); assertThat(columnReader.numericDocValues(), instanceOf(BlockLoader.OptionalColumnAtATimeReader.class)); var docBlock = TestBlock.docs(IntStream.range(1010, 2020).toArray()); var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false); @@ -893,7 +893,7 @@ public void testSingletonLongBulkBlockReadingManyValues() throws Exception { } { // Read two tiny blocks: - var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context); + var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context).get(); assertThat(columnReader.numericDocValues(), instanceOf(BlockLoader.OptionalColumnAtATimeReader.class)); var docBlock = TestBlock.docs(IntStream.range(32, 64).toArray()); var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DateRangeFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DateRangeFieldMapperTests.java index 7b54fb69922ef..14b04aab532b1 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DateRangeFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DateRangeFieldMapperTests.java @@ -126,6 +126,7 @@ public void testNullValueBlockLoader() throws IOException { iw.close(); try (DirectoryReader reader = DirectoryReader.open(directory)) { TestBlock block = (TestBlock) loader.columnAtATimeReader(reader.leaves().get(0)) + .get() .read(TestBlock.factory(), new BlockLoader.Docs() { @Override public int count() { diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java index 9f20befaf535e..adfecdce3471a 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java @@ -545,7 +545,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException { assertThat(loader, instanceOf(DateScriptBlockDocValuesReader.DateScriptBlockLoader.class)); // ignored source doesn't support column at a time loading: - var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()); + var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get(); assertThat(columnAtATimeLoader, instanceOf(DateScriptBlockDocValuesReader.class)); var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst()); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java index 183a71f65b40a..d4d114e464e2b 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java @@ -294,7 +294,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException { BlockLoader loader = fieldType.blockLoader(blContext(Settings.EMPTY, true)); assertThat(loader, instanceOf(DoubleScriptBlockDocValuesReader.DoubleScriptBlockLoader.class)); // ignored source doesn't support column at a time loading: - var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()); + var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get(); assertThat(columnAtATimeLoader, instanceOf(DoubleScriptBlockDocValuesReader.class)); var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst()); assertThat(rowStrideReader, instanceOf(DoubleScriptBlockDocValuesReader.class)); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java index 55dd4f18815e5..d692d565bba1b 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java @@ -325,7 +325,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException { assertThat(loader, instanceOf(IpScriptBlockDocValuesReader.IpScriptBlockLoader.class)); // ignored source doesn't support column at a time loading: - var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()); + var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get(); assertThat(columnAtATimeLoader, instanceOf(IpScriptBlockDocValuesReader.class)); var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst()); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java index f3b026c8a13ce..ab24518135f6e 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java @@ -446,7 +446,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException { BlockLoader loader = fieldType.blockLoader(blContext(Settings.EMPTY, true)); assertThat(loader, instanceOf(KeywordScriptBlockDocValuesReader.KeywordScriptBlockLoader.class)); // ignored source doesn't support column at a time loading: - var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()); + var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get(); assertThat(columnAtATimeLoader, instanceOf(KeywordScriptBlockDocValuesReader.class)); var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst()); assertThat(rowStrideReader, instanceOf(KeywordScriptBlockDocValuesReader.class)); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java index 09005682f9e43..e9588aeafbff3 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java @@ -327,7 +327,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException { BlockLoader loader = fieldType.blockLoader(blContext(Settings.EMPTY, true)); assertThat(loader, instanceOf(LongScriptBlockDocValuesReader.LongScriptBlockLoader.class)); // ignored source doesn't support column at a time loading: - var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()); + var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get(); assertThat(columnAtATimeLoader, instanceOf(LongScriptBlockDocValuesReader.class)); var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst()); assertThat(rowStrideReader, instanceOf(LongScriptBlockDocValuesReader.class)); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java index bbce5379d32e0..fe6f460aec782 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java @@ -2201,12 +2201,8 @@ public void testConditionalBlockLoader() throws IOException { } else { var columnReader = blockLoader.columnAtATimeReader(ctx); assertNotNull(columnReader); - testBlock = (TestBlock) columnReader.read( - TestBlock.factory(), - TestBlock.docs(IntStream.range(0, numDocs).toArray()), - 0, - randomBoolean() - ); + testBlock = (TestBlock) columnReader.get() + .read(TestBlock.factory(), TestBlock.docs(IntStream.range(0, numDocs).toArray()), 0, randomBoolean()); } for (int i = 0; i < textValues.size(); i++) { Object expected = textValues.get(i); diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackSubqueryIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackSubqueryIT.java index 5f891806337ab..2fd9a2f0ede12 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackSubqueryIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackSubqueryIT.java @@ -66,17 +66,9 @@ public void testManyKeywordFieldsWith10UniqueValuesInSubqueryIntermediateResults */ public void testManyRandomKeywordFieldsInSubqueryIntermediateResults() throws IOException { // 500MB random/unique keyword values trigger CBE, should not OOM - if (isServerless()) { // both 100 and 500 docs OOM in serverless - return; - } int docs = 500; heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true); - // 2 subqueries are enough to trigger CBE, confirmed where this CBE happens in ExchangeService.doFetchPageAsync, - // as a few big pages are loaded into the exchange buffer - // TODO 8 subqueries OOM, because the memory consumed by lucene is not properly tracked in ValuesSourceReaderOperator yet. - // Lucene90DocValuesProducer are on the top of objects list, also BlockSourceReader.scratch is not tracked by circuit breaker yet, - // skip 8 subqueries for now - for (int subquery : List.of(DEFAULT_SUBQUERIES)) { + for (int subquery : List.of(DEFAULT_SUBQUERIES, MAX_SUBQUERIES)) { assertCircuitBreaks(attempt -> buildSubqueries(subquery, "manybigfields", "")); } } @@ -92,8 +84,7 @@ public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithSortOneF } int docs = 500; // 500MB random/unique keyword values heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true); - // TODO skip 8 subqueries, it OOMs in CI, the same reason as sort many fields - for (int subquery : List.of(DEFAULT_SUBQUERIES)) { + for (int subquery : List.of(DEFAULT_SUBQUERIES, MAX_SUBQUERIES)) { assertCircuitBreaks(attempt -> buildSubqueriesWithSort(subquery, "manybigfields", " f000 ")); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java index 4b2b06763a547..534b514bec851 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java @@ -490,7 +490,9 @@ protected final List blockLoaderReadValuesFromColumnAtATimeReader( BlockLoader loader = fieldType.blockLoader(blContext(settings, true)); List all = new ArrayList<>(); for (LeafReaderContext ctx : reader.leaves()) { - TestBlock block = (TestBlock) loader.columnAtATimeReader(ctx).read(TestBlock.factory(), TestBlock.docs(ctx), offset, false); + TestBlock block = (TestBlock) loader.columnAtATimeReader(ctx) + .get() + .read(TestBlock.factory(), TestBlock.docs(ctx), offset, false); for (int i = 0; i < block.size(); i++) { all.add(block.get(i)); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java index 431902eaa1531..d2f76c80c4f13 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java @@ -146,7 +146,7 @@ private Object load(BlockLoader blockLoader, LeafReaderContext context, MapperSe } } BlockLoader.Docs docs = TestBlock.docs(docArray); - var block = (TestBlock) columnAtATimeReader.read(TestBlock.factory(), docs, offset, false); + var block = (TestBlock) columnAtATimeReader.get().read(TestBlock.factory(), docs, offset, false); assertThat(block.size(), equalTo(docArray.length - offset)); return block.get(0); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java index c5de1f272a59b..a383c64476a63 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java @@ -1635,7 +1635,7 @@ private void testSingletonBulkBlockReading(Function supplier, Consumer track) + implements + BlockLoader.ColumnAtATimeReader { + @Override + public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset, boolean nullsFiltered) + throws IOException { + BlockLoader.ColumnAtATimeReader reader = supplier.get(); + track.accept(reader); + return reader.read(factory, docs, offset, nullsFiltered); + } + + @Override + public boolean canReuse(int startingDocID) { + // There's no state preserved to reuse + return true; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java index d620031141a63..13106843b3ed8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java @@ -9,6 +9,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.util.IOSupplier; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.DocBlock; @@ -53,9 +54,13 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOpe * @param shardContexts per-shard loading information * @param docChannel the channel containing the shard, leaf/segment and doc id */ - public record Factory(ByteSizeValue jumboSize, List fields, IndexedByShardId shardContexts, int docChannel) - implements - OperatorFactory { + public record Factory( + ByteSizeValue jumboSize, + List fields, + IndexedByShardId shardContexts, + boolean reuseColumnLoaders, + int docChannel + ) implements OperatorFactory { public Factory { @@ -66,7 +71,14 @@ public record Factory(ByteSizeValue jumboSize, List fields, IndexedBy @Override public Operator get(DriverContext driverContext) { - return new ValuesSourceReaderOperator(driverContext, jumboSize.getBytes(), fields, shardContexts, docChannel); + return new ValuesSourceReaderOperator( + driverContext, + jumboSize.getBytes(), + fields, + shardContexts, + reuseColumnLoaders, + docChannel + ); } @Override @@ -152,6 +164,7 @@ public record ShardContext( final long jumboBytes; final FieldWork[] fields; final IndexedByShardId shardContexts; + private final boolean reuseColumnLoaders; private final int docChannel; private final Map readersBuilt = new TreeMap<>(); @@ -170,6 +183,7 @@ public ValuesSourceReaderOperator( long jumboBytes, List fields, IndexedByShardId shardContexts, + boolean reuseColumnLoaders, int docChannel ) { if (fields.isEmpty()) { @@ -182,6 +196,7 @@ public ValuesSourceReaderOperator( this.fields[i] = new FieldWork(fields.get(i), i); } this.shardContexts = shardContexts; + this.reuseColumnLoaders = reuseColumnLoaders; this.docChannel = docChannel; } @@ -261,7 +276,9 @@ protected class FieldWork { BlockLoader loader; @Nullable ConverterEvaluator converter; + @Nullable BlockLoader.ColumnAtATimeReader columnAtATime; + @Nullable BlockLoader.RowStrideReader rowStride; FieldWork(FieldInfo info, int fieldIdx) { @@ -294,8 +311,17 @@ void newShard(int shard) { BlockLoader.ColumnAtATimeReader columnAtATime(LeafReaderContext ctx) throws IOException { if (columnAtATime == null) { - columnAtATime = loader.columnAtATimeReader(ctx); - trackReader("column_at_a_time", this.columnAtATime); + IOSupplier supplier = loader.columnAtATimeReader(ctx); + if (supplier == null) { + trackReader("column_at_a_time", null); + return null; + } + if (reuseColumnLoaders) { + columnAtATime = supplier.get(); + trackReader("column_at_a_time", columnAtATime); + } else { + columnAtATime = new ColumnAtATimeReaderWithoutReuse(supplier, r -> trackReader("column_at_a_time", r)); + } } return columnAtATime; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 97f672b9041d5..c003834079b99 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -190,6 +190,7 @@ public void testPushRoundToToQuery() throws IOException { new IndexedByShardIdFromSingleton<>(new ValuesSourceReaderOperator.ShardContext(reader, (sourcePaths) -> { throw new UnsupportedOperationException(); }, 0.8)), + randomBoolean(), 0 ); List pages = new ArrayList<>(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 22947128e3eb2..fecf877734bd8 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -211,6 +211,7 @@ private List runQuery(Set values, Query query, boolean shuffleDocs new IndexedByShardIdFromSingleton<>(new ValuesSourceReaderOperator.ShardContext(reader, (sourcePaths) -> { throw new UnsupportedOperationException(); }, 0.2)), + true, 0 ) ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java index 0bd6eefd4ee25..a130b5dc1def9 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java @@ -91,7 +91,7 @@ public int get(int i) { } }; var columnAtATimeReader = blockLoader.columnAtATimeReader(ctx); - try (BlockLoader.Block block = columnAtATimeReader.read(blockFactory, docs, start, false)) { + try (BlockLoader.Block block = columnAtATimeReader.get().read(blockFactory, docs, start, false)) { BytesRefBlock result = (BytesRefBlock) block; BytesRef scratch = new BytesRef(); for (int i = 0; i < result.getPositionCount(); i++) { @@ -255,7 +255,7 @@ public int get(int i) { } }; var columnAtATimeReader = blockLoader.columnAtATimeReader(ctx); - try (BlockLoader.Block block = columnAtATimeReader.read(blockFactory, docs, start, false)) { + try (BlockLoader.Block block = columnAtATimeReader.get().read(blockFactory, docs, start, false)) { BytesRefBlock result = (BytesRefBlock) block; assertNotNull(result.asVector()); boolean enclosedInSingleRange = false; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java index 1c450eb3051fc..656719e173991 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java @@ -243,6 +243,7 @@ private static Operator.OperatorFactory factory( return loaderAndConverter; })), new IndexedByShardIdFromList<>(shardContexts), + randomBoolean(), 0 ); } @@ -495,6 +496,7 @@ public void testManySingleDocPages() { ByteSizeValue.ofGb(1), List.of(testCase.info, fieldInfo(mapperService(indexKey).fieldType("key"), ElementType.INT)), new IndexedByShardIdFromList<>(shardContexts), + randomBoolean(), 0 ).get(driverContext) ); @@ -569,6 +571,7 @@ private void loadSimpleAndAssert( fieldInfo(mapperService("index1").fieldType("indexKey"), ElementType.BYTES_REF) ), new IndexedByShardIdFromList<>(shardContexts), + randomBoolean(), 0 ).get(driverContext) ); @@ -582,6 +585,7 @@ private void loadSimpleAndAssert( ByteSizeValue.ofGb(1), b.stream().map(i -> i.info).toList(), new IndexedByShardIdFromList<>(shardContexts), + randomBoolean(), 0 ).get(driverContext) ); @@ -692,6 +696,7 @@ private void testLoadAllStatus(boolean allInOnePage) { ByteSizeValue.ofGb(1), List.of(i.info), new IndexedByShardIdFromList<>(shardContexts), + randomBoolean(), 0 ).get(driverContext) ) @@ -1382,6 +1387,7 @@ public void testNullsShared() { ) ), new IndexedByShardIdFromList<>(shardContexts), + randomBoolean(), 0 ).get(driverContext) ), @@ -1416,6 +1422,7 @@ public void testDescriptionOfMany() throws IOException { new IndexedByShardIdFromSingleton<>( new ValuesSourceReaderOperator.ShardContext(reader(indexKey), (sourcePaths) -> SourceLoader.FROM_STORED_SOURCE, 0.2) ), + randomBoolean(), 0 ); assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]")); @@ -1466,6 +1473,7 @@ public void testManyShards() throws IOException { return ValuesSourceReaderOperator.load(ft.blockLoader(blContext())); })), new IndexedByShardIdFromList<>(readerShardContexts), + randomBoolean(), 0 ); DriverContext driverContext = driverContext(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java index 49505b2c38f53..52e4f3e9467ec 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.compute.test.CannedSourceOperator; import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.TestDriverFactory; +import org.elasticsearch.compute.test.TestDriverRunner; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; @@ -172,6 +173,7 @@ static Operator.OperatorFactory factory(IndexReader reader, String name, Element STORED_FIELDS_SEQUENTIAL_PROPORTIONS ) ), + randomBoolean(), 0 ); } @@ -485,11 +487,23 @@ public void testLoadAllInOnePage() { } public void testManySingleDocPages() { + testManySingleDocPages(true); + } + + public void testManySingleDocPagesNoReuse() { + testManySingleDocPages(false); + } + + private void testManySingleDocPages(boolean reuseColumnLoaders) { DriverContext driverContext = driverContext(); + + boolean shuffle = randomBoolean(); + int sortedPages = 5; int numDocs = between(10, 100); List input = CannedSourceOperator.collectPages(simpleInput(driverContext, numDocs, between(1, numDocs), 1)); - Randomness.shuffle(input); - List operators = new ArrayList<>(); + if (shuffle) { + Randomness.shuffle(input.subList(sortedPages, input.size() - 1)); // Sort some of the list so we reuse some loaders + } Checks checks = new Checks(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING); FieldCase testCase = new FieldCase( new KeywordFieldMapper.KeywordFieldType("kwd"), @@ -497,21 +511,20 @@ public void testManySingleDocPages() { checks::tags, StatusChecks::keywordsFromDocValues ); - operators.add( - new ValuesSourceReaderOperator.Factory( - ByteSizeValue.ofGb(1), - List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)), - new IndexedByShardIdFromSingleton<>( - new ValuesSourceReaderOperator.ShardContext( - reader, - (sourcePaths) -> SourceLoader.FROM_STORED_SOURCE, - STORED_FIELDS_SEQUENTIAL_PROPORTIONS - ) - ), - 0 - ).get(driverContext) - ); - List results = drive(operators, input.iterator(), driverContext); + Operator load = new ValuesSourceReaderOperator.Factory( + ByteSizeValue.ofGb(1), + List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)), + new IndexedByShardIdFromSingleton<>( + new ValuesSourceReaderOperator.ShardContext( + reader, + (sourcePaths) -> SourceLoader.FROM_STORED_SOURCE, + STORED_FIELDS_SEQUENTIAL_PROPORTIONS + ) + ), + reuseColumnLoaders, + 0 + ).get(driverContext); + List results = new TestDriverRunner().numThreads(1).run(load, input.iterator(), driverContext); assertThat(results, hasSize(input.size())); for (Page page : results) { assertThat(page.getBlockCount(), equalTo(3)); @@ -521,6 +534,22 @@ public void testManySingleDocPages() { testCase.checkResults.check(page.getBlock(1), p, key); } } + ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) load.status(); + Matcher readersMatcher; + if (reuseColumnLoaders) { + if (shuffle) { + readersMatcher = lessThanOrEqualTo(numDocs - sortedPages + reader.leaves().size()); + } else { + readersMatcher = equalTo(reader.leaves().size()); + } + } else { + readersMatcher = equalTo(numDocs); + } + assertMap( + status.readersBuilt(), + matchesMap().entry("key:column_at_a_time:IntsFromDocValues.Singleton", readersMatcher) + .entry("kwd:column_at_a_time:BytesRefsFromOrds.Singleton", readersMatcher) + ); } public void testEmpty() { @@ -618,6 +647,7 @@ private void loadSimpleAndAssert( STORED_FIELDS_SEQUENTIAL_PROPORTIONS ) ), + randomBoolean(), 0 ).get(driverContext) ); @@ -637,6 +667,7 @@ private void loadSimpleAndAssert( STORED_FIELDS_SEQUENTIAL_PROPORTIONS ) ), + randomBoolean(), 0 ).get(driverContext) ); @@ -736,6 +767,7 @@ private void testLoadAllStatus(boolean allInOnePage) { STORED_FIELDS_SEQUENTIAL_PROPORTIONS ) ), + randomBoolean(), 0 ).get(driverContext) ) @@ -975,6 +1007,7 @@ private void testLoadLong(boolean shuffle, boolean manySegments) throws IOExcept STORED_FIELDS_SEQUENTIAL_PROPORTIONS ) ), + randomBoolean(), 0 ).get(driverContext) ) @@ -1638,6 +1671,7 @@ public void testNullsShared() { STORED_FIELDS_SEQUENTIAL_PROPORTIONS ) ), + randomBoolean(), 0 ).get(driverContext) ), @@ -1690,6 +1724,7 @@ private void testSequentialStoredFields(boolean sequential, int docCount) throws STORED_FIELDS_SEQUENTIAL_PROPORTIONS ) ), + randomBoolean(), 0 ).get(driverContext); List results = drive(op, source.iterator(), driverContext); @@ -1725,6 +1760,7 @@ public void testDescriptionOfMany() throws IOException { STORED_FIELDS_SEQUENTIAL_PROPORTIONS ) ), + randomBoolean(), 0 ); assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]")); @@ -1777,6 +1813,7 @@ public void testManyShards() throws IOException { return ValuesSourceReaderOperator.load(ft.blockLoader(blContext())); })), new IndexedByShardIdFromList<>(readerShardContexts), + randomBoolean(), 0 ); DriverContext driverContext = driverContext(); diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java index 0540b50f1a9dd..7804401f9edc6 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java @@ -7,19 +7,15 @@ package org.elasticsearch.compute.test; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.collect.Iterators; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArray; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.aggregation.blockhash.HashImplFactory; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; @@ -27,7 +23,6 @@ import org.elasticsearch.compute.operator.AsyncOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.compute.operator.DriverRunner; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.PageConsumerOperator; import org.elasticsearch.compute.operator.SinkOperator; @@ -36,18 +31,15 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.CrankyCircuitBreakerService; import org.elasticsearch.test.BreakerTestUtil; -import org.elasticsearch.threadpool.FixedExecutorBuilder; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.function.Supplier; -import java.util.stream.LongStream; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; /** * Base tests for {@link Operator}s that are not {@link SourceOperator} or {@link SinkOperator}. @@ -282,71 +274,36 @@ public void testSimpleFinishClose() { } } - protected final List drive(Operator operator, Iterator input, DriverContext driverContext) { - return drive(List.of(operator), input, driverContext); + /** + * @deprecated use {@link TestDriverRunner} + */ + @Deprecated + protected final List drive(Operator operator, Iterator input, DriverContext context) { + return new TestDriverRunner().run(operator, input, context); } - protected final List drive(List operators, Iterator input, DriverContext driverContext) { - List results = new ArrayList<>(); - boolean success = false; - try ( - Driver d = TestDriverFactory.create( - driverContext, - new CannedSourceOperator(input), - operators, - new TestResultPageSinkOperator(results::add) - ) - ) { - runDriver(d); - success = true; - } finally { - if (success == false) { - Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(results.iterator(), p -> p::releaseBlocks))); - } - } - return results; + /** + * @deprecated use {@link TestDriverRunner} + */ + @Deprecated + protected final List drive(List operators, Iterator input, DriverContext context) { + return new TestDriverRunner().run(operators, input, context); } + /** + * @deprecated use {@link TestDriverRunner} + */ + @Deprecated public static void runDriver(Driver driver) { - runDriver(List.of(driver)); + new TestDriverRunner().run(driver); } + /** + * @deprecated use {@link TestDriverRunner} + */ + @Deprecated public static void runDriver(List drivers) { - drivers = new ArrayList<>(drivers); - int dummyDrivers = between(0, 10); - for (int i = 0; i < dummyDrivers; i++) { - drivers.add( - TestDriverFactory.create( - new DriverContext(BigArrays.NON_RECYCLING_INSTANCE, TestBlockFactory.getNonBreakingInstance(), null), - new SequenceLongBlockSourceOperator( - TestBlockFactory.getNonBreakingInstance(), - LongStream.range(0, between(1, 100)), - between(1, 100) - ), - List.of(), - new PageConsumerOperator(Page::releaseBlocks) - ) - ); - } - Randomness.shuffle(drivers); - int numThreads = between(1, 16); - ThreadPool threadPool = new TestThreadPool( - getTestClass().getSimpleName(), - new FixedExecutorBuilder(Settings.EMPTY, "esql", numThreads, 1024, "esql", EsExecutors.TaskTrackingConfig.DEFAULT) - ); - var driverRunner = new DriverRunner(threadPool.getThreadContext()) { - @Override - protected void start(Driver driver, ActionListener driverListener) { - Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 10000), driverListener); - } - }; - PlainActionFuture future = new PlainActionFuture<>(); - try { - driverRunner.runToCompletion(drivers, future); - future.actionGet(TimeValue.timeValueSeconds(30)); - } finally { - terminate(threadPool); - } + new TestDriverRunner().run(drivers); } public static void assertDriverContext(DriverContext driverContext) { diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TestDriverRunner.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TestDriverRunner.java new file mode 100644 index 0000000000000..78fd5b2b2e432 --- /dev/null +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TestDriverRunner.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.test; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Driver; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.DriverRunner; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.compute.operator.PageConsumerOperator; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.LongStream; + +import static org.elasticsearch.test.ESTestCase.between; +import static org.elasticsearch.test.ESTestCase.terminate; + +public class TestDriverRunner { + private Integer numThreads = null; + + /** + * Set the number of threads use to run the driver. If this isn't called + * we run the driver with a random number of threads. + */ + public TestDriverRunner numThreads(int numThreads) { + this.numThreads = numThreads; + return this; + } + + /** + * Run a driver with a single operator, returning the result. + */ + public List run(Operator operator, Iterator input, DriverContext context) { + return run(List.of(operator), input, context); + } + + /** + * Run a single driver, returning the result. + */ + public List run(List operators, Iterator input, DriverContext driverContext) { + List results = new ArrayList<>(); + boolean success = false; + try ( + Driver d = TestDriverFactory.create( + driverContext, + new CannedSourceOperator(input), + operators, + new TestResultPageSinkOperator(results::add) + ) + ) { + run(d); + success = true; + } finally { + if (success == false) { + Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(results.iterator(), p -> p::releaseBlocks))); + } + } + return results; + } + + /** + * Run a driver. + */ + public void run(Driver driver) { + run(List.of(driver)); + } + + /** + * Run many drivers. + */ + public void run(List drivers) { + drivers = new ArrayList<>(drivers); + int dummyDrivers = between(0, 10); + for (int i = 0; i < dummyDrivers; i++) { + drivers.add( + TestDriverFactory.create( + new DriverContext(BigArrays.NON_RECYCLING_INSTANCE, TestBlockFactory.getNonBreakingInstance(), null), + new SequenceLongBlockSourceOperator( + TestBlockFactory.getNonBreakingInstance(), + LongStream.range(0, between(1, 100)), + between(1, 100) + ), + List.of(), + new PageConsumerOperator(Page::releaseBlocks) + ) + ); + } + Randomness.shuffle(drivers); + int numThreads = this.numThreads == null ? between(1, 16) : this.numThreads; + ThreadPool threadPool = new TestThreadPool( + "test", + new FixedExecutorBuilder(Settings.EMPTY, "esql", numThreads, 1024, "esql", EsExecutors.TaskTrackingConfig.DEFAULT) + ); + var driverRunner = new DriverRunner(threadPool.getThreadContext()) { + @Override + protected void start(Driver driver, ActionListener driverListener) { + Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 10000), driverListener); + } + }; + PlainActionFuture future = new PlainActionFuture<>(); + try { + driverRunner.runToCompletion(drivers, future); + future.actionGet(TimeValue.timeValueSeconds(30)); + } finally { + terminate(threadPool); + } + } +} diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 96a1a5ca11fd3..ac1f6fdcd79c5 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -29,8 +29,10 @@ import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; import org.elasticsearch.xpack.esql.tools.ProfileParser; +import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.ClassRule; @@ -74,6 +76,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.Is.is; @@ -1002,6 +1005,120 @@ public void testDateMathInJoin() throws IOException { } } + /** + * Runs enough aggs that we don't reuse column block loaders and + * asserts that we don't. + */ + public void testAggManyFieldsNoReuse() throws IOException { + testAggManyFields(PlannerSettings.REUSE_COLUMN_LOADERS_THRESHOLD.get(Settings.EMPTY) + 1, greaterThan(1)); + } + + /** + * Runs just less than enough aggs that we still reuse column + * block loaders and asserts that we don't. + */ + public void testAggManyFieldsReuse() throws IOException { + testAggManyFields(PlannerSettings.REUSE_COLUMN_LOADERS_THRESHOLD.get(Settings.EMPTY) - 1, equalTo(1)); + } + + private void testAggManyFields(int fieldCount, Matcher readerMatcher) throws IOException { + Request createIndex = new Request("PUT", testIndexName()); + createIndex.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1 + } + } + }"""); + Response response = client().performRequest(createIndex); + assertThat( + entityToMap(response.getEntity(), XContentType.JSON), + matchesMap().entry("shards_acknowledged", true).entry("index", testIndexName()).entry("acknowledged", true) + ); + + StringBuilder b = new StringBuilder(); + for (int round = 0; round < 10; round++) { + for (int i = 0; i < 1000; i++) { + b.append(String.format(Locale.ROOT, """ + {"create":{"_index":"%s"}} + {""", testIndexName())); + for (int f = 0; f < fieldCount; f++) { + if (f != 0) { + b.append(", "); + } + b.append(String.format(Locale.ROOT, "\"f%03d\": %s", f, i)); + } + b.append("}\n"); + } + Request bulk = new Request("POST", "/_bulk"); + bulk.addParameter("refresh", "true"); + bulk.addParameter("filter_path", "errors"); + bulk.setJsonEntity(b.toString()); + response = client().performRequest(bulk); + Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + } + + Request forceMerge = new Request("POST", '/' + testIndexName() + "/_forcemerge"); + forceMerge.addParameter("max_num_segments", String.valueOf(1)); + assertOK(client().performRequest(forceMerge)); + + StringBuilder query = new StringBuilder(fromIndex()).append(" | STATS "); + for (int f = 0; f < fieldCount; f++) { + if (f != 0) { + query.append(", "); + } + query.append(String.format(Locale.ROOT, "AVG(f%03d)", f)); + } + RequestObjectBuilder builder = requestObjectBuilder().query(query.toString()); + builder.profile(true); + // Lock to shard level partitioning, so we get consistent profile output + builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); + builder.pragmasOk(); + Map result = runEsql(builder); + ListMatcher schemaMatcha = matchesList(); + ListMatcher resultMatcher = matchesList(); + for (int f = 0; f < fieldCount; f++) { + schemaMatcha = schemaMatcha.item(Map.of("name", String.format(Locale.ROOT, "AVG(f%03d)", f), "type", "double")); + resultMatcher = resultMatcher.item(499.5); + } + assertResultMap( + result, + getResultMatcher(result).entry("profile", getProfileMatcher()), + schemaMatcha, + matchesList().item(resultMatcher) + ); + + Map reader = null; + @SuppressWarnings("unchecked") + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + String description = p.get("description").toString(); + if (description.equals("data") == false) { + continue; + } + fixTypesOnProfile(p); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + String name = (String) o.get("operator"); + if (name.startsWith("ValuesSourceReader")) { + assertThat(reader, nullValue()); + reader = o; + } + } + } + assertNotNull(reader); + MapMatcher readersBuiltMatcher = matchesMap(); + for (int f = 0; f < fieldCount; f++) { + readersBuiltMatcher = readersBuiltMatcher.entry( + String.format(Locale.ROOT, "f%03d:column_at_a_time:LongsFromDocValues.Singleton", f), + readerMatcher + ); + } + assertMap(reader, matchesMap().extraOk().entry("status", matchesMap().extraOk().entry("readers_built", readersBuiltMatcher))); + } + static MapMatcher commonProfile() { return matchesMap() // .entry("description", any(String.class)) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 1f9a58c99b537..4731341b69062 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -595,7 +595,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { 10_000, ByteSizeValue.ofMb(1), 1000, - 0.1 + 0.1, + PlannerSettings.REUSE_COLUMN_LOADERS_THRESHOLD.get(Settings.EMPTY) ); public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices( diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 0e41db3bd39c4..b6947cbb4c229 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -357,6 +357,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices, throw new IllegalStateException("can't load source here"); }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY)) ), + true, 0 ); CancellableTask parentTask = new EsqlQueryTask( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index d5dede2a07aa4..93dd6480c79c5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -466,6 +466,7 @@ private static Operator extractFieldsOperator( EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY) ) ), + true, 0 ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java index 21f729ee36b81..55c44f5f08b42 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java @@ -334,7 +334,7 @@ public Operator get(DriverContext driverContext) { EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(org.elasticsearch.common.settings.Settings.EMPTY) ) ); - return new ValuesSourceReaderOperator(driverContext, jumboSize.getBytes(), fields, shardContexts, docChannel); + return new ValuesSourceReaderOperator(driverContext, jumboSize.getBytes(), fields, shardContexts, true, docChannel); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 9ae955612d7ec..bfef23810c26f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -169,7 +169,11 @@ public EsPhysicalOperationProviders( } @Override - public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fieldExtractExec, PhysicalOperation source) { + public final PhysicalOperation fieldExtractPhysicalOperation( + FieldExtractExec fieldExtractExec, + PhysicalOperation source, + LocalExecutionPlannerContext context + ) { Layout.Builder layout = source.layout.builder(); var sourceAttr = fieldExtractExec.sourceAttribute(); int docChannel = source.layout.get(sourceAttr.id()).channel(); @@ -184,8 +188,16 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi s.storedFieldsSequentialProportion() ) ); + boolean reuseColumnLoaders = fieldExtractExec.attributesToExtract().size() <= context.plannerSettings() + .reuseColumnLoadersThreshold(); return source.with( - new ValuesSourceReaderOperator.Factory(plannerSettings.valuesLoadingJumboSize(), fields, readers, docChannel), + new ValuesSourceReaderOperator.Factory( + plannerSettings.valuesLoadingJumboSize(), + fields, + readers, + reuseColumnLoaders, + docChannel + ), layout.build() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index ef62a37a10ea7..1c621c39d33db 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -388,7 +388,7 @@ private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutio } private PhysicalOperation planFieldExtractNode(FieldExtractExec fieldExtractExec, LocalExecutionPlannerContext context) { - return physicalOperationProviders.fieldExtractPhysicalOperation(fieldExtractExec, plan(fieldExtractExec.child(), context)); + return physicalOperationProviders.fieldExtractPhysicalOperation(fieldExtractExec, plan(fieldExtractExec.child(), context), context); } private PhysicalOperation planOutput(OutputExec outputExec, LocalExecutionPlannerContext context) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java index 6353005f44ace..1ce7a3c0e5385 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java @@ -14,7 +14,11 @@ import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation; interface PhysicalOperationProviders { - PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fieldExtractExec, PhysicalOperation source); + PhysicalOperation fieldExtractPhysicalOperation( + FieldExtractExec fieldExtractExec, + PhysicalOperation source, + LocalExecutionPlannerContext context + ); PhysicalOperation sourcePhysicalOperation(EsQueryExec esQuery, LocalExecutionPlannerContext context); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java index 690cd522ae670..31365ada060cf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java @@ -16,6 +16,8 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.monitor.jvm.JvmInfo; +import java.util.List; + /** * Values for cluster level settings used in physical planning. */ @@ -93,6 +95,34 @@ public class PlannerSettings { Setting.Property.Dynamic ); + /** + * If we're loading more than this many fields at a time we discard column loaders after each + * page regardless of whether we can reuse them. They have significant per-field memory overhead + * so discarding them between pages allows some queries that would have OOMed to succeed. Usually + * the paths that need very high performance don't load more than a handful of fields at a time, + * so they do reuse fields. + */ + public static final Setting REUSE_COLUMN_LOADERS_THRESHOLD = Setting.intSetting( + "esql.reuse_column_loaders_threshold", + 30, + 0, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static List> settings() { + return List.of( + DEFAULT_DATA_PARTITIONING, + VALUES_LOADING_JUMBO_SIZE, + LUCENE_TOPN_LIMIT, + INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, + REDUCTION_LATE_MATERIALIZATION, + PARTIAL_AGGREGATION_EMIT_KEYS_THRESHOLD, + PARTIAL_AGGREGATION_EMIT_UNIQUENESS_THRESHOLD, + REUSE_COLUMN_LOADERS_THRESHOLD + ); + } + private volatile DataPartitioning defaultDataPartitioning; private volatile ByteSizeValue valuesLoadingJumboSize; private volatile int luceneTopNLimit; @@ -100,6 +130,7 @@ public class PlannerSettings { private volatile int partialEmitKeysThreshold; private volatile double partialEmitUniquenessThreshold; + private volatile int reuseColumnLoadersThreshold; /** * Ctor for prod that listens for updates from the {@link ClusterService}. @@ -112,6 +143,7 @@ public PlannerSettings(ClusterService clusterService) { clusterSettings.initializeAndWatch(INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, v -> this.intermediateLocalRelationMaxSize = v); clusterSettings.initializeAndWatch(PARTIAL_AGGREGATION_EMIT_KEYS_THRESHOLD, v -> this.partialEmitKeysThreshold = v); clusterSettings.initializeAndWatch(PARTIAL_AGGREGATION_EMIT_UNIQUENESS_THRESHOLD, v -> this.partialEmitUniquenessThreshold = v); + clusterSettings.initializeAndWatch(REUSE_COLUMN_LOADERS_THRESHOLD, v -> this.reuseColumnLoadersThreshold = v); } /** @@ -123,7 +155,8 @@ public PlannerSettings( int luceneTopNLimit, ByteSizeValue intermediateLocalRelationMaxSize, int partialEmitKeysThreshold, - double partialEmitUniquenessThreshold + double partialEmitUniquenessThreshold, + int reuseColumnLoadersThreshold ) { this.defaultDataPartitioning = defaultDataPartitioning; this.valuesLoadingJumboSize = valuesLoadingJumboSize; @@ -131,6 +164,7 @@ public PlannerSettings( this.intermediateLocalRelationMaxSize = intermediateLocalRelationMaxSize; this.partialEmitKeysThreshold = partialEmitKeysThreshold; this.partialEmitUniquenessThreshold = partialEmitUniquenessThreshold; + this.reuseColumnLoadersThreshold = reuseColumnLoadersThreshold; } public DataPartitioning defaultDataPartitioning() { @@ -170,4 +204,15 @@ public int partialEmitKeysThreshold() { public double partialEmitUniquenessThreshold() { return partialEmitUniquenessThreshold; } + + /** + * If we're loading more than this many fields at a time we discard column loaders after each + * page regardless of whether we can reuse them. They have significant per-field memory overhead + * so discarding them between pages allows some queries that would have OOMed to succeed. Usually + * the paths that need very high performance don't load more than a handful of fields at a time, + * so they do reuse fields. + */ + public int reuseColumnLoadersThreshold() { + return reuseColumnLoadersThreshold; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index ae12c8db8642a..e934f483ff9ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -255,18 +255,12 @@ public List> getSettings() { ESQL_QUERYLOG_THRESHOLD_INFO_SETTING, ESQL_QUERYLOG_THRESHOLD_WARN_SETTING, ESQL_QUERYLOG_INCLUDE_USER_SETTING, - PlannerSettings.DEFAULT_DATA_PARTITIONING, - PlannerSettings.VALUES_LOADING_JUMBO_SIZE, - PlannerSettings.LUCENE_TOPN_LIMIT, - PlannerSettings.INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, - PlannerSettings.REDUCTION_LATE_MATERIALIZATION, STORED_FIELDS_SEQUENTIAL_PROPORTION, EsqlFlags.ESQL_STRING_LIKE_ON_INDEX, - EsqlFlags.ESQL_ROUNDTO_PUSHDOWN_THRESHOLD, - PlannerSettings.PARTIAL_AGGREGATION_EMIT_KEYS_THRESHOLD, - PlannerSettings.PARTIAL_AGGREGATION_EMIT_UNIQUENESS_THRESHOLD + EsqlFlags.ESQL_ROUNDTO_PUSHDOWN_THRESHOLD ) ); + settings.addAll(PlannerSettings.settings()); if (ESQL_VIEWS_FEATURE_FLAG.isEnabled()) { settings.add(ViewService.MAX_VIEWS_COUNT_SETTING); settings.add(ViewService.MAX_VIEW_LENGTH_SETTING); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 1eb4d6e67d39d..b7c7c13aa4139 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -284,7 +284,8 @@ public void testTimeSeries() throws IOException { 10_000, ByteSizeValue.ofMb(1), between(1, 10000), - randomDoubleBetween(0.1, 1.0, true) + randomDoubleBetween(0.1, 1.0, true), + between(0, 1000) ); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( "test", diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java index 9d6815b74af5e..fe1f53fce3388 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java @@ -105,7 +105,11 @@ private static AnalysisRegistry createAnalysisRegistry() throws IOException { } @Override - public PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fieldExtractExec, PhysicalOperation source) { + public PhysicalOperation fieldExtractPhysicalOperation( + FieldExtractExec fieldExtractExec, + PhysicalOperation source, + LocalExecutionPlannerContext context + ) { Layout.Builder layout = source.layout.builder(); PhysicalOperation op = source; for (Attribute attr : fieldExtractExec.attributesToExtract()) { diff --git a/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java b/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java index 7bada615681c6..924059b683d7f 100644 --- a/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java +++ b/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java @@ -238,6 +238,7 @@ public void testNullValueBlockLoader() throws IOException { iw.close(); try (DirectoryReader reader = DirectoryReader.open(directory)) { TestBlock block = (TestBlock) loader.columnAtATimeReader(reader.leaves().get(0)) + .get() .read(TestBlock.factory(), new BlockLoader.Docs() { @Override public int count() {