diff --git a/docs/changelog/143997.yaml b/docs/changelog/143997.yaml new file mode 100644 index 0000000000000..8406de0836567 --- /dev/null +++ b/docs/changelog/143997.yaml @@ -0,0 +1,5 @@ +area: ES|QL +issues: [] +pr: 143997 +summary: "[ES|QL|DS] Wire parallel parsing into production for text formats" +type: enhancement diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/AsyncExternalSourceOperatorFactory.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/AsyncExternalSourceOperatorFactory.java index 8bf8971542736..48537b9b97aad 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/AsyncExternalSourceOperatorFactory.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/AsyncExternalSourceOperatorFactory.java @@ -11,10 +11,13 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.datasources.spi.ErrorPolicy; import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.FormatReader; +import org.elasticsearch.xpack.esql.datasources.spi.SegmentableFormatReader; import org.elasticsearch.xpack.esql.datasources.spi.StorageObject; import org.elasticsearch.xpack.esql.datasources.spi.StoragePath; import org.elasticsearch.xpack.esql.datasources.spi.StorageProvider; @@ -54,6 +57,8 @@ */ public class AsyncExternalSourceOperatorFactory implements SourceOperator.SourceOperatorFactory { + private static final Logger logger = LogManager.getLogger(AsyncExternalSourceOperatorFactory.class); + private final StorageProvider storageProvider; private final FormatReader formatReader; private final StoragePath path; @@ -67,6 +72,7 @@ public class AsyncExternalSourceOperatorFactory implements SourceOperator.Source private final Map partitionValues; private final ExternalSliceQueue sliceQueue; private final ErrorPolicy errorPolicy; + private final int parsingParallelism; public AsyncExternalSourceOperatorFactory( StorageProvider storageProvider, @@ -81,7 +87,8 @@ public AsyncExternalSourceOperatorFactory( Set partitionColumnNames, Map partitionValues, ExternalSliceQueue sliceQueue, - ErrorPolicy errorPolicy + ErrorPolicy errorPolicy, + int parsingParallelism ) { if (storageProvider == null) { throw new IllegalArgumentException("storageProvider cannot be null"); @@ -118,6 +125,40 @@ public AsyncExternalSourceOperatorFactory( this.partitionValues = partitionValues != null ? partitionValues : Map.of(); this.sliceQueue = sliceQueue; this.errorPolicy = errorPolicy != null ? errorPolicy : formatReader.defaultErrorPolicy(); + this.parsingParallelism = Math.max(1, parsingParallelism); + } + + public AsyncExternalSourceOperatorFactory( + StorageProvider storageProvider, + FormatReader formatReader, + StoragePath path, + List attributes, + int batchSize, + int maxBufferSize, + int rowLimit, + Executor executor, + FileSet fileSet, + Set partitionColumnNames, + Map partitionValues, + ExternalSliceQueue sliceQueue, + ErrorPolicy errorPolicy + ) { + this( + storageProvider, + formatReader, + path, + attributes, + batchSize, + maxBufferSize, + rowLimit, + executor, + fileSet, + partitionColumnNames, + partitionValues, + sliceQueue, + errorPolicy, + 1 + ); } public AsyncExternalSourceOperatorFactory( @@ -147,7 +188,8 @@ public AsyncExternalSourceOperatorFactory( partitionColumnNames, partitionValues, sliceQueue, - null + null, + 1 ); } @@ -177,7 +219,8 @@ public AsyncExternalSourceOperatorFactory( partitionColumnNames, partitionValues, sliceQueue, - null + null, + 1 ); } @@ -206,7 +249,8 @@ public AsyncExternalSourceOperatorFactory( partitionColumnNames, partitionValues, null, - null + null, + 1 ); } @@ -233,7 +277,8 @@ public AsyncExternalSourceOperatorFactory( null, null, null, - null + null, + 1 ); } @@ -259,7 +304,8 @@ public AsyncExternalSourceOperatorFactory( null, null, null, - null + null, + 1 ); } @@ -377,13 +423,29 @@ private void startMultiFileRead( executor.execute(() -> { try { int rowsRemaining = rowLimit; + boolean useParallel = rowLimit == FormatReader.NO_LIMIT && formatReader instanceof SegmentableFormatReader; for (StorageEntry entry : fileSet.files()) { if (buffer.noMoreInputs() || (rowLimit != FormatReader.NO_LIMIT && rowsRemaining <= 0)) { break; } StorageObject obj = storageProvider.newObject(entry.path(), entry.length(), entry.lastModified()); - int fileBudget = rowLimit == FormatReader.NO_LIMIT ? FormatReader.NO_LIMIT : rowsRemaining; - try (CloseableIterator pages = formatReader.read(obj, projectedColumns, batchSize, fileBudget, errorPolicy)) { + CloseableIterator pages; + if (useParallel) { + pages = ParallelParsingCoordinator.parallelRead( + (SegmentableFormatReader) formatReader, + obj, + projectedColumns, + batchSize, + parsingParallelism, + executor, + attributes, + errorPolicy + ); + } else { + int fileBudget = rowLimit == FormatReader.NO_LIMIT ? FormatReader.NO_LIMIT : rowsRemaining; + pages = formatReader.read(obj, projectedColumns, batchSize, fileBudget, errorPolicy); + } + try (pages) { int consumed = drainPagesWithBudget(pages, buffer, injector); if (rowLimit != FormatReader.NO_LIMIT) { rowsRemaining -= consumed; @@ -432,7 +494,20 @@ private void startSyncWrapperRead( executor.execute(() -> { CloseableIterator pages = null; try { - pages = formatReader.read(storageObject, projectedColumns, batchSize, rowLimit, errorPolicy); + if (rowLimit == FormatReader.NO_LIMIT && formatReader instanceof SegmentableFormatReader segmentable) { + pages = ParallelParsingCoordinator.parallelRead( + segmentable, + storageObject, + projectedColumns, + batchSize, + parsingParallelism, + executor, + attributes, + errorPolicy + ); + } else { + pages = formatReader.read(storageObject, projectedColumns, batchSize, rowLimit, errorPolicy); + } consumePages(pages, buffer, injector); } catch (Exception e) { buffer.onFailure(e); @@ -540,7 +615,14 @@ private static void closeQuietly(CloseableIterator iterator) { @Override public String describe() { - String asyncMode = formatReader.supportsNativeAsync() ? "native-async" : "sync-wrapper"; + String asyncMode; + if (formatReader instanceof SegmentableFormatReader && parsingParallelism > 1) { + asyncMode = "parallel-parse(" + parsingParallelism + ")"; + } else if (formatReader.supportsNativeAsync()) { + asyncMode = "native-async"; + } else { + asyncMode = "sync-wrapper"; + } return "AsyncExternalSourceOperator[" + "storage=" + storageProvider.getClass().getSimpleName() @@ -608,4 +690,8 @@ public ExternalSliceQueue sliceQueue() { public ErrorPolicy errorPolicy() { return errorPolicy; } + + public int parsingParallelism() { + return parsingParallelism; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/FileSourceFactory.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/FileSourceFactory.java index cc7dfd25b8b36..782728a9daffd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/FileSourceFactory.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/FileSourceFactory.java @@ -149,7 +149,8 @@ public SourceOperatorFactoryProvider operatorFactory() { context.partitionColumnNames(), partitionValues, context.sliceQueue(), - errorPolicy + errorPolicy, + context.parsingParallelism() ); }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ParallelParsingCoordinator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ParallelParsingCoordinator.java index 40b47a8ad7d9c..af1c10b829286 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ParallelParsingCoordinator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ParallelParsingCoordinator.java @@ -11,6 +11,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.datasources.spi.ErrorPolicy; import org.elasticsearch.xpack.esql.datasources.spi.SegmentableFormatReader; import org.elasticsearch.xpack.esql.datasources.spi.StorageObject; @@ -74,21 +75,48 @@ public static CloseableIterator parallelRead( int parallelism, Executor executor, List resolvedAttributes + ) throws IOException { + return parallelRead(reader, storageObject, projectedColumns, batchSize, parallelism, executor, resolvedAttributes, null); + } + + /** + * Creates a parallel-parsing iterator with an explicit error policy. + * + * @param errorPolicy error handling policy for per-segment parsing, or {@code null} for defaults + */ + public static CloseableIterator parallelRead( + SegmentableFormatReader reader, + StorageObject storageObject, + List projectedColumns, + int batchSize, + int parallelism, + Executor executor, + List resolvedAttributes, + ErrorPolicy errorPolicy ) throws IOException { long fileLength = storageObject.length(); long minSegment = reader.minimumSegmentSize(); if (parallelism <= 1 || fileLength < minSegment * 2) { - return reader.readSplit(storageObject, projectedColumns, batchSize, false, true, resolvedAttributes); + return reader.read(storageObject, projectedColumns, batchSize, errorPolicy); } List segments = computeSegments(reader, storageObject, fileLength, parallelism, minSegment); if (segments.size() <= 1) { - return reader.readSplit(storageObject, projectedColumns, batchSize, false, true, resolvedAttributes); + return reader.read(storageObject, projectedColumns, batchSize, errorPolicy); } - return new OrderedParallelIterator(reader, storageObject, projectedColumns, batchSize, segments, executor, resolvedAttributes); + return new OrderedParallelIterator( + reader, + storageObject, + projectedColumns, + batchSize, + segments, + executor, + resolvedAttributes, + errorPolicy + ); } /** @@ -162,6 +190,7 @@ private static final class OrderedParallelIterator implements CloseableIterator< private final List projectedColumns; private final int batchSize; private final List resolvedAttributes; + private final ErrorPolicy errorPolicy; private final List> segmentQueues; private final AtomicReference firstError = new AtomicReference<>(); @@ -178,13 +207,15 @@ private static final class OrderedParallelIterator implements CloseableIterator< int batchSize, List segments, Executor executor, - List resolvedAttributes + List resolvedAttributes, + ErrorPolicy errorPolicy ) { this.reader = reader; this.storageObject = storageObject; this.projectedColumns = projectedColumns; this.batchSize = batchSize; this.resolvedAttributes = resolvedAttributes; + this.errorPolicy = errorPolicy; this.allDone = new CountDownLatch(segments.size()); this.segmentQueues = new ArrayList<>(segments.size()); @@ -211,16 +242,13 @@ private void parseSegment(int segmentIndex, long offset, long length, int totalS boolean lastSplit = segmentIndex == totalSegments - 1; StorageObject segObj = new RangeStorageObject(storageObject, offset, length); - try ( - CloseableIterator pages = reader.readSplit( - segObj, - projectedColumns, - batchSize, - false, - lastSplit, - resolvedAttributes - ) - ) { + CloseableIterator pages; + if (segmentIndex == 0) { + pages = reader.read(segObj, projectedColumns, batchSize, errorPolicy); + } else { + pages = reader.readSplit(segObj, projectedColumns, batchSize, false, lastSplit, resolvedAttributes, errorPolicy); + } + try (pages) { while (pages.hasNext()) { if (firstError.get() != null || closed) { break; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/SourceOperatorContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/SourceOperatorContext.java index a9aee66d15b7a..56cf93e224a48 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/SourceOperatorContext.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/SourceOperatorContext.java @@ -53,7 +53,8 @@ public record SourceOperatorContext( FileSet fileSet, @Nullable ExternalSplit split, Set partitionColumnNames, - @Nullable ExternalSliceQueue sliceQueue + @Nullable ExternalSliceQueue sliceQueue, + int parsingParallelism ) { public SourceOperatorContext { Check.notNull(path, "path cannot be null"); @@ -72,6 +73,9 @@ public record SourceOperatorContext( if (maxBufferSize <= 0) { throw new IllegalArgumentException("maxBufferSize must be positive, got: " + maxBufferSize); } + if (parsingParallelism < 1) { + throw new IllegalArgumentException("parsingParallelism must be >= 1, got: " + parsingParallelism); + } } public SourceOperatorContext( @@ -103,7 +107,8 @@ public SourceOperatorContext( fileSet, split, null, - null + null, + 1 ); } @@ -135,7 +140,8 @@ public SourceOperatorContext( fileSet, null, null, - null + null, + 1 ); } @@ -166,7 +172,8 @@ public SourceOperatorContext( null, null, null, - null + null, + 1 ); } @@ -195,7 +202,8 @@ public SourceOperatorContext( null, null, null, - null + null, + 1 ); } @@ -219,6 +227,7 @@ public static class Builder { private ExternalSplit split; private Set partitionColumnNames; private ExternalSliceQueue sliceQueue; + private int parsingParallelism = 1; public Builder sourceType(String sourceType) { this.sourceType = sourceType; @@ -295,6 +304,11 @@ public Builder sliceQueue(ExternalSliceQueue sliceQueue) { return this; } + public Builder parsingParallelism(int parsingParallelism) { + this.parsingParallelism = parsingParallelism; + return this; + } + public SourceOperatorContext build() { return new SourceOperatorContext( sourceType, @@ -311,7 +325,8 @@ public SourceOperatorContext build() { fileSet, split, partitionColumnNames, - sliceQueue + sliceQueue, + parsingParallelism ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 3613b20461ff3..d80b86a13562f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -1298,6 +1298,7 @@ private PhysicalOperation planExternalSource(ExternalSourceExec externalSource, .fileSet(fileSet) .partitionColumnNames(partitionColumnNames) .sliceQueue(sliceQueue) + .parsingParallelism(context.queryPragmas().parsingParallelism()) .build(); SourceOperator.SourceOperatorFactory factory = operatorFactoryRegistry.factory(operatorContext); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index 5cb192ee9b60a..b2b8819b7e46d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -110,6 +110,16 @@ public final class QueryPragmas implements Writeable { public static final Setting FORK_IMPLICIT_LIMIT = Setting.boolSetting("fork_implicit_limit", true); + /** + * Number of parallel parser threads for intra-file text format parsing (CSV, NDJSON). + * Defaults to allocated processors. Set to 1 to disable parallel parsing. + */ + public static final Setting PARSING_PARALLELISM = Setting.intSetting( + "parsing_parallelism", + EsExecutors.allocatedProcessors(Settings.EMPTY), + 1 + ); + /** * When {@code true}, forces all non-single-segment pages through {@code ValuesFromDocSequence} * regardless of the number of {@code BYTES_REF} fields. Intended for testing the correctness @@ -260,6 +270,10 @@ public String externalDistribution() { return EXTERNAL_DISTRIBUTION.get(settings); } + public int parsingParallelism() { + return PARSING_PARALLELISM.get(settings); + } + /** * Returns the effective doc-sequence threshold. When {@link #FORCE_DOC_SEQUENCE} is * {@code true}, returns {@code 0} so that all non-single-segment pages use diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/datasources/AsyncExternalSourceOperatorFactoryTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/datasources/AsyncExternalSourceOperatorFactoryTests.java index c2ed608b0bc1c..938c0dc133dcc 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/datasources/AsyncExternalSourceOperatorFactoryTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/datasources/AsyncExternalSourceOperatorFactoryTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit; import org.elasticsearch.xpack.esql.datasources.spi.FormatReader; +import org.elasticsearch.xpack.esql.datasources.spi.SegmentableFormatReader; import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata; import org.elasticsearch.xpack.esql.datasources.spi.StorageObject; import org.elasticsearch.xpack.esql.datasources.spi.StoragePath; @@ -1051,6 +1052,211 @@ public void testSliceQueueMultipleSplitsWithMixedOffsets() throws Exception { operator.close(); } + // ===== Parallel parsing tests ===== + + public void testParallelParsingUsedForSegmentableReader() throws Exception { + TrackingSegmentableFormatReader formatReader = new TrackingSegmentableFormatReader(); + LargeStorageProvider storageProvider = new LargeStorageProvider(3 * 1024 * 1024); + + StoragePath path = StoragePath.of("file:///data/large.csv"); + List attributes = List.of( + new FieldAttribute( + Source.EMPTY, + "value", + new EsField("value", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE) + ) + ); + + DriverContext driverContext = mock(DriverContext.class); + doAnswer(inv -> null).when(driverContext).addAsyncAction(); + doAnswer(inv -> null).when(driverContext).removeAsyncAction(); + + AsyncExternalSourceOperatorFactory factory = new AsyncExternalSourceOperatorFactory( + storageProvider, + formatReader, + path, + attributes, + 100, + 10, + FormatReader.NO_LIMIT, + (Runnable r) -> r.run(), + null, + null, + null, + null, + null, + 2 + ); + + SourceOperator operator = factory.get(driverContext); + List pages = new ArrayList<>(); + while (operator.isFinished() == false) { + Page page = operator.getOutput(); + if (page != null) { + pages.add(page); + } + } + + assertTrue("readSplit should be called for non-first segments", formatReader.readSplitCount.get() > 0); + assertEquals("read() should be called once for the first segment (handles header)", 1, formatReader.readCount.get()); + + for (Page p : pages) { + p.releaseBlocks(); + } + operator.close(); + } + + public void testParallelParsingSkippedWithRowLimit() throws Exception { + TrackingSegmentableFormatReader formatReader = new TrackingSegmentableFormatReader(); + LargeStorageProvider storageProvider = new LargeStorageProvider(3 * 1024 * 1024); + + StoragePath path = StoragePath.of("file:///data/large.csv"); + List attributes = List.of( + new FieldAttribute( + Source.EMPTY, + "value", + new EsField("value", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE) + ) + ); + + DriverContext driverContext = mock(DriverContext.class); + doAnswer(inv -> null).when(driverContext).addAsyncAction(); + doAnswer(inv -> null).when(driverContext).removeAsyncAction(); + + AsyncExternalSourceOperatorFactory factory = new AsyncExternalSourceOperatorFactory( + storageProvider, + formatReader, + path, + attributes, + 100, + 10, + 10, + (Runnable r) -> r.run(), + null, + null, + null, + null, + null, + 2 + ); + + SourceOperator operator = factory.get(driverContext); + List pages = new ArrayList<>(); + while (operator.isFinished() == false) { + Page page = operator.getOutput(); + if (page != null) { + pages.add(page); + } + } + + assertTrue("read() should be called when row limit is set", formatReader.readCount.get() > 0); + assertEquals("readSplit should not be called when row limit bypasses parallel parsing", 0, formatReader.readSplitCount.get()); + + for (Page p : pages) { + p.releaseBlocks(); + } + operator.close(); + } + + public void testParallelParsingSkippedForNonSegmentableReader() throws Exception { + AtomicInteger readCount = new AtomicInteger(0); + FormatReader formatReader = new PageCountingFormatReader(readCount); + LargeStorageProvider storageProvider = new LargeStorageProvider(3 * 1024 * 1024); + + StoragePath path = StoragePath.of("file:///data/large.parquet"); + List attributes = List.of( + new FieldAttribute( + Source.EMPTY, + "value", + new EsField("value", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE) + ) + ); + + DriverContext driverContext = mock(DriverContext.class); + doAnswer(inv -> null).when(driverContext).addAsyncAction(); + doAnswer(inv -> null).when(driverContext).removeAsyncAction(); + + AsyncExternalSourceOperatorFactory factory = new AsyncExternalSourceOperatorFactory( + storageProvider, + formatReader, + path, + attributes, + 100, + 10, + FormatReader.NO_LIMIT, + (Runnable r) -> r.run(), + null, + null, + null, + null, + null, + 2 + ); + + SourceOperator operator = factory.get(driverContext); + List pages = new ArrayList<>(); + while (operator.isFinished() == false) { + Page page = operator.getOutput(); + if (page != null) { + pages.add(page); + } + } + + assertTrue("read() should be called for non-segmentable reader", readCount.get() > 0); + + for (Page p : pages) { + p.releaseBlocks(); + } + operator.close(); + } + + public void testDescribeShowsParallelParseMode() { + TrackingSegmentableFormatReader formatReader = new TrackingSegmentableFormatReader(); + StorageProvider storageProvider = mock(StorageProvider.class); + + AsyncExternalSourceOperatorFactory factory = new AsyncExternalSourceOperatorFactory( + storageProvider, + formatReader, + StoragePath.of("file:///test.csv"), + List.of( + new FieldAttribute(Source.EMPTY, "x", new EsField("x", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE)) + ), + 100, + 10, + FormatReader.NO_LIMIT, + Runnable::run, + null, + null, + null, + null, + null, + 4 + ); + + String description = factory.describe(); + assertTrue("describe should mention parallel-parse for segmentable readers", description.contains("parallel-parse(4)")); + } + + public void testDescribeShowsSyncWrapperForParallelism1() { + TrackingSegmentableFormatReader formatReader = new TrackingSegmentableFormatReader(); + StorageProvider storageProvider = mock(StorageProvider.class); + + AsyncExternalSourceOperatorFactory factory = new AsyncExternalSourceOperatorFactory( + storageProvider, + formatReader, + StoragePath.of("file:///test.csv"), + List.of( + new FieldAttribute(Source.EMPTY, "x", new EsField("x", DataType.INTEGER, Map.of(), false, EsField.TimeSeriesFieldType.NONE)) + ), + 100, + 10, + Runnable::run + ); + + String description = factory.describe(); + assertTrue("describe should show sync-wrapper when parallelism is 1", description.contains("sync-wrapper")); + } + // ===== Helpers ===== private static CloseableIterator emptyIterator() { @@ -1361,6 +1567,205 @@ public List fileExtensions() { public void close() {} } + /** + * Format reader that implements SegmentableFormatReader and tracks which methods are called. + */ + private static class TrackingSegmentableFormatReader implements SegmentableFormatReader { + final AtomicInteger readCount = new AtomicInteger(0); + final AtomicInteger readSplitCount = new AtomicInteger(0); + + @Override + public long findNextRecordBoundary(InputStream stream) throws IOException { + byte[] buf = new byte[1]; + int total = 0; + while (stream.read(buf) > 0) { + total++; + if (buf[0] == '\n') { + return total; + } + } + return -1; + } + + @Override + public SourceMetadata metadata(StorageObject object) { + return null; + } + + @Override + public CloseableIterator read(StorageObject object, List projectedColumns, int batchSize) { + readCount.incrementAndGet(); + return singleTestPageIterator(); + } + + @Override + public CloseableIterator readSplit( + StorageObject object, + List projectedColumns, + int batchSize, + boolean skipFirstLine, + boolean lastSplit, + List resolvedAttributes + ) { + readSplitCount.incrementAndGet(); + return singleTestPageIterator(); + } + + @Override + public CloseableIterator readSplit( + StorageObject object, + List projectedColumns, + int batchSize, + boolean skipFirstLine, + boolean lastSplit, + List resolvedAttributes, + org.elasticsearch.xpack.esql.datasources.spi.ErrorPolicy errorPolicy + ) { + readSplitCount.incrementAndGet(); + return singleTestPageIterator(); + } + + private static CloseableIterator singleTestPageIterator() { + Page page = createTestPage(); + return new CloseableIterator<>() { + private boolean consumed = false; + + @Override + public boolean hasNext() { + return consumed == false; + } + + @Override + public Page next() { + if (consumed) throw new NoSuchElementException(); + consumed = true; + return page; + } + + @Override + public void close() {} + }; + } + + @Override + public String formatName() { + return "test-segmentable"; + } + + @Override + public List fileExtensions() { + return List.of(".csv"); + } + + @Override + public void close() {} + } + + private static class LargeStorageProvider implements StorageProvider { + private final long fileSize; + + LargeStorageProvider(long fileSize) { + this.fileSize = fileSize; + } + + @Override + public StorageObject newObject(StoragePath path) { + return new LargeStorageObject(path, fileSize); + } + + @Override + public StorageObject newObject(StoragePath path, long length) { + return new LargeStorageObject(path, fileSize); + } + + @Override + public StorageObject newObject(StoragePath path, long length, Instant lastModified) { + return new LargeStorageObject(path, fileSize); + } + + @Override + public StorageIterator listObjects(StoragePath prefix, boolean recursive) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean exists(StoragePath path) { + return true; + } + + @Override + public List supportedSchemes() { + return List.of("file"); + } + + @Override + public void close() {} + } + + private static class LargeStorageObject implements StorageObject { + private final StoragePath path; + private final long size; + + LargeStorageObject(StoragePath path, long size) { + this.path = path; + this.size = size; + } + + @Override + public InputStream newStream() { + return newLineStream(size); + } + + @Override + public InputStream newStream(long position, long length) { + return newLineStream(length); + } + + private static InputStream newLineStream(long length) { + return new InputStream() { + private long remaining = length; + + @Override + public int read() { + if (remaining <= 0) return -1; + remaining--; + return '\n'; + } + + @Override + public int read(byte[] b, int off, int len) { + if (remaining <= 0) return -1; + int toRead = (int) Math.min(len, remaining); + for (int i = 0; i < toRead; i++) { + b[off + i] = '\n'; + } + remaining -= toRead; + return toRead; + } + }; + } + + @Override + public long length() { + return size; + } + + @Override + public Instant lastModified() { + return Instant.EPOCH; + } + + @Override + public boolean exists() { + return true; + } + + @Override + public StoragePath path() { + return path; + } + } + /** * Test async format reader that returns empty pages via async callback. */