Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -306,13 +307,16 @@ private static BlockLoader numericBlockLoader(WhereAndBaseName w, NumberFieldMap
@Benchmark
@OperationsPerInvocation(INDEX_SIZE)
public void benchmark() {
List<ValuesSourceReaderOperator.FieldInfo> fields = fields(name);
boolean reuseColumnLoaders = fields.size() <= PlannerSettings.REUSE_COLUMN_LOADERS_THRESHOLD.get(Settings.EMPTY);
ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(
new DriverContext(BigArrays.NON_RECYCLING_INSTANCE, blockFactory, null),
ByteSizeValue.ofMb(1).getBytes(),
fields(name),
fields,
new IndexedByShardIdFromSingleton<>(new ValuesSourceReaderOperator.ShardContext(reader, (sourcePaths) -> {
throw new UnsupportedOperationException("can't load _source here");
}, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
reuseColumnLoaders,
0
);
long sum = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -292,7 +293,7 @@ interface StoredFields {
* {@code null} or if they can't load column-at-a-time themselves.
*/
@Nullable
ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException;
IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) throws IOException;

/**
* Build a row-by-row reader. Must <strong>never</strong> return {@code null},
Expand Down Expand Up @@ -367,7 +368,7 @@ public Builder builder(BlockFactory factory, int expectedCount) {
protected abstract boolean canUsePreferLoaderForDoc(int docId) throws IOException;

@Override
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
public IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) throws IOException {
if (canUsePreferLoaderForLeaf(context)) {
return preferLoader.columnAtATimeReader(context);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.index.mapper.blockloader.ConstantNull;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
Expand Down Expand Up @@ -102,7 +103,7 @@ private SourceBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
}

@Override
public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
public final IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.index.mapper.BlockLoader.BytesRefBuilder;
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
Expand Down Expand Up @@ -44,7 +45,7 @@ public StoredFieldsBlockLoader(String field) {
}

@Override
public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) {
public final IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
Expand Down Expand Up @@ -54,7 +55,7 @@ protected FallbackSyntheticSourceBlockLoader(
}

@Override
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
public IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.search.fetch.StoredFieldsSpec;

import java.io.IOException;
Expand All @@ -26,7 +27,7 @@ public Builder builder(BlockFactory factory, int expectedCount) {
}

@Override
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) {
public IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
Expand All @@ -37,7 +38,7 @@ public Builder builder(BlockFactory factory, int expectedCount) {
}

@Override
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) {
public IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;

Expand All @@ -32,8 +33,8 @@ public Builder builder(BlockFactory factory, int expectedCount) {
}

@Override
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) {
return reader;
public IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) {
return () -> reader;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;

Expand All @@ -31,8 +32,8 @@ public Builder builder(BlockFactory factory, int expectedCount) {
}

@Override
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) {
return READER;
public IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) {
return () -> READER;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;

Expand All @@ -32,12 +33,12 @@ public Builder builder(BlockFactory factory, int expectedCount) {
}

@Override
public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
ColumnAtATimeReader reader = delegate.columnAtATimeReader(context);
public IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) throws IOException {
IOSupplier<ColumnAtATimeReader> reader = delegate.columnAtATimeReader(context);
if (reader == null) {
return null;
}
return new ColumnReader(reader);
return () -> new ColumnReader(reader.get());
}

private class ColumnReader implements ColumnAtATimeReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;

Expand Down Expand Up @@ -44,8 +45,8 @@ public abstract static class DocValuesBlockLoader implements BlockLoader {
public abstract AllReader reader(LeafReaderContext context) throws IOException;

@Override
public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
return reader(context);
public final IOSupplier<ColumnAtATimeReader> columnAtATimeReader(LeafReaderContext context) {
return () -> reader(context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException {
assertThat(loader, instanceOf(BooleanScriptBlockDocValuesReader.BooleanScriptBlockLoader.class));

// ignored source doesn't support column at a time loading:
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst());
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get();
assertThat(columnAtATimeLoader, instanceOf(BooleanScriptBlockDocValuesReader.class));

var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ public void testSingletonLongBulkBlockReadingManyValues() throws Exception {
LeafReaderContext context = reader.leaves().get(0);
{
// One big doc block
var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context);
var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context).get();
assertThat(columnReader.numericDocValues(), instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
var docBlock = TestBlock.docs(IntStream.range(from, to).toArray());
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false);
Expand All @@ -867,7 +867,7 @@ public void testSingletonLongBulkBlockReadingManyValues() throws Exception {
{
// Smaller doc blocks
int docBlockSize = 1000;
var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context);
var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context).get();
assertThat(columnReader.numericDocValues(), instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
for (int i = from; i < to; i += docBlockSize) {
var docBlock = TestBlock.docs(IntStream.range(i, i + docBlockSize).toArray());
Expand All @@ -881,7 +881,7 @@ public void testSingletonLongBulkBlockReadingManyValues() throws Exception {
}
{
// One smaller doc block:
var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context);
var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context).get();
assertThat(columnReader.numericDocValues(), instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
var docBlock = TestBlock.docs(IntStream.range(1010, 2020).toArray());
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false);
Expand All @@ -893,7 +893,7 @@ public void testSingletonLongBulkBlockReadingManyValues() throws Exception {
}
{
// Read two tiny blocks:
var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context);
var columnReader = (LongsBlockLoader.Singleton) blockLoader.columnAtATimeReader(context).get();
assertThat(columnReader.numericDocValues(), instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
var docBlock = TestBlock.docs(IntStream.range(32, 64).toArray());
var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void testNullValueBlockLoader() throws IOException {
iw.close();
try (DirectoryReader reader = DirectoryReader.open(directory)) {
TestBlock block = (TestBlock) loader.columnAtATimeReader(reader.leaves().get(0))
.get()
.read(TestBlock.factory(), new BlockLoader.Docs() {
@Override
public int count() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException {
assertThat(loader, instanceOf(DateScriptBlockDocValuesReader.DateScriptBlockLoader.class));

// ignored source doesn't support column at a time loading:
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst());
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get();
assertThat(columnAtATimeLoader, instanceOf(DateScriptBlockDocValuesReader.class));

var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException {
BlockLoader loader = fieldType.blockLoader(blContext(Settings.EMPTY, true));
assertThat(loader, instanceOf(DoubleScriptBlockDocValuesReader.DoubleScriptBlockLoader.class));
// ignored source doesn't support column at a time loading:
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst());
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get();
assertThat(columnAtATimeLoader, instanceOf(DoubleScriptBlockDocValuesReader.class));
var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst());
assertThat(rowStrideReader, instanceOf(DoubleScriptBlockDocValuesReader.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException {
assertThat(loader, instanceOf(IpScriptBlockDocValuesReader.IpScriptBlockLoader.class));

// ignored source doesn't support column at a time loading:
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst());
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get();
assertThat(columnAtATimeLoader, instanceOf(IpScriptBlockDocValuesReader.class));

var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException {
BlockLoader loader = fieldType.blockLoader(blContext(Settings.EMPTY, true));
assertThat(loader, instanceOf(KeywordScriptBlockDocValuesReader.KeywordScriptBlockLoader.class));
// ignored source doesn't support column at a time loading:
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst());
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get();
assertThat(columnAtATimeLoader, instanceOf(KeywordScriptBlockDocValuesReader.class));
var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst());
assertThat(rowStrideReader, instanceOf(KeywordScriptBlockDocValuesReader.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public void testBlockLoaderSourceOnlyRuntimeField() throws IOException {
BlockLoader loader = fieldType.blockLoader(blContext(Settings.EMPTY, true));
assertThat(loader, instanceOf(LongScriptBlockDocValuesReader.LongScriptBlockLoader.class));
// ignored source doesn't support column at a time loading:
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst());
var columnAtATimeLoader = loader.columnAtATimeReader(reader.leaves().getFirst()).get();
assertThat(columnAtATimeLoader, instanceOf(LongScriptBlockDocValuesReader.class));
var rowStrideReader = loader.rowStrideReader(reader.leaves().getFirst());
assertThat(rowStrideReader, instanceOf(LongScriptBlockDocValuesReader.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2201,12 +2201,8 @@ public void testConditionalBlockLoader() throws IOException {
} else {
var columnReader = blockLoader.columnAtATimeReader(ctx);
assertNotNull(columnReader);
testBlock = (TestBlock) columnReader.read(
TestBlock.factory(),
TestBlock.docs(IntStream.range(0, numDocs).toArray()),
0,
randomBoolean()
);
testBlock = (TestBlock) columnReader.get()
.read(TestBlock.factory(), TestBlock.docs(IntStream.range(0, numDocs).toArray()), 0, randomBoolean());
}
for (int i = 0; i < textValues.size(); i++) {
Object expected = textValues.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,9 @@ public void testManyKeywordFieldsWith10UniqueValuesInSubqueryIntermediateResults
*/
public void testManyRandomKeywordFieldsInSubqueryIntermediateResults() throws IOException {
// 500MB random/unique keyword values trigger CBE, should not OOM
if (isServerless()) { // both 100 and 500 docs OOM in serverless
return;
}
int docs = 500;
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true);
// 2 subqueries are enough to trigger CBE, confirmed where this CBE happens in ExchangeService.doFetchPageAsync,
// as a few big pages are loaded into the exchange buffer
// TODO 8 subqueries OOM, because the memory consumed by lucene is not properly tracked in ValuesSourceReaderOperator yet.
// Lucene90DocValuesProducer are on the top of objects list, also BlockSourceReader.scratch is not tracked by circuit breaker yet,
// skip 8 subqueries for now
for (int subquery : List.of(DEFAULT_SUBQUERIES)) {
for (int subquery : List.of(DEFAULT_SUBQUERIES, MAX_SUBQUERIES)) {
assertCircuitBreaks(attempt -> buildSubqueries(subquery, "manybigfields", ""));
}
}
Expand All @@ -92,8 +84,7 @@ public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithSortOneF
}
int docs = 500; // 500MB random/unique keyword values
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true);
// TODO skip 8 subqueries, it OOMs in CI, the same reason as sort many fields
for (int subquery : List.of(DEFAULT_SUBQUERIES)) {
for (int subquery : List.of(DEFAULT_SUBQUERIES, MAX_SUBQUERIES)) {
assertCircuitBreaks(attempt -> buildSubqueriesWithSort(subquery, "manybigfields", " f000 "));
}
}
Expand Down
Loading