Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9316000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.4.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_resolve_fields_response_views,9315000
esql_lookup_planning,9316000
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public final class LookupQueryOperator implements Operator {
private final IndexSearcher searcher;
private final Warnings warnings;
private final int maxPageSize;
private final boolean emptyResult;

private Page currentInputPage;
private int queryPosition = -1;
Expand All @@ -77,7 +78,8 @@ public LookupQueryOperator(
IndexedByShardId<? extends ShardContext> shardContexts,
int shardId,
SearchExecutionContext searchExecutionContext,
Warnings warnings
Warnings warnings,
boolean emptyResult
) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
Expand All @@ -86,6 +88,7 @@ public LookupQueryOperator(
this.shardContext = shardContexts.get(shardId);
this.shardContext.incRef();
this.searchExecutionContext = searchExecutionContext;
this.emptyResult = emptyResult;
try {
if (shardContext.searcher().getIndexReader() instanceof DirectoryReader directoryReader) {
// This optimization is currently disabled for ParallelCompositeReader
Expand All @@ -105,10 +108,14 @@ public void addInput(Page page) {
if (currentInputPage != null) {
throw new IllegalStateException("Operator already has input page, must consume it first");
}
currentInputPage = page;
queryPosition = -1; // Reset query position for new page
pagesReceived++;
rowsReceived += page.getPositionCount();
if (emptyResult) {
page.releaseBlocks();
return;
}
currentInputPage = page;
queryPosition = -1; // Reset query position for new page
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public Operator get(DriverContext driverContext) {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
);
}

Expand Down Expand Up @@ -180,7 +181,8 @@ public void testNoMatchesScenario() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(noMatchDirectory.reader)),
0,
noMatchDirectory.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Create input with non-matching terms
Expand Down Expand Up @@ -237,7 +239,8 @@ public void testGetOutputNeverNullWhileCanProduceMore() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Create input with many matching terms
Expand Down Expand Up @@ -283,7 +286,8 @@ public void testMixedMatchesAndNoMatches() throws Exception {
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
0,
directoryData.searchExecutionContext,
warnings()
warnings(),
false
)
) {
// Mix of matching and non-matching terms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ public enum Config {

private final Map<Config, Set<String>> includes = new HashMap<>();
private final Map<Config, Set<String>> excludes = new HashMap<>();
private final Map<String, String> constantValues = new HashMap<>();

public TestConfigurableSearchStats include(Config key, String... fields) {
// If this method is called with no fields, it is interpreted to mean include none, so we include a dummy field
Expand Down Expand Up @@ -506,6 +507,16 @@ public boolean hasExactSubfield(FieldName field) {
return isConfigationSet(Config.EXACT_SUBFIELD, field.string());
}

public TestConfigurableSearchStats withConstantValue(String field, String value) {
constantValues.put(field, value);
return this;
}

@Override
public String constantValue(FieldName name) {
return constantValues.get(name.string());
}

@Override
public String toString() {
return "TestConfigurableSearchStats{" + "includes=" + includes + ", excludes=" + excludes + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
Expand Down Expand Up @@ -126,27 +127,19 @@ public void testJoinOnFourKeys() throws IOException {
new String[] { "one", "two", "three", "four" },
new Integer[] { 1, 2, 3, 4 }, }
),
buildGreaterThanFilter(1L)
1L
);
}

public void testLongKey() throws IOException {
runLookup(
List.of(DataType.LONG),
new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }),
buildGreaterThanFilter(0L)
);
runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }), 0L);
}

/**
* LOOKUP multiple results match.
*/
public void testLookupIndexMultiResults() throws IOException {
runLookup(
List.of(DataType.KEYWORD),
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }),
buildGreaterThanFilter(-1L)
);
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }), -1L);
}

public void testJoinOnTwoKeysMultiResults() throws IOException {
Expand Down Expand Up @@ -235,19 +228,28 @@ public void populate(int docCount, List<String> expected, Predicate<Integer> fil
}
}

private PhysicalPlan buildGreaterThanFilter(long value) {
FieldAttribute filterAttribute = new FieldAttribute(
private static PhysicalPlan buildGreaterThanFilter(long value, FieldAttribute filterAttribute) {
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
EsRelation esRelation = new EsRelation(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
"test",
IndexMode.LOOKUP,
Map.of(),
Map.of(),
Map.of(),
List.of(filterAttribute)
);
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), List.of());
Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan);
return new FragmentExec(filter);
}

private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, PhysicalPlan pushedDownFilter) throws IOException {
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, Long filterValue) throws IOException {
FieldAttribute lAttribute = new FieldAttribute(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
);
PhysicalPlan pushedDownFilter = filterValue != null ? buildGreaterThanFilter(filterValue, lAttribute) : null;
String[] fieldMappers = new String[keyTypes.size() * 2];
for (int i = 0; i < keyTypes.size(); i++) {
fieldMappers[2 * i] = "key" + i;
Expand Down Expand Up @@ -402,19 +404,14 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(),
"lookup",
"lookup",
List.of(
new FieldAttribute(
Source.EMPTY,
"l",
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
)
),
List.of(lAttribute),
Source.EMPTY,
pushedDownFilter,
Predicates.combineAnd(joinOnConditions),
true, // useStreamingOperator
QueryPragmas.EXCHANGE_BUFFER_SIZE.getDefault(Settings.EMPTY),
false // profile
false, // profile
EsqlTestUtils.TEST_CFG
);
DriverContext driverContext = driverContext();
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
Expand Down Expand Up @@ -57,6 +58,8 @@
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

@ClusterScope(scope = SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
@TestLogging(
Expand Down Expand Up @@ -448,13 +451,28 @@ public void testMultiValueJoinKeyWarnings() throws Exception {
List<String> warnings = capturedWarnings.get();
assertNotNull("Warnings should not be null", warnings);

List<String> warningValues = warnings.stream().map(w -> HeaderWarning.extractWarningValueFromWarningHeader(w, false)).toList();

// Filter warnings for the LOOKUP JOIN multi-value warning
List<String> lookupJoinWarnings = warnings.stream().filter(w -> w.contains("LOOKUP JOIN encountered multi-value")).toList();
List<String> lookupJoinWarnings = warningValues.stream().filter(w -> w.contains("LOOKUP JOIN encountered multi-value")).toList();

assertThat(
"Expected LOOKUP JOIN multi-value warning to be present. All warnings: " + warnings,
"Expected LOOKUP JOIN multi-value warning to be present. All warnings: " + warningValues,
lookupJoinWarnings.size(),
greaterThanOrEqualTo(1)
);

// Verify the source location is correctly propagated (not Line -1:-1 / empty expression).
// The LOOKUP JOIN is on line 4, column 3 of the query (after "| ").
assertThat(
"Warning should contain correct source location, got: " + warningValues,
warningValues,
hasItem(startsWith("Line 4:3: evaluation of [LOOKUP JOIN languages_lookup ON language_code] failed"))
);
assertThat(
"Warning should contain correct source location, got: " + warningValues,
warningValues,
hasItem(startsWith("Line 4:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value"))
);
}
}
Loading
Loading