Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9316000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.4.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_resolve_fields_response_views,9315000
esql_lookup_planning,9316000
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
Expand Down Expand Up @@ -414,7 +415,8 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
Predicates.combineAnd(joinOnConditions),
true, // useStreamingOperator
QueryPragmas.EXCHANGE_BUFFER_SIZE.getDefault(Settings.EMPTY),
false // profile
false, // profile
EsqlTestUtils.TEST_CFG
);
DriverContext driverContext = driverContext();
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
Expand Down Expand Up @@ -57,6 +58,8 @@
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

@ClusterScope(scope = SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
@TestLogging(
Expand Down Expand Up @@ -448,13 +451,28 @@ public void testMultiValueJoinKeyWarnings() throws Exception {
List<String> warnings = capturedWarnings.get();
assertNotNull("Warnings should not be null", warnings);

List<String> warningValues = warnings.stream().map(w -> HeaderWarning.extractWarningValueFromWarningHeader(w, false)).toList();

// Filter warnings for the LOOKUP JOIN multi-value warning
List<String> lookupJoinWarnings = warnings.stream().filter(w -> w.contains("LOOKUP JOIN encountered multi-value")).toList();
List<String> lookupJoinWarnings = warningValues.stream().filter(w -> w.contains("LOOKUP JOIN encountered multi-value")).toList();

assertThat(
"Expected LOOKUP JOIN multi-value warning to be present. All warnings: " + warnings,
"Expected LOOKUP JOIN multi-value warning to be present. All warnings: " + warningValues,
lookupJoinWarnings.size(),
greaterThanOrEqualTo(1)
);

// Verify the source location is correctly propagated (not Line -1:-1 / empty expression).
// The LOOKUP JOIN is on line 4, column 3 of the query (after "| ").
assertThat(
"Warning should contain correct source location, got: " + warningValues,
warningValues,
hasItem(startsWith("Line 4:3: evaluation of [LOOKUP JOIN languages_lookup ON language_code] failed"))
);
assertThat(
"Warning should contain correct source location, got: " + warningValues,
warningValues,
hasItem(startsWith("Line 4:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value"))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
import org.elasticsearch.compute.operator.lookup.QueryList;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.Rewriteable;
Expand All @@ -29,9 +30,6 @@
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
Expand All @@ -54,7 +52,7 @@
* The query is then used to fetch the matching rows from the right dataset.
* The class supports two types of joins:
* 1. Field-based join: The join conditions are based on the equality of fields from the left and right datasets.
* It is used for field-based join when the join is on more than one field or there is a preJoinFilter
* It is used for field-based join when the join is on more than one field or there is a rightOnlyFilter
* 2. Expression-based join: The join conditions are based on a complex expression that can involve multiple fields and operators.
*/
public class ExpressionQueryList implements LookupEnrichQueryGenerator {
Expand All @@ -66,18 +64,22 @@ public class ExpressionQueryList implements LookupEnrichQueryGenerator {
private ExpressionQueryList(
List<QueryList> queryLists,
SearchExecutionContext context,
PhysicalPlan rightPreJoinPlan,
@Nullable Expression rightOnlyFilter,
@Nullable QueryBuilder pushedQuery,
ClusterService clusterService,
AliasFilter aliasFilter
) {
this.queryLists = new ArrayList<>(queryLists);
this.aliasFilter = aliasFilter;
this.clusterService = clusterService;
if (pushedQuery != null) {
lucenePushableFilterBuilders.add(pushedQuery);
}
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
SearchContextStats.from(List.of(context)),
new EsqlFlags(clusterService.getClusterSettings())
);
buildPreJoinFilter(rightPreJoinPlan, context, lucenePushdownPredicates);
buildPreJoinFilter(rightOnlyFilter, context, lucenePushdownPredicates);
}

/**
Expand All @@ -90,14 +92,15 @@ private ExpressionQueryList(
public static ExpressionQueryList fieldBasedJoin(
List<QueryList> queryLists,
SearchExecutionContext context,
PhysicalPlan rightPreJoinPlan,
@Nullable Expression rightOnlyFilter,
@Nullable QueryBuilder pushedQuery,
ClusterService clusterService,
AliasFilter aliasFilter
) {
if (queryLists.size() < 2 && (rightPreJoinPlan instanceof FilterExec == false)) {
if (queryLists.size() < 2 && rightOnlyFilter == null && pushedQuery == null) {
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists or a pre-join filter");
}
return new ExpressionQueryList(queryLists, context, rightPreJoinPlan, clusterService, aliasFilter);
return new ExpressionQueryList(queryLists, context, rightOnlyFilter, pushedQuery, clusterService, aliasFilter);
}

/**
Expand All @@ -110,37 +113,33 @@ public static ExpressionQueryList fieldBasedJoin(
*/
public static ExpressionQueryList expressionBasedJoin(
SearchExecutionContext context,
PhysicalPlan rightPreJoinPlan,
@Nullable Expression rightOnlyFilter,
@Nullable QueryBuilder pushedQuery,
ClusterService clusterService,
LookupFromIndexService.TransportRequest request,
List<MatchConfig> matchFields,
Expression joinOnConditions,
AliasFilter aliasFilter,
Warnings warnings
) {
if (LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() == false) {
throw new UnsupportedOperationException("Lookup Join on Boolean Expression capability is not enabled");
}
if (request.getJoinOnConditions() == null) {
if (joinOnConditions == null) {
throw new IllegalStateException("expressionBasedJoin must have join conditions");
}
ExpressionQueryList expressionQueryList = new ExpressionQueryList(
new ArrayList<>(),
context,
rightPreJoinPlan,
rightOnlyFilter,
pushedQuery,
clusterService,
aliasFilter
);
// Build join-on conditions using the context from planning (this is safe as conditions are static)
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
SearchContextStats.from(List.of(context)),
new EsqlFlags(clusterService.getClusterSettings())
);
expressionQueryList.buildJoinOnForExpressionJoin(
request.getJoinOnConditions(),
request.getMatchFields(),
context,
lucenePushdownPredicates,
warnings
);
expressionQueryList.buildJoinOnForExpressionJoin(joinOnConditions, matchFields, context, lucenePushdownPredicates, warnings);
return expressionQueryList;
}

Expand Down Expand Up @@ -242,37 +241,33 @@ private void addToLucenePushableFilters(QueryBuilder queryBuilder) {
}

private void buildPreJoinFilter(
PhysicalPlan rightPreJoinPlan,
@Nullable Expression rightOnlyFilter,
SearchExecutionContext context,
LucenePushdownPredicates lucenePushdownPredicates
) {
if (rightPreJoinPlan instanceof FilterExec filterExec) {
List<Expression> candidateRightHandFilters = Predicates.splitAnd(filterExec.condition());
for (Expression filter : candidateRightHandFilters) {
if (filter instanceof TranslationAware translationAware) {
if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
QueryBuilder queryBuilder = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder();
// Rewrite the query builder to ensure doIndexMetadataRewrite is called
// Some functions, such as KQL require rewriting to work properly
try {
queryBuilder = Rewriteable.rewrite(queryBuilder, context, true);
} catch (IOException e) {
throw new UncheckedIOException("Error while rewriting query for Lucene pushable filter", e);
}
// Store QueryBuilder instead of Query to avoid caching IndexReader references
addToLucenePushableFilters(queryBuilder);
if (rightOnlyFilter == null) {
return;
}
List<Expression> candidateRightHandFilters = Predicates.splitAnd(rightOnlyFilter);
for (Expression filter : candidateRightHandFilters) {
if (filter instanceof TranslationAware translationAware) {
if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
QueryBuilder queryBuilder = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder();
// Rewrite the query builder to ensure doIndexMetadataRewrite is called
// Some functions, such as KQL require rewriting to work properly
try {
queryBuilder = Rewriteable.rewrite(queryBuilder, context, true);
} catch (IOException e) {
throw new UncheckedIOException("Error while rewriting query for Lucene pushable filter", e);
}
// Store QueryBuilder instead of Query to avoid caching IndexReader references
addToLucenePushableFilters(queryBuilder);
}
// If the filter is not translatable we will not apply it for now
// as performance testing showed no performance improvement.
// We can revisit this in the future if needed, once we have more optimized workflow in place.
// The filter is optional, so it is OK to ignore it if it cannot be translated.
}
} else if (rightPreJoinPlan != null && rightPreJoinPlan instanceof EsSourceExec == false) {
throw new IllegalStateException(
"The right side of a LookupJoinExec can only be a FilterExec on top of an EsSourceExec or an EsSourceExec, but got: "
+ rightPreJoinPlan
);
// If the filter is not translatable we will not apply it for now
// as performance testing showed no performance improvement.
// We can revisit this in the future if needed, once we have more optimized workflow in place.
// The filter is optional, so it is OK to ignore it if it cannot be translated.
}
}

Expand Down
Loading
Loading