diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index c8dd98f99161..f8462581f948 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -20,18 +20,27 @@ package org.apache.iceberg; import java.io.IOException; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Binder; +import org.apache.iceberg.expressions.BoundReference; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructProjection; @@ -43,6 +52,7 @@ * This table may return duplicate rows. */ public class AllManifestsTable extends BaseMetadataTable { + private static final int REF_SNAPSHOT_ID = 18; private static final Schema MANIFEST_FILE_SCHEMA = new Schema( Types.NestedField.required(14, "content", Types.IntegerType.get()), Types.NestedField.required(1, "path", Types.StringType.get()), @@ -60,7 +70,8 @@ public class AllManifestsTable extends BaseMetadataTable { Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()), Types.NestedField.optional(12, "lower_bound", Types.StringType.get()), Types.NestedField.optional(13, "upper_bound", Types.StringType.get()) - ))) + ))), + Types.NestedField.required(REF_SNAPSHOT_ID, "reference_snapshot_id", Types.LongType.get()) ); AllManifestsTable(TableOperations ops, Table table) { @@ -112,21 +123,25 @@ protected CloseableIterable doPlanFiles() { Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter(); ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); - return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> { + SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(), + isCaseSensitive()); + Iterable filteredSnapshots = Iterables.filter(table().snapshots(), snapshotEvaluator::eval); + + return CloseableIterable.withNoopClose(Iterables.transform(filteredSnapshots, snap -> { if (snap.manifestListLocation() != null) { DataFile manifestListAsDataFile = DataFiles.builder(PartitionSpec.unpartitioned()) .withInputFile(io.newInputFile(snap.manifestListLocation())) .withRecordCount(1) .withFormat(FileFormat.AVRO) .build(); - return new ManifestListReadTask(io, schema(), specs, new BaseFileScanTask( - manifestListAsDataFile, null, - schemaString, specString, residuals)); + return new ManifestListReadTask(io, schema(), specs, + new BaseFileScanTask(manifestListAsDataFile, null, schemaString, specString, residuals), + snap.snapshotId()); } else { return StaticDataTask.of( io.newInputFile(tableOps().current().metadataFileLocation()), MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(io), - manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest) + manifest -> manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest, snap.snapshotId()) ); } })); @@ -138,12 +153,15 @@ static class ManifestListReadTask implements DataTask { private final Schema schema; private final Map specs; private final FileScanTask manifestListTask; + private final long referenceSnapshotId; - ManifestListReadTask(FileIO io, Schema schema, Map specs, FileScanTask manifestListTask) { + ManifestListReadTask(FileIO io, Schema schema, Map specs, FileScanTask manifestListTask, + long referenceSnapshotId) { this.io = io; this.schema = schema; this.specs = specs; this.manifestListTask = manifestListTask; + this.referenceSnapshotId = referenceSnapshotId; } @Override @@ -164,7 +182,7 @@ public CloseableIterable rows() { .build()) { CloseableIterable rowIterable = CloseableIterable.transform(manifests, - manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest)); + manifest -> manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest, referenceSnapshotId)); StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema); return CloseableIterable.transform(rowIterable, projection::wrap); @@ -204,4 +222,188 @@ public Iterable split(long splitSize) { return ImmutableList.of(this); // don't split } } + + static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) { + return StaticDataTask.Row.of( + manifest.content().id(), + manifest.path(), + manifest.length(), + manifest.partitionSpecId(), + manifest.snapshotId(), + manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0, + manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0, + manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0, + manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0, + manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0, + manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0, + ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()), + referenceSnapshotId + ); + } + + private static class SnapshotEvaluator { + + private final Expression boundExpr; + + private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) { + this.boundExpr = Binder.bind(structType, expr, caseSensitive); + } + + private boolean eval(Snapshot snapshot) { + return new SnapshotEvalVisitor().eval(snapshot); + } + + private class SnapshotEvalVisitor extends BoundExpressionVisitor { + + private long snapshotId; + private static final boolean ROWS_MIGHT_MATCH = true; + private static final boolean ROWS_CANNOT_MATCH = false; + + private boolean eval(Snapshot snapshot) { + this.snapshotId = snapshot.snapshotId(); + return ExpressionVisitors.visitEvaluator(boundExpr, this); + } + + @Override + public Boolean alwaysTrue() { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean alwaysFalse() { + return ROWS_CANNOT_MATCH; + } + + @Override + public Boolean not(Boolean result) { + return !result; + } + + @Override + public Boolean and(Boolean leftResult, Boolean rightResult) { + return leftResult && rightResult; + } + + @Override + public Boolean or(Boolean leftResult, Boolean rightResult) { + return leftResult || rightResult; + } + + @Override + public Boolean isNull(BoundReference ref) { + if (isSnapshotRef(ref)) { + return ROWS_CANNOT_MATCH; // reference_snapshot_id is never null + } else { + return ROWS_MIGHT_MATCH; + } + } + + @Override + public Boolean notNull(BoundReference ref) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean isNaN(BoundReference ref) { + if (isSnapshotRef(ref)) { + return ROWS_CANNOT_MATCH; // reference_snapshot_id is never nan + } else { + return ROWS_MIGHT_MATCH; + } + } + + @Override + public Boolean notNaN(BoundReference ref) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean lt(BoundReference ref, Literal lit) { + return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0); + } + + @Override + public Boolean ltEq(BoundReference ref, Literal lit) { + return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0); + } + + @Override + public Boolean gt(BoundReference ref, Literal lit) { + return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0); + } + + @Override + public Boolean gtEq(BoundReference ref, Literal lit) { + return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0); + } + + @Override + public Boolean eq(BoundReference ref, Literal lit) { + return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0); + } + + @Override + public Boolean notEq(BoundReference ref, Literal lit) { + return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0); + } + + @Override + public Boolean in(BoundReference ref, Set literalSet) { + if (isSnapshotRef(ref)) { + Comparator longComparator = Comparators.forType(Types.LongType.get()); + boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0); + if (noneMatch) { + return ROWS_CANNOT_MATCH; + } + } + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean notIn(BoundReference ref, Set literalSet) { + if (isSnapshotRef(ref)) { + Comparator longComparator = Comparators.forType(Types.LongType.get()); + boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0); + if (anyMatch) { + return ROWS_CANNOT_MATCH; + } + } + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean startsWith(BoundReference ref, Literal lit) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean notStartsWith(BoundReference ref, Literal lit) { + return ROWS_MIGHT_MATCH; + } + + /** + * Comparison of snapshot reference and literal, using long comparator. + * + * @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id + * @param lit literal value to compare with snapshot id. + * @param desiredResult function to apply to long comparator result, returns true if result is as expected. + * @return false if comparator does not achieve desired result, true otherwise + */ + private Boolean compareSnapshotRef(BoundReference ref, Literal lit, + Function desiredResult) { + if (isSnapshotRef(ref)) { + Literal longLit = lit.to(Types.LongType.get()); + int cmp = longLit.comparator().compare(snapshotId, longLit.value()); + if (!desiredResult.apply(cmp)) { + return ROWS_CANNOT_MATCH; + } + } + return ROWS_MIGHT_MATCH; + } + + private boolean isSnapshotRef(BoundReference ref) { + return ref.fieldId() == REF_SNAPSHOT_ID; + } + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index a357487ea1ba..db78d1cde851 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -57,18 +57,7 @@ public TestMetadataTableScans(int formatVersion) { } private void preparePartitionedTable() { - table.newFastAppend() - .appendFile(FILE_A) - .commit(); - table.newFastAppend() - .appendFile(FILE_C) - .commit(); - table.newFastAppend() - .appendFile(FILE_D) - .commit(); - table.newFastAppend() - .appendFile(FILE_B) - .commit(); + preparePartitionedTableData(); if (formatVersion == 2) { table.newRowDelta() @@ -86,6 +75,21 @@ private void preparePartitionedTable() { } } + private void preparePartitionedTableData() { + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + table.newFastAppend() + .appendFile(FILE_C) + .commit(); + table.newFastAppend() + .appendFile(FILE_D) + .commit(); + table.newFastAppend() + .appendFile(FILE_B) + .commit(); + } + @Test public void testManifestsTableWithDroppedPartition() throws IOException { table.newFastAppend() @@ -809,6 +813,180 @@ public void testPartitionsTableScanWithPlanExecutor() { Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); } + @Test + public void testAllManifestsTableSnapshotGt() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.greaterThan("reference_snapshot_id", 2)); + + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 3L, 4L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotGte() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.greaterThanOrEqual("reference_snapshot_id", 3)); + + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 3L, 4L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotLt() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.lessThan("reference_snapshot_id", 3)); + + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 1L, 2L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotLte() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.lessThanOrEqual("reference_snapshot_id", 2)); + + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 1L, 2L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotEq() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.equal("reference_snapshot_id", 2)); + + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 2L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotNotEq() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.notEqual("reference_snapshot_id", 2)); + + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotIn() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.in("reference_snapshot_id", 1, 3)); + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 1L, 3L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotNotIn() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.notIn("reference_snapshot_id", 1, 3)); + + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 2L, 4L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotAnd() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.and( + Expressions.equal("reference_snapshot_id", 2), + Expressions.greaterThan("length", 0))); + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 2L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotOr() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.or( + Expressions.equal("reference_snapshot_id", 2), + Expressions.equal("reference_snapshot_id", 4))); + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 2L, 4L), + actualManifestListPaths(manifestsTableScan)); + } + + @Test + public void testAllManifestsTableSnapshotNot() { + // Snapshots 1,2,3,4 + preparePartitionedTableData(); + + Table manifestsTable = new AllManifestsTable(table.ops(), table); + TableScan manifestsTableScan = manifestsTable.newScan() + .filter(Expressions.not( + Expressions.equal("reference_snapshot_id", 2))); + + Assert.assertEquals("Expected snapshots do not match", + expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L), + actualManifestListPaths(manifestsTableScan)); + } + + private Set actualManifestListPaths(TableScan allManifestsTableScan) { + return StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false) + .map(t -> (AllManifestsTable.ManifestListReadTask) t) + .map(t -> t.file().path().toString()) + .collect(Collectors.toSet()); + } + + private Set expectedManifestListPaths(Iterable snapshots, Long... snapshotIds) { + Set snapshotIdSet = Sets.newHashSet(snapshotIds); + return StreamSupport.stream(snapshots.spliterator(), false) + .filter(s -> snapshotIdSet.contains(s.snapshotId())) + .map(Snapshot::manifestListLocation) + .collect(Collectors.toSet()); + } + private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals) throws IOException { try (CloseableIterable tasks = scan.planTasks()) { Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0); diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index f147016ebbd6..4d2cde2f9d4a 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -19,26 +19,38 @@ package org.apache.iceberg.spark.source; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; +import java.util.StringJoiner; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -50,12 +62,14 @@ import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.SparkException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; @@ -991,18 +1005,34 @@ public void testAllManifestsTable() { Table manifestTable = loadTable(tableIdentifier, "all_manifests"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); - List manifests = Lists.newArrayList(); - df1.select("id", "data").write() .format("iceberg") .mode("append") .save(loadLocation(tableIdentifier)); - manifests.addAll(table.currentSnapshot().allManifests()); + table.updateProperties() + .set(TableProperties.FORMAT_VERSION, "2") + .commit(); + + DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedFiles(table.io()), null); + PartitionSpec dataFileSpec = table.specs().get(dataFile.specId()); + StructLike dataFilePartition = dataFile.partition(); + + PositionDelete delete = PositionDelete.create(); + delete.set(dataFile.path(), 0L, null); + + DeleteFile deleteFile = writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete)); + + table.newRowDelta() + .addDeletes(deleteFile) + .commit(); table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - manifests.addAll(table.currentSnapshot().allManifests()); + Stream> snapshotIdToManifests = + StreamSupport.stream(table.snapshots().spliterator(), false) + .flatMap(snapshot -> snapshot.allManifests(table.io()).stream().map( + manifest -> Pair.of(snapshot.snapshotId(), manifest))); List actual = spark.read() .format("iceberg") @@ -1012,37 +1042,12 @@ public void testAllManifestsTable() { table.refresh(); - GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema(), "manifests")); - GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder - .set("content", manifest.content().id()) - .set("path", manifest.path()) - .set("length", manifest.length()) - .set("partition_spec_id", manifest.partitionSpecId()) - .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) - .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) - .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) - .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) - .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) - .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) - .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> - summaryBuilder - .set("contains_null", false) - .set("contains_nan", false) - .set("lower_bound", "1") - .set("upper_bound", "1") - .build() - )) - .build() - )); - + List expected = snapshotIdToManifests + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); expected.sort(Comparator.comparing(o -> o.get("path").toString())); - Assert.assertEquals("Manifests table should have two manifest rows", 2, actual.size()); + Assert.assertEquals("Manifests table should have 5 manifest rows", 5, actual.size()); for (int i = 0; i < expected.size(); i += 1) { TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); } @@ -1427,8 +1432,127 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException { Assert.assertEquals("Rows must match", records, actualRecords); } + @Test + public void testAllManifestTableSnapshotFiltering() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "all_manifest_snapshot_filtering"); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); + Table manifestTable = loadTable(tableIdentifier, "all_manifests"); + Dataset df = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + + List> snapshotIdToManifests = Lists.newArrayList(); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot1 = table.currentSnapshot(); + snapshotIdToManifests.addAll(snapshot1.allManifests().stream() + .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest)) + .collect(Collectors.toList())); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot2 = table.currentSnapshot(); + Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size()); + snapshotIdToManifests.addAll(snapshot2.allManifests().stream() + .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest)) + .collect(Collectors.toList())); + + // Add manifests that will not be selected + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + StringJoiner snapshotIds = new StringJoiner(",", "(", ")"); + snapshotIds.add(String.valueOf(snapshot1.snapshotId())); + snapshotIds.add(String.valueOf(snapshot2.snapshotId())); + snapshotIds.toString(); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "all_manifests")) + .filter("reference_snapshot_id in " + snapshotIds) + .orderBy("path") + .collectAsList(); + table.refresh(); + + List expected = snapshotIdToManifests.stream() + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); + expected.sort(Comparator.comparing(o -> o.get("path").toString())); + + Assert.assertEquals("Manifests table should have 3 manifest rows", 3, actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + + private GenericData.Record manifestRecord(Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema(), "manifests")); + GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); + return builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) + .set("length", manifest.length()) + .set("partition_spec_id", manifest.partitionSpecId()) + .set("added_snapshot_id", manifest.snapshotId()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) + .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> + summaryBuilder + .set("contains_null", false) + .set("contains_nan", false) + .set("lower_bound", "1") + .set("upper_bound", "1") + .build() + )) + .set("reference_snapshot_id", referenceSnapshotId) + .build(); + } + private void asMetadataRecord(GenericData.Record file) { file.put(0, FileContent.DATA.id()); file.put(3, 0); // specId } + + private PositionDeleteWriter newPositionDeleteWriter(Table table, PartitionSpec spec, + StructLike partition) { + OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0, 0).build(); + EncryptedOutputFile outputFile = fileFactory.newOutputFile(spec, partition); + + SparkFileWriterFactory fileWriterFactory = SparkFileWriterFactory.builderFor(table).build(); + return fileWriterFactory.newPositionDeleteWriter(outputFile, spec, partition); + } + + private DeleteFile writePositionDeletes(Table table, PartitionSpec spec, StructLike partition, + Iterable> deletes) { + PositionDeleteWriter positionDeleteWriter = newPositionDeleteWriter(table, spec, partition); + + try (PositionDeleteWriter writer = positionDeleteWriter) { + for (PositionDelete delete : deletes) { + writer.write(delete); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return positionDeleteWriter.toDeleteFile(); + } } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index d6f65af4d88b..b3ceb192bbb6 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -19,27 +19,38 @@ package org.apache.iceberg.spark.source; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; +import java.util.StringJoiner; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -51,12 +62,14 @@ import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.SparkException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; @@ -995,18 +1008,34 @@ public void testAllManifestsTable() { Table manifestTable = loadTable(tableIdentifier, "all_manifests"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); - List manifests = Lists.newArrayList(); - df1.select("id", "data").write() .format("iceberg") .mode("append") .save(loadLocation(tableIdentifier)); - manifests.addAll(table.currentSnapshot().allManifests(table.io())); + table.updateProperties() + .set(TableProperties.FORMAT_VERSION, "2") + .commit(); + + DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedFiles(table.io()), null); + PartitionSpec dataFileSpec = table.specs().get(dataFile.specId()); + StructLike dataFilePartition = dataFile.partition(); + + PositionDelete delete = PositionDelete.create(); + delete.set(dataFile.path(), 0L, null); + + DeleteFile deleteFile = writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete)); + + table.newRowDelta() + .addDeletes(deleteFile) + .commit(); table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - manifests.addAll(table.currentSnapshot().allManifests(table.io())); + Stream> snapshotIdToManifests = + StreamSupport.stream(table.snapshots().spliterator(), false) + .flatMap(snapshot -> snapshot.allManifests(table.io()).stream().map( + manifest -> Pair.of(snapshot.snapshotId(), manifest))); List actual = spark.read() .format("iceberg") @@ -1016,37 +1045,12 @@ public void testAllManifestsTable() { table.refresh(); - GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema(), "manifests")); - GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder - .set("content", manifest.content().id()) - .set("path", manifest.path()) - .set("length", manifest.length()) - .set("partition_spec_id", manifest.partitionSpecId()) - .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) - .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) - .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) - .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) - .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) - .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) - .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> - summaryBuilder - .set("contains_null", false) - .set("contains_nan", false) - .set("lower_bound", "1") - .set("upper_bound", "1") - .build() - )) - .build() - )); - + List expected = snapshotIdToManifests + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); expected.sort(Comparator.comparing(o -> o.get("path").toString())); - Assert.assertEquals("Manifests table should have two manifest rows", 2, actual.size()); + Assert.assertEquals("Manifests table should have 5 manifest rows", 5, actual.size()); for (int i = 0; i < expected.size(); i += 1) { TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); } @@ -1466,8 +1470,127 @@ public void testFilesTablePartitionId() throws Exception { Assert.assertEquals("Should have two partition specs", ImmutableList.of(spec0, spec1), actual); } + @Test + public void testAllManifestTableSnapshotFiltering() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "all_manifest_snapshot_filtering"); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); + Table manifestTable = loadTable(tableIdentifier, "all_manifests"); + Dataset df = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + + List> snapshotIdToManifests = Lists.newArrayList(); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot1 = table.currentSnapshot(); + snapshotIdToManifests.addAll(snapshot1.allManifests().stream() + .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest)) + .collect(Collectors.toList())); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot2 = table.currentSnapshot(); + Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size()); + snapshotIdToManifests.addAll(snapshot2.allManifests().stream() + .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest)) + .collect(Collectors.toList())); + + // Add manifests that will not be selected + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + StringJoiner snapshotIds = new StringJoiner(",", "(", ")"); + snapshotIds.add(String.valueOf(snapshot1.snapshotId())); + snapshotIds.add(String.valueOf(snapshot2.snapshotId())); + snapshotIds.toString(); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "all_manifests")) + .filter("reference_snapshot_id in " + snapshotIds) + .orderBy("path") + .collectAsList(); + table.refresh(); + + List expected = snapshotIdToManifests.stream() + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); + expected.sort(Comparator.comparing(o -> o.get("path").toString())); + + Assert.assertEquals("Manifests table should have 3 manifest rows", 3, actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + + private GenericData.Record manifestRecord(Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema(), "manifests")); + GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); + return builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) + .set("length", manifest.length()) + .set("partition_spec_id", manifest.partitionSpecId()) + .set("added_snapshot_id", manifest.snapshotId()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) + .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> + summaryBuilder + .set("contains_null", false) + .set("contains_nan", false) + .set("lower_bound", "1") + .set("upper_bound", "1") + .build() + )) + .set("reference_snapshot_id", referenceSnapshotId) + .build(); + } + private void asMetadataRecord(GenericData.Record file) { file.put(0, FileContent.DATA.id()); file.put(3, 0); // specId } + + private PositionDeleteWriter newPositionDeleteWriter(Table table, PartitionSpec spec, + StructLike partition) { + OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0, 0).build(); + EncryptedOutputFile outputFile = fileFactory.newOutputFile(spec, partition); + + SparkFileWriterFactory fileWriterFactory = SparkFileWriterFactory.builderFor(table).build(); + return fileWriterFactory.newPositionDeleteWriter(outputFile, spec, partition); + } + + private DeleteFile writePositionDeletes(Table table, PartitionSpec spec, StructLike partition, + Iterable> deletes) { + PositionDeleteWriter positionDeleteWriter = newPositionDeleteWriter(table, spec, partition); + + try (PositionDeleteWriter writer = positionDeleteWriter) { + for (PositionDelete delete : deletes) { + writer.write(delete); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return positionDeleteWriter.toDeleteFile(); + } } diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index d6f65af4d88b..72f8ea38f54c 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -19,27 +19,38 @@ package org.apache.iceberg.spark.source; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; +import java.util.StringJoiner; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -51,12 +62,14 @@ import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.SparkException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; @@ -995,18 +1008,34 @@ public void testAllManifestsTable() { Table manifestTable = loadTable(tableIdentifier, "all_manifests"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); - List manifests = Lists.newArrayList(); - df1.select("id", "data").write() .format("iceberg") .mode("append") .save(loadLocation(tableIdentifier)); - manifests.addAll(table.currentSnapshot().allManifests(table.io())); + table.updateProperties() + .set(TableProperties.FORMAT_VERSION, "2") + .commit(); + + DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedFiles(table.io()), null); + PartitionSpec dataFileSpec = table.specs().get(dataFile.specId()); + StructLike dataFilePartition = dataFile.partition(); + + PositionDelete delete = PositionDelete.create(); + delete.set(dataFile.path(), 0L, null); + + DeleteFile deleteFile = writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete)); + + table.newRowDelta() + .addDeletes(deleteFile) + .commit(); table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - manifests.addAll(table.currentSnapshot().allManifests(table.io())); + Stream> snapshotIdToManifests = + StreamSupport.stream(table.snapshots().spliterator(), false) + .flatMap(snapshot -> snapshot.allManifests(table.io()).stream().map( + manifest -> Pair.of(snapshot.snapshotId(), manifest))); List actual = spark.read() .format("iceberg") @@ -1016,37 +1045,12 @@ public void testAllManifestsTable() { table.refresh(); - GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema(), "manifests")); - GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder - .set("content", manifest.content().id()) - .set("path", manifest.path()) - .set("length", manifest.length()) - .set("partition_spec_id", manifest.partitionSpecId()) - .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) - .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) - .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) - .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) - .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) - .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) - .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> - summaryBuilder - .set("contains_null", false) - .set("contains_nan", false) - .set("lower_bound", "1") - .set("upper_bound", "1") - .build() - )) - .build() - )); - + List expected = snapshotIdToManifests + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); expected.sort(Comparator.comparing(o -> o.get("path").toString())); - Assert.assertEquals("Manifests table should have two manifest rows", 2, actual.size()); + Assert.assertEquals("Manifests table should have 5 manifest rows", 5, actual.size()); for (int i = 0; i < expected.size(); i += 1) { TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); } @@ -1431,6 +1435,7 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException { Assert.assertEquals("Rows must match", records, actualRecords); } + @Test public void testFilesTablePartitionId() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test"); @@ -1466,8 +1471,127 @@ public void testFilesTablePartitionId() throws Exception { Assert.assertEquals("Should have two partition specs", ImmutableList.of(spec0, spec1), actual); } + @Test + public void testAllManifestTableSnapshotFiltering() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "all_manifest_snapshot_filtering"); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); + Table manifestTable = loadTable(tableIdentifier, "all_manifests"); + Dataset df = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + + List> snapshotIdToManifests = Lists.newArrayList(); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot1 = table.currentSnapshot(); + snapshotIdToManifests.addAll(snapshot1.allManifests().stream() + .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest)) + .collect(Collectors.toList())); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot2 = table.currentSnapshot(); + Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size()); + snapshotIdToManifests.addAll(snapshot2.allManifests().stream() + .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest)) + .collect(Collectors.toList())); + + // Add manifests that will not be selected + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + StringJoiner snapshotIds = new StringJoiner(",", "(", ")"); + snapshotIds.add(String.valueOf(snapshot1.snapshotId())); + snapshotIds.add(String.valueOf(snapshot2.snapshotId())); + snapshotIds.toString(); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "all_manifests")) + .filter("reference_snapshot_id in " + snapshotIds) + .orderBy("path") + .collectAsList(); + table.refresh(); + + List expected = snapshotIdToManifests.stream() + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); + expected.sort(Comparator.comparing(o -> o.get("path").toString())); + + Assert.assertEquals("Manifests table should have 3 manifest rows", 3, actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + + private GenericData.Record manifestRecord(Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema(), "manifests")); + GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); + return builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) + .set("length", manifest.length()) + .set("partition_spec_id", manifest.partitionSpecId()) + .set("added_snapshot_id", manifest.snapshotId()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) + .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> + summaryBuilder + .set("contains_null", false) + .set("contains_nan", false) + .set("lower_bound", "1") + .set("upper_bound", "1") + .build() + )) + .set("reference_snapshot_id", referenceSnapshotId) + .build(); + } + private void asMetadataRecord(GenericData.Record file) { file.put(0, FileContent.DATA.id()); file.put(3, 0); // specId } + + private PositionDeleteWriter newPositionDeleteWriter(Table table, PartitionSpec spec, + StructLike partition) { + OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0, 0).build(); + EncryptedOutputFile outputFile = fileFactory.newOutputFile(spec, partition); + + SparkFileWriterFactory fileWriterFactory = SparkFileWriterFactory.builderFor(table).build(); + return fileWriterFactory.newPositionDeleteWriter(outputFile, spec, partition); + } + + private DeleteFile writePositionDeletes(Table table, PartitionSpec spec, StructLike partition, + Iterable> deletes) { + PositionDeleteWriter positionDeleteWriter = newPositionDeleteWriter(table, spec, partition); + + try (PositionDeleteWriter writer = positionDeleteWriter) { + for (PositionDelete delete : deletes) { + writer.write(delete); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return positionDeleteWriter.toDeleteFile(); + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index d055c70cea1f..a80e5ee79f9f 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -23,7 +23,10 @@ import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; +import java.util.StringJoiner; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.AssertHelpers; @@ -59,6 +62,7 @@ import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.SparkException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -1023,8 +1027,6 @@ public void testAllManifestsTable() { Table manifestTable = loadTable(tableIdentifier, "all_manifests"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); - List manifests = Lists.newArrayList(); - df1.select("id", "data").write() .format("iceberg") .mode("append") @@ -1047,52 +1049,27 @@ public void testAllManifestsTable() { .addDeletes(deleteFile) .commit(); - manifests.addAll(table.currentSnapshot().allManifests(table.io())); - table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - manifests.addAll(table.currentSnapshot().allManifests(table.io())); + Stream> snapshotIdToManifests = + StreamSupport.stream(table.snapshots().spliterator(), false) + .flatMap(snapshot -> snapshot.allManifests(table.io()).stream().map( + manifest -> Pair.of(snapshot.snapshotId(), manifest))); List actual = spark.read() .format("iceberg") .load(loadLocation(tableIdentifier, "all_manifests")) - .dropDuplicates("path") .orderBy("path") .collectAsList(); table.refresh(); - GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema(), "manifests")); - GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder - .set("content", manifest.content().id()) - .set("path", manifest.path()) - .set("length", manifest.length()) - .set("partition_spec_id", manifest.partitionSpecId()) - .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) - .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) - .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) - .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) - .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) - .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) - .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> - summaryBuilder - .set("contains_null", false) - .set("contains_nan", false) - .set("lower_bound", "1") - .set("upper_bound", "1") - .build() - )) - .build() - )); - + List expected = snapshotIdToManifests + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); expected.sort(Comparator.comparing(o -> o.get("path").toString())); - Assert.assertEquals("Manifests table should have 4 manifest rows", 4, actual.size()); + Assert.assertEquals("Manifests table should have 5 manifest rows", 5, actual.size()); for (int i = 0; i < expected.size(); i += 1) { TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); } @@ -1513,6 +1490,101 @@ public void testFilesTablePartitionId() throws Exception { Assert.assertEquals("Should have two partition specs", ImmutableList.of(spec0, spec1), actual); } + @Test + public void testAllManifestTableSnapshotFiltering() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "all_manifest_snapshot_filtering"); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); + Table manifestTable = loadTable(tableIdentifier, "all_manifests"); + Dataset df = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + + List> snapshotIdToManifests = Lists.newArrayList(); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot1 = table.currentSnapshot(); + snapshotIdToManifests.addAll(snapshot1.allManifests().stream() + .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest)) + .collect(Collectors.toList())); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot2 = table.currentSnapshot(); + Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size()); + snapshotIdToManifests.addAll(snapshot2.allManifests().stream() + .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest)) + .collect(Collectors.toList())); + + // Add manifests that will not be selected + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + StringJoiner snapshotIds = new StringJoiner(",", "(", ")"); + snapshotIds.add(String.valueOf(snapshot1.snapshotId())); + snapshotIds.add(String.valueOf(snapshot2.snapshotId())); + snapshotIds.toString(); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "all_manifests")) + .filter("reference_snapshot_id in " + snapshotIds) + .orderBy("path") + .collectAsList(); + table.refresh(); + + List expected = snapshotIdToManifests.stream() + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); + expected.sort(Comparator.comparing(o -> o.get("path").toString())); + + Assert.assertEquals("Manifests table should have 3 manifest rows", 3, actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + + private GenericData.Record manifestRecord(Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema(), "manifests")); + GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); + return builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) + .set("length", manifest.length()) + .set("partition_spec_id", manifest.partitionSpecId()) + .set("added_snapshot_id", manifest.snapshotId()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) + .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> + summaryBuilder + .set("contains_null", false) + .set("contains_nan", false) + .set("lower_bound", "1") + .set("upper_bound", "1") + .build() + )) + .set("reference_snapshot_id", referenceSnapshotId) + .build(); + } + private void asMetadataRecord(GenericData.Record file) { file.put(0, FileContent.DATA.id()); file.put(3, 0); // specId diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index d055c70cea1f..a80e5ee79f9f 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -23,7 +23,10 @@ import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; +import java.util.StringJoiner; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.AssertHelpers; @@ -59,6 +62,7 @@ import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.SparkException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -1023,8 +1027,6 @@ public void testAllManifestsTable() { Table manifestTable = loadTable(tableIdentifier, "all_manifests"); Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); - List manifests = Lists.newArrayList(); - df1.select("id", "data").write() .format("iceberg") .mode("append") @@ -1047,52 +1049,27 @@ public void testAllManifestsTable() { .addDeletes(deleteFile) .commit(); - manifests.addAll(table.currentSnapshot().allManifests(table.io())); - table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - manifests.addAll(table.currentSnapshot().allManifests(table.io())); + Stream> snapshotIdToManifests = + StreamSupport.stream(table.snapshots().spliterator(), false) + .flatMap(snapshot -> snapshot.allManifests(table.io()).stream().map( + manifest -> Pair.of(snapshot.snapshotId(), manifest))); List actual = spark.read() .format("iceberg") .load(loadLocation(tableIdentifier, "all_manifests")) - .dropDuplicates("path") .orderBy("path") .collectAsList(); table.refresh(); - GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema(), "manifests")); - GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( - manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder - .set("content", manifest.content().id()) - .set("path", manifest.path()) - .set("length", manifest.length()) - .set("partition_spec_id", manifest.partitionSpecId()) - .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) - .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) - .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) - .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) - .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) - .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) - .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> - summaryBuilder - .set("contains_null", false) - .set("contains_nan", false) - .set("lower_bound", "1") - .set("upper_bound", "1") - .build() - )) - .build() - )); - + List expected = snapshotIdToManifests + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); expected.sort(Comparator.comparing(o -> o.get("path").toString())); - Assert.assertEquals("Manifests table should have 4 manifest rows", 4, actual.size()); + Assert.assertEquals("Manifests table should have 5 manifest rows", 5, actual.size()); for (int i = 0; i < expected.size(); i += 1) { TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); } @@ -1513,6 +1490,101 @@ public void testFilesTablePartitionId() throws Exception { Assert.assertEquals("Should have two partition specs", ImmutableList.of(spec0, spec1), actual); } + @Test + public void testAllManifestTableSnapshotFiltering() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "all_manifest_snapshot_filtering"); + Table table = createTable(tableIdentifier, SCHEMA, SPEC); + Table manifestTable = loadTable(tableIdentifier, "all_manifests"); + Dataset df = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + + List> snapshotIdToManifests = Lists.newArrayList(); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot1 = table.currentSnapshot(); + snapshotIdToManifests.addAll(snapshot1.allManifests().stream() + .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest)) + .collect(Collectors.toList())); + + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + Snapshot snapshot2 = table.currentSnapshot(); + Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size()); + snapshotIdToManifests.addAll(snapshot2.allManifests().stream() + .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest)) + .collect(Collectors.toList())); + + // Add manifests that will not be selected + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + df.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + StringJoiner snapshotIds = new StringJoiner(",", "(", ")"); + snapshotIds.add(String.valueOf(snapshot1.snapshotId())); + snapshotIds.add(String.valueOf(snapshot2.snapshotId())); + snapshotIds.toString(); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "all_manifests")) + .filter("reference_snapshot_id in " + snapshotIds) + .orderBy("path") + .collectAsList(); + table.refresh(); + + List expected = snapshotIdToManifests.stream() + .map(snapshotManifest -> manifestRecord(manifestTable, snapshotManifest.first(), snapshotManifest.second())) + .collect(Collectors.toList()); + expected.sort(Comparator.comparing(o -> o.get("path").toString())); + + Assert.assertEquals("Manifests table should have 3 manifest rows", 3, actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + + private GenericData.Record manifestRecord(Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { + GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema(), "manifests")); + GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( + manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); + return builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) + .set("length", manifest.length()) + .set("partition_spec_id", manifest.partitionSpecId()) + .set("added_snapshot_id", manifest.snapshotId()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) + .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> + summaryBuilder + .set("contains_null", false) + .set("contains_nan", false) + .set("lower_bound", "1") + .set("upper_bound", "1") + .build() + )) + .set("reference_snapshot_id", referenceSnapshotId) + .build(); + } + private void asMetadataRecord(GenericData.Record file) { file.put(0, FileContent.DATA.id()); file.put(3, 0); // specId