diff --git a/gradle/verification-metadata.xml b/gradle/verification-metadata.xml index 1282766203e15..1177fc41ef610 100644 --- a/gradle/verification-metadata.xml +++ b/gradle/verification-metadata.xml @@ -427,6 +427,16 @@ + + + + + + + + + + diff --git a/x-pack/plugin/esql-datasource-csv/build.gradle b/x-pack/plugin/esql-datasource-csv/build.gradle index 86f14a4de0ad6..c3fa8a8259d40 100644 --- a/x-pack/plugin/esql-datasource-csv/build.gradle +++ b/x-pack/plugin/esql-datasource-csv/build.gradle @@ -32,6 +32,7 @@ dependencies { testImplementation project(':test:framework') testImplementation(testArtifact(project(xpackModule('core')))) + testImplementation project(xpackModule('esql:qa:testFixtures')) } tasks.named("dependencyLicenses").configure { diff --git a/x-pack/plugin/esql-datasource-csv/src/main/java/org/elasticsearch/xpack/esql/datasource/csv/CsvFormatReader.java b/x-pack/plugin/esql-datasource-csv/src/main/java/org/elasticsearch/xpack/esql/datasource/csv/CsvFormatReader.java index 6c029ed15aeb6..eedd12d282ef8 100644 --- a/x-pack/plugin/esql-datasource-csv/src/main/java/org/elasticsearch/xpack/esql/datasource/csv/CsvFormatReader.java +++ b/x-pack/plugin/esql-datasource-csv/src/main/java/org/elasticsearch/xpack/esql/datasource/csv/CsvFormatReader.java @@ -100,6 +100,16 @@ * —{@code Array()} type notation * * + *

Bracket multi-value syntax

+ * When {@code multi_value_syntax} is {@code brackets}, array-like values support: + *
    + *
  • {@code [a,b,c]} — unquoted elements
  • + *
  • {@code ["a","b","c"]} — quoted elements (quotes stripped)
  • + *
  • {@code [a,"b,c"]} — mixed; commas inside quotes are literal
  • + *
+ *

With comma delimiter, a cell like {@code [hello,world]} is treated as one column: + * commas inside {@code [...]} are not column delimiters. + * *

Error handling

* Controlled by {@link ErrorPolicy} and its {@link ErrorPolicy.Mode}: * @@ -444,10 +454,17 @@ private List parseSchema(String schemaLine) { return attributes; } + /** + * Parse CSV type names to ESQL DataType. Small numeric types (SHORT, BYTE, FLOAT, etc.) + * are widened to INTEGER/DOUBLE since the planner expects widened types. + */ private DataType parseDataType(String typeName) { - return switch (typeName) { + String upper = typeName.toUpperCase(Locale.ROOT); + return switch (upper) { + case "SHORT", "BYTE" -> DataType.INTEGER; case "INTEGER", "INT", "I" -> DataType.INTEGER; case "LONG", "L" -> DataType.LONG; + case "FLOAT", "HALF_FLOAT", "SCALED_FLOAT" -> DataType.DOUBLE; case "DOUBLE", "D" -> DataType.DOUBLE; case "KEYWORD", "K", "STRING", "S" -> DataType.KEYWORD; case "TEXT", "TXT" -> DataType.TEXT; @@ -583,29 +600,36 @@ private Page readNextBatch() throws IOException { } initProjection(); - CsvSchema csvSchema = CsvSchema.emptySchema() - .withColumnSeparator(options.delimiter()) - .withQuoteChar(options.quoteChar()) - .withEscapeChar(options.escapeChar()) - .withNullValue(options.nullValue()); - csvIterator = sharedCsvMapper.readerFor(List.class).with(csvSchema).readValues(reader); + boolean useBracketAwareParsing = bracketMultiValues && options.delimiter() == ','; + if (useBracketAwareParsing == false) { + CsvSchema csvSchema = CsvSchema.emptySchema() + .withColumnSeparator(options.delimiter()) + .withQuoteChar(options.quoteChar()) + .withEscapeChar(options.escapeChar()) + .withNullValue(options.nullValue()); + csvIterator = sharedCsvMapper.readerFor(List.class).with(csvSchema).readValues(reader); + } } while (true) { List rows = new ArrayList<>(); - while (rows.size() < batchSize && csvIterator.hasNext()) { - List rowList = csvIterator.next(); - String[] row = new String[rowList.size()]; - for (int i = 0; i < rowList.size(); i++) { - Object val = rowList.get(i); - row[i] = val != null ? val.toString() : null; - } - if (hasCommentFilter && row.length > 0 && row[0] != null) { - String trimmedFirstCell = row[0].trim(); - if (trimmedFirstCell.startsWith(options.commentPrefix())) { - continue; + if (bracketMultiValues && options.delimiter() == ',') { + rows = readRowsBracketAware(batchSize); + } else { + while (rows.size() < batchSize && csvIterator.hasNext()) { + List rowList = csvIterator.next(); + String[] row = new String[rowList.size()]; + for (int i = 0; i < rowList.size(); i++) { + Object val = rowList.get(i); + row[i] = val != null ? val.toString() : null; + } + if (hasCommentFilter && row.length > 0 && row[0] != null) { + String trimmedFirstCell = row[0].trim(); + if (trimmedFirstCell.startsWith(options.commentPrefix())) { + continue; + } } + rows.add(row); } - rows.add(row); } if (rows.isEmpty()) { @@ -619,6 +643,129 @@ private Page readNextBatch() throws IOException { } } + /** + * Reads CSV rows using bracket-aware parsing. When a cell starts with {@code [} after a comma + * and ends with {@code ]} before a comma, commas inside are not column delimiters. + * The cell value is kept as {@code [a,b,c]} so multi-value conversion can parse it. + * Supports multi-line quoted fields. + */ + private List readRowsBracketAware(int batchSize) throws IOException { + List rows = new ArrayList<>(); + String line; + while (rows.size() < batchSize && (line = reader.readLine()) != null) { + line = line.trim(); + if (line.isEmpty() || (hasCommentFilter && line.startsWith(options.commentPrefix()))) { + continue; + } + StringBuilder logicalLine = new StringBuilder(line); + while (hasUnclosedQuote(logicalLine.toString(), options.quoteChar())) { + String next = reader.readLine(); + if (next == null) { + break; + } + logicalLine.append('\n').append(next); + } + String[] row = splitLineBracketAware(logicalLine.toString()); + rows.add(row); + } + return rows; + } + + private static boolean hasUnclosedQuote(String s, char quote) { + boolean inQuotes = false; + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c == quote) { + if (i + 1 < s.length() && s.charAt(i + 1) == quote) { + i++; + continue; + } + inQuotes = !inQuotes; + } + } + return inQuotes; + } + + /** + * Splits a CSV line by delimiter, treating quoted fields and {@code [..,..,..]} as single cells. + * Commas inside quotes or brackets are not delimiters. Escaped commas ({@code \,}) are skipped. + */ + private String[] splitLineBracketAware(String line) { + List entries = new ArrayList<>(); + char delim = options.delimiter(); + char quote = options.quoteChar(); + char esc = options.escapeChar(); + StringBuilder current = new StringBuilder(); + boolean inQuotes = false; + boolean inBrackets = false; + int i = 0; + while (i < line.length()) { + char c = line.charAt(i); + if (inQuotes) { + if (c == quote) { + if (i + 1 < line.length() && line.charAt(i + 1) == quote) { + current.append(quote); + i += 2; + continue; + } + inQuotes = false; + } else if (c == esc && i + 1 < line.length() && line.charAt(i + 1) == delim) { + current.append(delim); + i += 2; + continue; + } else { + current.append(c); + } + i++; + } else if (inBrackets) { + current.append(c); + if (c == ']') { + inBrackets = false; + entries.add(current.toString()); + current = new StringBuilder(); + i++; + while (i < line.length() && line.charAt(i) == ' ') { + i++; + } + if (i < line.length() && line.charAt(i) == delim) { + i++; + continue; + } + continue; + } + i++; + } else if (c == quote) { + inQuotes = true; + i++; + } else if (c == '[' && current.length() == 0) { + inBrackets = true; + current.append(c); + i++; + } else if (c == delim) { + if (i > 0 && line.charAt(i - 1) == esc) { + current.append(c); + } else { + entries.add(current.toString().trim()); + current = new StringBuilder(); + } + i++; + } else { + current.append(c); + i++; + } + } + if (inQuotes) { + throw new EsqlIllegalArgumentException("Unclosed quoted field in line [{}]", line); + } + if (inBrackets) { + throw new EsqlIllegalArgumentException("Unclosed bracket cell in line [{}]", line); + } + if (current.length() > 0) { + entries.add(current.toString().trim()); + } + return entries.toArray(String[]::new); + } + private void initProjection() { int schemaSize = schema.size(); if (projectedColumns == null || projectedColumns.isEmpty()) { @@ -768,16 +915,30 @@ private List splitBracketContent(String content) { List result = new ArrayList<>(); StringBuilder current = new StringBuilder(); char esc = options.escapeChar(); + char quote = options.quoteChar(); + boolean inQuotes = false; int i = 0; while (i < content.length()) { char c = content.charAt(i); - if (c == esc && i + 1 < content.length() && content.charAt(i + 1) == ',') { - current.append(','); - i += 2; - } else if (c == ',') { + if (c == quote) { + if (inQuotes) { + if (i + 1 < content.length() && content.charAt(i + 1) == quote) { + current.append(quote); + i += 2; + continue; + } + inQuotes = false; + } else { + inQuotes = true; + } + i++; + } else if (c == ',' && inQuotes == false) { result.add(current.toString().trim()); current = new StringBuilder(); i++; + } else if (c == esc && inQuotes == false && i + 1 < content.length() && content.charAt(i + 1) == ',') { + current.append(','); + i += 2; } else { current.append(c); i++; @@ -794,6 +955,10 @@ private Object parseElement(String value, DataType dataType) { if (hasCustomNullValue && value.equals(nullValueStr)) { return null; } + value = unquoteElement(value); + if (value.isEmpty()) { + return null; + } return switch (dataType) { case INTEGER -> tryParseInt(value); case LONG -> tryParseLong(value); @@ -809,6 +974,19 @@ private Object parseElement(String value, DataType dataType) { }; } + /** + * Unquotes an element that is wrapped in the configured quote character. + * Removes leading/trailing quotes and replaces {@code ""} with {@code "} in the inner content. + */ + private String unquoteElement(String value) { + char quote = options.quoteChar(); + if (value.length() >= 2 && value.charAt(0) == quote && value.charAt(value.length() - 1) == quote) { + String inner = value.substring(1, value.length() - 1); + return inner.replace(String.valueOf(quote) + quote, String.valueOf(quote)); + } + return value; + } + private Object tryParseInt(String value) { try { return Integer.parseInt(value); @@ -936,6 +1114,9 @@ private Class javaClassForDataType(DataType dataType) { } private static boolean looksNumeric(String value) { + if (value == null || value.isEmpty()) { + return false; + } int start = (value.charAt(0) == '-') ? 1 : 0; if (start >= value.length()) { return false; diff --git a/x-pack/plugin/esql-datasource-csv/src/test/java/org/elasticsearch/xpack/esql/datasource/csv/CsvFormatReaderTests.java b/x-pack/plugin/esql-datasource-csv/src/test/java/org/elasticsearch/xpack/esql/datasource/csv/CsvFormatReaderTests.java index 5309df1f1b66e..723bace930460 100644 --- a/x-pack/plugin/esql-datasource-csv/src/test/java/org/elasticsearch/xpack/esql/datasource/csv/CsvFormatReaderTests.java +++ b/x-pack/plugin/esql-datasource-csv/src/test/java/org/elasticsearch/xpack/esql/datasource/csv/CsvFormatReaderTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.CsvTestsDataLoader; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -1498,6 +1499,43 @@ public void testMultiValuePipeSeparatedElements() throws IOException { } } + /** + * Bracket-aware line splitting: escaped delimiter ({@code \,}) is treated as literal, + * so the comma does not split the column. The cell value preserves the escape sequence. + */ + public void testEscapedDelimiterInLine() throws IOException { + String csv = "id:long,data:keyword\n1,a\\,b\n2,normal\n"; + StorageObject object = createStorageObject(csv); + CsvFormatReader reader = new CsvFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(object, null, 10)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + assertEquals(2, page.getPositionCount()); + assertEquals(1L, ((LongBlock) page.getBlock(0)).getLong(0)); + assertEquals(new BytesRef("a\\,b"), ((BytesRefBlock) page.getBlock(1)).getBytesRef(0, new BytesRef())); + assertEquals(2L, ((LongBlock) page.getBlock(0)).getLong(1)); + assertEquals(new BytesRef("normal"), ((BytesRefBlock) page.getBlock(1)).getBytesRef(1, new BytesRef())); + } + } + + /** + * Escaped delimiter inside quoted field: {@code "a\,b"} yields literal comma in the cell. + */ + public void testEscapedDelimiterInQuotedField() throws IOException { + String csv = "id:long,data:keyword\n1,\"a\\,b\"\n2,normal\n"; + StorageObject object = createStorageObject(csv); + CsvFormatReader reader = new CsvFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(object, null, 10)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + assertEquals(2, page.getPositionCount()); + assertEquals(new BytesRef("a,b"), ((BytesRefBlock) page.getBlock(1)).getBytesRef(0, new BytesRef())); + assertEquals(new BytesRef("normal"), ((BytesRefBlock) page.getBlock(1)).getBytesRef(1, new BytesRef())); + } + } + public void testMultiValueEscapedComma() throws IOException { String csv = "id:integer,data:keyword\n1,\"[a\\\\,b,c]\"\n"; CsvFormatOptions options = new CsvFormatOptions( @@ -1578,11 +1616,105 @@ public void testMultiValueBracketsQuotedStrings() throws IOException { assertEquals(2, page.getPositionCount()); BytesRefBlock namesBlock = (BytesRefBlock) page.getBlock(1); assertEquals(2, namesBlock.getValueCount(0)); - assertEquals(new BytesRef("\"foo\""), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(0), new BytesRef())); - assertEquals(new BytesRef("\"bar\""), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(0) + 1, new BytesRef())); + assertEquals(new BytesRef("foo"), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(0), new BytesRef())); + assertEquals(new BytesRef("bar"), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(0) + 1, new BytesRef())); assertEquals(2, namesBlock.getValueCount(1)); - assertEquals(new BytesRef("\"hello world\""), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(1), new BytesRef())); - assertEquals(new BytesRef("\"test\""), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(1) + 1, new BytesRef())); + assertEquals(new BytesRef("hello world"), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(1), new BytesRef())); + assertEquals(new BytesRef("test"), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(1) + 1, new BytesRef())); + } + } + + /** + * Loads employees.csv from ESQL test fixtures. Verifies 23 columns in header and each row. + * The file has multi-value fields like {@code [Senior Python Developer,Accountant]}. + */ + public void testEmployeesCsvWithMultiValues() throws IOException { + String csv = new String(CsvTestsDataLoader.getResourceStream("/data/employees.csv").readAllBytes(), StandardCharsets.UTF_8); + StorageObject object = createStorageObject(csv); + CsvFormatReader reader = new CsvFormatReader(blockFactory); + + List schema = reader.schema(object); + assertEquals(23, schema.size()); + + int rowCount = 0; + try (CloseableIterator iterator = reader.read(object, null, 100)) { + while (iterator.hasNext()) { + Page page = iterator.next(); + assertEquals(23, page.getBlockCount()); + rowCount += page.getPositionCount(); + } + } + assertEquals(100, rowCount); + } + + /** + * CSV with comma delimiter: {@code a,[hello,world],c} parses the middle cell as one column + * whose value {@code [hello,world]} yields two multi-values: hello and world. + */ + public void testMultiValueBracketsInMultiColumnRow() throws IOException { + String csv = "prefix:keyword,tags:keyword,suffix:keyword\nx,[hello,world],y\n"; + StorageObject object = createStorageObject(csv); + CsvFormatReader reader = new CsvFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(object, null, 10)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + assertEquals(1, page.getPositionCount()); + BytesRefBlock prefixBlock = page.getBlock(0); + BytesRefBlock tagsBlock = page.getBlock(1); + BytesRefBlock suffixBlock = page.getBlock(2); + assertEquals(new BytesRef("x"), prefixBlock.getBytesRef(0, new BytesRef())); + assertEquals(2, tagsBlock.getValueCount(0)); + assertEquals(new BytesRef("hello"), tagsBlock.getBytesRef(tagsBlock.getFirstValueIndex(0), new BytesRef())); + assertEquals(new BytesRef("world"), tagsBlock.getBytesRef(tagsBlock.getFirstValueIndex(0) + 1, new BytesRef())); + assertEquals(new BytesRef("y"), suffixBlock.getBytesRef(0, new BytesRef())); + } + } + + public void testMultiValueBracketsQuotedElements() throws IOException { + String csv = "id:integer,names:keyword\n1,\"[\"\"hello\"\",\"\"world\"\"]\"\n"; + StorageObject object = createStorageObject(csv); + CsvFormatReader reader = new CsvFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(object, null, 10)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + assertEquals(1, page.getPositionCount()); + BytesRefBlock namesBlock = (BytesRefBlock) page.getBlock(1); + assertEquals(2, namesBlock.getValueCount(0)); + assertEquals(new BytesRef("hello"), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(0), new BytesRef())); + assertEquals(new BytesRef("world"), namesBlock.getBytesRef(namesBlock.getFirstValueIndex(0) + 1, new BytesRef())); + } + } + + public void testMultiValueBracketsMixedQuotedUnquoted() throws IOException { + String csv = "id:integer,data:keyword\n1,\"[hello,\"\"world,world\"\"]\"\n"; + StorageObject object = createStorageObject(csv); + CsvFormatReader reader = new CsvFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(object, null, 10)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + assertEquals(1, page.getPositionCount()); + BytesRefBlock dataBlock = (BytesRefBlock) page.getBlock(1); + assertEquals(2, dataBlock.getValueCount(0)); + assertEquals(new BytesRef("hello"), dataBlock.getBytesRef(dataBlock.getFirstValueIndex(0), new BytesRef())); + assertEquals(new BytesRef("world,world"), dataBlock.getBytesRef(dataBlock.getFirstValueIndex(0) + 1, new BytesRef())); + } + } + + public void testMultiValueBracketsQuotedWithEscapedQuote() throws IOException { + String csv = "id:integer,data:keyword\n1,\"[\"\"say \"\"\"\"hi\"\"\"\"\"\"]\"\n"; + StorageObject object = createStorageObject(csv); + CsvFormatReader reader = new CsvFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(object, null, 10)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + assertEquals(1, page.getPositionCount()); + BytesRefBlock dataBlock = (BytesRefBlock) page.getBlock(1); + assertEquals(1, dataBlock.getValueCount(0)); + assertEquals(new BytesRef("say \"hi\""), dataBlock.getBytesRef(dataBlock.getFirstValueIndex(0), new BytesRef())); } } diff --git a/x-pack/plugin/esql-datasource-iceberg/qa/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/iceberg/IcebergSpecIT.java b/x-pack/plugin/esql-datasource-iceberg/qa/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/iceberg/IcebergSpecIT.java index 3554020b3f511..d80405502641b 100644 --- a/x-pack/plugin/esql-datasource-iceberg/qa/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/iceberg/IcebergSpecIT.java +++ b/x-pack/plugin/esql-datasource-iceberg/qa/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/iceberg/IcebergSpecIT.java @@ -14,16 +14,10 @@ import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase; -import org.elasticsearch.xpack.esql.SpecReader; import org.junit.ClassRule; -import java.net.URL; import java.util.List; -import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; -import static org.junit.Assert.assertTrue; - /** Integration tests for Iceberg tables with metadata (loads iceberg-*.csv-spec). */ @ThreadLeakFilters(filters = TestClustersThreadFilter.class) @AwaitsFix(bugUrl = "Iceberg integration tests disabled pending stabilization") @@ -51,8 +45,6 @@ protected String getTestRestCluster() { @ParametersFactory(argumentFormatting = "csv-spec:%2$s.%3$s") public static List readScriptSpec() throws Exception { - List urls = classpathResources("/iceberg-*.csv-spec"); - assertTrue("No iceberg-*.csv-spec files found", urls.size() > 0); - return SpecReader.readScriptSpec(urls, specParser()); + return readExternalSpecTests("/iceberg-basic.csv-spec"); } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/iceberg-basic.csv-spec b/x-pack/plugin/esql-datasource-iceberg/qa/src/javaRestTest/resources/iceberg-basic.csv-spec similarity index 100% rename from x-pack/plugin/esql/qa/testFixtures/src/main/resources/iceberg-basic.csv-spec rename to x-pack/plugin/esql-datasource-iceberg/qa/src/javaRestTest/resources/iceberg-basic.csv-spec diff --git a/x-pack/plugin/esql/build.gradle b/x-pack/plugin/esql/build.gradle index ae8531f7fd67b..8f88fecaed8a5 100644 --- a/x-pack/plugin/esql/build.gradle +++ b/x-pack/plugin/esql/build.gradle @@ -65,6 +65,8 @@ dependencies { testImplementation project(path: xpackModule('kql')) testImplementation project(path: xpackModule('mapper-unsigned-long')) + testImplementation project(path: xpackModule('esql-datasource-csv')) + testImplementation project(path: xpackModule('esql-datasource-http')) testImplementation project(path: xpackModule('esql-datasource-gzip')) testImplementation project(path: xpackModule('esql-datasource-zstd')) testImplementation project(path: xpackModule('esql-datasource-bzip2')) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 398f75f4708be..a3cd492519207 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.logging.HeaderWarning; +import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; @@ -37,6 +38,8 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.compute.querydsl.query.SingleValueMatchQuery; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.PathUtils; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -65,12 +68,26 @@ import org.elasticsearch.xpack.esql.analysis.UnmappedResolution; import org.elasticsearch.xpack.esql.approximation.Approximation; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.core.type.InvalidMappedField; import org.elasticsearch.xpack.esql.core.util.StringUtils; +import org.elasticsearch.xpack.esql.datasource.csv.CsvDataSourcePlugin; +import org.elasticsearch.xpack.esql.datasource.http.HttpDataSourcePlugin; +import org.elasticsearch.xpack.esql.datasources.DataSourceCapabilities; +import org.elasticsearch.xpack.esql.datasources.DataSourceModule; +import org.elasticsearch.xpack.esql.datasources.ExternalSourceResolution; +import org.elasticsearch.xpack.esql.datasources.ExternalSourceResolver; +import org.elasticsearch.xpack.esql.datasources.FileSplit; +import org.elasticsearch.xpack.esql.datasources.OperatorFactoryRegistry; +import org.elasticsearch.xpack.esql.datasources.SplitDiscoveryPhase; +import org.elasticsearch.xpack.esql.datasources.spi.DataSourcePlugin; +import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; +import org.elasticsearch.xpack.esql.datasources.spi.StoragePath; import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService; import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; @@ -93,8 +110,13 @@ import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.SettingsValidationContext; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.UnresolvedExternalRelation; import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec; +import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; +import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.MMRExec; @@ -124,7 +146,13 @@ import org.junit.Before; import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -133,8 +161,12 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static java.util.stream.Collectors.toSet; @@ -145,6 +177,7 @@ import static org.elasticsearch.xpack.esql.CsvTestUtils.loadPageFromCsv; import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET; import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.VIEW_CONFIGS; +import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.getResourceStream; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; @@ -205,6 +238,8 @@ public class CsvTests extends ESTestCase { private static final EsqlCapabilities ENABLED_CAPS = EsqlCapabilities.capabilities(FUNCTION_REGISTRY, false); private static final EsqlCapabilities ALL_CAPS = EsqlCapabilities.capabilities(FUNCTION_REGISTRY, true); + private static final Pattern TEMPLATE_PATTERN = Pattern.compile("\\{\\{(\\w+)}}"); + private final String fileName; private final String groupName; private final String testName; @@ -385,10 +420,6 @@ public final void test() throws Throwable { "CSV tests cannot currently handle views with branching", testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.VIEWS_WITH_BRANCHING.capabilityName()) ); - assumeFalseLogging( - "CSV tests cannot handle EXTERNAL sources (requires QA integration tests)", - testCase.query.trim().toUpperCase(java.util.Locale.ROOT).startsWith("EXTERNAL") - ); assumeFalseLogging( "CSV tests cannot handle replacing approximate count by exact (requires ES filter pushdown)", groupName.equals("approximation") @@ -625,7 +656,8 @@ private LogicalPlan analyzedPlan( UnmappedResolution unmappedResolution, Configuration configuration, Map datasets, - TransportVersion minimumVersion + TransportVersion minimumVersion, + ExternalSourceResolution externalSourceResolution ) { var indexResolution = loadIndexResolution(datasets); var enrichPolicies = loadEnrichPolicies(); @@ -633,10 +665,12 @@ private LogicalPlan analyzedPlan( new AnalyzerContext( configuration, FUNCTION_REGISTRY, + null, indexResolution, Map.of(), enrichPolicies, emptyInferenceResolution(), + externalSourceResolution, minimumVersion, unmappedResolution ), @@ -753,9 +787,10 @@ private static TestPhysicalOperationProviders testOperationProviders( } private ActualResults executePlan(BigArrays bigArrays) throws Exception { + String query = substituteTemplates(testCase.query, csvFileTemplateResolver()); EsqlExecutionInfo esqlExecutionInfo = createEsqlExecutionInfo(randomBoolean()); esqlExecutionInfo.queryProfile().planning().start(); - EsqlStatement statement = EsqlParser.INSTANCE.createStatement(testCase.query); + EsqlStatement statement = EsqlParser.INSTANCE.createStatement(query); LogicalPlan plan = resolveViews(statement.plan()); this.configuration = EsqlTestUtils.configuration( new QueryPragmas(Settings.builder().put("page_size", randomPageSize()).build()), @@ -765,7 +800,48 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { var testDatasets = testDatasets(plan); // Specifically use the newest transport version; the csv tests correspond to a single node cluster on the current version. TransportVersion minimumVersion = TransportVersion.current(); - LogicalPlan analyzed = analyzedPlan(plan, statement.setting(UNMAPPED_FIELDS), configuration, testDatasets, minimumVersion); + + boolean hasExternalSources = plan.anyMatch(UnresolvedExternalRelation.class::isInstance); + ExternalSourceResolution externalSourceResolution = ExternalSourceResolution.EMPTY; + DataSourceModule dataSourceModule = null; + OperatorFactoryRegistry operatorFactoryRegistry; + if (hasExternalSources) { + var preAnalysis = new PreAnalyzer().preAnalyze(plan); + if (preAnalysis.icebergPaths().isEmpty() == false) { + List plugins = List.of(new CsvDataSourcePlugin(), new HttpDataSourcePlugin()); + DataSourceCapabilities caps = DataSourceCapabilities.build(plugins); + BlockFactory blockFactory = BlockFactory.builder(bigArrays).build(); + dataSourceModule = new DataSourceModule(plugins, caps, Settings.EMPTY, blockFactory, EsExecutors.DIRECT_EXECUTOR_SERVICE); + ExternalSourceResolver externalSourceResolver = new ExternalSourceResolver( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + dataSourceModule + ); + Map> pathParams = new HashMap<>(); + plan.forEachUp(UnresolvedExternalRelation.class, p -> { + if (p.tablePath() instanceof Literal literal && literal.value() != null) { + String path = BytesRefs.toString(literal.value()); + pathParams.put(path, p.params()); + } + }); + PlainActionFuture resolveFuture = new PlainActionFuture<>(); + externalSourceResolver.resolve(preAnalysis.icebergPaths(), pathParams, resolveFuture); + externalSourceResolution = resolveFuture.actionGet(); + operatorFactoryRegistry = dataSourceModule.createOperatorFactoryRegistry(EsExecutors.DIRECT_EXECUTOR_SERVICE); + } else { + operatorFactoryRegistry = null; + } + } else { + operatorFactoryRegistry = null; + } + + LogicalPlan analyzed = analyzedPlan( + plan, + statement.setting(UNMAPPED_FIELDS), + configuration, + testDatasets, + minimumVersion, + externalSourceResolution + ); FoldContext foldCtx = FoldContext.small(); EsqlSession session = new EsqlSession( @@ -800,7 +876,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { session.executeOptimizedPlan( new EsqlQueryRequest(), esqlExecutionInfo, - planRunner(bigArrays, physicalOperationProviders), + planRunner(bigArrays, physicalOperationProviders, operatorFactoryRegistry), optimizedPlan, configuration, foldCtx, @@ -825,7 +901,13 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { ); })); - return listener.get(); + try { + return listener.get(); + } finally { + if (dataSourceModule != null) { + IOUtils.closeWhileHandlingException(dataSourceModule); + } + } } private Settings randomNodeSettings() { @@ -855,17 +937,24 @@ private void opportunisticallyAssertPlanSerialization(PhysicalPlan plan) { || p instanceof ChangePointExec || p instanceof MergeExec || p instanceof MMRExec + || p instanceof ExternalSourceExec + || p instanceof ExchangeExec )) { return; } SerializationTestUtils.assertSerialization(plan, configuration); } - PlanRunner planRunner(BigArrays bigArrays, TestPhysicalOperationProviders physicalOperationProviders) { + PlanRunner planRunner( + BigArrays bigArrays, + TestPhysicalOperationProviders physicalOperationProviders, + OperatorFactoryRegistry operatorFactoryRegistry + ) { return (physicalPlan, configuration, foldContext, planTimeProfile, listener) -> executeSubPlan( bigArrays, foldContext, physicalOperationProviders, + operatorFactoryRegistry, physicalPlan, listener ); @@ -875,11 +964,14 @@ void executeSubPlan( BigArrays bigArrays, FoldContext foldCtx, TestPhysicalOperationProviders physicalOperationProviders, + OperatorFactoryRegistry operatorFactoryRegistry, PhysicalPlan physicalPlan, ActionListener listener ) { // Keep in sync with ComputeService#execute opportunisticallyAssertPlanSerialization(physicalPlan); + // Discover splits from FragmentExec (plan has FragmentExec, not ExternalSourceExec yet) + List coordinatorSplits = coordinatorSplits(operatorFactoryRegistry, physicalPlan); Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( physicalPlan, configuration @@ -912,13 +1004,12 @@ void executeSubPlan( mock(LookupFromIndexService.class), mock(InferenceService.class), physicalOperationProviders, - null // OperatorFactoryRegistry - not needed for CSV tests + operatorFactoryRegistry ); List collectedPages = Collections.synchronizedList(new ArrayList<>()); // replace fragment inside the coordinator plan - List drivers = new ArrayList<>(); LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan( "final", foldCtx, @@ -926,7 +1017,7 @@ void executeSubPlan( new OutputExec(coordinatorPlan, collectedPages::add), EmptyIndexedByShardId.instance() ); - drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(getTestName())); + List drivers = new ArrayList<>(coordinatorNodeExecutionPlan.createDrivers(getTestName())); if (dataNodePlan != null) { var searchStats = new DisabledSearchStats(); var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); @@ -936,6 +1027,12 @@ void executeSubPlan( ); var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer, null); + if (coordinatorSplits.isEmpty() == false) { + csvDataNodePhysicalPlan = csvDataNodePhysicalPlan.transformUp( + ExternalSourceExec.class, + exec -> exec.splits().isEmpty() ? exec.withSplits(coordinatorSplits) : exec + ); + } exchangeSource.addRemoteSink( exchangeSink::fetchPageAsync, Randomness.get().nextBoolean(), @@ -969,4 +1066,137 @@ protected void start(Driver driver, ActionListener driverListener) { listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, configuration, DriverCompletionInfo.EMPTY, null)) ); } + + private static List coordinatorSplits(OperatorFactoryRegistry operatorFactoryRegistry, PhysicalPlan physicalPlan) { + List coordinatorSplits = new ArrayList<>(); + if (operatorFactoryRegistry != null) { + physicalPlan.forEachDown(FragmentExec.class, fragment -> fragment.fragment().forEachDown(ExternalRelation.class, external -> { + ExternalSourceExec tempExec = external.toPhysicalExec(); + PhysicalPlan discovered = SplitDiscoveryPhase.resolveExternalSplits(tempExec, operatorFactoryRegistry.sourceFactories()); + if (discovered instanceof ExternalSourceExec withSplits) { + if (withSplits.splits().isEmpty() == false) { + coordinatorSplits.addAll(withSplits.splits()); + } else { + // Fallback: FileSplitProvider returns empty for FileSet.UNRESOLVED (single-file). + // Create a single split from the path so the operator can read. + String path = withSplits.sourcePath(); + if (path != null && path.startsWith("file://")) { + try { + StoragePath storagePath = StoragePath.of(path); + Path fsPath = PathUtils.get(URI.create(path)); + if (Files.exists(fsPath)) { + long fileLength = Files.size(fsPath); + coordinatorSplits.add( + new FileSplit( + withSplits.sourceType(), + storagePath, + 0, + fileLength, + ".csv", + withSplits.config() != null ? withSplits.config() : Map.of(), + Map.of() + ) + ); + } + } catch (Exception e) { + LOGGER.debug( + () -> org.elasticsearch.core.Strings.format( + "Fallback split creation failed for path [%s]; " + + "operator may still work with path directly when splits empty", + path + ), + e + ); + } + } + } + } + })); + } + return coordinatorSplits; + } + + /** + * Returns a template resolver that maps CSV dataset names (e.g. "employees") to file:// URLs + * for datasets in CSV_DATASET that have a data file. Unknown template names return null. + *

+ * Paths are resolved lazily on first use per template and cached. For jar resources, temp files + * use a file-specific prefix; if a file matching that prefix already exists, it is reused. + */ + public static Function csvFileTemplateResolver() { + ConcurrentHashMap cache = new ConcurrentHashMap<>(); + return templateName -> { + CsvTestsDataLoader.TestDataset dataset = CSV_DATASET.get(templateName); + if (dataset == null || dataset.dataFileName() == null) { + throw new IllegalArgumentException("Failed to resolve CSV path for dataset [" + templateName + "]"); + } + return cache.computeIfAbsent(templateName, k -> { + try { + return resolveCsvFilePathLazy("/data/" + dataset.dataFileName()); + } catch (IOException e) { + throw new UncheckedIOException("Failed to resolve CSV path for dataset [" + templateName + "]", e); + } + }); + }; + } + + private static String resolveCsvFilePathLazy(String resourcePath) throws IOException { + URL resource = CsvTestsDataLoader.class.getResource(resourcePath); + if (resource == null) { + throw new IllegalStateException("Cannot find resource " + resourcePath); + } + String protocol = resource.getProtocol(); + if ("file".equals(protocol)) { + return normalizeFileUri(resource.toExternalForm()); + } + if ("jar".equals(protocol)) { + String fileName = PathUtils.get(resourcePath).getFileName().toString(); + String prefix = CsvTests.class.getCanonicalName() + "-" + fileName + "-"; + Path tempDir = PathUtils.get(System.getProperty("java.io.tmpdir")); + try (var stream = Files.newDirectoryStream(tempDir, prefix + "*")) { + var iterator = stream.iterator(); + if (iterator.hasNext()) { + Path existing = iterator.next(); + return normalizeFileUri(existing.toUri().toURL().toExternalForm()); + } + } + Path tempFile = createTempFile(tempDir, prefix, ".csv"); + try (InputStream in = getResourceStream(resourcePath)) { + Files.copy(in, tempFile, StandardCopyOption.REPLACE_EXISTING); + } + return normalizeFileUri(tempFile.toUri().toURL().toExternalForm()); + } + throw new IllegalStateException("Unsupported resource protocol: " + protocol); + } + + private static Path createTempFile(Path tempDir, String prefix, String suffix) throws IOException { + Path tempFile = Files.createTempFile(tempDir, prefix, suffix); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + Files.deleteIfExists(tempFile); + } catch (IOException ignored) { + // Best-effort cleanup on shutdown + } + })); + return tempFile; + } + + private static String normalizeFileUri(String uri) { + if (uri != null && uri.startsWith("file:/") && uri.startsWith("file:///") == false) { + return "file://" + uri.substring(5); + } + return uri; + } + + public static String substituteTemplates(String query, Function templateResolver) { + Matcher matcher = TEMPLATE_PATTERN.matcher(query); + StringBuilder result = new StringBuilder(); + while (matcher.find()) { + String templateName = matcher.group(1); + String replacement = templateResolver.apply(templateName); + matcher.appendReplacement(result, Matcher.quoteReplacement(replacement != null ? replacement : matcher.group(0))); + } + matcher.appendTail(result); + return result.toString(); + } }