Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public final class LookupQueryOperator implements Operator {
private final IndexSearcher searcher;
private final Warnings warnings;
private final int maxPageSize;
private final boolean emptyResult;

private Page currentInputPage;
private int queryPosition = -1;
Expand All @@ -77,7 +78,8 @@ public LookupQueryOperator(
IndexedByShardId<? extends ShardContext> shardContexts,
int shardId,
SearchExecutionContext searchExecutionContext,
Warnings warnings
Warnings warnings,
boolean emptyResult
) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
Expand All @@ -86,6 +88,7 @@ public LookupQueryOperator(
this.shardContext = shardContexts.get(shardId);
this.shardContext.incRef();
this.searchExecutionContext = searchExecutionContext;
this.emptyResult = emptyResult;
try {
if (shardContext.searcher().getIndexReader() instanceof DirectoryReader directoryReader) {
// This optimization is currently disabled for ParallelCompositeReader
Expand All @@ -105,10 +108,14 @@ public void addInput(Page page) {
if (currentInputPage != null) {
throw new IllegalStateException("Operator already has input page, must consume it first");
}
currentInputPage = page;
queryPosition = -1; // Reset query position for new page
pagesReceived++;
rowsReceived += page.getPositionCount();
if (emptyResult) {
page.releaseBlocks();
return;
}
currentInputPage = page;
queryPosition = -1; // Reset query position for new page
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,21 @@ public Operator get(DriverContext driverContext) {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
);
}

@Override
public String describe() {
return "LookupQueryOperator[maxPageSize=256]";
return "LookupQueryOperator[maxPageSize=256, emptyResult=false]";
}
};
}

@Override
protected Matcher<String> expectedDescriptionOfSimple() {
return equalTo("LookupQueryOperator[maxPageSize=256]");
return equalTo("LookupQueryOperator[maxPageSize=256, emptyResult=false]");
}

@Override
Expand Down Expand Up @@ -180,7 +181,8 @@ public void testNoMatchesScenario() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(noMatchDirectory.reader)),
0,
noMatchDirectory.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Create input with non-matching terms
Expand Down Expand Up @@ -237,7 +239,8 @@ public void testGetOutputNeverNullWhileCanProduceMore() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Create input with many matching terms
Expand Down Expand Up @@ -283,7 +286,8 @@ public void testMixedMatchesAndNoMatches() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Mix of matching and non-matching terms
Expand Down Expand Up @@ -325,6 +329,48 @@ public void testMixedMatchesAndNoMatches() throws Exception {
}
}

/**
* Test that when emptyResult=true the operator discards all input pages without producing output.
*/
public void testEmptyResultDiscardsInput() {
DriverContext driverContext = driverContext();
QueryList queryList = QueryList.rawTermQueryList(directoryData.field, AliasFilter.EMPTY, 0, ElementType.BYTES_REF);

try (
LookupQueryOperator operator = new LookupQueryOperator(
driverContext.blockFactory(),
LookupQueryOperator.DEFAULT_MAX_PAGE_SIZE,
queryList,
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings(),
true
)
) {
assertTrue("Should need input initially", operator.needsInput());
assertFalse("Should not be finished before finish() is called", operator.isFinished());

// Feed multiple pages with terms that would normally match
for (int p = 0; p < 3; p++) {
try (BytesRefBlock.Builder builder = driverContext.blockFactory().newBytesRefBlockBuilder(10)) {
for (int i = 0; i < 10; i++) {
builder.appendBytesRef(new BytesRef("term-" + i));
}
operator.addInput(new Page(builder.build()));
}

assertNull("Should never produce output when emptyResult=true", operator.getOutput());
assertFalse("Should not be able to produce more data", operator.canProduceMoreDataWithoutExtraInput());
assertTrue("Should still need input (not finished)", operator.needsInput());
}

operator.finish();
assertTrue("Should be finished after finish()", operator.isFinished());
assertNull("Should return null after finish()", operator.getOutput());
}
}

private static Warnings warnings() {
return Warnings.createWarnings(DriverContext.WarningsMode.COLLECT, new TestWarningsSource("test"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public enum Config {

private final Map<Config, Set<String>> includes = new HashMap<>();
private final Map<Config, Set<String>> excludes = new HashMap<>();
private final Map<String, String> constantValues = new HashMap<>();

public TestConfigurableSearchStats include(Config key, String... fields) {
// If this method is called with no fields, it is interpreted to mean include none, so we include a dummy field
Expand Down Expand Up @@ -513,6 +514,16 @@ public boolean hasExactSubfield(FieldName field) {
return isConfigationSet(Config.EXACT_SUBFIELD, field.string());
}

public TestConfigurableSearchStats withConstantValue(String field, String value) {
constantValues.put(field, value);
return this;
}

@Override
public String constantValue(FieldName name) {
return constantValues.get(name.string());
}

@Override
public String toString() {
return "TestConfigurableSearchStats{" + "includes=" + includes + ", excludes=" + excludes + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,27 +127,19 @@ public void testJoinOnFourKeys() throws IOException {
new String[] { "one", "two", "three", "four" },
new Integer[] { 1, 2, 3, 4 }, }
),
buildGreaterThanFilter(1L)
1L
);
}

public void testLongKey() throws IOException {
runLookup(
List.of(DataType.LONG),
new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }),
buildGreaterThanFilter(0L)
);
runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }), 0L);
}

/**
* LOOKUP multiple results match.
*/
public void testLookupIndexMultiResults() throws IOException {
runLookup(
List.of(DataType.KEYWORD),
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }),
buildGreaterThanFilter(-1L)
);
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }), -1L);
}

public void testJoinOnTwoKeysMultiResults() throws IOException {
Expand Down Expand Up @@ -236,19 +228,28 @@ public void populate(int docCount, List<String> expected, Predicate<Integer> fil
}
}

private PhysicalPlan buildGreaterThanFilter(long value) {
FieldAttribute filterAttribute = new FieldAttribute(
private static PhysicalPlan buildGreaterThanFilter(long value, FieldAttribute filterAttribute) {
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
EsRelation esRelation = new EsRelation(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
"test",
IndexMode.LOOKUP,
Map.of(),
Map.of(),
Map.of(),
List.of(filterAttribute)
);
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), List.of());
Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan);
return new FragmentExec(filter);
}

private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, PhysicalPlan pushedDownFilter) throws IOException {
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, Long filterValue) throws IOException {
FieldAttribute lAttribute = new FieldAttribute(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
);
PhysicalPlan pushedDownFilter = filterValue != null ? buildGreaterThanFilter(filterValue, lAttribute) : null;
String[] fieldMappers = new String[keyTypes.size() * 2];
for (int i = 0; i < keyTypes.size(); i++) {
fieldMappers[2 * i] = "key" + i;
Expand Down Expand Up @@ -403,13 +404,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(),
"lookup",
"lookup",
List.of(
new FieldAttribute(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
)
),
List.of(lAttribute),
Source.EMPTY,
pushedDownFilter,
Predicates.combineAnd(joinOnConditions),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.compute.lucene.ShardContext;
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator.EvalOperatorFactory;
import org.elasticsearch.compute.operator.FilterOperator;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
Expand All @@ -37,6 +38,7 @@
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
Expand All @@ -45,6 +47,7 @@
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
Expand Down Expand Up @@ -271,6 +274,8 @@ private PhysicalOperation planLookupNode(
);
} else if (node instanceof FieldExtractExec fieldExtractExec) {
return planFieldExtractExec(plannerSettings, fieldExtractExec, source);
} else if (node instanceof EvalExec evalExec) {
return planEvalExec(evalExec, source, foldCtx);
} else if (node instanceof FilterExec filterExec) {
return planFilterExec(filterExec, source, foldCtx);
} else if (node instanceof ProjectExec projectExec) {
Expand Down Expand Up @@ -304,7 +309,8 @@ private PhysicalOperation planParameterizedQueryExec(
parameterizedQueryExec.joinOnConditions(),
parameterizedQueryExec.query(),
lookupSource,
queryListFromPlanFactory
queryListFromPlanFactory,
parameterizedQueryExec.emptyResult()
);

return PhysicalOperation.fromSource(sourceFactory, layout).with(enrichQueryFactory, layout);
Expand Down Expand Up @@ -408,6 +414,16 @@ public String describe() {
}, layout);
}

private PhysicalOperation planEvalExec(EvalExec evalExec, PhysicalOperation source, FoldContext foldCtx) {
for (Alias field : evalExec.fields()) {
var evaluatorSupplier = EvalMapper.toEvaluator(foldCtx, field.child(), source.layout());
Layout.Builder layout = source.layout().builder();
layout.append(field.toAttribute());
source = source.with(new EvalOperatorFactory(evaluatorSupplier), layout.build());
}
return source;
}

private PhysicalOperation planFilterExec(FilterExec filterExec, PhysicalOperation source, FoldContext foldCtx) {
return source.with(
new FilterOperator.FilterOperatorFactory(EvalMapper.toEvaluator(foldCtx, filterExec.condition(), source.layout())),
Expand Down Expand Up @@ -441,7 +457,8 @@ private record LookupQueryOperatorFactory(
@Nullable Expression joinOnConditions,
@Nullable QueryBuilder query,
Source planSource,
QueryListFromPlanFactory queryListFromPlanFactory
QueryListFromPlanFactory queryListFromPlanFactory,
boolean emptyResult
) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
Expand All @@ -468,13 +485,14 @@ public Operator get(DriverContext driverContext) {
shardContexts,
shardId,
searchExecutionContext,
warnings
warnings,
emptyResult
);
}

@Override
public String describe() {
return "LookupQueryOperator[maxPageSize=" + maxPageSize + "]";
return "LookupQueryOperator[maxPageSize=" + maxPageSize + ", emptyResult=" + emptyResult + "]";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LookupLogicalOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LookupPhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
Expand Down Expand Up @@ -849,9 +851,10 @@ public static LogicalPlan buildLocalLogicalPlan(
/**
* Builds the output attributes for a {@link ParameterizedQuery}, mirroring how {@link EsRelation}
* exposes all index fields. This ensures the logical verifier can validate that all field references
* in the plan are satisfied. At the physical level, {@code ReplaceSourceAttributes}
* in the plan are satisfied. At the physical level,
* {@link org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes ReplaceSourceAttributes}
* strips the output back down to just {@code [_doc, _positions]}, and {@code InsertFieldExtraction}
* adds the needed fields back — the same pattern used for {@code EsRelation / ReplaceSourceAttributes}.
* adds the needed fields back — the same pattern used for {@code EsRelation}.
*/
private static List<Attribute> buildParameterizedQueryOutput(
FieldAttribute docAttribute,
Expand Down Expand Up @@ -879,7 +882,7 @@ private static List<Attribute> buildParameterizedQueryOutput(

/**
* Builds the physical plan for the lookup node by running:
* LocalMapper.map -> LookupPhysicalPlanOptimizer.
* LookupLogicalOptimizer -> LocalMapper.map -> LookupPhysicalPlanOptimizer.
* The caller is responsible for building the logical plan via {@link #buildLocalLogicalPlan}.
*/
public static PhysicalPlan createLookupPhysicalPlan(
Expand All @@ -890,7 +893,9 @@ public static PhysicalPlan createLookupPhysicalPlan(
SearchStats searchStats,
EsqlFlags flags
) {
PhysicalPlan physicalPlan = LocalMapper.INSTANCE.map(logicalPlan);
LogicalPlan optimizedLogical = new LookupLogicalOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats))
.localOptimize(logicalPlan);
PhysicalPlan physicalPlan = LocalMapper.INSTANCE.map(optimizedLogical);
LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext(
plannerSettings,
flags,
Expand Down
Loading
Loading