Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/143997.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 143997
summary: "[ES|QL|DS] Wire parallel parsing into production for text formats"
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.datasources.spi.ErrorPolicy;
import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit;
import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
import org.elasticsearch.xpack.esql.datasources.spi.SegmentableFormatReader;
import org.elasticsearch.xpack.esql.datasources.spi.StorageObject;
import org.elasticsearch.xpack.esql.datasources.spi.StoragePath;
import org.elasticsearch.xpack.esql.datasources.spi.StorageProvider;
Expand Down Expand Up @@ -54,6 +57,8 @@
*/
public class AsyncExternalSourceOperatorFactory implements SourceOperator.SourceOperatorFactory {

private static final Logger logger = LogManager.getLogger(AsyncExternalSourceOperatorFactory.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: not used.


private final StorageProvider storageProvider;
private final FormatReader formatReader;
private final StoragePath path;
Expand All @@ -67,6 +72,7 @@ public class AsyncExternalSourceOperatorFactory implements SourceOperator.Source
private final Map<String, Object> partitionValues;
private final ExternalSliceQueue sliceQueue;
private final ErrorPolicy errorPolicy;
private final int parsingParallelism;

public AsyncExternalSourceOperatorFactory(
StorageProvider storageProvider,
Expand All @@ -81,7 +87,8 @@ public AsyncExternalSourceOperatorFactory(
Set<String> partitionColumnNames,
Map<String, Object> partitionValues,
ExternalSliceQueue sliceQueue,
ErrorPolicy errorPolicy
ErrorPolicy errorPolicy,
int parsingParallelism
) {
if (storageProvider == null) {
throw new IllegalArgumentException("storageProvider cannot be null");
Expand Down Expand Up @@ -118,6 +125,40 @@ public AsyncExternalSourceOperatorFactory(
this.partitionValues = partitionValues != null ? partitionValues : Map.of();
this.sliceQueue = sliceQueue;
this.errorPolicy = errorPolicy != null ? errorPolicy : formatReader.defaultErrorPolicy();
this.parsingParallelism = Math.max(1, parsingParallelism);
}

public AsyncExternalSourceOperatorFactory(
StorageProvider storageProvider,
FormatReader formatReader,
StoragePath path,
List<Attribute> attributes,
int batchSize,
int maxBufferSize,
int rowLimit,
Executor executor,
FileSet fileSet,
Set<String> partitionColumnNames,
Map<String, Object> partitionValues,
ExternalSliceQueue sliceQueue,
ErrorPolicy errorPolicy
) {
this(
storageProvider,
formatReader,
path,
attributes,
batchSize,
maxBufferSize,
rowLimit,
executor,
fileSet,
partitionColumnNames,
partitionValues,
sliceQueue,
errorPolicy,
1
);
}

public AsyncExternalSourceOperatorFactory(
Expand Down Expand Up @@ -147,7 +188,8 @@ public AsyncExternalSourceOperatorFactory(
partitionColumnNames,
partitionValues,
sliceQueue,
null
null,
1
);
}

Expand Down Expand Up @@ -177,7 +219,8 @@ public AsyncExternalSourceOperatorFactory(
partitionColumnNames,
partitionValues,
sliceQueue,
null
null,
1
);
}

Expand Down Expand Up @@ -206,7 +249,8 @@ public AsyncExternalSourceOperatorFactory(
partitionColumnNames,
partitionValues,
null,
null
null,
1
);
}

Expand All @@ -233,7 +277,8 @@ public AsyncExternalSourceOperatorFactory(
null,
null,
null,
null
null,
1
);
}

Expand All @@ -259,7 +304,8 @@ public AsyncExternalSourceOperatorFactory(
null,
null,
null,
null
null,
1
);
}

Expand Down Expand Up @@ -377,13 +423,29 @@ private void startMultiFileRead(
executor.execute(() -> {
try {
int rowsRemaining = rowLimit;
boolean useParallel = rowLimit == FormatReader.NO_LIMIT && formatReader instanceof SegmentableFormatReader;
for (StorageEntry entry : fileSet.files()) {
if (buffer.noMoreInputs() || (rowLimit != FormatReader.NO_LIMIT && rowsRemaining <= 0)) {
break;
}
StorageObject obj = storageProvider.newObject(entry.path(), entry.length(), entry.lastModified());
int fileBudget = rowLimit == FormatReader.NO_LIMIT ? FormatReader.NO_LIMIT : rowsRemaining;
try (CloseableIterator<Page> pages = formatReader.read(obj, projectedColumns, batchSize, fileBudget, errorPolicy)) {
CloseableIterator<Page> pages;
if (useParallel) {
pages = ParallelParsingCoordinator.parallelRead(
(SegmentableFormatReader) formatReader,
obj,
projectedColumns,
batchSize,
parsingParallelism,
executor,
attributes,
errorPolicy
);
} else {
int fileBudget = rowLimit == FormatReader.NO_LIMIT ? FormatReader.NO_LIMIT : rowsRemaining;
pages = formatReader.read(obj, projectedColumns, batchSize, fileBudget, errorPolicy);
}
try (pages) {
int consumed = drainPagesWithBudget(pages, buffer, injector);
if (rowLimit != FormatReader.NO_LIMIT) {
rowsRemaining -= consumed;
Expand Down Expand Up @@ -432,7 +494,20 @@ private void startSyncWrapperRead(
executor.execute(() -> {
CloseableIterator<Page> pages = null;
try {
pages = formatReader.read(storageObject, projectedColumns, batchSize, rowLimit, errorPolicy);
if (rowLimit == FormatReader.NO_LIMIT && formatReader instanceof SegmentableFormatReader segmentable) {
pages = ParallelParsingCoordinator.parallelRead(
segmentable,
storageObject,
projectedColumns,
batchSize,
parsingParallelism,
executor,
attributes,
errorPolicy
);
} else {
pages = formatReader.read(storageObject, projectedColumns, batchSize, rowLimit, errorPolicy);
}
consumePages(pages, buffer, injector);
} catch (Exception e) {
buffer.onFailure(e);
Expand Down Expand Up @@ -540,7 +615,14 @@ private static void closeQuietly(CloseableIterator<?> iterator) {

@Override
public String describe() {
String asyncMode = formatReader.supportsNativeAsync() ? "native-async" : "sync-wrapper";
String asyncMode;
if (formatReader instanceof SegmentableFormatReader && parsingParallelism > 1) {
asyncMode = "parallel-parse(" + parsingParallelism + ")";
} else if (formatReader.supportsNativeAsync()) {
asyncMode = "native-async";
} else {
asyncMode = "sync-wrapper";
}
return "AsyncExternalSourceOperator["
+ "storage="
+ storageProvider.getClass().getSimpleName()
Expand Down Expand Up @@ -608,4 +690,8 @@ public ExternalSliceQueue sliceQueue() {
public ErrorPolicy errorPolicy() {
return errorPolicy;
}

public int parsingParallelism() {
return parsingParallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public SourceOperatorFactoryProvider operatorFactory() {
context.partitionColumnNames(),
partitionValues,
context.sliceQueue(),
errorPolicy
errorPolicy,
context.parsingParallelism()
);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.datasources.spi.ErrorPolicy;
import org.elasticsearch.xpack.esql.datasources.spi.SegmentableFormatReader;
import org.elasticsearch.xpack.esql.datasources.spi.StorageObject;

Expand Down Expand Up @@ -74,21 +75,48 @@ public static CloseableIterator<Page> parallelRead(
int parallelism,
Executor executor,
List<Attribute> resolvedAttributes
) throws IOException {
return parallelRead(reader, storageObject, projectedColumns, batchSize, parallelism, executor, resolvedAttributes, null);
}

/**
* Creates a parallel-parsing iterator with an explicit error policy.
*
* @param errorPolicy error handling policy for per-segment parsing, or {@code null} for defaults
*/
public static CloseableIterator<Page> parallelRead(
SegmentableFormatReader reader,
StorageObject storageObject,
List<String> projectedColumns,
int batchSize,
int parallelism,
Executor executor,
List<Attribute> resolvedAttributes,
ErrorPolicy errorPolicy
) throws IOException {
long fileLength = storageObject.length();
long minSegment = reader.minimumSegmentSize();

if (parallelism <= 1 || fileLength < minSegment * 2) {
return reader.readSplit(storageObject, projectedColumns, batchSize, false, true, resolvedAttributes);
return reader.read(storageObject, projectedColumns, batchSize, errorPolicy);
}

List<long[]> segments = computeSegments(reader, storageObject, fileLength, parallelism, minSegment);

if (segments.size() <= 1) {
return reader.readSplit(storageObject, projectedColumns, batchSize, false, true, resolvedAttributes);
return reader.read(storageObject, projectedColumns, batchSize, errorPolicy);
}

return new OrderedParallelIterator(reader, storageObject, projectedColumns, batchSize, segments, executor, resolvedAttributes);
return new OrderedParallelIterator(
reader,
storageObject,
projectedColumns,
batchSize,
segments,
executor,
resolvedAttributes,
errorPolicy
);
}

/**
Expand Down Expand Up @@ -162,6 +190,7 @@ private static final class OrderedParallelIterator implements CloseableIterator<
private final List<String> projectedColumns;
private final int batchSize;
private final List<Attribute> resolvedAttributes;
private final ErrorPolicy errorPolicy;

private final List<BlockingQueue<Page>> segmentQueues;
private final AtomicReference<Throwable> firstError = new AtomicReference<>();
Expand All @@ -178,13 +207,15 @@ private static final class OrderedParallelIterator implements CloseableIterator<
int batchSize,
List<long[]> segments,
Executor executor,
List<Attribute> resolvedAttributes
List<Attribute> resolvedAttributes,
ErrorPolicy errorPolicy
) {
this.reader = reader;
this.storageObject = storageObject;
this.projectedColumns = projectedColumns;
this.batchSize = batchSize;
this.resolvedAttributes = resolvedAttributes;
this.errorPolicy = errorPolicy;
this.allDone = new CountDownLatch(segments.size());

this.segmentQueues = new ArrayList<>(segments.size());
Expand All @@ -211,16 +242,13 @@ private void parseSegment(int segmentIndex, long offset, long length, int totalS
boolean lastSplit = segmentIndex == totalSegments - 1;
StorageObject segObj = new RangeStorageObject(storageObject, offset, length);

try (
CloseableIterator<Page> pages = reader.readSplit(
segObj,
projectedColumns,
batchSize,
false,
lastSplit,
resolvedAttributes
)
) {
CloseableIterator<Page> pages;
if (segmentIndex == 0) {
pages = reader.read(segObj, projectedColumns, batchSize, errorPolicy);
} else {
pages = reader.readSplit(segObj, projectedColumns, batchSize, false, lastSplit, resolvedAttributes, errorPolicy);
}
try (pages) {
while (pages.hasNext()) {
if (firstError.get() != null || closed) {
break;
Expand Down
Loading
Loading