Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 183 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -68,6 +72,7 @@ static CloseableIterable<FileScanTask> planFiles(
Expression rowFilter = context.rowFilter();
boolean caseSensitive = context.caseSensitive();
boolean ignoreResiduals = context.ignoreResiduals();
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;

LoadingCache<Integer, ManifestEvaluator> evalCache =
Caffeine.newBuilder()
Expand All @@ -77,14 +82,18 @@ static CloseableIterable<FileScanTask> planFiles(
PartitionSpec transformedSpec = BaseFilesTable.transformSpec(tableSchema, spec);
return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive);
});
ManifestContentEvaluator manifestContentEvaluator =
new ManifestContentEvaluator(filter, tableSchema.asStruct(), caseSensitive);

CloseableIterable<ManifestFile> filteredManifests =
CloseableIterable.filter(
manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
manifests,
manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest)
&& manifestContentEvaluator.eval(manifest));

String schemaString = SchemaParser.toJson(projectedSchema);
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(
Expand All @@ -94,6 +103,178 @@ static CloseableIterable<FileScanTask> planFiles(
table, manifest, projectedSchema, schemaString, specString, residuals));
}

/**
* Evaluates an {@link Expression} on a {@link ManifestFile} to test whether a given data or
* delete manifests shall be included in the scan
*/
private static class ManifestContentEvaluator {

private final Expression boundExpr;

private ManifestContentEvaluator(
Expression expr, Types.StructType structType, boolean caseSensitive) {
Expression rewritten = Expressions.rewriteNot(expr);
this.boundExpr = Binder.bind(structType, rewritten, caseSensitive);
}

private boolean eval(ManifestFile manifest) {
return new ManifestEvalVisitor().eval(manifest);
}

private class ManifestEvalVisitor extends ExpressionVisitors.BoundExpressionVisitor<Boolean> {

private int manifestContentId;

private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;

private boolean eval(ManifestFile manifestFile) {
this.manifestContentId = manifestFile.content().id();
return ExpressionVisitors.visitEvaluator(boundExpr, this);
}

@Override
public Boolean alwaysTrue() {
return ROWS_MIGHT_MATCH;
}

@Override
public Boolean alwaysFalse() {
return ROWS_CANNOT_MATCH;
}

@Override
public Boolean not(Boolean result) {
return !result;
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
}

@Override
public <T> Boolean isNull(BoundReference<T> ref) {
if (fileContent(ref)) {
return ROWS_CANNOT_MATCH; // date_file.content should not be null
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNull(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean isNaN(BoundReference<T> ref) {
if (fileContent(ref)) {
return ROWS_CANNOT_MATCH; // date_file.content should not be nan
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNaN(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
if (fileContent(ref)) {
Literal<Integer> intLit = lit.to(Types.IntegerType.get());
if (!contentMatch(intLit.value())) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
if (fileContent(ref)) {
Literal<Integer> intLit = lit.to(Types.IntegerType.get());
if (contentMatch(intLit.value())) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
if (fileContent(ref)) {
if (literalSet.stream().noneMatch(lit -> contentMatch((Integer) lit))) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
if (fileContent(ref)) {
if (literalSet.stream().anyMatch(lit -> contentMatch((Integer) lit))) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

private <T> boolean fileContent(BoundReference<T> ref) {
return ref.fieldId() == DataFile.CONTENT.fieldId();
}

private <T> boolean contentMatch(Integer fileContentId) {
if (FileContent.DATA.id() == fileContentId) {
return ManifestContent.DATA.id() == manifestContentId;
} else if (FileContent.EQUALITY_DELETES.id() == fileContentId
|| FileContent.POSITION_DELETES.id() == fileContentId) {
return ManifestContent.DELETES.id() == manifestContentId;
} else {
return false;
}
}
}
}

static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Schema projection;
private final Schema fileProjection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ protected static List<Object> parameters() {
return Arrays.asList(1, 2);
}

protected Set<String> actualManifestListPaths(TableScan allManifestsTableScan) {
return StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false)
.map(t -> (AllManifestsTable.ManifestListReadTask) t)
protected Set<String> scannedPaths(TableScan scan) {
return StreamSupport.stream(scan.planFiles().spliterator(), false)
.map(t -> t.file().path().toString())
.collect(Collectors.toSet());
}
Expand Down
Loading