Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/145755.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 145755
summary: Per-file filter pushdown awareness
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public PushdownResult pushFilters(List<Expression> filters) {
}

logger.debug("ORC filter pushdown: validated {} of {} expressions for pushdown", pushed.size(), filters.size());
return new PushdownResult(new OrcPushedExpressions(pushed), remainder);
return new PushdownResult(new OrcPushedExpressions(pushed), pushed, remainder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public PushdownResult pushFilters(List<Expression> filters) {
}

logger.debug("Parquet filter pushdown: validated {} of {} expressions for pushdown", pushed.size(), filters.size());
return new PushdownResult(new ParquetPushedExpressions(pushed), remainder);
return new PushdownResult(new ParquetPushedExpressions(pushed), pushed, remainder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.datasources.spi.ErrorPolicy;
import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit;
import org.elasticsearch.xpack.esql.datasources.spi.FileList;
import org.elasticsearch.xpack.esql.datasources.spi.FilterPushdownSupport;
import org.elasticsearch.xpack.esql.datasources.spi.FormatReadContext;
import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
import org.elasticsearch.xpack.esql.datasources.spi.RangeAwareFormatReader;
Expand All @@ -30,6 +34,8 @@
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -77,6 +83,8 @@ public class AsyncExternalSourceOperatorFactory implements SourceOperator.Source
private final ErrorPolicy errorPolicy;
private final int parsingParallelism;
private final TimeValue drainTimeout;
private final List<Expression> pushedExpressions;
private final FilterPushdownSupport pushdownSupport;

public AsyncExternalSourceOperatorFactory(
StorageProvider storageProvider,
Expand All @@ -93,7 +101,9 @@ public AsyncExternalSourceOperatorFactory(
ExternalSliceQueue sliceQueue,
ErrorPolicy errorPolicy,
int parsingParallelism,
TimeValue drainTimeout
TimeValue drainTimeout,
@Nullable List<Expression> pushedExpressions,
@Nullable FilterPushdownSupport pushdownSupport
) {
if (storageProvider == null) {
throw new IllegalArgumentException("storageProvider cannot be null");
Expand Down Expand Up @@ -132,6 +142,46 @@ public AsyncExternalSourceOperatorFactory(
this.errorPolicy = errorPolicy != null ? errorPolicy : formatReader.defaultErrorPolicy();
this.parsingParallelism = Math.max(1, parsingParallelism);
this.drainTimeout = drainTimeout != null ? drainTimeout : ExternalSourceDrainUtils.DEFAULT_DRAIN_TIMEOUT;
this.pushedExpressions = pushedExpressions != null ? pushedExpressions : List.of();
this.pushdownSupport = pushdownSupport;
}

public AsyncExternalSourceOperatorFactory(
StorageProvider storageProvider,
FormatReader formatReader,
StoragePath path,
List<Attribute> attributes,
int batchSize,
int maxBufferSize,
int rowLimit,
Executor executor,
FileList fileList,
Set<String> partitionColumnNames,
Map<String, Object> partitionValues,
ExternalSliceQueue sliceQueue,
ErrorPolicy errorPolicy,
int parsingParallelism,
TimeValue drainTimeout
) {
this(
storageProvider,
formatReader,
path,
attributes,
batchSize,
maxBufferSize,
rowLimit,
executor,
fileList,
partitionColumnNames,
partitionValues,
sliceQueue,
errorPolicy,
parsingParallelism,
drainTimeout,
null,
null
);
}

public AsyncExternalSourceOperatorFactory(
Expand Down Expand Up @@ -165,6 +215,8 @@ public AsyncExternalSourceOperatorFactory(
sliceQueue,
errorPolicy,
parsingParallelism,
null,
null,
null
);
}
Expand Down Expand Up @@ -199,6 +251,8 @@ public AsyncExternalSourceOperatorFactory(
sliceQueue,
errorPolicy,
1,
null,
null,
null
);
}
Expand Down Expand Up @@ -232,6 +286,8 @@ public AsyncExternalSourceOperatorFactory(
sliceQueue,
null,
1,
null,
null,
null
);
}
Expand Down Expand Up @@ -264,6 +320,8 @@ public AsyncExternalSourceOperatorFactory(
sliceQueue,
null,
1,
null,
null,
null
);
}
Expand Down Expand Up @@ -295,6 +353,8 @@ public AsyncExternalSourceOperatorFactory(
null,
null,
1,
null,
null,
null
);
}
Expand Down Expand Up @@ -324,6 +384,8 @@ public AsyncExternalSourceOperatorFactory(
null,
null,
1,
null,
null,
null
);
}
Expand Down Expand Up @@ -352,6 +414,8 @@ public AsyncExternalSourceOperatorFactory(
null,
null,
1,
null,
null,
null
);
}
Expand Down Expand Up @@ -412,6 +476,65 @@ private CloseableIterator<Page> adaptSchema(
return new SchemaAdaptingIterator(pages, dataColumns, mapping, driverContext.blockFactory());
}

/**
* Returns a format reader with an adapted pushed filter for this file, or the original reader
* if no adaptation is needed. Adaptation is needed when the file has missing columns and
* pushed expressions reference those columns.
*/
private FormatReader readerForFile(FileSplit fileSplit) {
if (pushedExpressions.isEmpty() || pushdownSupport == null) {
return formatReader;
}
SchemaReconciliation.ColumnMapping mapping = fileSplit.columnMapping();
if (mapping == null || (mapping.hasMissingColumns() == false && mapping.hasCasts() == false)) {
return formatReader;
}
Set<String> fileColumnNames = new LinkedHashSet<>();
Map<String, DataType> fileColumnTypes = new HashMap<>();
assert mapping.columnCount() <= attributes.size()
: "column mapping count [" + mapping.columnCount() + "] exceeds attributes size [" + attributes.size() + "]";
for (int i = 0; i < mapping.columnCount(); i++) {
if (mapping.localIndex(i) != -1) {
String name = attributes.get(i).name();
fileColumnNames.add(name);
DataType castTarget = mapping.cast(i);
if (castTarget != null) {
DataType fileType = inferFileType(attributes.get(i).dataType(), castTarget);
if (fileType != null) {
fileColumnTypes.put(name, fileType);
}
}
}
}
List<Expression> adapted = FilterAdaptation.adaptFilterForFile(pushedExpressions, fileColumnNames, fileColumnTypes);
if (adapted.isEmpty()) {
return formatReader.withPushedFilter(null);
}
FilterPushdownSupport.PushdownResult result = pushdownSupport.pushFilters(adapted);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would probably be useful if we could cash the resolution at a level higher than per file (at some point).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — we could cache the adapted PushdownResult keyed on the set of missing/widened columns so files with identical schemas share one translation.

if (result.hasPushedFilter()) {
return formatReader.withPushedFilter(result.pushedFilter());
}
return formatReader.withPushedFilter(null);
}

/**
* Infers the file's native type from the unified attribute type and the cast target.
* The cast target is the unified (wider) type; the file has the narrower type.
*/
/**
* Infers the file's native type from the cast target. Only returns a narrower type when
* the adaptation is safe for integral comparisons (LONG→INTEGER). DOUBLE→INTEGER narrowing
* is not supported because literal truncation can cause incorrect predicate semantics.
*/
Comment on lines +520 to +528
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: redundancy

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — merged the two javadoc blocks into one.

private static DataType inferFileType(DataType unifiedType, DataType castTarget) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unifiedType isn't used.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed — the parameter was left over from when DOUBLE→INTEGER was considered.

if (castTarget == DataType.LONG) {
return DataType.INTEGER;
}
// DOUBLE→INTEGER narrowing is intentionally not supported: Number.longValue() truncates
// fractional values, which can change comparison semantics (e.g., col < 2.7 vs col < 2).
Comment on lines +533 to +534
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if the methods stays, this can be a javadoc comment and the method simplified.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — moved inline comment into the javadoc and removed the redundant one.

return null;
}

private void startSliceQueueRead(AsyncExternalSourceBuffer buffer, DriverContext driverContext) {
executor.execute(() -> {
try {
Expand All @@ -437,9 +560,10 @@ private void startSliceQueueRead(AsyncExternalSourceBuffer buffer, DriverContext
}
List<String> cols = projectedColumns(injector);

FormatReader fileReader = readerForFile(fileSplit);
boolean isRangeSplit = "true".equals(fileSplit.config().get(FileSplitProvider.RANGE_SPLIT_KEY));
CloseableIterator<Page> pages;
if (isRangeSplit && formatReader instanceof RangeAwareFormatReader rangeReader) {
if (isRangeSplit && fileReader instanceof RangeAwareFormatReader rangeReader) {
String fileLengthStr = (String) fileSplit.config().get(FileSplitProvider.FILE_LENGTH_KEY);
StorageObject fullObj = fileLengthStr != null
? storageProvider.newObject(fileSplit.path(), Long.parseLong(fileLengthStr))
Expand Down Expand Up @@ -470,7 +594,7 @@ private void startSliceQueueRead(AsyncExternalSourceBuffer buffer, DriverContext
.firstSplit(firstSplit)
.lastSplit(lastSplit)
.build();
pages = formatReader.read(obj, ctx);
pages = fileReader.read(obj, ctx);
}
CloseableIterator<Page> adaptedPages = adaptSchema(pages, fileSplit.columnMapping(), driverContext);
try (adaptedPages) {
Expand All @@ -493,6 +617,11 @@ private void startSliceQueueRead(AsyncExternalSourceBuffer buffer, DriverContext
});
}

/**
* Multi-file read path (legacy, non-slice-queue). Per-file filter adaptation is not applied
* here because this path does not carry {@link FileSplit} with {@link SchemaReconciliation.ColumnMapping};
* UNION_BY_NAME queries use the slice-queue path ({@link #startSliceQueueRead}) instead.
*/
private void startMultiFileRead(
List<String> projectedColumns,
AsyncExternalSourceBuffer buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
package org.elasticsearch.xpack.esql.datasources;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.util.Check;
import org.elasticsearch.xpack.esql.datasources.spi.ErrorPolicy;
import org.elasticsearch.xpack.esql.datasources.spi.ExternalSourceFactory;
import org.elasticsearch.xpack.esql.datasources.spi.FilterPushdownSupport;
import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata;
import org.elasticsearch.xpack.esql.datasources.spi.SourceOperatorFactoryProvider;
Expand All @@ -20,6 +22,7 @@
import org.elasticsearch.xpack.esql.datasources.spi.StorageProvider;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -138,6 +141,11 @@ public SourceOperatorFactoryProvider operatorFactory() {
partitionValues = fileSplit.partitionValues();
}

List<Expression> pushedExpressions = context.pushedExpressions();
FilterPushdownSupport pushdownSupport = (pushedExpressions != null && pushedExpressions.isEmpty() == false)
? format.filterPushdownSupport()
: null;

return new AsyncExternalSourceOperatorFactory(
storage,
format,
Expand All @@ -153,7 +161,9 @@ public SourceOperatorFactoryProvider operatorFactory() {
context.sliceQueue(),
errorPolicy,
context.parsingParallelism(),
null
null,
pushedExpressions,
pushdownSupport
);
};
}
Expand Down
Loading
Loading