Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 210 additions & 8 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,27 @@
package org.apache.iceberg;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

Expand All @@ -43,6 +52,7 @@
* This table may return duplicate rows.
*/
public class AllManifestsTable extends BaseMetadataTable {
private static final int REF_SNAPSHOT_ID = 18;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we keeping this a constant because we refer to it again in the isSnapshotRef function? I just wonder because we don't do this for any of our other fields in the schema.

Copy link
Member Author

@szehon-ho szehon-ho Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's it actually, its used in two places. Cleaner just to put 18 in both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side comment / non-blocking: I'm a big fan of using named constants for schema element IDs.

Should we update to use named position constants throughout, particularly for schemas like the metadata tables that are evolving (note that content is listed first but has ID 14). But is the cherry-picking that would be required by a large diff like this considered an unnecessary change?

Copy link
Member Author

@szehon-ho szehon-ho Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I am not sure if there is consensus, Russell made a discussion in #4847 (comment) but didn't finished (and for the other constants, they are never referenced outside). But If so, we should do it in another pr as its unrelated.

private static final Schema MANIFEST_FILE_SCHEMA = new Schema(
Types.NestedField.required(14, "content", Types.IntegerType.get()),
Types.NestedField.required(1, "path", Types.StringType.get()),
Expand All @@ -60,7 +70,8 @@ public class AllManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())
)))
))),
Types.NestedField.required(REF_SNAPSHOT_ID, "reference_snapshot_id", Types.LongType.get())
);

AllManifestsTable(TableOperations ops, Table table) {
Expand Down Expand Up @@ -112,21 +123,25 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> {
SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you want to keep it on one line, you could just use MANIFEST_FILE_SCHEMA in the constructor of the evaluator class (you have access to it there)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might keep it as argument, the class is more re-usable if it takes the schema as an argument, was thinking we could add this for other metadata tables at some point.

isCaseSensitive());
Iterable<Snapshot> filteredSnapshots = Iterables.filter(table().snapshots(), snapshotEvaluator::eval);

return CloseableIterable.withNoopClose(Iterables.transform(filteredSnapshots, snap -> {
if (snap.manifestListLocation() != null) {
DataFile manifestListAsDataFile = DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(io.newInputFile(snap.manifestListLocation()))
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
return new ManifestListReadTask(io, schema(), specs, new BaseFileScanTask(
manifestListAsDataFile, null,
schemaString, specString, residuals));
return new ManifestListReadTask(io, schema(), specs,
new BaseFileScanTask(manifestListAsDataFile, null, schemaString, specString, residuals),
snap.snapshotId());
} else {
return StaticDataTask.of(
io.newInputFile(tableOps().current().metadataFileLocation()),
MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(io),
manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest)
manifest -> manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest, snap.snapshotId())
);
}
}));
Expand All @@ -138,12 +153,15 @@ static class ManifestListReadTask implements DataTask {
private final Schema schema;
private final Map<Integer, PartitionSpec> specs;
private final FileScanTask manifestListTask;
private final long referenceSnapshotId;

ManifestListReadTask(FileIO io, Schema schema, Map<Integer, PartitionSpec> specs, FileScanTask manifestListTask) {
ManifestListReadTask(FileIO io, Schema schema, Map<Integer, PartitionSpec> specs, FileScanTask manifestListTask,
long referenceSnapshotId) {
this.io = io;
this.schema = schema;
this.specs = specs;
this.manifestListTask = manifestListTask;
this.referenceSnapshotId = referenceSnapshotId;
}

@Override
Expand All @@ -164,7 +182,7 @@ public CloseableIterable<StructLike> rows() {
.build()) {

CloseableIterable<StructLike> rowIterable = CloseableIterable.transform(manifests,
manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest));
manifest -> manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest, referenceSnapshotId));

StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema);
return CloseableIterable.transform(rowIterable, projection::wrap);
Expand Down Expand Up @@ -204,4 +222,188 @@ public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}
}

static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
return StaticDataTask.Row.of(
manifest.content().id(),
manifest.path(),
manifest.length(),
manifest.partitionSpecId(),
manifest.snapshotId(),
manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 0 may be the wrong thing here, shouldn't it be "null" if we don't have the info?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is in the original schema in ManifestsTable, where the counts are required. I also did not notice that data file counts are optional in AllManifestsTable when I recently added counts for delete files. I think we should continue to use 0 for compatibility but I wonder why we have such a mismatch in nullability between these tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it is probably because we did not have those counts populated initially so there may be old manifests without that info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means I have to switch to optional delete counts as well. I'll follow up with a PR for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still use 0 for data files count when reading a delete manifest as null means unknown but we know there is no data file in a delete manifest.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea this is copy + add from ManifestsTable class (should have mentioned), so if we fix it can be in another pr.

manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
referenceSnapshotId
);
}

private static class SnapshotEvaluator {

private final Expression boundExpr;

private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
this.boundExpr = Binder.bind(structType, expr, caseSensitive);
}

private boolean eval(Snapshot snapshot) {
return new SnapshotEvalVisitor().eval(snapshot);
}

private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {

private long snapshotId;
private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;

private boolean eval(Snapshot snapshot) {
this.snapshotId = snapshot.snapshotId();
return ExpressionVisitors.visitEvaluator(boundExpr, this);
}

@Override
public Boolean alwaysTrue() {
return ROWS_MIGHT_MATCH;
}

@Override
public Boolean alwaysFalse() {
return ROWS_CANNOT_MATCH;
}

@Override
public Boolean not(Boolean result) {
return !result;
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
}

@Override
public <T> Boolean isNull(BoundReference<T> ref) {
if (isSnapshotRef(ref)) {
return ROWS_CANNOT_MATCH; // reference_snapshot_id is never null
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNull(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean isNaN(BoundReference<T> ref) {
if (isSnapshotRef(ref)) {
return ROWS_CANNOT_MATCH; // reference_snapshot_id is never nan
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNaN(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
}

@Override
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about these a bit offline, I think the gt/lt's here probably won't every be useful but I don't see a problem in including them

}

@Override
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
}

@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
}

@Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
}

@Override
public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
}

@Override
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
if (isSnapshotRef(ref)) {
Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this with

literalSet.contains(snapshotId)?

Copy link
Member Author

@szehon-ho szehon-ho Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think its the same in this case, but thought its safer to use the Comparator lookup for Long type to get the official Iceberg way to compare (if it ever differs from Java's comparator). I guess this is for all the comparisons in this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm late to this review, but the literalSet will use the correct comparator for a given type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK works for me , I will make a follow up pr then!

if (noneMatch) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
if (isSnapshotRef(ref)) {
Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another call of "contains?"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should eventually update this. The set's contains method will be correct. It's probably not a big deal right now since there are so few snapshots.

if (anyMatch) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

/**
* Comparison of snapshot reference and literal, using long comparator.
*
* @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id
* @param lit literal value to compare with snapshot id.
* @param desiredResult function to apply to long comparator result, returns true if result is as expected.
* @return false if comparator does not achieve desired result, true otherwise
*/
private <T> Boolean compareSnapshotRef(BoundReference<T> ref, Literal<T> lit,
Function<Integer, Boolean> desiredResult) {
if (isSnapshotRef(ref)) {
Literal<Long> longLit = lit.to(Types.LongType.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine it’s unlikely to happen as the query should throw before this, but any concern with potential null being passed in forlit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it seems it's checked when trying to make the predicate:

java.lang.NullPointerException: Cannot create expression literal from null
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907)
	at org.apache.iceberg.expressions.Literals.from(Literals.java:62)
	at org.apache.iceberg.expressions.UnboundPredicate.<init>(UnboundPredicate.java:40)
	at org.apache.iceberg.expressions.Expressions.equal(Expressions.java:175)

So i think it's not necessary, as it's pretty internal. Also, none of the other Evaluators check this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho, once the expression is bound, you are guaranteed that the lit corresponding to the ref will be a LongType because binding coerces the literal types to match the corresponding term. This shouldn't be needed, unless you're doing it to get a Long comparator because you have a long snapshotId.

int cmp = longLit.comparator().compare(snapshotId, longLit.value());
if (!desiredResult.apply(cmp)) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

private <T> boolean isSnapshotRef(BoundReference<T> ref) {
return ref.fieldId() == REF_SNAPSHOT_ID;
}
}
}
}
Loading