diff --git a/docs/changelog/145980.yaml b/docs/changelog/145980.yaml
new file mode 100644
index 0000000000000..f9ce8d4985bb4
--- /dev/null
+++ b/docs/changelog/145980.yaml
@@ -0,0 +1,5 @@
+area: "Machine Learning"
+issues: []
+pr: 145980
+summary: Lookup join and Inline stats support for query approximation
+type: feature
diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/GenerativeApproximationIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/GenerativeApproximationIT.java
index bbdbaeb968cc1..03d2bdb3c5b3f 100644
--- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/GenerativeApproximationIT.java
+++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/GenerativeApproximationIT.java
@@ -27,7 +27,7 @@ public class GenerativeApproximationIT extends GenerativeApproximationRestTest {
@Before
public void checkCapability() {
- assumeTrue("query approximation should be enabled", EsqlCapabilities.Cap.APPROXIMATION_V6.isEnabled());
+ assumeTrue("query approximation should be enabled", EsqlCapabilities.Cap.APPROXIMATION_V7.isEnabled());
}
@ClassRule
diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java
index 8512fc59c3d19..a47b84a86e2d7 100644
--- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java
+++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java
@@ -1951,7 +1951,7 @@ public void testRandomTimezoneBuckets() throws IOException {
}
public void testApproximationColumnMetadata() throws IOException {
- assumeTrue("approximation support", EsqlCapabilities.Cap.APPROXIMATION_V6.isEnabled());
+ assumeTrue("approximation support", EsqlCapabilities.Cap.APPROXIMATION_V7.isEnabled());
bulkLoadTestData(10);
String query = "SET approximation=true; " + fromIndex() + " | STATS count=COUNT()";
diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeApproximationRestTest.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeApproximationRestTest.java
index 6daf0c20d42a7..5d0a8b5bca877 100644
--- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeApproximationRestTest.java
+++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeApproximationRestTest.java
@@ -16,7 +16,7 @@
import java.util.List;
import java.util.Map;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V6;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V7;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FIX_SUM_AGG_LONG_OVERFLOW;
/**
@@ -73,7 +73,7 @@ protected void assertResults(
@Override
protected void shouldSkipTest(String testName) throws IOException {
super.shouldSkipTest(testName);
- assumeFalse("No approximation tests", testCase.requiredCapabilities.contains(APPROXIMATION_V6.capabilityName()));
+ assumeFalse("No approximation tests", testCase.requiredCapabilities.contains(APPROXIMATION_V7.capabilityName()));
assumeFalse(
"Approximation casts integer SUM to double, preventing long overflow",
testCase.requiredCapabilities.contains(FIX_SUM_AGG_LONG_OVERFLOW.capabilityName())
diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java
index 18208a1da0727..1d6c61ff3d4cd 100644
--- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java
+++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeForkRestTest.java
@@ -15,7 +15,7 @@
import java.util.List;
import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V6;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V7;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ESQL_WITHOUT_GROUPING;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METRICS_GROUP_BY_ALL;
@@ -92,7 +92,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse(
"Tests using query approximation are skipped since query approximation is not supported with FORK",
- testCase.requiredCapabilities.contains(APPROXIMATION_V6.capabilityName())
+ testCase.requiredCapabilities.contains(APPROXIMATION_V7.capabilityName())
);
assumeFalse(
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java
index b56e2be1ad953..0364ef884f5a0 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java
@@ -59,7 +59,7 @@ public CommandDescription generate(
if (useUnmappedFields) {
result.append(SET_UNMAPPED_FIELDS_PREFIX);
}
- boolean setQueryApproximation = EsqlCapabilities.Cap.APPROXIMATION_V6.isEnabled()
+ boolean setQueryApproximation = EsqlCapabilities.Cap.APPROXIMATION_V7.isEnabled()
&& randomDouble() < QUERY_APPROXIMATION_SETTING_PROBABILITY;
if (setQueryApproximation) {
result.append(randomQueryApproximationSettings());
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec
index 205a499e3a2be..7c0fd0961baa9 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/approximation.csv-spec
@@ -22,7 +22,7 @@
No approximation
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=false\;
@@ -35,7 +35,7 @@ count:long
No confidence intervals
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000, "confidence_level":null}\;
@@ -48,7 +48,7 @@ count:long | sum:long
Exact total row count
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -61,7 +61,7 @@ count:long | _approximation_confidence_interval(count):long | _approximation_cer
Exact total single-valued field count
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -74,7 +74,7 @@ count:long | _approximation_confidence_interval(count):long | _approximation_cer
Approximate total multi-valued field count
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -87,7 +87,7 @@ count:long | _approximation_confidence_interval(count):long | _approximati
Approximate stats on large single-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -100,7 +100,7 @@ count:long | avg:double | sum:long | _approximation_confidence_i
Approximate stats on large multi-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -115,7 +115,7 @@ count:long | avg:double | sum:long | _approximation_confiden
Exact stats on small single-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -130,7 +130,7 @@ count:long | avg:double | sum:long | _approximation_confidence_interval(count):l
Exact stats on small multi-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -146,7 +146,7 @@ count:long | avg:double | sum:long | _approximation_confidence_interval(count):l
Multiple total counts
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -159,7 +159,7 @@ count:long | count2:long | countValue:long | _approximation_confidence_i
Exact count with where on single-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -174,7 +174,7 @@ count:long | _approximation_confidence_interval(count):long | _approximation_cer
Approximate stats with where on multi-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -190,7 +190,7 @@ count:long | avg:double | sum:long | _approximation_confidenc
Approximate stats with stats where
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000, "confidence_level":0.85}\;
@@ -207,7 +207,7 @@ count:long | avg:double | sum:long | _approximation_confidenc
Approximate stats with sample
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000,"confidence_level":0.85}\;
@@ -222,7 +222,7 @@ count:long | avg:double | sum:long | _approximation_confidence_in
Approximate stats with commands before stats
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -247,7 +247,7 @@ count:long | avg:double | sum:long | _approximation_confiden
Approximate stats with commands after stats
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -266,7 +266,7 @@ avg:double | avg2:double | _approximation_confidence_interval(avg):double | _app
Approximate stats with dependent variables that have confidence interval
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -286,7 +286,7 @@ y:integer | plus1:double | _approximation_confidence_interval(plus1):double
Approximate stats with dependent string variable
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -303,7 +303,7 @@ from_str:double
Approximate stats with dependent multi-valued variable
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -319,7 +319,7 @@ sv:double
Approximate stats by with zero variance
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":100000}\;
@@ -341,7 +341,7 @@ avg:double | median:double | one:double | mv:integer | _approximat
Approximate stats by on large single-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -361,7 +361,7 @@ count:long | sv:integer | _approximation_confidence_interval(count):long | _appr
Approximate stats by on large multi-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":100000}\;
@@ -382,7 +382,7 @@ count:long | mv:integer | _approximation_confidence_interval(count):long | _appr
Exact stats by on small single-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -402,7 +402,7 @@ count:long | sv:integer | _approximation_confidence_interval(count):long | _appr
Exact stats by on small multi-valued data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -423,7 +423,7 @@ count:long | mv:integer | _approximation_confidence_interval(count):long | _appr
Approximate stats by on mixed data
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -436,7 +436,7 @@ count:long | _approximation_confidence_interval(count):long | _approximation
Overwrite approximated column
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -451,7 +451,7 @@ sum:integer
User-generated approximation column name
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -470,7 +470,7 @@ sum:long | _approximation_confidence_interval(sum):long | _approximation_certifi
Rename stats group key
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -484,7 +484,7 @@ count:long | key:integer | _approximation_confidence_interval(count):long | _app
Filter all rows
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -498,7 +498,7 @@ median:double | _approximation_confidence_interval(median):double | _approximati
Row with duplicate aggs
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -513,7 +513,7 @@ a:long | b:long | c:long | x:long | _approximation_confidence_interval(a):long |
Duplicate aggs with grouping
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -528,7 +528,7 @@ median:double | p50:double | key:keyword | _approximation_confidence_interval(me
Sum of nulls
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -544,7 +544,7 @@ null
Approximated column is null
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -557,7 +557,7 @@ null | null | null
Approximated column and null
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -570,7 +570,7 @@ sum:long | sum_null:long | _approximation_confidence_interval(sum):lo
Mv_expand after duplicate aggs
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -582,7 +582,7 @@ a:long | b:long | _approximation_confidence_interval(a):long | _
Exact dense vector count
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation={"rows":10000}\;
@@ -596,7 +596,7 @@ count_vectors:long | _approximation_confidence_interval(count_vectors):long | _a
Count by date bucket
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -614,8 +614,154 @@ c:long | b:date | _approximation_confidence_interval(c):long |
;
+Lookup join before stats
+required_capability: approximation_v7
+request_stored: skip
+
+SET approximation={"rows":10000}\;
+FROM many_numbers
+| EVAL language_code = sv % 4 + 1
+| LOOKUP JOIN languages_lookup ON language_code
+| EVAL length = LENGTH(language_name)
+| STATS AVG(length)
+;
+
+AVG(length):double | _approximation_confidence_interval(AVG(length)):double | _approximation_certified(AVG(length)):boolean
+6.4..6.6 | [6.4..6.6,6.4..6.6] | {any}
+;
+
+
+Lookup join after stats by
+required_capability: approximation_v7
+request_stored: skip
+
+SET approximation={"rows":10000}\;
+FROM many_numbers
+ | EVAL language_code = sv % 4 + 1
+ | STATS count=COUNT() BY language_code
+ | LOOKUP JOIN languages_lookup ON language_code
+ | SORT language_name
+ | LIMIT 5
+;
+
+count:long | language_code:integer | language_name:keyword | _approximation_confidence_interval(count):long | _approximation_certified(count):boolean
+40000..80000 | 1 | English | [40000..80000,40000..80000] | {any}
+40000..80000 | 2 | French | [40000..80000,40000..80000] | {any}
+40000..80000 | 4 | German | [40000..80000,40000..80000] | {any}
+40000..80000 | 3 | Spanish | [40000..80000,40000..80000] | {any}
+;
+
+
+Lookup join before and after stats by
+required_capability: approximation_v7
+request_stored: skip
+
+SET approximation={"rows":10000}\;
+FROM many_numbers
+ | EVAL language_code = sv % 4 + 1
+ | LOOKUP JOIN languages_lookup ON language_code
+ | DROP language_code
+ | STATS avg=AVG(sv) BY language_name
+ | RENAME language_name AS original_language_name
+ | EVAL language_code = LENGTH(original_language_name) % 4 + 1
+ | LOOKUP JOIN languages_lookup ON language_code
+ | SORT original_language_name
+ | DROP language_code
+;
+
+avg:double | original_language_name:keyword | language_name:keyword | _approximation_confidence_interval(avg):double | _approximation_certified(avg):boolean
+440..490 | English | German | [440..490,440..490] | {any}
+440..490 | French | Spanish | [440..490,440..490] | {any}
+440..490 | German | Spanish | [440..490,440..490] | {any}
+440..490 | Spanish | German | [440..490,440..490] | {any}
+;
+
+
+Inline stats
+required_capability: approximation_v7
+request_stored: skip
+
+SET approximation={"rows":10000}\;
+FROM many_numbers
+| KEEP sv
+| INLINE STATS AVG(sv)
+| SORT sv
+| LIMIT 5
+;
+
+sv:integer | AVG(sv):double | _approximation_confidence_interval(AVG(sv)):double | _approximation_certified(AVG(sv)):boolean
+1 | 440..490 | [440..490,440..490] | {any}
+2 | 440..490 | [440..490,440..490] | {any}
+2 | 440..490 | [440..490,440..490] | {any}
+3 | 440..490 | [440..490,440..490] | {any}
+3 | 440..490 | [440..490,440..490] | {any}
+;
+
+
+Inline stats and where
+required_capability: approximation_v7
+request_stored: skip
+
+SET approximation={"rows":10000}\;
+FROM many_numbers
+| KEEP sv
+| WHERE sv >= 400
+| INLINE STATS SUM(sv)
+| SORT sv
+| LIMIT 401
+| SORT sv DESC
+| LIMIT 3
+;
+
+sv:integer | SUM(sv):long | _approximation_confidence_interval(SUM(sv)):long | _approximation_certified(SUM(sv)):boolean
+401 | 80000000..110000000 | [80000000..110000000,80000000..110000000] | {any}
+400 | 80000000..110000000 | [80000000..110000000,80000000..110000000] | {any}
+400 | 80000000..110000000 | [80000000..110000000,80000000..110000000] | {any}
+;
+
+
+Inline stats by
+required_capability: approximation_v7
+request_stored: skip
+
+SET approximation={"rows":10000}\;
+FROM many_numbers
+| KEEP sv
+| INLINE STATS COUNT(), AVG(sv) BY sv
+| SORT sv DESC
+| LIMIT 3
+;
+
+COUNT():long | AVG(sv):double | sv:integer |_approximation_confidence_interval(COUNT()):long | _approximation_certified(COUNT()):boolean | _approximation_confidence_interval(AVG(sv)):double | _approximation_certified(AVG(sv)):boolean
+100..2000 | 699.999999..700.000001 | 700 | [100..2000,100..2000] | {any} | [699.999999..700.000001,699.999999..700.000001] | {any}
+100..2000 | 699.999999..700.000001 | 700 | [100..2000,100..2000] | {any} | [699.999999..700.000001,699.999999..700.000001] | {any}
+100..2000 | 699.999999..700.000001 | 700 | [100..2000,100..2000] | {any} | [699.999999..700.000001,699.999999..700.000001] | {any}
+;
+
+
+Inline stats by and lookup join
+required_capability: approximation_v7
+request_stored: skip
+
+SET approximation={"rows":10000}\;
+FROM many_numbers
+| KEEP sv
+| WHERE sv < 700
+| INLINE STATS SUM(sv) BY sv
+| EVAL language_code = sv % 4 + 1
+| LOOKUP JOIN languages_lookup ON language_code
+| SORT sv DESC
+| LIMIT 2
+;
+
+SUM(sv):long | sv:integer | language_code:integer | language_name:keyword | _approximation_confidence_interval(SUM(sv)):long | _approximation_certified(SUM(sv)):boolean
+70000..1400000 | 699 | 4 | German | [70000..1400000,70000..1400000] | {any}
+70000..1400000 | 699 | 4 | German | [70000..1400000,70000..1400000] | {any}
+;
+
+
Warn on no stats
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -634,7 +780,7 @@ sv:integer
Warn on max
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
SET approximation=true\;
@@ -652,7 +798,7 @@ max:integer | sv:integer
Example boolean for docs
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
// tag::approximationBooleanForDocs[]
@@ -672,7 +818,7 @@ sum:long | _approximation_confidence_interval(sum):long | _approximation_certifi
Example map for docs
-required_capability: approximation_v6
+required_capability: approximation_v7
request_stored: skip
// tag::approximationMapForDocs[]
diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java
index a1d9650b67e96..9923ae109504b 100644
--- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java
+++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/CsvIT.java
@@ -254,6 +254,7 @@ public final void test() throws Throwable {
Map.of()
);
+ CsvAssert.assertMetadata(expected, actual.columnNames(), actual.columnTypes(), logger);
CsvAssert.assertDataWithValueConverter(
expected,
actual.values(),
diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java
index b6250aa749d77..a14a4d255fd21 100644
--- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java
+++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java
@@ -96,7 +96,7 @@
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V6;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.APPROXIMATION_V7;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.EXPLAIN;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.allOf;
@@ -2524,7 +2524,7 @@ public void testExplainWithLookupJoin() {
*/
public void testExplainWithApproximation() {
assumeTrue("EXPLAIN requires the capability to be enabled", EXPLAIN.isEnabled());
- assumeTrue("Approximation requires the capability to be enabled", APPROXIMATION_V6.isEnabled());
+ assumeTrue("Approximation requires the capability to be enabled", APPROXIMATION_V7.isEnabled());
String indexName = "explain_approximation_test";
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
index 73dfeb4a2ab87..f2f2846a9d6ca 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
@@ -2179,7 +2179,7 @@ public enum Cap {
/**
* Support query approximation.
*/
- APPROXIMATION_V6,
+ APPROXIMATION_V7,
/**
* Create a ScoreOperator only when shard contexts are available
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java
index a74a3a26e7163..dee4c8939315c 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximation/Approximation.java
@@ -60,10 +60,14 @@
import org.elasticsearch.xpack.esql.plan.logical.SampledAggregate;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.TopNBy;
+import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.UriParts;
import org.elasticsearch.xpack.esql.plan.logical.UserAgent;
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
+import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
+import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.session.Result;
@@ -80,7 +84,7 @@
*
it contains exactly one {@code STATS} command
* the other processing commands are from the supported set
* ({@link Approximation#SUPPORTED_COMMANDS}); this set contains almost all
- * unary commands, but most notably not {@code FORK} or {@code JOIN}.
+ * unary commands, and some non-unary ones; most notably not {@code FORK}.
* the aggregate functions are from the supported set
* ({@link Approximation#SUPPORTED_SINGLE_VALUED_AGGS} and
* {@link Approximation#SUPPORTED_MULTIVALUED_AGGS})
@@ -145,8 +149,10 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount,
Eval.class,
Filter.class,
Grok.class,
+ InlineJoin.class,
Insist.class,
LocalRelation.class,
+ LookupJoin.class,
MvExpand.class,
OrderBy.class,
Project.class,
@@ -156,6 +162,7 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount,
Row.class,
Sample.class,
SampledAggregate.class,
+ StubRelation.class, // Temporary node generated by INLINE STATS
UriParts.class,
UserAgent.class
);
@@ -314,14 +321,7 @@ public record QueryProperties(boolean hasGrouping, boolean canDecreaseRowCount,
private int subPlanIterationCount;
private final SetOnce sourceRowCount;
- /**
- * Creates an Approximation object for a logical plan if it's an approximation plan, and returns null otherwise.
- */
- public static Approximation create(LogicalPlan logicalPlan, ApproximationSettings approximationSettings) {
- return ApproximationPlan.is(logicalPlan) ? new Approximation(logicalPlan, approximationSettings) : null;
- }
-
- Approximation(LogicalPlan logicalPlan, ApproximationSettings settings) {
+ public Approximation(LogicalPlan logicalPlan, ApproximationSettings settings) {
this.queryProperties = verifyPlanOrThrow(logicalPlan);
// The plan is executed multiple times. Use CopyingLocalSupplier to
// make sure the page is not released between executions.
@@ -461,9 +461,11 @@ public LogicalPlan firstSubPlan() {
}
/**
- * Returns the new main plan to execute for approximation after executing a subplan, based on the result of the subplan.
+ * Processes the subplan results.
+ * Returns the sample probability suitable for approximation if possible,
+ * or null if more subplans need to be executed to obtain it.
*/
- public LogicalPlan newMainPlan(Result result) {
+ public Double processResult(Result result) {
if (sourceRowCount.get() == null) {
return processSourceCount(rowCount(result));
} else {
@@ -479,7 +481,7 @@ public LogicalPlan newMainPlan(Result result) {
*
*/
private LogicalPlan sourceCountSubPlan() {
- LogicalPlan leaf = logicalPlan.collectLeaves().getFirst();
+ LogicalPlan leaf = getLeftmostLeaf(logicalPlan);
LogicalPlan sourceCountPlan = new Aggregate(
Source.EMPTY,
leaf,
@@ -490,18 +492,32 @@ private LogicalPlan sourceCountSubPlan() {
return sourceCountPlan;
}
+ /**
+ * Returns the leftmost leaf of a plan, which is the large source index for approximation.
+ */
+ private LogicalPlan getLeftmostLeaf(LogicalPlan plan) {
+ while (plan instanceof LeafPlan == false) {
+ plan = switch (plan) {
+ case UnaryPlan unaryPlan -> unaryPlan.child();
+ case LookupJoin join -> join.left();
+ default -> throw new IllegalStateException("unsupported plan type: " + plan.getClass());
+ };
+ }
+ return plan;
+ }
+
/**
* Determines either the final sample probability or whether more subplans
* need to the executed, based on the total number of rows in the source
* index and the query properties.
*/
- private LogicalPlan processSourceCount(long sourceRowCount) {
+ private Double processSourceCount(long sourceRowCount) {
logger.debug("total number of source rows: [{}] rows", sourceRowCount);
this.sourceRowCount.set(sourceRowCount);
if (sourceRowCount == 0) {
// If there are no rows, run the original query.
nextSubPlanSampleProbability = null;
- return ApproximationPlan.substituteSampleProbability(logicalPlan, 1.0);
+ return 1.0;
}
double sampleProbability = Math.min(1.0, (double) sampleRowCount / sourceRowCount);
if (queryProperties.canIncreaseRowCount == false && sampleProbability >= sampleProbabilityThreshold) {
@@ -509,15 +525,15 @@ private LogicalPlan processSourceCount(long sourceRowCount) {
// we can directly run the original query without sampling.
logger.debug("using original plan (too few rows)");
nextSubPlanSampleProbability = null;
- return ApproximationPlan.substituteSampleProbability(logicalPlan, 1.0);
+ return 1.0;
} else if (queryProperties.canIncreaseRowCount == false && queryProperties.canDecreaseRowCount == false) {
// If the query preserves all rows, we can directly approximate with the sample probability.
nextSubPlanSampleProbability = null;
- return ApproximationPlan.substituteSampleProbability(logicalPlan, sampleProbability);
+ return sampleProbability;
} else {
// Otherwise, we need to sample the number of rows first to obtain a good sample probability.
nextSubPlanSampleProbability = Math.min(1.0, (double) ROW_COUNT_FOR_COUNT_ESTIMATION / sourceRowCount);
- return logicalPlan;
+ return null;
}
}
@@ -553,7 +569,7 @@ private LogicalPlan countSubPlan(double sampleProbability) {
);
}
}
- } else {
+ } else if (plan instanceof LeafPlan == false) {
// Strip everything after the STATS command.
plan = plan.children().getFirst();
}
@@ -610,7 +626,7 @@ private LogicalPlan countSubPlan(double sampleProbability) {
* To be safe, the maximum iteration count is capped at 10, and an exception is thrown
* when this count is exceeded.
*/
- private LogicalPlan processCount(long rowCount) {
+ private Double processCount(long rowCount) {
subPlanIterationCount += 1;
if (subPlanIterationCount > 10) {
throw new IllegalStateException("Approximation count iteration limit exceeded");
@@ -625,15 +641,15 @@ private LogicalPlan processCount(long rowCount) {
// If the new sample probability is large, run the original query.
logger.debug("using original plan (too few rows)");
nextSubPlanSampleProbability = null;
- return ApproximationPlan.substituteSampleProbability(logicalPlan, 1.0);
+ return 1.0;
} else if (rowCount <= ROW_COUNT_FOR_COUNT_ESTIMATION / 2) {
// Not enough rows are sampled yet; increase the sample probability and try again.
nextSubPlanSampleProbability = Math.min(1.0, sampleProbability * ROW_COUNT_FOR_COUNT_ESTIMATION / Math.max(1, rowCount));
- return logicalPlan;
+ return null;
} else {
// A good sample probability is found; run the approximation plan.
nextSubPlanSampleProbability = null;
- return ApproximationPlan.substituteSampleProbability(logicalPlan, newSampleProbability);
+ return newSampleProbability;
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java
index cc6bf3a688748..48ac49970e605 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownJoinPastProject.java
@@ -111,9 +111,7 @@ protected LogicalPlan rule(Join join) {
List renamesForEval = new ArrayList<>(aliasesForReplacedAttributesBuilder.build().values());
Eval eval = new Eval(project.source(), project.child(), renamesForEval);
- Join finalJoin = new Join(join.source(), eval, updatedJoin.right(), updatedJoin.config());
-
- return new Project(project.source(), finalJoin, finalProjections);
+ return new Project(project.source(), updatedJoin.replaceLeft(eval), finalProjections);
}
return join;
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLookupWithJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLookupWithJoin.java
index 09ed113de1622..a9ae769603007 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLookupWithJoin.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLookupWithJoin.java
@@ -9,7 +9,7 @@
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Lookup;
-import org.elasticsearch.xpack.esql.plan.logical.join.Join;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
public final class ReplaceLookupWithJoin extends OptimizerRules.OptimizerRule {
@@ -20,6 +20,6 @@ public ReplaceLookupWithJoin() {
@Override
protected LogicalPlan rule(Lookup lookup) {
// left join between the main relation and the local, lookup relation
- return new Join(lookup.source(), lookup.child(), lookup.localRelation(), lookup.joinConfig());
+ return new LookupJoin(lookup.source(), lookup.child(), lookup.localRelation(), lookup.joinConfig());
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java
index 3f709b252c7fd..234fcd1b5e3c3 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStats.java
@@ -16,6 +16,7 @@
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.Foldables;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountApproximate;
@@ -59,7 +60,20 @@ protected PhysicalPlan rule(SampledAggregateExec plan) {
double sampleProbability = (double) Foldables.literalValueOf(plan.sampleProbability());
assert sampleProbability < 1.0;
- PhysicalPlan child = plan.child().transformUp(LeafExec.class, leaf -> new SampleExec(Source.EMPTY, leaf, plan.sampleProbability()));
+ // The only non-unary plans that are currently supported are Joins.
+ // At the moment, the left side of the join is the "expensive" side and
+ // will be sampled, while the right side is just a lookup table.
+ // This will probably change in the future, in which case this logic
+ // must be reconsidered.
+ Holder sampledAdded = new Holder<>(false);
+ PhysicalPlan child = plan.child().transformDown(p -> {
+ if (p instanceof LeafExec && sampledAdded.get() == false) {
+ sampledAdded.set(true);
+ return new SampleExec(Source.EMPTY, p, plan.sampleProbability());
+ } else {
+ return p;
+ }
+ });
List sampleCorrections = new ArrayList<>();
List intermediateAttributes = new ArrayList<>();
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java
index 9cc137c96d6cd..fc8b5f5737f26 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java
@@ -38,7 +38,7 @@
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
-import org.elasticsearch.xpack.esql.plan.logical.join.Join;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.ImmediateLocalSupplier;
@@ -103,8 +103,8 @@ public static List logical() {
Grok.ENTRY,
InlineJoin.ENTRY,
InlineStats.ENTRY,
- Join.ENTRY,
LocalRelation.ENTRY,
+ LookupJoin.ENTRY,
Limit.ENTRY,
LimitBy.ENTRY,
Lookup.ENTRY,
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Lookup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Lookup.java
index 6341c230b2870..471f382e93f8b 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Lookup.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Lookup.java
@@ -22,6 +22,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import java.io.IOException;
@@ -100,7 +101,7 @@ public LocalRelation localRelation() {
@Override
public LogicalPlan surrogate() {
// left join between the main relation and the local, lookup relation
- return new Join(source(), child(), localRelation, joinConfig());
+ return new LookupJoin(source(), child(), localRelation, joinConfig());
}
public JoinConfig joinConfig() {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java
index 67a2f28b0b67d..2bd64823f2137 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java
@@ -8,7 +8,6 @@
package org.elasticsearch.xpack.esql.plan.logical.join;
import org.elasticsearch.TransportVersion;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
@@ -20,7 +19,6 @@
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
-import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
@@ -73,8 +71,12 @@
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.commonType;
-public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic, ExecutesOn, PostOptimizationVerificationAware {
- public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Join", Join::new);
+public abstract class Join extends BinaryPlan
+ implements
+ PostAnalysisVerificationAware,
+ SortAgnostic,
+ ExecutesOn,
+ PostOptimizationVerificationAware {
private static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = TransportVersion.fromName("esql_lookup_join_pre_join_filter");
public static final DataType[] UNSUPPORTED_TYPES = {
TEXT,
@@ -160,31 +162,10 @@ protected LogicalPlan getRightToSerialize(StreamOutput out) {
return rightToSerialize;
}
- @Override
- public String getWriteableName() {
- return ENTRY.name;
- }
-
public JoinConfig config() {
return config;
}
- @Override
- protected NodeInfo info() {
- // Do not just add the JoinConfig as a whole - this would prevent correctly registering the
- // expressions and references.
- return NodeInfo.create(
- this,
- Join::new,
- left(),
- right(),
- config.type(),
- config.leftFields(),
- config.rightFields(),
- config.joinOnConditions()
- );
- }
-
@Override
public List output() {
if (lazyOutput == null) {
@@ -291,15 +272,6 @@ public boolean resolved() {
return childrenResolved() && expressionsResolved();
}
- public Join withConfig(JoinConfig config) {
- return new Join(source(), left(), right(), config, isRemote);
- }
-
- @Override
- public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
- return new Join(source(), left, right, config, isRemote);
- }
-
@Override
public int hashCode() {
return Objects.hash(config, left(), right(), isRemote);
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java
index 726c3752dfa7c..a1bf092fde545 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java
@@ -7,6 +7,8 @@
package org.elasticsearch.xpack.esql.plan.logical.join;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
@@ -17,8 +19,8 @@
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
+import java.io.IOException;
import java.util.List;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
@@ -28,7 +30,13 @@
* Lookup join - specialized LEFT (OUTER) JOIN between the main left side and a lookup index (index_mode = lookup) on the right.
* This is only used during parsing and substituted to a regular {@link Join} during analysis.
*/
-public class LookupJoin extends Join implements SurrogateLogicalPlan, TelemetryAware, PostAnalysisVerificationAware {
+public class LookupJoin extends Join implements TelemetryAware, PostAnalysisVerificationAware {
+
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+ LogicalPlan.class,
+ "LookupJoin",
+ LookupJoin::new
+ );
public LookupJoin(
Source source,
@@ -61,20 +69,24 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig
super(source, left, right, joinConfig, isRemote);
}
- /**
- * Translate the expression into a regular join with a Projection on top, to deal with serialization & co.
- */
- @Override
- public LogicalPlan surrogate() {
- // TODO: decide whether to introduce USING or just basic ON semantics - keep the ordering out for now
- return new Join(source(), left(), right(), config(), isRemote());
+ public LookupJoin(StreamInput in) throws IOException {
+ super(in);
}
@Override
- public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
+ public LookupJoin replaceChildren(LogicalPlan left, LogicalPlan right) {
return new LookupJoin(source(), left, right, config(), isRemote());
}
+ public LookupJoin withConfig(JoinConfig config) {
+ return new LookupJoin(source(), left(), right(), config, isRemote());
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ENTRY.name;
+ }
+
@Override
protected NodeInfo info() {
return NodeInfo.create(
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
index 6aee5bf9c7c59..6458012b9fc82 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
@@ -53,6 +53,7 @@
import org.elasticsearch.xpack.esql.analysis.UnmappedResolution;
import org.elasticsearch.xpack.esql.analysis.Verifier;
import org.elasticsearch.xpack.esql.approximation.Approximation;
+import org.elasticsearch.xpack.esql.approximation.ApproximationPlan;
import org.elasticsearch.xpack.esql.approximation.ApproximationSettings;
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
@@ -61,6 +62,7 @@
import org.elasticsearch.xpack.esql.core.expression.function.Function;
import org.elasticsearch.xpack.esql.core.querydsl.QueryDslTimestampBoundsExtractor;
import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.datasources.ExternalSourceResolution;
import org.elasticsearch.xpack.esql.datasources.ExternalSourceResolver;
import org.elasticsearch.xpack.esql.datasources.PartitionFilterHintExtractor;
@@ -369,7 +371,7 @@ public void onResponse(Versioned analyzedPlan) {
p,
finalConfiguration,
foldContext,
- Approximation.create(p, configuration.approximationSettings()),
+ new Holder(),
minimumVersion,
planTimeProfile,
l
@@ -412,7 +414,7 @@ private void executeOptimizedPlan(
LogicalPlan optimizedPlan,
Configuration configuration,
FoldContext foldContext,
- Approximation approximation,
+ Holder approximation,
TransportVersion minimumVersion,
PlanTimeProfile planTimeProfile,
ActionListener listener
@@ -549,7 +551,7 @@ private void executeSubPlans(
LogicalPlan optimizedPlan,
Configuration configuration,
FoldContext foldContext,
- Approximation approximation,
+ Holder approximation,
PlanRunner runner,
EsqlExecutionInfo executionInfo,
EsqlQueryRequest request,
@@ -558,7 +560,7 @@ private void executeSubPlans(
ActionListener listener
) {
var subPlansResults = new HashSet();
- var subPlan = firstSubPlan(optimizedPlan, approximation, subPlansResults);
+ var subPlan = firstSubPlan(optimizedPlan, configuration, approximation, subPlansResults);
// TODO: merge into one method
if (subPlan != null) {
@@ -598,26 +600,46 @@ private record SubPlanAndCallback(
Runnable cleanup
) {};
- private SubPlanAndCallback firstSubPlan(LogicalPlan optimizedPlan, Approximation approximation, Set subPlansResults) {
- if (approximation != null) {
- LogicalPlan subPlan = approximation.firstSubPlan();
+ private SubPlanAndCallback firstSubPlan(
+ LogicalPlan mainPlan,
+ Configuration configuration,
+ Holder approximation,
+ Set subPlansResults
+ ) {
+ SubPlanAndCallback subPlanAndCallback = null;
+
+ // InlineJoin must be first, because approximation may need to approximate a subplan of it.
+ InlineJoin.LogicalPlanTuple subPlans = InlineJoin.firstSubPlan(mainPlan, subPlansResults);
+ if (subPlans != null) {
+ AtomicReference localRelationPage = new AtomicReference<>();
+ subPlanAndCallback = new SubPlanAndCallback(subPlans.stubReplacedSubPlan(), result -> {
+ // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
+ LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan().source(), result);
+ localRelationPage.set(resultWrapper.supplier().get());
+ subPlansResults.add(resultWrapper);
+ return InlineJoin.newMainPlan(mainPlan, subPlans, resultWrapper);
+ }, () -> releaseLocalRelationBlocks(localRelationPage));
+ }
+
+ LogicalPlan plan = subPlanAndCallback != null ? subPlanAndCallback.subPlan : mainPlan;
+ if (ApproximationPlan.is(plan)) {
+ if (approximation.get() == null) {
+ approximation.set(new Approximation(plan, configuration.approximationSettings()));
+ }
+ LogicalPlan subPlan = approximation.get().firstSubPlan();
if (subPlan != null) {
- return new SubPlanAndCallback(subPlan, approximation::newMainPlan, () -> {});
+ subPlanAndCallback = new SubPlanAndCallback(subPlan, result -> {
+ Double sampleProbability = approximation.get().processResult(result);
+ if (sampleProbability != null) {
+ return ApproximationPlan.substituteSampleProbability(mainPlan, sampleProbability);
+ } else {
+ return mainPlan;
+ }
+ }, () -> {});
}
}
- InlineJoin.LogicalPlanTuple subPlans = InlineJoin.firstSubPlan(optimizedPlan, subPlansResults);
- if (subPlans == null) {
- return null;
- }
- AtomicReference localRelationPage = new AtomicReference<>();
- return new SubPlanAndCallback(subPlans.stubReplacedSubPlan(), result -> {
- // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
- LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan().source(), result);
- localRelationPage.set(resultWrapper.supplier().get());
- subPlansResults.add(resultWrapper);
- return InlineJoin.newMainPlan(optimizedPlan, subPlans, resultWrapper);
- }, () -> releaseLocalRelationBlocks(localRelationPage));
+ return subPlanAndCallback;
}
private void executeSubPlan(
@@ -626,7 +648,7 @@ private void executeSubPlan(
SubPlanAndCallback subPlan,
Configuration configuration,
FoldContext foldContext,
- Approximation approximation,
+ Holder approximation,
EsqlExecutionInfo executionInfo,
PlanRunner runner,
EsqlQueryRequest request,
@@ -648,7 +670,7 @@ private void executeSubPlan(
LogicalPlan newMainPlan = subPlan.newMainPlan.apply(result);
// look for the next inlinejoin plan
- var newSubPlan = firstSubPlan(newMainPlan, approximation, subPlansResults);
+ var newSubPlan = firstSubPlan(newMainPlan, configuration, approximation, subPlansResults);
if (newSubPlan == null) {// run the final "main" plan
executionInfo.finishSubPlans();
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java
index 420b3397a4b3c..079a0b04fd59d 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java
@@ -85,10 +85,7 @@
import org.elasticsearch.xpack.esql.plan.logical.fuse.Fuse;
import org.elasticsearch.xpack.esql.plan.logical.fuse.FuseScoreEval;
import org.elasticsearch.xpack.esql.plan.logical.inference.InferencePlan;
-import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
-import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
-import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.ResolvingProject;
import org.elasticsearch.xpack.esql.plan.logical.promql.AcrossSeriesAggregate;
import org.elasticsearch.xpack.esql.plan.logical.promql.PlaceholderRelation;
@@ -146,15 +143,8 @@ public class ApproximationSupportTests extends ESTestCase {
Fork.class,
UnionAll.class,
ViewUnionAll.class,
- Join.class,
- InlineJoin.class,
- LookupJoin.class,
ParameterizedQuery.class,
- // InlineStats is not supported yet.
- // Only a single Stats command is supported.
- InlineStats.class,
-
// Timeseries indices are not supported yet.
// They require chained Stats commands.
TimeSeriesAggregate.class,
@@ -177,9 +167,10 @@ public class ApproximationSupportTests extends ESTestCase {
// These plans don't occur in a correct analyzed query.
UnresolvedRelation.class,
UnresolvedExternalRelation.class,
- StubRelation.class,
Drop.class,
Keep.class,
+ InlineStats.class,
+ Join.class,
Rename.class,
ResolvingProject.class,
SparklineGenerateEmptyBuckets.class,
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTestCase.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTestCase.java
index 581b35bcbd26a..c8d92fe7fe754 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTestCase.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTestCase.java
@@ -7,23 +7,16 @@
package org.elasticsearch.xpack.esql.approximation;
-import org.apache.lucene.util.SetOnce;
-import org.elasticsearch.TransportVersion;
-import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.esql.TestOptimizer;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
-import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.expression.Foldables;
-import org.elasticsearch.xpack.esql.inference.InferenceService;
-import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
-import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext;
-import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@@ -39,33 +32,21 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PARSER;
-import static org.elasticsearch.xpack.esql.EsqlTestUtils.analyzer;
-import static org.mockito.Mockito.mock;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.optimizer;
public abstract class ApproximationTestCase extends ESTestCase {
- private static final LogicalPlanPreOptimizer preOptimizer = new LogicalPlanPreOptimizer(
- new LogicalPreOptimizerContext(FoldContext.small(), mock(InferenceService.class), TransportVersion.current())
- );
private static final BlockFactory blockFactory = BlockFactory.builder(BigArrays.NON_RECYCLING_INSTANCE)
.breaker(new NoopCircuitBreaker("none"))
.build();
- static LogicalPlan getLogicalPlan(String query) throws Exception {
- SetOnce resultHolder = new SetOnce<>();
- SetOnce exceptionHolder = new SetOnce<>();
- LogicalPlan plan = TEST_PARSER.createStatement(query, new QueryParams()).plan();
- plan = analyzer().addEmployees("test").addK8s().addTestLookup().buildAnalyzer().analyze(plan);
- plan.setAnalyzed();
- preOptimizer.preOptimize(plan, ActionListener.wrap(resultHolder::set, exceptionHolder::set));
- if (exceptionHolder.get() != null) {
- throw exceptionHolder.get();
- }
- return resultHolder.get().children().getFirst();
+ private static final TestOptimizer optimizer = optimizer().addDefaultIndex().addTestLookup().addK8s();
+
+ static LogicalPlan getLogicalPlan(String query) {
+ return optimizer.coordinatorPlan(query);
}
- static Approximation.QueryProperties verify(String query) throws Exception {
+ static Approximation.QueryProperties verify(String query) {
return Approximation.verifyPlanOrThrow(getLogicalPlan(query));
}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTests.java
index 9732afef2c201..1f3f91b3489e1 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationTests.java
@@ -15,6 +15,7 @@
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.SampledAggregate;
+import org.elasticsearch.xpack.esql.session.Result;
import java.util.List;
@@ -38,6 +39,7 @@ public void testVerify_validQuery() throws Exception {
verify("ROW i=[1,2,3] | EVAL x=TO_STRING(i) | DISSECT x \"%{x}\" | STATS i=10*POW(PERCENTILE(i, 0.5), 2) | LIMIT 10");
verify("FROM test | URI_PARTS parts = last_name | STATS scheme_count = COUNT() BY parts.scheme | LIMIT 10");
verify("FROM test | REGISTERED_DOMAIN rd = last_name | STATS c = COUNT() BY rd.registered_domain | LIMIT 10");
+ verify("FROM test | INLINE STATS COUNT() BY last_name | LIMIT 10");
}
public void testVerify_validQuery_queryProperties() throws Exception {
@@ -65,6 +67,10 @@ public void testVerify_validQuery_queryProperties() throws Exception {
verify("FROM test | MV_EXPAND gender | WHERE emp_no < 3 | STATS COUNT()"),
equalTo(new Approximation.QueryProperties(false, true, true))
);
+ assertThat(
+ verify("FROM test | WHERE emp_no < 3 | INLINE STATS COUNT()"),
+ equalTo(new Approximation.QueryProperties(false, true, false))
+ );
}
public void testVerify_exactlyOneStats() {
@@ -76,6 +82,18 @@ public void testVerify_exactlyOneStats() {
"FROM test | STATS COUNT() BY emp_no | STATS COUNT()",
equalTo("line 1:39: approximation not supported: query with multiple [STATS] cannot be approximated")
);
+ assertError(
+ "FROM test | INLINE STATS count=COUNT() BY emp_no | STATS AVG(count)",
+ equalTo("line 1:52: approximation not supported: query with multiple [STATS] cannot be approximated")
+ );
+ assertError(
+ "FROM test | STATS COUNT() BY emp_no | INLINE STATS COUNT()",
+ equalTo("line 1:39: approximation not supported: query with multiple [STATS] cannot be approximated")
+ );
+ assertError(
+ "FROM test | INLINE STATS count=COUNT() | INLINE STATS SUM(count)",
+ equalTo("line 1:42: approximation not supported: query with multiple [STATS] cannot be approximated")
+ );
}
public void testVerify_incompatibleSourceCommand() {
@@ -84,7 +102,7 @@ public void testVerify_incompatibleSourceCommand() {
equalTo("line 1:1: approximation not supported: query with [SHOW INFO] cannot be approximated")
);
assertError(
- "TS k8s | STATS COUNT(network.cost)",
+ "TS k8s | STATS RATE(network.total_bytes_in)",
equalTo("line 1:1: approximation not supported: query with [TS k8s] cannot be approximated")
);
}
@@ -98,22 +116,6 @@ public void testVerify_incompatibleProcessingCommand() {
"FROM test | STATS COUNT() | FORK (EVAL x=1) (EVAL y=1)",
equalTo("line 1:29: approximation not supported: query with [FORK (EVAL x=1) (EVAL y=1)] cannot be approximated")
);
- assertError(
- "FROM test | INLINE STATS COUNT() | STATS COUNT()",
- equalTo("line 1:13: approximation not supported: query with [INLINE STATS COUNT()] cannot be approximated")
- );
- assertError(
- "FROM test | STATS COUNT() | INLINE STATS COUNT()",
- equalTo("line 1:29: approximation not supported: query with [INLINE STATS COUNT()] cannot be approximated")
- );
- assertError(
- "FROM test | LOOKUP JOIN test_lookup ON emp_no | FORK (EVAL x=1) (EVAL y=1) | STATS COUNT()",
- equalTo("line 1:13: approximation not supported: query with [LOOKUP JOIN test_lookup ON emp_no] cannot be approximated")
- );
- assertError(
- "FROM test | STATS emp_no=COUNT() | LOOKUP JOIN test_lookup ON emp_no | FORK (EVAL x=1) (EVAL y=1)",
- equalTo("line 1:36: approximation not supported: query with [LOOKUP JOIN test_lookup ON emp_no] cannot be approximated")
- );
}
public void testVerify_incompatibleProcessingCommandBeforeStats() {
@@ -161,7 +163,7 @@ public void testPlans_noData() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 0, so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(0));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(0));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -183,7 +185,7 @@ public void testPlans_smallDataNoFilters() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 1000, so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(1_000));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(1_000));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -205,7 +207,7 @@ public void testPlans_largeDataNoFilters() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 10^9.
- mainPlan = approximation.newMainPlan(newCountResult(1_000_000_000));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(1_000_000_000));
// No filtering, so no more subplans.
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -229,7 +231,7 @@ public void testPlans_largeDataAfterFiltering() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 10^12.
- approximation.newMainPlan(newCountResult(1_000_000_000_000L));
+ processResult(approximation, mainPlan, newCountResult(1_000_000_000_000L));
// There's filtering, so start with a count plan with sampling with probability 10^4 / 10^12.
subplan = approximation.firstSubPlan();
assertThat(subplan, hasPlan(Filter.class));
@@ -237,14 +239,14 @@ public void testPlans_largeDataAfterFiltering() throws Exception {
assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-8), withAggs(CountApproximate.class)));
// Sampled-corrected filtered count of 10^9 (so actual count of 10), so increase the sample probability.
- approximation.newMainPlan(newCountResult(1_000_000_000));
+ processResult(approximation, mainPlan, newCountResult(1_000_000_000));
subplan = approximation.firstSubPlan();
assertThat(subplan, hasPlan(Filter.class));
assertThat(subplan, not(hasPlan(Aggregate.class)));
assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-5), withAggs(CountApproximate.class)));
// Sampled-corrected filtered count of 10^9 (so actual count of 10_000), so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(1_000_000_000));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(1_000_000_000));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -268,7 +270,7 @@ public void testPlans_smallDataAfterFiltering() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 10^18.
- approximation.newMainPlan(newCountResult(1_000_000_000_000_000_000L));
+ processResult(approximation, mainPlan, newCountResult(1_000_000_000_000_000_000L));
// There's filtering, so start with a count plan with sampling with probability 10^4 / 10^18.
subplan = approximation.firstSubPlan();
assertThat(subplan, hasPlan(Filter.class));
@@ -276,28 +278,28 @@ public void testPlans_smallDataAfterFiltering() throws Exception {
assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-14), withAggs(CountApproximate.class)));
// Filtered count of 0, so increase the sample probability.
- approximation.newMainPlan(newCountResult(0));
+ processResult(approximation, mainPlan, newCountResult(0));
subplan = approximation.firstSubPlan();
assertThat(subplan, hasPlan(Filter.class));
assertThat(subplan, not(hasPlan(Aggregate.class)));
assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-10), withAggs(CountApproximate.class)));
// Filtered count of 0, so increase the sample probability.
- approximation.newMainPlan(newCountResult(0));
+ processResult(approximation, mainPlan, newCountResult(0));
subplan = approximation.firstSubPlan();
assertThat(subplan, hasPlan(Filter.class));
assertThat(subplan, not(hasPlan(Aggregate.class)));
assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-6), withAggs(CountApproximate.class)));
// Filtered count of 0, so increase the sample probability.
- approximation.newMainPlan(newCountResult(0));
+ processResult(approximation, mainPlan, newCountResult(0));
subplan = approximation.firstSubPlan();
assertThat(subplan, hasPlan(Filter.class));
assertThat(subplan, not(hasPlan(Aggregate.class)));
assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-2), withAggs(CountApproximate.class)));
// Filtered count of 0, so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(0));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(0));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -321,7 +323,7 @@ public void testPlans_smallDataBeforeFiltering() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 1000, so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(1_000));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(1_000));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -345,7 +347,7 @@ public void testPlans_smallDataAfterMvExpanding() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 1000.
- approximation.newMainPlan(newCountResult(1_000));
+ processResult(approximation, mainPlan, newCountResult(1_000));
// The next subplan should be the mv_expanded count,
// without sampling, because source count is small.
subplan = approximation.firstSubPlan();
@@ -354,7 +356,7 @@ public void testPlans_smallDataAfterMvExpanding() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// mv_expanded count of 10_000, so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(1_000));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(1_000));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -378,7 +380,7 @@ public void testPlans_largeDataAfterMvExpanding() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 1000.
- approximation.newMainPlan(newCountResult(1_000));
+ processResult(approximation, mainPlan, newCountResult(1_000));
// The next subplan should be the mv_expanded count,
// without no sampling because source count is small.
subplan = approximation.firstSubPlan();
@@ -387,7 +389,7 @@ public void testPlans_largeDataAfterMvExpanding() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// mv_expanded count of 10^9, so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(1_000_000_000));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(1_000_000_000));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -411,7 +413,7 @@ public void testPlans_largeDataBeforeMvExpanding() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 10^9.
- approximation.newMainPlan(newCountResult(1_000_000_000));
+ processResult(approximation, mainPlan, newCountResult(1_000_000_000));
// The next subplan should be the mv_expanded count,
// with sampling because source count is large.
subplan = approximation.firstSubPlan();
@@ -420,7 +422,7 @@ public void testPlans_largeDataBeforeMvExpanding() throws Exception {
assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-5), withAggs(CountApproximate.class)));
// Sample-corrected mv_expanded count of 10^12 (so actual of 10^7), so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(1_000_000_000_000L));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(1_000_000_000_000L));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -444,7 +446,7 @@ public void testPlans_sampleProbabilityThreshold_noFilter() throws Exception {
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 500_000, so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(500_000));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(500_000));
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -469,7 +471,7 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti
assertThat(subplan, hasPlan(Aggregate.class, withAggs(Count.class)));
// Source count of 10^12.
- approximation.newMainPlan(newCountResult(1_000_000_000_000L));
+ processResult(approximation, mainPlan, newCountResult(1_000_000_000_000L));
// There's filtering, so start with a count plan with sampling with probability 10^4 / 10^12.
subplan = approximation.firstSubPlan();
assertThat(subplan, hasPlan(Filter.class));
@@ -477,7 +479,7 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti
assertThat(subplan, hasPlan(SampledAggregate.class, withProbability(1e-8), withAggs(CountApproximate.class)));
// Sampled filtered count of 0, so increase the sample probability.
- approximation.newMainPlan(newCountResult(0));
+ processResult(approximation, mainPlan, newCountResult(0));
// There's filtering, so start with a count plan with sampling with probability 10^4 / 10^12.
subplan = approximation.firstSubPlan();
assertThat(subplan, hasPlan(Filter.class));
@@ -486,7 +488,7 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti
// Sampled filtered count of 20, which would next to a sample probability of 0.2,
// which is above the threshold, so no more subplans.
- mainPlan = approximation.newMainPlan(newCountResult(20));
+ mainPlan = processResult(approximation, mainPlan, newCountResult(20));
// There's filtering, so start with a count plan with sampling with probability 10^4 / 10^12.
subplan = approximation.firstSubPlan();
assertThat(subplan, nullValue());
@@ -496,4 +498,9 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti
assertThat(mainPlan, not(hasPlan(SampledAggregate.class)));
assertThat(mainPlan, hasPlan(Aggregate.class, withAggs(Sum.class)));
}
+
+ private LogicalPlan processResult(Approximation approximation, LogicalPlan mainPlan, Result result) {
+ Double sampleProbability = approximation.processResult(result);
+ return sampleProbability == null ? mainPlan : ApproximationPlan.substituteSampleProbability(mainPlan, sampleProbability);
+ }
}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java
index 6ae3ad0941b9b..d13bfeeab1da8 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java
@@ -72,9 +72,9 @@
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
-import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
@@ -1231,7 +1231,7 @@ public void testPruneLeftJoinOnNullMatchingFieldAndShadowingAttributes() {
var rightRelation = EsqlTestUtils.relation(IndexMode.LOOKUP).withAttributes(List.of(keyRight, fieldRight1, fieldRight2));
JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(keyLeft), List.of(keyRight), null);
- var join = new Join(EMPTY, leftRelation, rightRelation, joinConfig);
+ var join = new LookupJoin(EMPTY, leftRelation, rightRelation, joinConfig);
var project = new Project(EMPTY, join, List.of(keyLeft, intFieldLeft, fieldRight1, fieldRight2));
var testStats = statsForMissingField("key");
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
index f68caae1df5d6..6cd2a37047aa3 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
@@ -1068,10 +1068,9 @@ public void testPushdownLimitsPastLeftJoin() {
assertNotEquals(leftChild, rightChild);
var joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), null);
- var join = switch (randomIntBetween(0, 2)) {
- case 0 -> new Join(EMPTY, leftChild, rightChild, joinConfig);
- case 1 -> new LookupJoin(EMPTY, leftChild, rightChild, joinConfig, false);
- case 2 -> new InlineJoin(EMPTY, leftChild, rightChild, joinConfig);
+ var join = switch (randomIntBetween(0, 1)) {
+ case 0 -> new LookupJoin(EMPTY, leftChild, rightChild, joinConfig, false);
+ case 1 -> new InlineJoin(EMPTY, leftChild, rightChild, joinConfig);
default -> throw new IllegalArgumentException();
};
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java
index 6b6de5d5c375f..d2218b005c06f 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFiltersTests.java
@@ -60,6 +60,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
@@ -614,7 +615,7 @@ public void testPushDownFilterPastLeftJoinWithComplexMix() {
EsRelation left = relation(List.of(a, getFieldAttribute("b")));
EsRelation right = relation(List.of(c, d, e, f, g));
JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(a), List.of(c), null);
- Join join = new Join(EMPTY, left, right, joinConfig);
+ Join join = new LookupJoin(EMPTY, left, right, joinConfig);
// Predicates
Expression p1 = greaterThanOf(c, ONE); // pushable
@@ -865,7 +866,7 @@ private Join createLeftJoinOnFields() {
EsRelation right = relation(List.of(c, b));
JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(a, b), List.of(b, c), null);
- return new Join(EMPTY, left, right, joinConfig);
+ return new LookupJoin(EMPTY, left, right, joinConfig);
}
private Join createLeftJoinOnExpression() {
@@ -877,7 +878,7 @@ private Join createLeftJoinOnExpression() {
EsRelation right = relation(List.of(c, b2));
Expression joinOnCondition = new GreaterThanOrEqual(Source.EMPTY, b1, b2);
JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(b1), List.of(b2), joinOnCondition);
- return new Join(EMPTY, left, right, joinConfig);
+ return new LookupJoin(EMPTY, left, right, joinConfig);
}
public void testLeftJoinOnExpressionPushable() {
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimitsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimitsTests.java
index 74d102774fe93..7cf8e88be5119 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimitsTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimitsTests.java
@@ -33,6 +33,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import java.util.List;
@@ -229,7 +230,7 @@ public void testNonPushableLimit() {
}),
new PushDownLimitTestCase<>(
Join.class,
- (plan, attr) -> new Join(EMPTY, plan, plan, new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), attr)),
+ (plan, attr) -> new LookupJoin(EMPTY, plan, plan, new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), attr)),
(basePlan, optimizedPlan) -> {
assertEquals(basePlan.source(), optimizedPlan.source());
var limit = as(optimizedPlan.left(), Limit.class);
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStatsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStatsTests.java
index b7494c09cbdef..33eb641b91faf 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStatsTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSampledStatsBySampleAndStatsTests.java
@@ -26,6 +26,7 @@
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
+import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
@@ -36,6 +37,7 @@
import java.util.HashMap;
import java.util.List;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@@ -123,6 +125,31 @@ public void testReplace_stdDev() {
assertThat(sampleExec.probability(), is(sampledAgg.sampleProbability()));
}
+ public void testReplace_join() {
+ Alias count = countApproximateAlias(Literal.keyword(Source.EMPTY, "*"));
+ EsQueryExec left = esQueryExec();
+ EsQueryExec right = esQueryExec();
+ LookupJoinExec lookupJoin = lookupJoinExec(left, right);
+ SampledAggregateExec sampledAgg = sampledAggregate(lookupJoin, List.of(count), List.of(), AggregatorMode.INITIAL, 0.5);
+
+ PhysicalPlan result = applyRule(sampledAgg);
+
+ assertThat(result, instanceOf(ProjectExec.class));
+ ProjectExec project = (ProjectExec) result;
+ assertThat(project.child(), instanceOf(EvalExec.class));
+ EvalExec eval = (EvalExec) project.child();
+ // COUNT and its bucket must be sample-corrected.
+ assertThat(eval.fields(), hasSize(2));
+ AggregateExec aggExec = assertAggregate(eval.child(), sampledAgg);
+ assertThat(aggExec.child(), instanceOf(LookupJoinExec.class));
+ LookupJoinExec lookupJoinExec = (LookupJoinExec) aggExec.child();
+ assertThat(lookupJoinExec.left(), instanceOf(SampleExec.class));
+ SampleExec sampleExec = (SampleExec) lookupJoinExec.left();
+ assertThat(sampleExec.probability(), is(sampledAgg.sampleProbability()));
+ assertThat(sampleExec.child(), equalTo(left));
+ assertThat(lookupJoinExec.right(), equalTo(right));
+ }
+
private static PhysicalPlan applyRule(SampledAggregateExec sampledAgg) {
return new ReplaceSampledStatsBySampleAndStats().apply(sampledAgg);
}
@@ -176,6 +203,10 @@ private static EsQueryExec esQueryExec() {
return new EsQueryExec(Source.EMPTY, "test", IndexMode.STANDARD, List.of(), null, null, null, List.of());
}
+ private static LookupJoinExec lookupJoinExec(PhysicalPlan left, PhysicalPlan right) {
+ return new LookupJoinExec(Source.EMPTY, left, right, List.of(), List.of(), List.of(), null);
+ }
+
private static Alias countApproximateAlias(Expression field) {
return new Alias(Source.EMPTY, "count", new CountApproximate(Source.EMPTY, field));
}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/JoinSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/JoinSerializationTests.java
index 76d40ccce072c..696b86f9de8fb 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/JoinSerializationTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/JoinSerializationTests.java
@@ -13,6 +13,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinType;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import java.io.IOException;
import java.util.List;
@@ -26,7 +27,7 @@ protected Join createTestInstance() {
LogicalPlan left = randomChild(0);
LogicalPlan right = randomChild(0);
JoinConfig config = randomJoinConfig();
- return new Join(source, left, right, config);
+ return new LookupJoin(source, left, right, config);
}
private static JoinConfig randomJoinConfig() {
@@ -46,6 +47,6 @@ protected Join mutateInstance(Join instance) throws IOException {
case 1 -> right = randomValueOtherThan(right, () -> randomChild(0));
case 2 -> config = randomValueOtherThan(config, JoinSerializationTests::randomJoinConfig);
}
- return new Join(instance.source(), left, right, config);
+ return new LookupJoin(instance.source(), left, right, config);
}
}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/JoinTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/JoinTests.java
index 187dcbd07d778..db989ba224cb7 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/JoinTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/JoinTests.java
@@ -18,6 +18,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
+import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import java.util.ArrayList;
import java.util.List;
@@ -46,7 +47,7 @@ public void testExpressionsAndReferences() {
Row right = new Row(Source.EMPTY, rightFields);
JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, leftAttributes, rightAttributes, null);
- Join join = new Join(Source.EMPTY, left, right, joinConfig);
+ Join join = new LookupJoin(Source.EMPTY, left, right, joinConfig);
// matchfields are a subset of the left and right fields, so they don't contribute to the size of the references set.
// assertEquals(2 * numMatchFields, join.references().size());
@@ -82,7 +83,7 @@ public void testTransformExprs() {
Row right = new Row(Source.EMPTY, rightFields);
JoinConfig joinConfig = new JoinConfig(JoinTypes.LEFT, leftAttributes, rightAttributes, null);
- Join join = new Join(Source.EMPTY, left, right, joinConfig);
+ Join join = new LookupJoin(Source.EMPTY, left, right, joinConfig);
assertTrue(join.config().leftFields().stream().allMatch(ref -> ref.dataType().equals(DataType.INTEGER)));
Join transformedJoin = (Join) join.transformExpressionsOnly(Attribute.class, attr -> attr.withDataType(DataType.BOOLEAN));
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/golden_tests/ApproximationGoldenTests/testSimpleApproximation/query.esql b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/golden_tests/ApproximationGoldenTests/testSimpleApproximation/query.esql
new file mode 100644
index 0000000000000..8ef04445f108c
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/golden_tests/ApproximationGoldenTests/testSimpleApproximation/query.esql
@@ -0,0 +1,2 @@
+SET approximation={"rows":10000};
+FROM many_numbers | STATS COUNT(), AVG(sv)
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByFieldIntroducedInTheJoinNotDuplicated/logical_optimization.expected b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByFieldIntroducedInTheJoinNotDuplicated/logical_optimization.expected
index e06ec80c8afc5..1111bf97732f0 100644
--- a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByFieldIntroducedInTheJoinNotDuplicated/logical_optimization.expected
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByFieldIntroducedInTheJoinNotDuplicated/logical_optimization.expected
@@ -1,6 +1,6 @@
Limit[1000[INTEGER],false,false]
\_LimitBy[5[INTEGER],[language_name{f}#0],false]
- \_Join[LEFT,[language_code{r}#1],[language_code{f}#2],null]
+ \_LookupJoin[LEFT,[language_code{r}#1],[language_code{f}#2],false,null]
|_Eval[[languages{f}#3 AS language_code#1]]
| \_EsRelation[employees][avg_worked_seconds{f}#4, birth_date{f}#5, emp_no{f}#6, first_name{f}#7, gender{f}#8, height{f}#9, height.float{f}#10, height.half_float{f}#11, height.scaled_float{f}#12, hire_date{f}#13, is_rehired{f}#14, job_positions{f}#15, languages{f}#3, languages.byte{f}#16, languages.long{f}#17, languages.short{f}#18, last_name{f}#19, salary{f}#20, salary_change{f}#21, salary_change.int{f}#22, salary_change.keyword{f}#23, salary_change.long{f}#24, still_hired{f}#25]
\_EsRelation[languages_lookup][LOOKUP][language_code{f}#2, language_name{f}#0]
\ No newline at end of file
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByOriginalFieldDuplicated/logical_optimization.expected b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByOriginalFieldDuplicated/logical_optimization.expected
index 89efa0d1fb64f..d8ad1373dceb6 100644
--- a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByOriginalFieldDuplicated/logical_optimization.expected
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByOriginalFieldDuplicated/logical_optimization.expected
@@ -1,6 +1,6 @@
Limit[1000[INTEGER],false,false]
\_LimitBy[5[INTEGER],[emp_no{f}#0],true]
- \_Join[LEFT,[language_code{r}#1],[language_code{f}#2],null]
+ \_LookupJoin[LEFT,[language_code{r}#1],[language_code{f}#2],false,null]
|_Eval[[languages{f}#3 AS language_code#1]]
| \_LimitBy[5[INTEGER],[emp_no{f}#0],false]
| \_EsRelation[employees][avg_worked_seconds{f}#4, birth_date{f}#5, emp_no{f}#0, first_name{f}#6, gender{f}#7, height{f}#8, height.float{f}#9, height.half_float{f}#10, height.scaled_float{f}#11, hire_date{f}#12, is_rehired{f}#13, job_positions{f}#14, languages{f}#3, languages.byte{f}#15, languages.long{f}#16, languages.short{f}#17, last_name{f}#18, salary{f}#19, salary_change{f}#20, salary_change.int{f}#21, salary_change.keyword{f}#22, salary_change.long{f}#23, still_hired{f}#24]
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByShadowedJoinFieldDuplicated/logical_optimization.expected b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByShadowedJoinFieldDuplicated/logical_optimization.expected
index 0fb44a1d99ac3..8356670902df2 100644
--- a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByShadowedJoinFieldDuplicated/logical_optimization.expected
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByShadowedJoinFieldDuplicated/logical_optimization.expected
@@ -1,6 +1,6 @@
Limit[1000[INTEGER],false,false]
\_LimitBy[5[INTEGER],[language_code{r}#0],true]
- \_Join[LEFT,[language_code{r}#0],[language_code{f}#1],null]
+ \_LookupJoin[LEFT,[language_code{r}#0],[language_code{f}#1],false,null]
|_LimitBy[5[INTEGER],[language_code{r}#0],false]
| \_Project[[avg_worked_seconds{f}#2, birth_date{f}#3, emp_no{f}#4, first_name{f}#5, gender{f}#6, height{f}#7, height.float{f}#8, height.half_float{f}#9, height.scaled_float{f}#10, hire_date{f}#11, is_rehired{f}#12, job_positions{f}#13, languages{f}#14 AS language_code#0, languages.byte{f}#15, languages.long{f}#16, languages.short{f}#17, last_name{f}#18, salary{f}#19, salary_change{f}#20, salary_change.int{f}#21, salary_change.keyword{f}#22, salary_change.long{f}#23, still_hired{f}#24]]
| \_EsRelation[employees][avg_worked_seconds{f}#2, birth_date{f}#3, emp_no{f}#4, first_name{f}#5, gender{f}#6, height{f}#7, height.float{f}#8, height.half_float{f}#9, height.scaled_float{f}#10, hire_date{f}#11, is_rehired{f}#12, job_positions{f}#13, languages{f}#14, languages.byte{f}#15, languages.long{f}#16, languages.short{f}#17, last_name{f}#18, salary{f}#19, salary_change{f}#20, salary_change.int{f}#21, salary_change.keyword{f}#22, salary_change.long{f}#23, still_hired{f}#24]
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByShadowedNonJoinFieldNotDuplicated/logical_optimization.expected b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByShadowedNonJoinFieldNotDuplicated/logical_optimization.expected
index e06ec80c8afc5..1111bf97732f0 100644
--- a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByShadowedNonJoinFieldNotDuplicated/logical_optimization.expected
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/logical/golden_tests/PushDownAndCombineLimitByGoldenTests/testLimitByShadowedNonJoinFieldNotDuplicated/logical_optimization.expected
@@ -1,6 +1,6 @@
Limit[1000[INTEGER],false,false]
\_LimitBy[5[INTEGER],[language_name{f}#0],false]
- \_Join[LEFT,[language_code{r}#1],[language_code{f}#2],null]
+ \_LookupJoin[LEFT,[language_code{r}#1],[language_code{f}#2],false,null]
|_Eval[[languages{f}#3 AS language_code#1]]
| \_EsRelation[employees][avg_worked_seconds{f}#4, birth_date{f}#5, emp_no{f}#6, first_name{f}#7, gender{f}#8, height{f}#9, height.float{f}#10, height.half_float{f}#11, height.scaled_float{f}#12, hire_date{f}#13, is_rehired{f}#14, job_positions{f}#15, languages{f}#3, languages.byte{f}#16, languages.long{f}#17, languages.short{f}#18, last_name{f}#19, salary{f}#20, salary_change{f}#21, salary_change.int{f}#22, salary_change.keyword{f}#23, salary_change.long{f}#24, still_hired{f}#25]
\_EsRelation[languages_lookup][LOOKUP][language_code{f}#2, language_name{f}#0]
\ No newline at end of file
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/golden_tests/ReorderLimitProjectAndOrderByGoldenTests/testLimitByAndProjectNotSwapped/logical_optimization.expected b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/golden_tests/ReorderLimitProjectAndOrderByGoldenTests/testLimitByAndProjectNotSwapped/logical_optimization.expected
index 0fb44a1d99ac3..8356670902df2 100644
--- a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/golden_tests/ReorderLimitProjectAndOrderByGoldenTests/testLimitByAndProjectNotSwapped/logical_optimization.expected
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/golden_tests/ReorderLimitProjectAndOrderByGoldenTests/testLimitByAndProjectNotSwapped/logical_optimization.expected
@@ -1,6 +1,6 @@
Limit[1000[INTEGER],false,false]
\_LimitBy[5[INTEGER],[language_code{r}#0],true]
- \_Join[LEFT,[language_code{r}#0],[language_code{f}#1],null]
+ \_LookupJoin[LEFT,[language_code{r}#0],[language_code{f}#1],false,null]
|_LimitBy[5[INTEGER],[language_code{r}#0],false]
| \_Project[[avg_worked_seconds{f}#2, birth_date{f}#3, emp_no{f}#4, first_name{f}#5, gender{f}#6, height{f}#7, height.float{f}#8, height.half_float{f}#9, height.scaled_float{f}#10, hire_date{f}#11, is_rehired{f}#12, job_positions{f}#13, languages{f}#14 AS language_code#0, languages.byte{f}#15, languages.long{f}#16, languages.short{f}#17, last_name{f}#18, salary{f}#19, salary_change{f}#20, salary_change.int{f}#21, salary_change.keyword{f}#22, salary_change.long{f}#23, still_hired{f}#24]]
| \_EsRelation[employees][avg_worked_seconds{f}#2, birth_date{f}#3, emp_no{f}#4, first_name{f}#5, gender{f}#6, height{f}#7, height.float{f}#8, height.half_float{f}#9, height.scaled_float{f}#10, hire_date{f}#11, is_rehired{f}#12, job_positions{f}#13, languages{f}#14, languages.byte{f}#15, languages.long{f}#16, languages.short{f}#17, last_name{f}#18, salary{f}#19, salary_change{f}#20, salary_change.int{f}#21, salary_change.keyword{f}#22, salary_change.long{f}#23, still_hired{f}#24]
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/golden_tests/ReorderLimitProjectAndOrderByGoldenTests/testProjectAndOrderBySwapped/logical_optimization.expected b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/golden_tests/ReorderLimitProjectAndOrderByGoldenTests/testProjectAndOrderBySwapped/logical_optimization.expected
index 73a1968a99b18..5465f58816dc6 100644
--- a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/golden_tests/ReorderLimitProjectAndOrderByGoldenTests/testProjectAndOrderBySwapped/logical_optimization.expected
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/golden_tests/ReorderLimitProjectAndOrderByGoldenTests/testProjectAndOrderBySwapped/logical_optimization.expected
@@ -1,6 +1,6 @@
Limit[1000[INTEGER],false,false]
\_TopNBy[[Order[salary{f}#0,ASC,LAST]],5[INTEGER],[language_code{r}#1]]
\_Project[[avg_worked_seconds{f}#2, birth_date{f}#3, emp_no{f}#4, first_name{f}#5, gender{f}#6, height{f}#7, height.float{f}#8, height.half_float{f}#9, height.scaled_float{f}#10, hire_date{f}#11, is_rehired{f}#12, job_positions{f}#13, languages{f}#14 AS language_code#1, languages.byte{f}#15, languages.long{f}#16, languages.short{f}#17, last_name{f}#18, salary{f}#0, salary_change{f}#19, salary_change.int{f}#20, salary_change.keyword{f}#21, salary_change.long{f}#22, still_hired{f}#23, language_name{f}#24]]
- \_Join[LEFT,[languages{f}#14],[language_code{f}#25],null]
+ \_LookupJoin[LEFT,[languages{f}#14],[language_code{f}#25],false,null]
|_EsRelation[employees][avg_worked_seconds{f}#2, birth_date{f}#3, emp_no{f}#4, first_name{f}#5, gender{f}#6, height{f}#7, height.float{f}#8, height.half_float{f}#9, height.scaled_float{f}#10, hire_date{f}#11, is_rehired{f}#12, job_positions{f}#13, languages{f}#14, languages.byte{f}#15, languages.long{f}#16, languages.short{f}#17, last_name{f}#18, salary{f}#0, salary_change{f}#19, salary_change.int{f}#20, salary_change.keyword{f}#21, salary_change.long{f}#22, still_hired{f}#23]
\_EsRelation[languages_lookup][LOOKUP][language_code{f}#25, language_name{f}#24]
\ No newline at end of file
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/plugin/golden_tests/LateMaterializationPlannerGoldenTests/testLookupJoinOnDataNode/local_reduce_planned_data_driver.expected b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/plugin/golden_tests/LateMaterializationPlannerGoldenTests/testLookupJoinOnDataNode/local_reduce_planned_data_driver.expected
index 8f1164f38c747..90a8442f76dd9 100644
--- a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/plugin/golden_tests/LateMaterializationPlannerGoldenTests/testLookupJoinOnDataNode/local_reduce_planned_data_driver.expected
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/plugin/golden_tests/LateMaterializationPlannerGoldenTests/testLookupJoinOnDataNode/local_reduce_planned_data_driver.expected
@@ -2,7 +2,7 @@ ExchangeSinkExec[[_doc{f}#0, emp_no{f}#1, languages{f}#2, language_code{r}#3, la
\_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
Project[[_doc{f}#0, emp_no{f}#1, languages{f}#2, language_code{r}#3, language_name{f}#4]]
\_TopN[[Order[emp_no{f}#1,ASC,LAST]],20[INTEGER],false]
- \_Join[LEFT,[language_code{r}#3],[language_code{f}#5],null]
+ \_LookupJoin[LEFT,[language_code{r}#3],[language_code{f}#5],false,null]
|_Eval[[languages{f}#2 AS language_code#3]]
| \_Filter[emp_no{f}#1 >= 10091[INTEGER] AND emp_no{f}#1 < 10094[INTEGER]]
| \_EsRelation[employees][_doc{f}#0, avg_worked_seconds{f}#6, birth_date{f}#7, emp_no{f}#1, first_name{f}#8, gender{f}#9, height{f}#10, height.float{f}#11, height.half_float{f}#12, height.scaled_float{f}#13, hire_date{f}#14, is_rehired{f}#15, job_positions{f}#16, languages{f}#2, languages.byte{f}#17, languages.long{f}#18, languages.short{f}#19, last_name{f}#20, salary{f}#21, salary_change{f}#22, salary_change.int{f}#23, salary_change.keyword{f}#24, salary_change.long{f}#25, still_hired{f}#26]
diff --git a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/plugin/golden_tests/LateMaterializationPlannerGoldenTests/testLookupJoinOnDataNode/physical_optimization.expected b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/plugin/golden_tests/LateMaterializationPlannerGoldenTests/testLookupJoinOnDataNode/physical_optimization.expected
index 438c1c6d34382..80975cbb0fdc4 100644
--- a/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/plugin/golden_tests/LateMaterializationPlannerGoldenTests/testLookupJoinOnDataNode/physical_optimization.expected
+++ b/x-pack/plugin/esql/src/test/resources/org/elasticsearch/xpack/esql/plugin/golden_tests/LateMaterializationPlannerGoldenTests/testLookupJoinOnDataNode/physical_optimization.expected
@@ -3,7 +3,7 @@ TopNExec[[Order[emp_no{f}#0,ASC,LAST]],20[INTEGER],null]
\_FragmentExec[filter=null, estimatedRowSize=0, reducer=[], fragment=[<>
Project[[avg_worked_seconds{f}#1, birth_date{f}#2, emp_no{f}#0, first_name{f}#3, gender{f}#4, height{f}#5, height.float{f}#6, height.half_float{f}#7, height.scaled_float{f}#8, hire_date{f}#9, is_rehired{f}#10, job_positions{f}#11, languages{f}#12, languages.byte{f}#13, languages.long{f}#14, languages.short{f}#15, last_name{f}#16, salary{f}#17, salary_change{f}#18, salary_change.int{f}#19, salary_change.keyword{f}#20, salary_change.long{f}#21, still_hired{f}#22, language_code{r}#23, language_name{f}#24]]
\_TopN[[Order[emp_no{f}#0,ASC,LAST]],20[INTEGER],false]
- \_Join[LEFT,[language_code{r}#23],[language_code{f}#25],null]
+ \_LookupJoin[LEFT,[language_code{r}#23],[language_code{f}#25],false,null]
|_Eval[[languages{f}#12 AS language_code#23]]
| \_Filter[emp_no{f}#0 >= 10091[INTEGER] AND emp_no{f}#0 < 10094[INTEGER]]
| \_EsRelation[employees][avg_worked_seconds{f}#1, birth_date{f}#2, emp_no{f}#0, first_name{f}#3, gender{f}#4, height{f}#5, height.float{f}#6, height.half_float{f}#7, height.scaled_float{f}#8, hire_date{f}#9, is_rehired{f}#10, job_positions{f}#11, languages{f}#12, languages.byte{f}#13, languages.long{f}#14, languages.short{f}#15, last_name{f}#16, salary{f}#17, salary_change{f}#18, salary_change.int{f}#19, salary_change.keyword{f}#20, salary_change.long{f}#21, still_hired{f}#22]