Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/139314.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 139314
summary: Improve Lookup Join performance with `CachedDirectoryReader`
area: ES|QL
type: enhancement
issues:
- 137268
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public int count() {
protected void writeLenAndValues(BytesStreamOutput out) throws IOException {
// sort the ArrayList variant of the collection prior to serializing it into a binary array
if (values instanceof ArrayList<BytesRef> list) {
list.sort(Comparator.naturalOrder());
list.sort(Comparator.naturalOrder());
}

for (BytesRef value : values) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.lookup;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.NumericDocValues;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.IntFunction;
import java.util.function.LongSupplier;

/**
* A DirectoryReader that caches NumericDocValues per field.
*/
class CachedDirectoryReader extends FilterDirectoryReader {
CachedDirectoryReader(DirectoryReader in) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
return new CachedLeafReader(reader);
}
});
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new CachedDirectoryReader(in);
}

@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}

static class CachedLeafReader extends FilterLeafReader {
final Map<String, NumericDocValues> docValues = new HashMap<>();

CachedLeafReader(LeafReader in) {
super(in);
}

@Override
public NumericDocValues getNumericDocValues(String field) throws IOException {
NumericDocValues dv = super.getNumericDocValues(field);
if (dv == null) {
// It's important to return null here if the field doesn't have doc values - and the only way
// to get that consistently is to call super.getNumericDocValues. There are other ways to try,
// but I don't believe they'll work consistently. So that means we prepare the reader each time,
// but we don't use it. This still is faster than not caching at all.
return null;
}
return new CachedNumericDocValues(dv::cost, docId -> docValues.compute(field, (k, curr) -> {
if (curr == null || curr.docID() > docId) {
return dv;
}
return curr;
}));
}

@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}

@Override
public CacheHelper getReaderCacheHelper() {
return in.getCoreCacheHelper();
}
}

static class CachedNumericDocValues extends NumericDocValues {
private NumericDocValues delegate = null;
private final LongSupplier cost;
private final IntFunction<NumericDocValues> fromCache;

CachedNumericDocValues(LongSupplier cost, IntFunction<NumericDocValues> fromCache) {
this.cost = cost;
this.fromCache = fromCache;
}

NumericDocValues getOrOverwriteDelegate(int docID) {
if (delegate == null) {
// This will return the cached delegate if present
// However, it could return a new one if the current one is ahead of docID
// Sometimes, we call with -1 docID to specifically request a new one
delegate = fromCache.apply(docID);
}
return delegate;
}

@Override
public long longValue() throws IOException {
return getOrOverwriteDelegate(-1).longValue();
}

@Override
public boolean advanceExact(int target) throws IOException {
return getOrOverwriteDelegate(target).advanceExact(target);
}

@Override
public int advance(int target) throws IOException {
return getOrOverwriteDelegate(target).advance(target);
}

/**
* If there is a delegate, we will return its docId,
* otherwise we return -1 to indicate there is no delegate
*/
@Override
public int docID() {
return delegate == null ? -1 : delegate.docID();
}

@Override
public int nextDoc() throws IOException {
return getOrOverwriteDelegate(-1).nextDoc();
}

@Override
public long cost() {
return cost.getAsLong();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator.lookup;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ConstantScoreQuery;
Expand Down Expand Up @@ -63,8 +64,17 @@ public EnrichQuerySourceOperator(
this.shardContexts = shardContexts;
this.shardContext = shardContexts.get(shardId);
this.shardContext.incRef();
this.searcher = shardContext.searcher();
this.indexReader = searcher.getIndexReader();
try {
if (shardContext.searcher().getIndexReader() instanceof DirectoryReader directoryReader) {
// This optimization is currently disabled for ParallelCompositeReader
this.indexReader = new CachedDirectoryReader(directoryReader);
} else {
this.indexReader = shardContext.searcher().getIndexReader();
}
this.searcher = new IndexSearcher(this.indexReader);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
this.warnings = warnings;
}

Expand Down