diff --git a/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java b/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java index 1120a69cc5166..5efe7ffc800a2 100644 --- a/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java +++ b/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java @@ -21,7 +21,7 @@ import java.util.List; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V5; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V6; import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.ASYNC; public class MixedClusterEsqlSpecIT extends EsqlSpecTestCase { @@ -96,7 +96,7 @@ protected boolean supportsInferenceTestService() { @Override protected boolean supportsIndexModeLookup() throws IOException { - return hasCapabilities(List.of(JOIN_LOOKUP_V5.capabilityName())); + return hasCapabilities(List.of(JOIN_LOOKUP_V6.capabilityName())); } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 5c7f981c93a97..dd75776973c3d 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -48,7 +48,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V5; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V6; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST; import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC; @@ -124,7 +124,7 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())); - assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V5.capabilityName())); + assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V6.capabilityName())); } private TestFeatureService remoteFeaturesService() throws IOException { @@ -283,8 +283,8 @@ protected boolean supportsInferenceTestService() { @Override protected boolean supportsIndexModeLookup() throws IOException { - // CCS does not yet support JOIN_LOOKUP_V5 and clusters falsely report they have this capability - // return hasCapabilities(List.of(JOIN_LOOKUP_V5.capabilityName())); + // CCS does not yet support JOIN_LOOKUP_V6 and clusters falsely report they have this capability + // return hasCapabilities(List.of(JOIN_LOOKUP_V6.capabilityName())); return false; } } diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java index 406997b66dbf0..2aae4c94c33fe 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -14,6 +14,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.esql.AssertWarnings; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.junit.After; import org.junit.Assert; @@ -219,6 +220,16 @@ public void testIndicesDontExist() throws IOException { assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); assertThat(e.getMessage(), containsString("index_not_found_exception")); assertThat(e.getMessage(), containsString("no such index [foo]")); + + if (EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()) { + e = expectThrows( + ResponseException.class, + () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test1 | LOOKUP JOIN foo ON id1")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Unknown index [foo]")); + } } private static RestEsqlTestCase.RequestObjectBuilder timestampFilter(String op, String date) throws IOException { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 7fed4f377096f..8b8d24b1bb156 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -3,8 +3,12 @@ // Reuses the sample dataset and commands from enrich.csv-spec // +############################################### +# Tests with languages_lookup index +############################################### + basicOnTheDataNode -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | EVAL language_code = languages @@ -21,7 +25,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; basicRow -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW language_code = 1 | LOOKUP JOIN languages_lookup ON language_code @@ -32,7 +36,7 @@ language_code:integer | language_name:keyword ; basicOnTheCoordinator -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | SORT emp_no @@ -49,7 +53,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; subsequentEvalOnTheDataNode -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | EVAL language_code = languages @@ -67,7 +71,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x ; subsequentEvalOnTheCoordinator -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | SORT emp_no @@ -85,7 +89,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x ; sortEvalBeforeLookup -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | SORT emp_no @@ -102,7 +106,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; nonUniqueLeftKeyOnTheDataNode -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | WHERE emp_no <= 10030 @@ -126,7 +130,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; nonUniqueRightKeyOnTheDataNode -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | EVAL language_code = emp_no % 10 @@ -146,7 +150,7 @@ emp_no:integer | language_code:integer | language_name:keyword | country:k ; nonUniqueRightKeyOnTheCoordinator -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | SORT emp_no @@ -166,7 +170,7 @@ emp_no:integer | language_code:integer | language_name:keyword | country:k ; nonUniqueRightKeyFromRow -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW language_code = 2 | LOOKUP JOIN languages_lookup_non_unique_key ON language_code @@ -178,8 +182,73 @@ language_code:integer | language_name:keyword | country:keyword 2 | [German, German, German] | [Austria, Germany, Switzerland] ; +############################################### +# Filtering tests with languages_lookup index +############################################### + +lookupWithFilterOnLeftSideField +required_capability: join_lookup_v6 + +FROM employees +| EVAL language_code = languages +| LOOKUP JOIN languages_lookup ON language_code +| SORT emp_no +| KEEP emp_no, language_code, language_name +| WHERE emp_no >= 10091 AND emp_no < 10094 +; + +emp_no:integer | language_code:integer | language_name:keyword +10091 | 3 | Spanish +10092 | 1 | English +10093 | 3 | Spanish +; + +lookupMessageWithFilterOnRightSideField-Ignore +required_capability: join_lookup_v6 + +FROM sample_data +| LOOKUP JOIN message_types_lookup ON message +| WHERE type == "Error" +| KEEP @timestamp, client_ip, event_duration, message, type +| SORT @timestamp DESC +; + +@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error +; + +lookupWithFieldAndRightSideAfterStats +required_capability: join_lookup_v6 + +FROM sample_data +| LOOKUP JOIN message_types_lookup ON message +| STATS count = count(message) BY type +| WHERE type == "Error" +; + +count:long | type:keyword +3 | Error +; + +lookupWithFieldOnJoinKey-Ignore +required_capability: join_lookup_v6 + +FROM employees +| EVAL language_code = languages +| LOOKUP JOIN languages_lookup ON language_code +| WHERE language_code > 1 AND language_name IS NOT NULL +| KEEP emp_no, language_code, language_name +; + +emp_no:integer | language_code:integer | language_name:keyword +10001 | 2 | French +10003 | 4 | German +; + nullJoinKeyOnTheDataNode -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | WHERE emp_no < 10004 @@ -197,7 +266,7 @@ emp_no:integer | language_code:integer | language_name:keyword mvJoinKeyOnTheDataNode -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM employees | WHERE 10003 < emp_no AND emp_no < 10008 @@ -215,7 +284,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; mvJoinKeyFromRow -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW language_code = [4, 5, 6, 7] | LOOKUP JOIN languages_lookup_non_unique_key ON language_code @@ -228,7 +297,7 @@ language_code:integer | language_name:keyword | country:keyword ; mvJoinKeyFromRowExpanded -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW language_code = [4, 5, 6, 7, 8] | MV_EXPAND language_code @@ -245,10 +314,26 @@ language_code:integer | language_name:keyword | country:keyword 8 | Mv-Lang2 | Mv-Land2 ; +############################################### +# Tests with clientips_lookup index +############################################### + lookupIPFromRow -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", right = "right" +| LOOKUP JOIN clientips_lookup ON client_ip +; + +left:keyword | client_ip:keyword | right:keyword | env:keyword +left | 172.21.0.5 | right | Development +; + +lookupIPFromKeepRow +required_capability: join_lookup_v6 ROW left = "left", client_ip = "172.21.0.5", right = "right" +| KEEP left, client_ip, right | LOOKUP JOIN clientips_lookup ON client_ip ; @@ -257,7 +342,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowing -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -268,7 +353,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowingKeep -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -281,7 +366,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowingKeepReordered -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -294,7 +379,7 @@ right | Development | 172.21.0.5 ; lookupIPFromIndex -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -313,7 +398,7 @@ ignoreOrder:true ; lookupIPFromIndexKeep -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -332,8 +417,30 @@ ignoreOrder:true 2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | QA ; +lookupIPFromIndexKeepKeep +required_capability: join_lookup_v6 + +FROM sample_data +| KEEP client_ip, event_duration, @timestamp, message +| RENAME @timestamp AS timestamp, message AS msg +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| KEEP timestamp, client_ip, event_duration, msg, env +; +ignoreOrder:true + +timestamp:date | client_ip:keyword | event_duration:long | msg:keyword | env:keyword +2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Production +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Production +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Production +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Production +2023-10-23T13:33:34.937Z | 172.21.0.5 | 1232382 | Disconnected | Development +2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | QA +2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | QA +; + lookupIPFromIndexStats -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -349,7 +456,7 @@ count:long | env:keyword ; lookupIPFromIndexStatsKeep -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -365,10 +472,43 @@ count:long | env:keyword 1 | Development ; +statsAndLookupIPFromIndex +required_capability: join_lookup_v6 + +FROM sample_data +| EVAL client_ip = client_ip::keyword +| STATS count = count(client_ip) BY client_ip +| LOOKUP JOIN clientips_lookup ON client_ip +| SORT count DESC, client_ip ASC, env ASC +; + +count:long | client_ip:keyword | env:keyword +4 | 172.21.3.15 | Production +1 | 172.21.0.5 | Development +1 | 172.21.2.113 | QA +1 | 172.21.2.162 | QA +; + +############################################### +# Tests with message_types_lookup index +############################################### + lookupMessageFromRow -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 + +ROW left = "left", message = "Connected to 10.1.0.1", right = "right" +| LOOKUP JOIN message_types_lookup ON message +; + +left:keyword | message:keyword | right:keyword | type:keyword +left | Connected to 10.1.0.1 | right | Success +; + +lookupMessageFromKeepRow +required_capability: join_lookup_v6 ROW left = "left", message = "Connected to 10.1.0.1", right = "right" +| KEEP left, message, right | LOOKUP JOIN message_types_lookup ON message ; @@ -377,7 +517,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromRowWithShadowing -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right" | LOOKUP JOIN message_types_lookup ON message @@ -388,7 +528,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromRowWithShadowingKeep -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right" | LOOKUP JOIN message_types_lookup ON message @@ -400,7 +540,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromIndex -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -418,7 +558,7 @@ ignoreOrder:true ; lookupMessageFromIndexKeep -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -436,8 +576,28 @@ ignoreOrder:true 2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success ; +lookupMessageFromIndexKeepKeep +required_capability: join_lookup_v6 + +FROM sample_data +| KEEP client_ip, event_duration, @timestamp, message +| LOOKUP JOIN message_types_lookup ON message +| KEEP @timestamp, client_ip, event_duration, message, type +; +ignoreOrder:true + +@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword +2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Success +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error +2023-10-23T13:33:34.937Z | 172.21.0.5 | 1232382 | Disconnected | Disconnected +2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | Success +2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success +; + lookupMessageFromIndexKeepReordered -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -456,7 +616,7 @@ Success | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 ; lookupMessageFromIndexStats -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -471,7 +631,7 @@ count:long | type:keyword ; lookupMessageFromIndexStatsKeep -required_capability: join_lookup_v5 +required_capability: join_lookup_v6 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -486,67 +646,333 @@ count:long | type:keyword 1 | Disconnected ; -// -// Filtering tests -// +statsAndLookupMessageFromIndex +required_capability: join_lookup_v6 -lookupWithFilterOnLeftSideField -required_capability: join_lookup_v5 +FROM sample_data +| STATS count = count(message) BY message +| LOOKUP JOIN message_types_lookup ON message +| KEEP count, type, message +| SORT count DESC, message ASC +; -FROM employees -| EVAL language_code = languages -| LOOKUP JOIN languages_lookup ON language_code -| SORT emp_no -| KEEP emp_no, language_code, language_name -| WHERE emp_no >= 10091 AND emp_no < 10094 +count:long | type:keyword | message:keyword +3 | Error | Connection error +1 | Success | Connected to 10.1.0.1 +1 | Success | Connected to 10.1.0.2 +1 | Success | Connected to 10.1.0.3 +1 | Disconnected | Disconnected ; -emp_no:integer | language_code:integer | language_name:keyword -10091 | 3 | Spanish -10092 | 1 | English -10093 | 3 | Spanish +lookupMessageFromIndexTwice +required_capability: join_lookup_v6 + +FROM sample_data +| LOOKUP JOIN message_types_lookup ON message +| RENAME message AS message1, type AS type1 +| EVAL message = client_ip::keyword +| LOOKUP JOIN message_types_lookup ON message +| RENAME message AS message2, type AS type2 ; +ignoreOrder:true -lookupMessageWithFilterOnRightSideField-Ignore -required_capability: join_lookup_v5 +@timestamp:date | client_ip:ip | event_duration:long | message1:keyword | type1:keyword | message2:keyword | type2:keyword +2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Success | 172.21.3.15 | null +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error | 172.21.3.15 | null +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error | 172.21.3.15 | null +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error | 172.21.3.15 | null +2023-10-23T13:33:34.937Z | 172.21.0.5 | 1232382 | Disconnected | Disconnected | 172.21.0.5 | null +2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | Success | 172.21.2.113 | null +2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success | 172.21.2.162 | null +; + +lookupMessageFromIndexTwiceKeep +required_capability: join_lookup_v6 FROM sample_data | LOOKUP JOIN message_types_lookup ON message -| WHERE type == "Error" -| KEEP @timestamp, client_ip, event_duration, message, type -| SORT @timestamp DESC +| RENAME message AS message1, type AS type1 +| EVAL message = client_ip::keyword +| LOOKUP JOIN message_types_lookup ON message +| RENAME message AS message2, type AS type2 +| KEEP @timestamp, client_ip, event_duration, message1, type1, message2, type2 ; +ignoreOrder:true -@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword -2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error -2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error -2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error +@timestamp:date | client_ip:ip | event_duration:long | message1:keyword | type1:keyword | message2:keyword | type2:keyword +2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Success | 172.21.3.15 | null +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error | 172.21.3.15 | null +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error | 172.21.3.15 | null +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error | 172.21.3.15 | null +2023-10-23T13:33:34.937Z | 172.21.0.5 | 1232382 | Disconnected | Disconnected | 172.21.0.5 | null +2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | Success | 172.21.2.113 | null +2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success | 172.21.2.162 | null ; -lookupWithFieldAndRightSideAfterStats -required_capability: join_lookup_v5 +############################################### +# Tests with clientips_lookup and message_types_lookup indexes +############################################### + +lookupIPAndMessageFromRow +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +; + +left:keyword | client_ip:keyword | message:keyword | right:keyword | env:keyword | type:keyword +left | 172.21.0.5 | Connected to 10.1.0.1 | right | Development | Success +; + +lookupIPAndMessageFromRowKeepBefore +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" +| KEEP left, client_ip, message, right +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +; + +left:keyword | client_ip:keyword | message:keyword | right:keyword | env:keyword | type:keyword +left | 172.21.0.5 | Connected to 10.1.0.1 | right | Development | Success +; + +lookupIPAndMessageFromRowKeepBetween +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" +| LOOKUP JOIN clientips_lookup ON client_ip +| KEEP left, client_ip, message, right, env +| LOOKUP JOIN message_types_lookup ON message +; + +left:keyword | client_ip:keyword | message:keyword | right:keyword | env:keyword | type:keyword +left | 172.21.0.5 | Connected to 10.1.0.1 | right | Development | Success +; + +lookupIPAndMessageFromRowKeepAfter +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +| KEEP left, client_ip, message, right, env, type +; + +left:keyword | client_ip:keyword | message:keyword | right:keyword | env:keyword | type:keyword +left | 172.21.0.5 | Connected to 10.1.0.1 | right | Development | Success +; + +lookupIPAndMessageFromRowWithShadowing +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", type = "type", right = "right" +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +; + +left:keyword | client_ip:keyword | message:keyword | right:keyword | env:keyword | type:keyword +left | 172.21.0.5 | Connected to 10.1.0.1 | right | Development | Success +; + +lookupIPAndMessageFromRowWithShadowingKeep +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +| KEEP left, client_ip, message, right, env, type +; + +left:keyword | client_ip:keyword | message:keyword | right:keyword | env:keyword | type:keyword +left | 172.21.0.5 | Connected to 10.1.0.1 | right | Development | Success +; + +lookupIPAndMessageFromRowWithShadowingKeepKeep +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| KEEP left, client_ip, message, right, env +| LOOKUP JOIN message_types_lookup ON message +| KEEP left, client_ip, message, right, env, type +; + +left:keyword | client_ip:keyword | message:keyword | right:keyword | env:keyword | type:keyword +left | 172.21.0.5 | Connected to 10.1.0.1 | right | Development | Success +; + +lookupIPAndMessageFromRowWithShadowingKeepKeepKeep +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" +| EVAL client_ip = client_ip::keyword +| KEEP left, client_ip, message, right, env +| LOOKUP JOIN clientips_lookup ON client_ip +| KEEP left, client_ip, message, right, env +| LOOKUP JOIN message_types_lookup ON message +| KEEP left, client_ip, message, right, env, type +; + +left:keyword | client_ip:keyword | message:keyword | right:keyword | env:keyword | type:keyword +left | 172.21.0.5 | Connected to 10.1.0.1 | right | Development | Success +; + +lookupIPAndMessageFromRowWithShadowingKeepReordered +required_capability: join_lookup_v6 + +ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +| KEEP right, env, type, client_ip +; + +right:keyword | env:keyword | type:keyword | client_ip:keyword +right | Development | Success | 172.21.0.5 +; + +lookupIPAndMessageFromIndex +required_capability: join_lookup_v6 FROM sample_data +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip | LOOKUP JOIN message_types_lookup ON message -| STATS count = count(message) BY type -| WHERE type == "Error" ; +ignoreOrder:true -count:long | type:keyword -3 | Error +@timestamp:date | event_duration:long | message:keyword | client_ip:keyword | env:keyword | type:keyword +2023-10-23T13:55:01.543Z | 1756467 | Connected to 10.1.0.1 | 172.21.3.15 | Production | Success +2023-10-23T13:53:55.832Z | 5033755 | Connection error | 172.21.3.15 | Production | Error +2023-10-23T13:52:55.015Z | 8268153 | Connection error | 172.21.3.15 | Production | Error +2023-10-23T13:51:54.732Z | 725448 | Connection error | 172.21.3.15 | Production | Error +2023-10-23T13:33:34.937Z | 1232382 | Disconnected | 172.21.0.5 | Development | Disconnected +2023-10-23T12:27:28.948Z | 2764889 | Connected to 10.1.0.2 | 172.21.2.113 | QA | Success +2023-10-23T12:15:03.360Z | 3450233 | Connected to 10.1.0.3 | 172.21.2.162 | QA | Success ; -lookupWithFieldOnJoinKey-Ignore -required_capability: join_lookup_v5 +lookupIPAndMessageFromIndexKeep +required_capability: join_lookup_v6 -FROM employees -| EVAL language_code = languages -| LOOKUP JOIN languages_lookup ON language_code -| WHERE language_code > 1 AND language_name IS NOT NULL -| KEEP emp_no, language_code, language_name +FROM sample_data +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +| KEEP @timestamp, client_ip, event_duration, message, env, type ; +ignoreOrder:true -emp_no:integer | language_code:integer | language_name:keyword -10001 | 2 | French -10003 | 4 | German +@timestamp:date | client_ip:keyword | event_duration:long | message:keyword | env:keyword | type:keyword +2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Production | Success +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Production | Error +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Production | Error +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Production | Error +2023-10-23T13:33:34.937Z | 172.21.0.5 | 1232382 | Disconnected | Development | Disconnected +2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | QA | Success +2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | QA | Success +; + +lookupIPAndMessageFromIndexStats +required_capability: join_lookup_v6 + +FROM sample_data +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +| STATS count = count(*) BY env, type +| SORT count DESC, env ASC, type ASC +; + +count:long | env:keyword | type:keyword +3 | Production | Error +2 | QA | Success +1 | Development | Disconnected +1 | Production | Success +; + +lookupIPAndMessageFromIndexStatsKeep +required_capability: join_lookup_v6 + +FROM sample_data +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +| KEEP client_ip, env, type +| STATS count = count(*) BY env, type +| SORT count DESC, env ASC, type ASC +; + +count:long | env:keyword | type:keyword +3 | Production | Error +2 | QA | Success +1 | Development | Disconnected +1 | Production | Success +; + +statsAndLookupIPAndMessageFromIndex +required_capability: join_lookup_v6 + +FROM sample_data +| EVAL client_ip = client_ip::keyword +| STATS count = count(*) BY client_ip, message +| LOOKUP JOIN clientips_lookup ON client_ip +| LOOKUP JOIN message_types_lookup ON message +| SORT count DESC, client_ip ASC, message ASC +; + +count:long | client_ip:keyword | message:keyword | env:keyword | type:keyword +3 | 172.21.3.15 | Connection error | Production | Error +1 | 172.21.0.5 | Disconnected | Development | Disconnected +1 | 172.21.2.113 | Connected to 10.1.0.2 | QA | Success +1 | 172.21.2.162 | Connected to 10.1.0.3 | QA | Success +1 | 172.21.3.15 | Connected to 10.1.0.1 | Production | Success +; + +lookupIPAndMessageFromIndexChainedEvalKeep +required_capability: join_lookup_v6 + +FROM sample_data +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| EVAL message = CONCAT(env, " environment") +| LOOKUP JOIN message_types_lookup ON message +| KEEP @timestamp, client_ip, event_duration, message, type +; +ignoreOrder:true + +@timestamp:date | client_ip:keyword | event_duration:long | message:keyword | type:keyword +2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Production environment | Production +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Production environment | Production +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Production environment | Production +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Production environment | Production +2023-10-23T13:33:34.937Z | 172.21.0.5 | 1232382 | Development environment | Development +2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | QA environment | null +2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | QA environment | null +; + +lookupIPAndMessageFromIndexChainedRenameKeep +required_capability: join_lookup_v6 + +FROM sample_data +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| RENAME env AS message +| LOOKUP JOIN message_types_lookup ON message +| KEEP @timestamp, client_ip, event_duration, message, type ; +ignoreOrder:true + +@timestamp:date | client_ip:keyword | event_duration:long | message:keyword | type:keyword +2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Production | null +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Production | null +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Production | null +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Production | null +2023-10-23T13:33:34.937Z | 172.21.0.5 | 1232382 | Development | null +2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | QA | null +2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | QA | null +; + diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/message_types.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/message_types.csv index 8e00485771445..bb4b58046b843 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/message_types.csv +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/message_types.csv @@ -4,3 +4,5 @@ Disconnected,Disconnected Connected to 10.1.0.1,Success Connected to 10.1.0.2,Success Connected to 10.1.0.3,Success +Production environment,Production +Development environment,Development diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index e9a0f89e4f448..235d0dcbe4164 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -547,7 +547,7 @@ public enum Cap { /** * LOOKUP JOIN */ - JOIN_LOOKUP_V5(Build.current().isSnapshot()), + JOIN_LOOKUP_V6(Build.current().isSnapshot()), /** * Fix for https://github.com/elastic/elasticsearch/issues/117054 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index cf91c7df9a034..d59745f03f608 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -198,12 +198,16 @@ private static class ResolveTable extends ParameterizedAnalyzerRule lookupResolution, EnrichResolution enrichResolution ) { // Currently for tests only, since most do not test lookups @@ -26,12 +28,6 @@ public AnalyzerContext( IndexResolution indexResolution, EnrichResolution enrichResolution ) { - this( - configuration, - functionRegistry, - indexResolution, - IndexResolution.invalid("AnalyzerContext constructed without any lookup join resolution"), - enrichResolution - ); + this(configuration, functionRegistry, indexResolution, Map.of(), enrichResolution); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 83480f6651abf..c0290fa2b1d73 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.compute.data.Block; @@ -77,10 +76,12 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; @@ -282,10 +283,10 @@ public void analyzedPlan( return; } - TriFunction analyzeAction = (indices, lookupIndices, policies) -> { + Function analyzeAction = (l) -> { planningMetrics.gatherPreAnalysisMetrics(parsed); Analyzer analyzer = new Analyzer( - new AnalyzerContext(configuration, functionRegistry, indices, lookupIndices, policies), + new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution), verifier ); LogicalPlan plan = analyzer.analyze(parsed); @@ -301,110 +302,77 @@ public void analyzedPlan( EsqlSessionCCSUtils.checkForCcsLicense(indices, indicesExpressionGrouper, verifier.licenseState()); - // TODO: make a separate call for lookup indices final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new) ).keySet(); - SubscribableListener.newForked(l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)) - .andThen((l, enrichResolution) -> { - // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API - var enrichMatchFields = enrichResolution.resolvedEnrichPolicies() - .stream() - .map(ResolvedEnrichPolicy::matchField) - .collect(Collectors.toSet()); - // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy - var fieldNames = fieldNames(parsed, enrichMatchFields); - ListenerResult listenerResult = new ListenerResult(null, null, enrichResolution, fieldNames); - - // first resolve the lookup indices, then the main indices - preAnalyzeLookupIndices(preAnalysis.lookupIndices, listenerResult, l); - }) - .andThen((l, listenerResult) -> { - // resolve the main indices - preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, requestFilter, l); - }) - .andThen((l, listenerResult) -> { - // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for - // invalid index resolution to updateExecutionInfo - if (listenerResult.indices.isValid()) { - // CCS indices and skip_unavailable cluster values can stop the analysis right here - if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, listenerResult, logicalPlanListener, l)) - return; - } - // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step - l.onResponse(listenerResult); - }) - .andThen((l, listenerResult) -> { - // first attempt (maybe the only one) at analyzing the plan - analyzeAndMaybeRetry(analyzeAction, requestFilter, listenerResult, logicalPlanListener, l); - }) - .andThen((l, listenerResult) -> { - assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; - - // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices - // resolving one more time (the first attempt failed and the query had a filter) - for (String clusterAlias : executionInfo.clusterAliases()) { - executionInfo.swapCluster(clusterAlias, (k, v) -> null); - } - - // here the requestFilter is set to null, performing the pre-analysis after the first step failed - preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, null, l); - }) - .andThen((l, listenerResult) -> { - assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; - LOGGER.debug("Analyzing the plan (second attempt, without filter)"); - LogicalPlan plan; - try { - plan = analyzeAction.apply(listenerResult.indices, listenerResult.lookupIndices, listenerResult.enrichResolution); - } catch (Exception e) { - l.onFailure(e); - return; - } - LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); - l.onResponse(plan); - }) - .addListener(logicalPlanListener); - } + var listener = SubscribableListener.newForked( + l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l) + ).andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l)); + // first resolve the lookup indices, then the main indices + for (TableInfo lookupIndex : preAnalysis.lookupIndices) { + listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(lookupIndex, preAnalysisResult, l); }); + } + listener.andThen((l, result) -> { + // resolve the main indices + preAnalyzeIndices(preAnalysis.indices, executionInfo, result, requestFilter, l); + }).andThen((l, result) -> { + // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for + // invalid index resolution to updateExecutionInfo + if (result.indices.isValid()) { + // CCS indices and skip_unavailable cluster values can stop the analysis right here + if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, result, logicalPlanListener, l)) return; + } + // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step + l.onResponse(result); + }).andThen((l, result) -> { + // first attempt (maybe the only one) at analyzing the plan + analyzeAndMaybeRetry(analyzeAction, requestFilter, result, logicalPlanListener, l); + }).andThen((l, result) -> { + assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; + + // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices + // resolving one more time (the first attempt failed and the query had a filter) + for (String clusterAlias : executionInfo.clusterAliases()) { + executionInfo.swapCluster(clusterAlias, (k, v) -> null); + } - private void preAnalyzeLookupIndices(List indices, ListenerResult listenerResult, ActionListener listener) { - if (indices.size() > 1) { - // Note: JOINs on more than one index are not yet supported - listener.onFailure(new MappingException("More than one LOOKUP JOIN is not supported")); - } else if (indices.size() == 1) { - TableInfo tableInfo = indices.get(0); - TableIdentifier table = tableInfo.id(); - // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - indexResolver.resolveAsMergedMapping( - table.index(), - Set.of("*"), // TODO: for LOOKUP JOIN, this currently declares all lookup index fields relevant and might fetch too many. - null, - listener.map(indexResolution -> listenerResult.withLookupIndexResolution(indexResolution)) - ); - // TODO: Verify that the resolved index actually has indexMode: "lookup" - } else { + // here the requestFilter is set to null, performing the pre-analysis after the first step failed + preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l); + }).andThen((l, result) -> { + assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; + LOGGER.debug("Analyzing the plan (second attempt, without filter)"); + LogicalPlan plan; try { - // No lookup indices specified - listener.onResponse( - new ListenerResult( - listenerResult.indices, - IndexResolution.invalid("[none specified]"), - listenerResult.enrichResolution, - listenerResult.fieldNames - ) - ); - } catch (Exception ex) { - listener.onFailure(ex); + plan = analyzeAction.apply(result); + } catch (Exception e) { + l.onFailure(e); + return; } - } + LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); + l.onResponse(plan); + }).addListener(logicalPlanListener); + } + + private void preAnalyzeLookupIndex(TableInfo tableInfo, PreAnalysisResult result, ActionListener listener) { + TableIdentifier table = tableInfo.id(); + Set fieldNames = result.wildcardJoinIndices().contains(table.index()) ? IndexResolver.ALL_FIELDS : result.fieldNames; + // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types + indexResolver.resolveAsMergedMapping( + table.index(), + fieldNames, + null, + listener.map(indexResolution -> result.addLookupIndexResolution(table.index(), indexResolution)) + ); + // TODO: Verify that the resolved index actually has indexMode: "lookup" } private void preAnalyzeIndices( List indices, EsqlExecutionInfo executionInfo, - ListenerResult listenerResult, + PreAnalysisResult result, QueryBuilder requestFilter, - ActionListener listener + ActionListener listener ) { // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one if (indices.size() > 1) { @@ -412,7 +380,7 @@ private void preAnalyzeIndices( listener.onFailure(new MappingException("Queries with multiple indices are not supported")); } else if (indices.size() == 1) { // known to be unavailable from the enrich policy API call - Map unavailableClusters = listenerResult.enrichResolution.getUnavailableClusters(); + Map unavailableClusters = result.enrichResolution.getUnavailableClusters(); TableInfo tableInfo = indices.get(0); TableIdentifier table = tableInfo.id(); @@ -445,34 +413,20 @@ private void preAnalyzeIndices( String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution - listener.onResponse( - new ListenerResult( - IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())), - listenerResult.lookupIndices, - listenerResult.enrichResolution, - listenerResult.fieldNames - ) - ); + listener.onResponse(result.withIndexResolution(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())))); } else { // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( indexExpressionToResolve, - listenerResult.fieldNames, + result.fieldNames, requestFilter, - listener.map(indexResolution -> listenerResult.withIndexResolution(indexResolution)) + listener.map(indexResolution -> result.withIndexResolution(indexResolution)) ); } } else { try { // occurs when dealing with local relations (row a = 1) - listener.onResponse( - new ListenerResult( - IndexResolution.invalid("[none specified]"), - listenerResult.lookupIndices, - listenerResult.enrichResolution, - listenerResult.fieldNames - ) - ); + listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]"))); } catch (Exception ex) { listener.onFailure(ex); } @@ -483,11 +437,11 @@ private boolean analyzeCCSIndices( EsqlExecutionInfo executionInfo, Set targetClusters, Set unresolvedPolicies, - ListenerResult listenerResult, + PreAnalysisResult result, ActionListener logicalPlanListener, - ActionListener l + ActionListener l ) { - IndexResolution indexResolution = listenerResult.indices; + IndexResolution indexResolution = result.indices; EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { @@ -509,7 +463,7 @@ private boolean analyzeCCSIndices( enrichPolicyResolver.resolvePolicies( newClusters, unresolvedPolicies, - l.map(enrichResolution -> listenerResult.withEnrichResolution(enrichResolution)) + l.map(enrichResolution -> result.withEnrichResolution(enrichResolution)) ); return true; } @@ -517,11 +471,11 @@ private boolean analyzeCCSIndices( } private static void analyzeAndMaybeRetry( - TriFunction analyzeAction, + Function analyzeAction, QueryBuilder requestFilter, - ListenerResult listenerResult, + PreAnalysisResult result, ActionListener logicalPlanListener, - ActionListener l + ActionListener l ) { LogicalPlan plan = null; var filterPresentMessage = requestFilter == null ? "without" : "with"; @@ -529,7 +483,7 @@ private static void analyzeAndMaybeRetry( LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); try { - plan = analyzeAction.apply(listenerResult.indices, listenerResult.lookupIndices, listenerResult.enrichResolution); + plan = analyzeAction.apply(result); } catch (Exception e) { if (e instanceof VerificationException ve) { LOGGER.debug( @@ -544,7 +498,7 @@ private static void analyzeAndMaybeRetry( } else { // interested only in a VerificationException, but this time we are taking out the index filter // to try and make the index resolution work without any index filtering. In the next step... to be continued - l.onResponse(listenerResult); + l.onResponse(result); } } else { // if the query failed with any other type of exception, then just pass the exception back to the user @@ -557,10 +511,24 @@ private static void analyzeAndMaybeRetry( logicalPlanListener.onResponse(plan); } - static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields) { + private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution, ActionListener l) { + try { + // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API + var enrichMatchFields = enrichResolution.resolvedEnrichPolicies() + .stream() + .map(ResolvedEnrichPolicy::matchField) + .collect(Collectors.toSet()); + // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy + l.onResponse(fieldNames(parsed, enrichMatchFields, new PreAnalysisResult(enrichResolution))); + } catch (Exception ex) { + l.onFailure(ex); + } + } + + static PreAnalysisResult fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields, PreAnalysisResult result) { if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) { // no explicit columns selection, for example "from employees" - return IndexResolver.ALL_FIELDS; + return result.withFieldNames(IndexResolver.ALL_FIELDS); } Holder projectAll = new Holder<>(false); @@ -571,7 +539,7 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF projectAll.set(true); }); if (projectAll.get()) { - return IndexResolver.ALL_FIELDS; + return result.withFieldNames(IndexResolver.ALL_FIELDS); } AttributeSet references = new AttributeSet(); @@ -579,6 +547,7 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF // ie "from test | eval lang = languages + 1 | keep *l" should consider both "languages" and "*l" as valid fields to ask for AttributeSet keepCommandReferences = new AttributeSet(); AttributeSet keepJoinReferences = new AttributeSet(); + Set wildcardJoinIndices = new java.util.HashSet<>(); parsed.forEachDown(p -> {// go over each plan top-down if (p instanceof RegexExtract re) { // for Grok and Dissect @@ -596,10 +565,16 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF enrichRefs.removeIf(attr -> attr instanceof EmptyAttribute); references.addAll(enrichRefs); } else if (p instanceof LookupJoin join) { - keepJoinReferences.addAll(join.config().matchFields()); // TODO: why is this empty if (join.config().type() instanceof JoinTypes.UsingJoinType usingJoinType) { keepJoinReferences.addAll(usingJoinType.columns()); } + if (keepCommandReferences.isEmpty()) { + // No KEEP commands after the JOIN, so we need to mark this index for "*" field resolution + wildcardJoinIndices.add(((UnresolvedRelation) join.right()).table().index()); + } else { + // Keep commands can reference the join columns with names that shadow aliases, so we block their removal + keepJoinReferences.addAll(keepCommandReferences); + } } else { references.addAll(p.references()); if (p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES) { @@ -634,6 +609,10 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF }); // Add JOIN ON column references afterward to avoid Alias removal references.addAll(keepJoinReferences); + // If any JOIN commands need wildcard field-caps calls, persist the index names + if (wildcardJoinIndices.isEmpty() == false) { + result = result.withWildcardJoinIndices(wildcardJoinIndices); + } // remove valid metadata attributes because they will be filtered out by the IndexResolver anyway // otherwise, in some edge cases, we will fail to ask for "*" (all fields) instead @@ -642,12 +621,12 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) { // there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index - return IndexResolver.INDEX_METADATA_FIELD; + return result.withFieldNames(IndexResolver.INDEX_METADATA_FIELD); } else { fieldNames.addAll(subfields(fieldNames)); fieldNames.addAll(enrichPolicyMatchFields); fieldNames.addAll(subfields(enrichPolicyMatchFields)); - return fieldNames; + return result.withFieldNames(fieldNames); } } @@ -706,22 +685,36 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { return plan; } - private record ListenerResult( + record PreAnalysisResult( IndexResolution indices, - IndexResolution lookupIndices, + Map lookupIndices, EnrichResolution enrichResolution, - Set fieldNames + Set fieldNames, + Set wildcardJoinIndices ) { - ListenerResult withEnrichResolution(EnrichResolution newEnrichResolution) { - return new ListenerResult(indices(), lookupIndices(), newEnrichResolution, fieldNames()); + PreAnalysisResult(EnrichResolution newEnrichResolution) { + this(null, new HashMap<>(), newEnrichResolution, Set.of(), Set.of()); } - ListenerResult withIndexResolution(IndexResolution newIndexResolution) { - return new ListenerResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames()); + PreAnalysisResult withEnrichResolution(EnrichResolution newEnrichResolution) { + return new PreAnalysisResult(indices(), lookupIndices(), newEnrichResolution, fieldNames(), wildcardJoinIndices()); } - ListenerResult withLookupIndexResolution(IndexResolution newIndexResolution) { - return new ListenerResult(indices(), newIndexResolution, enrichResolution(), fieldNames()); + PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) { + return new PreAnalysisResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices()); } - }; + + PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) { + lookupIndices.put(index, newIndexResolution); + return this; + } + + PreAnalysisResult withFieldNames(Set newFields) { + return new PreAnalysisResult(indices(), lookupIndices(), enrichResolution(), newFields, wildcardJoinIndices()); + } + + public PreAnalysisResult withWildcardJoinIndices(Set wildcardJoinIndices) { + return new PreAnalysisResult(indices(), lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices); + } + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index c11ef8615eb72..f553c15ef69fa 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -263,7 +263,7 @@ public final void test() throws Throwable { ); assumeFalse( "lookup join disabled for csv tests", - testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V5.capabilityName()) + testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V6.capabilityName()) ); assumeFalse( "can't use TERM function in csv tests", diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java index 5e79e40b7e938..85dd36ba0aaa5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java @@ -123,8 +123,8 @@ public static IndexResolution expandedDefaultIndexResolution() { return loadMapping("mapping-default.json", "test"); } - public static IndexResolution defaultLookupResolution() { - return loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP); + public static Map defaultLookupResolution() { + return Map.of("languages_lookup", loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP)); } public static EnrichResolution defaultEnrichResolution() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index cfff245b19244..4e02119b31744 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -2139,7 +2139,7 @@ public void testLookupMatchTypeWrong() { } public void testLookupJoinUnknownIndex() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); String errorMessage = "Unknown index [foobar]"; IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage); @@ -2149,7 +2149,7 @@ public void testLookupJoinUnknownIndex() { EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), analyzerDefaultMapping(), - missingLookupIndex, + Map.of("foobar", missingLookupIndex), defaultEnrichResolution() ), TEST_VERIFIER @@ -2168,7 +2168,7 @@ public void testLookupJoinUnknownIndex() { } public void testLookupJoinUnknownField() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name"; String errorMessage = "1:45: Unknown column [last_name] in right side of join"; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index 4b916106165fb..58180aafedc0b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -1964,7 +1964,7 @@ public void testSortByAggregate() { } public void testLookupJoinDataTypeMismatch() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); query("FROM test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 1d10ebab267ce..c4d7b30115c2d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -219,11 +219,6 @@ public static void init() { enrichResolution = new EnrichResolution(); AnalyzerTestUtils.loadEnrichPolicyResolution(enrichResolution, "languages_idx", "id", "languages_idx", "mapping-languages.json"); - var lookupMapping = loadMapping("mapping-languages.json"); - IndexResolution lookupResolution = IndexResolution.valid( - new EsIndex("language_code", lookupMapping, Map.of("language_code", IndexMode.LOOKUP)) - ); - // Most tests used data from the test index, so we load it here, and use it in the plan() function. mapping = loadMapping("mapping-basic.json"); EsIndex test = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD)); @@ -4911,7 +4906,7 @@ public void testPlanSanityCheck() throws Exception { } public void testPlanSanityCheckWithBinaryPlans() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); var plan = optimizedPlan(""" FROM test @@ -5913,15 +5908,15 @@ public void testLookupStats() { * | \_Limit[1000[INTEGER]] * | \_Filter[languages{f}#10 > 1[INTEGER]] * | \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] - * \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownFilterOnJoinKeyWithRename() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); String query = """ FROM test | RENAME languages AS language_code - | LOOKUP JOIN language_code ON language_code + | LOOKUP JOIN languages_lookup ON language_code | WHERE language_code > 1 """; var plan = optimizedPlan(query); @@ -5956,15 +5951,15 @@ public void testLookupJoinPushDownFilterOnJoinKeyWithRename() { * | \_Limit[1000[INTEGER]] * | \_Filter[emp_no{f}#7 > 1[INTEGER]] * | \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] - * \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownFilterOnLeftSideField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); String query = """ FROM test | RENAME languages AS language_code - | LOOKUP JOIN language_code ON language_code + | LOOKUP JOIN languages_lookup ON language_code | WHERE emp_no > 1 """; @@ -6000,15 +5995,15 @@ public void testLookupJoinPushDownFilterOnLeftSideField() { * |_EsqlProject[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang * uages{f}#10 AS language_code, last_name{f}#11, long_noidx{f}#17, salary{f}#12]] * | \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] - * \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownDisabledForLookupField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); String query = """ FROM test | RENAME languages AS language_code - | LOOKUP JOIN language_code ON language_code + | LOOKUP JOIN languages_lookup ON language_code | WHERE language_name == "English" """; @@ -6045,15 +6040,15 @@ public void testLookupJoinPushDownDisabledForLookupField() { * guages{f}#11 AS language_code, last_name{f}#12, long_noidx{f}#18, salary{f}#13]] * | \_Filter[emp_no{f}#8 > 1[INTEGER]] * | \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..] - * \_EsRelation[language_code][LOOKUP][language_code{f}#19, language_name{f}#20] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20] */ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); String query = """ FROM test | RENAME languages AS language_code - | LOOKUP JOIN language_code ON language_code + | LOOKUP JOIN languages_lookup ON language_code | WHERE language_name == "English" AND emp_no > 1 """; @@ -6098,15 +6093,15 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel * |_EsqlProject[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan * guages{f}#11 AS language_code, last_name{f}#12, long_noidx{f}#18, salary{f}#13]] * | \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..] - * \_EsRelation[language_code][LOOKUP][language_code{f}#19, language_name{f}#20] + * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20] */ public void testLookupJoinPushDownDisabledForDisjunctionBetweenLeftAndRightField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); String query = """ FROM test | RENAME languages AS language_code - | LOOKUP JOIN language_code ON language_code + | LOOKUP JOIN languages_lookup ON language_code | WHERE language_name == "English" OR emp_no > 1 """; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index d0c7a1cd61010..9f6ef89008a24 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -285,7 +285,7 @@ TestDataSource makeTestDataSource( String indexName, String mappingFileName, EsqlFunctionRegistry functionRegistry, - IndexResolution lookupResolution, + Map lookupResolution, EnrichResolution enrichResolution, SearchStats stats ) { @@ -2331,7 +2331,7 @@ public void testVerifierOnMissingReferences() { } public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V6.isEnabled()); // Do not assert serialization: // This will have a LookupJoinExec, which is not serializable because it doesn't leave the coordinator. diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java index 0fe89b24dfc6a..e4271a0a6ddd5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java @@ -1316,25 +1316,25 @@ public void testCountStar() { } public void testEnrichOnDefaultFieldWithKeep() { - Set fieldNames = EsqlSession.fieldNames(parser.createStatement(""" + Set fieldNames = fieldNames(""" from employees | enrich languages_policy - | keep emp_no"""), Set.of("language_name")); + | keep emp_no""", Set.of("language_name")); assertThat(fieldNames, equalTo(Set.of("emp_no", "emp_no.*", "language_name", "language_name.*"))); } public void testDissectOverwriteName() { - Set fieldNames = EsqlSession.fieldNames(parser.createStatement(""" + Set fieldNames = fieldNames(""" from employees | dissect first_name "%{first_name} %{more}" - | keep emp_no, first_name, more"""), Set.of()); + | keep emp_no, first_name, more""", Set.of()); assertThat(fieldNames, equalTo(Set.of("emp_no", "emp_no.*", "first_name", "first_name.*"))); } public void testEnrichOnDefaultField() { - Set fieldNames = EsqlSession.fieldNames(parser.createStatement(""" + Set fieldNames = fieldNames(""" from employees - | enrich languages_policy"""), Set.of("language_name")); + | enrich languages_policy""", Set.of("language_name")); assertThat(fieldNames, equalTo(ALL_FIELDS)); } @@ -1345,7 +1345,7 @@ public void testMetrics() { assertThat(e.getMessage(), containsString("line 1:1: mismatched input 'METRICS' expecting {")); return; } - Set fieldNames = EsqlSession.fieldNames(parser.createStatement(query), Set.of()); + Set fieldNames = fieldNames(query, Set.of()); assertThat( fieldNames, equalTo( @@ -1363,8 +1363,218 @@ public void testMetrics() { ); } + public void testLookupJoin() { + assertFieldNames( + "FROM employees | KEEP languages | RENAME languages AS language_code | LOOKUP JOIN languages_lookup ON language_code", + Set.of("languages", "languages.*", "language_code", "language_code.*"), + Set.of("languages_lookup") // Since we have KEEP before the LOOKUP JOIN we need to wildcard the lookup index + ); + } + + public void testLookupJoinKeep() { + assertFieldNames( + """ + FROM employees + | KEEP languages + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | KEEP languages, language_code, language_name""", + Set.of("languages", "languages.*", "language_code", "language_code.*", "language_name", "language_name.*"), + Set.of() // Since we have KEEP after the LOOKUP, we can use the global field names instead of wildcarding the lookup index + ); + } + + public void testLookupJoinKeepWildcard() { + assertFieldNames( + """ + FROM employees + | KEEP languages + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | KEEP language*""", + Set.of("language*", "languages", "languages.*", "language_code", "language_code.*"), + Set.of() // Since we have KEEP after the LOOKUP, we can use the global field names instead of wildcarding the lookup index + ); + } + + public void testMultiLookupJoin() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | LOOKUP JOIN clientips_lookup ON client_ip + | LOOKUP JOIN message_types_lookup ON message""", + Set.of("*"), // With no KEEP we should keep all fields + Set.of() // since global field names are wildcarded, we don't need to wildcard any indices + ); + } + + public void testMultiLookupJoinKeepBefore() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | KEEP @timestamp, client_ip, event_duration, message + | LOOKUP JOIN clientips_lookup ON client_ip + | LOOKUP JOIN message_types_lookup ON message""", + Set.of("@timestamp", "@timestamp.*", "client_ip", "client_ip.*", "event_duration", "event_duration.*", "message", "message.*"), + Set.of("clientips_lookup", "message_types_lookup") // Since the KEEP is before both JOINS we need to wildcard both indices + ); + } + + public void testMultiLookupJoinKeepBetween() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | LOOKUP JOIN clientips_lookup ON client_ip + | KEEP @timestamp, client_ip, event_duration, message, env + | LOOKUP JOIN message_types_lookup ON message""", + Set.of( + "@timestamp", + "@timestamp.*", + "client_ip", + "client_ip.*", + "event_duration", + "event_duration.*", + "message", + "message.*", + "env", + "env.*" + ), + Set.of("message_types_lookup") // Since the KEEP is before the second JOIN, we need to wildcard the second index + ); + } + + public void testMultiLookupJoinKeepAfter() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | LOOKUP JOIN clientips_lookup ON client_ip + | LOOKUP JOIN message_types_lookup ON message + | KEEP @timestamp, client_ip, event_duration, message, env, type""", + Set.of( + "@timestamp", + "@timestamp.*", + "client_ip", + "client_ip.*", + "event_duration", + "event_duration.*", + "message", + "message.*", + "env", + "env.*", + "type", + "type.*" + ), + Set.of() // Since the KEEP is after both JOINs, we can use the global field names + ); + } + + public void testMultiLookupJoinKeepAfterWildcard() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | LOOKUP JOIN clientips_lookup ON client_ip + | LOOKUP JOIN message_types_lookup ON message + | KEEP *env*, *type*""", + Set.of("*env*", "*type*", "client_ip", "client_ip.*", "message", "message.*"), + Set.of() // Since the KEEP is after both JOINs, we can use the global field names + ); + } + + public void testMultiLookupJoinSameIndex() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | LOOKUP JOIN clientips_lookup ON client_ip + | EVAL client_ip = message + | LOOKUP JOIN clientips_lookup ON client_ip""", + Set.of("*"), // With no KEEP we should keep all fields + Set.of() // since global field names are wildcarded, we don't need to wildcard any indices + ); + } + + public void testMultiLookupJoinSameIndexKeepBefore() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | KEEP @timestamp, client_ip, event_duration, message + | LOOKUP JOIN clientips_lookup ON client_ip + | EVAL client_ip = message + | LOOKUP JOIN clientips_lookup ON client_ip""", + Set.of("@timestamp", "@timestamp.*", "client_ip", "client_ip.*", "event_duration", "event_duration.*", "message", "message.*"), + Set.of("clientips_lookup") // Since there is no KEEP after the last JOIN, we need to wildcard the index + ); + } + + public void testMultiLookupJoinSameIndexKeepBetween() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | LOOKUP JOIN clientips_lookup ON client_ip + | KEEP @timestamp, client_ip, event_duration, message, env + | EVAL client_ip = message + | LOOKUP JOIN clientips_lookup ON client_ip""", + Set.of( + "@timestamp", + "@timestamp.*", + "client_ip", + "client_ip.*", + "event_duration", + "event_duration.*", + "message", + "message.*", + "env", + "env.*" + ), + Set.of("clientips_lookup") // Since there is no KEEP after the last JOIN, we need to wildcard the index + ); + } + + public void testMultiLookupJoinSameIndexKeepAfter() { + assertFieldNames( + """ + FROM sample_data + | EVAL client_ip = client_ip::keyword + | LOOKUP JOIN clientips_lookup ON client_ip + | EVAL client_ip = message + | LOOKUP JOIN clientips_lookup ON client_ip + | KEEP @timestamp, client_ip, event_duration, message, env""", + Set.of( + "@timestamp", + "@timestamp.*", + "client_ip", + "client_ip.*", + "event_duration", + "event_duration.*", + "message", + "message.*", + "env", + "env.*" + ), + Set.of() // Since the KEEP is after both JOINs, we can use the global field names + ); + } + + private Set fieldNames(String query, Set enrichPolicyMatchFields) { + var preAnalysisResult = new EsqlSession.PreAnalysisResult(null); + return EsqlSession.fieldNames(parser.createStatement(query), enrichPolicyMatchFields, preAnalysisResult).fieldNames(); + } + private void assertFieldNames(String query, Set expected) { - Set fieldNames = EsqlSession.fieldNames(parser.createStatement(query), Collections.emptySet()); + Set fieldNames = fieldNames(query, Collections.emptySet()); assertThat(fieldNames, equalTo(expected)); } + + private void assertFieldNames(String query, Set expected, Set wildCardIndices) { + var preAnalysisResult = EsqlSession.fieldNames(parser.createStatement(query), Set.of(), new EsqlSession.PreAnalysisResult(null)); + assertThat("Query-wide field names", preAnalysisResult.fieldNames(), equalTo(expected)); + assertThat("Lookup Indices that expect wildcard lookups", preAnalysisResult.wildcardJoinIndices(), equalTo(wildCardIndices)); + } }