Skip to content
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9305000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.4.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
inference_azure_openai_task_settings_headers,9304000
esql_lookup_planning,9305000
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public final class LookupQueryOperator implements Operator {
private final IndexSearcher searcher;
private final Warnings warnings;
private final int maxPageSize;
private final boolean emptyResult;

private Page currentInputPage;
private int queryPosition = -1;
Expand All @@ -77,7 +78,8 @@ public LookupQueryOperator(
IndexedByShardId<? extends ShardContext> shardContexts,
int shardId,
SearchExecutionContext searchExecutionContext,
Warnings warnings
Warnings warnings,
boolean emptyResult
) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
Expand All @@ -86,6 +88,7 @@ public LookupQueryOperator(
this.shardContext = shardContexts.get(shardId);
this.shardContext.incRef();
this.searchExecutionContext = searchExecutionContext;
this.emptyResult = emptyResult;
try {
if (shardContext.searcher().getIndexReader() instanceof DirectoryReader directoryReader) {
// This optimization is currently disabled for ParallelCompositeReader
Expand All @@ -105,10 +108,14 @@ public void addInput(Page page) {
if (currentInputPage != null) {
throw new IllegalStateException("Operator already has input page, must consume it first");
}
currentInputPage = page;
queryPosition = -1; // Reset query position for new page
pagesReceived++;
rowsReceived += page.getPositionCount();
if (emptyResult) {
page.releaseBlocks();
return;
}
currentInputPage = page;
queryPosition = -1; // Reset query position for new page
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public Operator get(DriverContext driverContext) {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
);
}

Expand Down Expand Up @@ -180,7 +181,8 @@ public void testNoMatchesScenario() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(noMatchDirectory.reader)),
0,
noMatchDirectory.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Create input with non-matching terms
Expand Down Expand Up @@ -237,7 +239,8 @@ public void testGetOutputNeverNullWhileCanProduceMore() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Create input with many matching terms
Expand Down Expand Up @@ -283,7 +286,8 @@ public void testMixedMatchesAndNoMatches() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Mix of matching and non-matching terms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ public enum Config {

private final Map<Config, Set<String>> includes = new HashMap<>();
private final Map<Config, Set<String>> excludes = new HashMap<>();
private final Map<String, String> constantValues = new HashMap<>();

public TestConfigurableSearchStats include(Config key, String... fields) {
// If this method is called with no fields, it is interpreted to mean include none, so we include a dummy field
Expand Down Expand Up @@ -492,6 +493,16 @@ public boolean hasExactSubfield(FieldName field) {
return isConfigationSet(Config.EXACT_SUBFIELD, field.string());
}

public TestConfigurableSearchStats withConstantValue(String field, String value) {
constantValues.put(field, value);
return this;
}

@Override
public String constantValue(FieldName name) {
return constantValues.get(name.string());
}

@Override
public String toString() {
return "TestConfigurableSearchStats{" + "includes=" + includes + ", excludes=" + excludes + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
Expand Down Expand Up @@ -125,27 +126,19 @@ public void testJoinOnFourKeys() throws IOException {
new String[] { "one", "two", "three", "four" },
new Integer[] { 1, 2, 3, 4 }, }
),
buildGreaterThanFilter(1L)
1L
);
}

public void testLongKey() throws IOException {
runLookup(
List.of(DataType.LONG),
new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }),
buildGreaterThanFilter(0L)
);
runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }), 0L);
}

/**
* LOOKUP multiple results match.
*/
public void testLookupIndexMultiResults() throws IOException {
runLookup(
List.of(DataType.KEYWORD),
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }),
buildGreaterThanFilter(-1L)
);
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }), -1L);
}

public void testJoinOnTwoKeysMultiResults() throws IOException {
Expand Down Expand Up @@ -234,19 +227,28 @@ public void populate(int docCount, List<String> expected, Predicate<Integer> fil
}
}

private PhysicalPlan buildGreaterThanFilter(long value) {
FieldAttribute filterAttribute = new FieldAttribute(
private static PhysicalPlan buildGreaterThanFilter(long value, FieldAttribute filterAttribute) {
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
EsRelation esRelation = new EsRelation(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
"test",
IndexMode.LOOKUP,
Map.of(),
Map.of(),
Map.of(),
List.of(filterAttribute)
);
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), List.of());
Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan);
return new FragmentExec(filter);
}

private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, PhysicalPlan pushedDownFilter) throws IOException {
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, Long filterValue) throws IOException {
FieldAttribute lAttribute = new FieldAttribute(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
);
PhysicalPlan pushedDownFilter = filterValue != null ? buildGreaterThanFilter(filterValue, lAttribute) : null;
String[] fieldMappers = new String[keyTypes.size() * 2];
for (int i = 0; i < keyTypes.size(); i++) {
fieldMappers[2 * i] = "key" + i;
Expand Down Expand Up @@ -401,19 +403,14 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(),
"lookup",
"lookup",
List.of(
new FieldAttribute(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
)
),
List.of(lAttribute),
Source.EMPTY,
pushedDownFilter,
Predicates.combineAnd(joinOnConditions),
true, // useStreamingOperator
QueryPragmas.EXCHANGE_BUFFER_SIZE.getDefault(Settings.EMPTY),
false // profile
false, // profile
EsqlTestUtils.TEST_CFG
);
DriverContext driverContext = driverContext();
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
Expand Down Expand Up @@ -58,6 +59,8 @@
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

@ClusterScope(scope = SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
@TestLogging(
Expand Down Expand Up @@ -449,13 +452,28 @@ public void testMultiValueJoinKeyWarnings() throws Exception {
List<String> warnings = capturedWarnings.get();
assertNotNull("Warnings should not be null", warnings);

List<String> warningValues = warnings.stream().map(w -> HeaderWarning.extractWarningValueFromWarningHeader(w, false)).toList();

// Filter warnings for the LOOKUP JOIN multi-value warning
List<String> lookupJoinWarnings = warnings.stream().filter(w -> w.contains("LOOKUP JOIN encountered multi-value")).toList();
List<String> lookupJoinWarnings = warningValues.stream().filter(w -> w.contains("LOOKUP JOIN encountered multi-value")).toList();

assertThat(
"Expected LOOKUP JOIN multi-value warning to be present. All warnings: " + warnings,
"Expected LOOKUP JOIN multi-value warning to be present. All warnings: " + warningValues,
lookupJoinWarnings.size(),
greaterThanOrEqualTo(1)
);

// Verify the source location is correctly propagated (not Line -1:-1 / empty expression).
// The LOOKUP JOIN is on line 4, column 3 of the query (after "| ").
assertThat(
"Warning should contain correct source location, got: " + warningValues,
warningValues,
hasItem(startsWith("Line 4:3: evaluation of [LOOKUP JOIN languages_lookup ON language_code] failed"))
);
assertThat(
"Warning should contain correct source location, got: " + warningValues,
warningValues,
hasItem(startsWith("Line 4:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value"))
);
}
}
Loading
Loading