diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java index 176bbbd3293c..7c4739a28644 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java @@ -30,22 +30,33 @@ * Represents a manifest file that can be scanned to find data files in a table. */ public interface ManifestFile { + Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get()); + Types.NestedField LENGTH = required(501, "manifest_length", Types.LongType.get()); + Types.NestedField SPEC_ID = required(502, "partition_spec_id", Types.IntegerType.get()); + Types.NestedField SNAPSHOT_ID = optional(503, "added_snapshot_id", Types.LongType.get()); + Types.NestedField ADDED_FILES_COUNT = optional(504, "added_data_files_count", Types.IntegerType.get()); + Types.NestedField EXISTING_FILES_COUNT = optional(505, "existing_data_files_count", Types.IntegerType.get()); + Types.NestedField DELETED_FILES_COUNT = optional(506, "deleted_data_files_count", Types.IntegerType.get()); + Types.StructType PARTITION_SUMMARY_TYPE = Types.StructType.of( + required(509, "contains_null", Types.BooleanType.get()), + optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values + optional(511, "upper_bound", Types.BinaryType.get()) + ); + Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions", + Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE)); + Types.NestedField ADDED_ROWS_COUNT = optional(512, "added_rows_count", Types.LongType.get()); + Types.NestedField EXISTING_ROWS_COUNT = optional(513, "existing_rows_count", Types.LongType.get()); + Types.NestedField DELETED_ROWS_COUNT = optional(514, "deleted_rows_count", Types.LongType.get()); + Types.NestedField SEQUENCE_NUMBER = optional(515, "sequence_number", Types.LongType.get()); + Types.NestedField MIN_SEQUENCE_NUMBER = optional(516, "min_sequence_number", Types.LongType.get()); + // next ID to assign: 517 + Schema SCHEMA = new Schema( - required(500, "manifest_path", Types.StringType.get()), - required(501, "manifest_length", Types.LongType.get()), - required(502, "partition_spec_id", Types.IntegerType.get()), - optional(503, "added_snapshot_id", Types.LongType.get()), - optional(504, "added_data_files_count", Types.IntegerType.get()), - optional(505, "existing_data_files_count", Types.IntegerType.get()), - optional(506, "deleted_data_files_count", Types.IntegerType.get()), - optional(507, "partitions", Types.ListType.ofRequired(508, Types.StructType.of( - required(509, "contains_null", Types.BooleanType.get()), - optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values - optional(511, "upper_bound", Types.BinaryType.get()) - ))), - optional(512, "added_rows_count", Types.LongType.get()), - optional(513, "existing_rows_count", Types.LongType.get()), - optional(514, "deleted_rows_count", Types.LongType.get())); + PATH, LENGTH, SPEC_ID, + SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID, + ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT, + ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT, + PARTITION_SUMMARIES); static Schema schema() { return SCHEMA; @@ -66,6 +77,16 @@ static Schema schema() { */ int partitionSpecId(); + /** + * @return the sequence number of the commit that added the manifest file + */ + long sequenceNumber(); + + /** + * @return the lowest sequence number of any data file in the manifest + */ + long minSequenceNumber(); + /** * @return ID of the snapshot that added the manifest file to table metadata */ @@ -152,14 +173,8 @@ default boolean hasDeletedFiles() { * Summarizes the values of one partition field stored in a manifest file. */ interface PartitionFieldSummary { - Types.StructType TYPE = ManifestFile.schema() - .findType("partitions") - .asListType() - .elementType() - .asStructType(); - static Types.StructType getType() { - return TYPE; + return PARTITION_SUMMARY_TYPE; } /** diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 2fc71c40937f..4989281ffe96 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -220,6 +220,16 @@ public int partitionSpecId() { return specId; } + @Override + public long sequenceNumber() { + return 0; + } + + @Override + public long minSequenceNumber() { + return 0; + } + @Override public Long snapshotId() { return snapshotId; diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 6054d6fde771..22ab7fe84524 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -125,20 +124,7 @@ public Map summary() { public List manifests() { if (manifests == null) { // if manifests isn't set, then the snapshotFile is set and should be read to get the list - try (CloseableIterable files = Avro.read(manifestList) - .rename("manifest_file", GenericManifestFile.class.getName()) - .rename("partitions", GenericPartitionFieldSummary.class.getName()) - .rename("r508", GenericPartitionFieldSummary.class.getName()) - .classLoader(GenericManifestFile.class.getClassLoader()) - .project(ManifestFile.schema()) - .reuseContainers(false) - .build()) { - - this.manifests = Lists.newLinkedList(files); - - } catch (IOException e) { - throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestList.location()); - } + this.manifests = ManifestLists.read(manifestList); } return manifests; diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 6b6010b19b7f..26f7153f81af 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -46,12 +46,14 @@ public class GenericManifestFile private String manifestPath = null; private Long length = null; private int specId = -1; + private long sequenceNumber = 0; + private long minSequenceNumber = 0; private Long snapshotId = null; private Integer addedFilesCount = null; - private Long addedRowsCount = null; private Integer existingFilesCount = null; - private Long existingRowsCount = null; private Integer deletedFilesCount = null; + private Long addedRowsCount = null; + private Long existingRowsCount = null; private Long deletedRowsCount = null; private List partitions = null; @@ -61,10 +63,7 @@ public class GenericManifestFile public GenericManifestFile(org.apache.avro.Schema avroSchema) { this.avroSchema = avroSchema; - List fields = AvroSchemaUtil.convert(avroSchema) - .asNestedType() - .asStructType() - .fields(); + List fields = AvroSchemaUtil.convert(avroSchema).asStructType().fields(); List allFields = ManifestFile.schema().asStruct().fields(); this.fromProjectionPos = new int[fields.size()]; @@ -89,6 +88,8 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) { this.manifestPath = file.location(); this.length = null; // lazily loaded from file this.specId = specId; + this.sequenceNumber = 0; + this.minSequenceNumber = 0; this.snapshotId = null; this.addedFilesCount = null; this.addedRowsCount = null; @@ -100,25 +101,8 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) { this.fromProjectionPos = null; } - public GenericManifestFile(String path, long length, int specId, Long snapshotId, - int addedFilesCount, int existingFilesCount, int deletedFilesCount, - List partitions) { - this.avroSchema = AVRO_SCHEMA; - this.manifestPath = path; - this.length = length; - this.specId = specId; - this.snapshotId = snapshotId; - this.addedFilesCount = addedFilesCount; - this.addedRowsCount = null; - this.existingFilesCount = existingFilesCount; - this.existingRowsCount = null; - this.deletedFilesCount = deletedFilesCount; - this.deletedRowsCount = null; - this.partitions = partitions; - this.fromProjectionPos = null; - } - - public GenericManifestFile(String path, long length, int specId, Long snapshotId, + public GenericManifestFile(String path, long length, int specId, + long sequenceNumber, long minSequenceNumber, Long snapshotId, int addedFilesCount, long addedRowsCount, int existingFilesCount, long existingRowsCount, int deletedFilesCount, long deletedRowsCount, List partitions) { @@ -126,6 +110,8 @@ public GenericManifestFile(String path, long length, int specId, Long snapshotId this.manifestPath = path; this.length = length; this.specId = specId; + this.sequenceNumber = sequenceNumber; + this.minSequenceNumber = minSequenceNumber; this.snapshotId = snapshotId; this.addedFilesCount = addedFilesCount; this.addedRowsCount = addedRowsCount; @@ -147,6 +133,8 @@ private GenericManifestFile(GenericManifestFile toCopy) { this.manifestPath = toCopy.manifestPath; this.length = toCopy.length; this.specId = toCopy.specId; + this.sequenceNumber = toCopy.sequenceNumber; + this.minSequenceNumber = toCopy.minSequenceNumber; this.snapshotId = toCopy.snapshotId; this.addedFilesCount = toCopy.addedFilesCount; this.addedRowsCount = toCopy.addedRowsCount; @@ -192,6 +180,16 @@ public int partitionSpecId() { return specId; } + @Override + public long sequenceNumber() { + return sequenceNumber; + } + + @Override + public long minSequenceNumber() { + return minSequenceNumber; + } + @Override public Long snapshotId() { return snapshotId; @@ -257,21 +255,25 @@ public Object get(int i) { case 2: return specId; case 3: - return snapshotId; + return sequenceNumber; case 4: - return addedFilesCount; + return minSequenceNumber; case 5: - return existingFilesCount; + return snapshotId; case 6: - return deletedFilesCount; + return addedFilesCount; case 7: - return partitions; + return existingFilesCount; case 8: - return addedRowsCount; + return deletedFilesCount; case 9: - return existingRowsCount; + return addedRowsCount; case 10: + return existingRowsCount; + case 11: return deletedRowsCount; + case 12: + return partitions; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -297,29 +299,35 @@ public void set(int i, T value) { this.specId = (Integer) value; return; case 3: - this.snapshotId = (Long) value; + this.sequenceNumber = value != null ? (Long) value : 0; return; case 4: - this.addedFilesCount = (Integer) value; + this.minSequenceNumber = value != null ? (Long) value : 0; return; case 5: - this.existingFilesCount = (Integer) value; + this.snapshotId = (Long) value; return; case 6: - this.deletedFilesCount = (Integer) value; + this.addedFilesCount = (Integer) value; return; case 7: - this.partitions = (List) value; + this.existingFilesCount = (Integer) value; return; case 8: - this.addedRowsCount = (Long) value; + this.deletedFilesCount = (Integer) value; return; case 9: - this.existingRowsCount = (Long) value; + this.addedRowsCount = (Long) value; return; case 10: + this.existingRowsCount = (Long) value; + return; + case 11: this.deletedRowsCount = (Long) value; return; + case 12: + this.partitions = (List) value; + return; default: // ignore the object, it must be from a newer version of the format } @@ -385,7 +393,8 @@ private CopyBuilder(ManifestFile toCopy) { this.manifestFile = new GenericManifestFile((GenericManifestFile) toCopy); } else { this.manifestFile = new GenericManifestFile( - toCopy.path(), toCopy.length(), toCopy.partitionSpecId(), toCopy.snapshotId(), + toCopy.path(), toCopy.length(), toCopy.partitionSpecId(), + toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(), toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(), toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(), copyList(toCopy.partitions(), PartitionFieldSummary::copy)); diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index 6c999b899b10..fe3afe4f8e69 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.Iterator; @@ -30,43 +29,29 @@ import org.apache.iceberg.io.OutputFile; abstract class ManifestListWriter implements FileAppender { - static ManifestListWriter write(int formatVersion, OutputFile manifestListFile, - long snapshotId, Long parentSnapshotId) { - Preconditions.checkArgument(formatVersion == 1, "Sequence number is required for format v%s", formatVersion); - return new V1Writer(manifestListFile, snapshotId, parentSnapshotId); - } + private final FileAppender writer; - static ManifestListWriter write(int formatVersion, OutputFile manifestListFile, - long snapshotId, Long parentSnapshotId, long sequenceNumber) { - if (formatVersion == 1) { - Preconditions.checkArgument(sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER, - "Invalid sequence number for v1 manifest list: %s", sequenceNumber); - return new V1Writer(manifestListFile, snapshotId, parentSnapshotId); - } - throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion); + private ManifestListWriter(OutputFile file, Map meta) { + this.writer = newAppender(file, meta); } - private final FileAppender writer; + protected abstract ManifestFile prepare(ManifestFile manifest); - private ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) { - this.writer = newAppender(snapshotFile, ImmutableMap.of( - "snapshot-id", String.valueOf(snapshotId), - "parent-snapshot-id", String.valueOf(parentSnapshotId))); - } + protected abstract FileAppender newAppender(OutputFile file, Map meta); @Override - public void add(ManifestFile file) { - writer.add(file); + public void add(ManifestFile manifest) { + writer.add(prepare(manifest)); } @Override public void addAll(Iterator values) { - writer.addAll(values); + values.forEachRemaining(this::add); } @Override public void addAll(Iterable values) { - writer.addAll(values); + values.forEach(this::add); } @Override @@ -84,23 +69,66 @@ public long length() { return writer.length(); } - private static FileAppender newAppender(OutputFile file, Map meta) { - try { - return Avro.write(file) - .schema(ManifestFile.schema()) - .named("manifest_file") - .meta(meta) - .overwrite() - .build(); - - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file); + static class V2Writer extends ManifestListWriter { + private final V2Metadata.IndexedManifestFile wrapper; + + V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + super(snapshotFile, ImmutableMap.of( + "snapshot-id", String.valueOf(snapshotId), + "parent-snapshot-id", String.valueOf(parentSnapshotId), + "sequence-number", String.valueOf(sequenceNumber), + "format-version", "2")); + this.wrapper = new V2Metadata.IndexedManifestFile(sequenceNumber); + } + + @Override + protected ManifestFile prepare(ManifestFile manifest) { + return wrapper.wrap(manifest); + } + + @Override + protected FileAppender newAppender(OutputFile file, Map meta) { + try { + return Avro.write(file) + .schema(V2Metadata.MANIFEST_LIST_SCHEMA) + .named("manifest_file") + .meta(meta) + .overwrite() + .build(); + + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file); + } } } static class V1Writer extends ManifestListWriter { - private V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) { - super(snapshotFile, snapshotId, parentSnapshotId); + private final V1Metadata.IndexedManifestFile wrapper = new V1Metadata.IndexedManifestFile(); + + V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) { + super(snapshotFile, ImmutableMap.of( + "snapshot-id", String.valueOf(snapshotId), + "parent-snapshot-id", String.valueOf(parentSnapshotId), + "format-version", "1")); + } + + @Override + protected ManifestFile prepare(ManifestFile manifest) { + return wrapper.wrap(manifest); + } + + protected FileAppender newAppender(OutputFile file, Map meta) { + try { + return Avro.write(file) + .schema(V1Metadata.MANIFEST_LIST_SCHEMA) + .named("manifest_file") + .meta(meta) + .overwrite() + .build(); + + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file); + } } } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java new file mode 100644 index 000000000000..b1b5dc47453d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +class ManifestLists { + private ManifestLists() { + } + + static List read(InputFile manifestList) { + try (CloseableIterable files = Avro.read(manifestList) + .rename("manifest_file", GenericManifestFile.class.getName()) + .rename("partitions", GenericPartitionFieldSummary.class.getName()) + .rename("r508", GenericPartitionFieldSummary.class.getName()) + .classLoader(GenericManifestFile.class.getClassLoader()) + .project(ManifestFile.schema()) + .reuseContainers(false) + .build()) { + + return Lists.newLinkedList(files); + + } catch (IOException e) { + throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestList.location()); + } + } + + static ManifestListWriter write(int formatVersion, OutputFile manifestListFile, + long snapshotId, Long parentSnapshotId) { + Preconditions.checkArgument(formatVersion == 1, + "Sequence number is required when writing format v%s", formatVersion); + return new ManifestListWriter.V1Writer(manifestListFile, snapshotId, parentSnapshotId); + } + + static ManifestListWriter write(int formatVersion, OutputFile manifestListFile, + long snapshotId, Long parentSnapshotId, long sequenceNumber) { + switch (formatVersion) { + case 1: + Preconditions.checkArgument(sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER, + "Invalid sequence number for v1 manifest list: %s", sequenceNumber); + return new ManifestListWriter.V1Writer(manifestListFile, snapshotId, parentSnapshotId); + case 2: + return new ManifestListWriter.V2Writer(manifestListFile, snapshotId, parentSnapshotId, sequenceNumber); + } + throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index d1a9fa1c2acb..a6408ce12056 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -35,6 +35,7 @@ */ public abstract class ManifestWriter implements FileAppender { private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class); + static final long UNASSIGNED_SEQ = -1L; static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder) { @@ -206,7 +207,7 @@ public long length() { public ManifestFile toManifestFile() { Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); - return new GenericManifestFile(file.location(), writer.length(), specId, snapshotId, + return new GenericManifestFile(file.location(), writer.length(), specId, UNASSIGNED_SEQ, UNASSIGNED_SEQ, snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index fb9509eb7065..3bf8364194b0 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -150,7 +150,7 @@ public Snapshot apply() { if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) { OutputFile manifestList = manifestListPath(); - try (ManifestListWriter writer = ManifestListWriter.write( + try (ManifestListWriter writer = ManifestLists.write( ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) { // keep track of the manifest lists created @@ -379,8 +379,8 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife } return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(), - snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, - stats.summaries()); + manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId, addedFiles, addedRows, existingFiles, + existingRows, deletedFiles, deletedRows, stats.summaries()); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path()); diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java new file mode 100644 index 000000000000..f581b718d9ee --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.avro.AvroSchemaUtil; + +class V1Metadata { + private V1Metadata() { + } + + static final Schema MANIFEST_LIST_SCHEMA = new Schema( + ManifestFile.PATH, ManifestFile.LENGTH, ManifestFile.SPEC_ID, ManifestFile.SNAPSHOT_ID, + ManifestFile.ADDED_FILES_COUNT, ManifestFile.EXISTING_FILES_COUNT, ManifestFile.DELETED_FILES_COUNT, + ManifestFile.PARTITION_SUMMARIES, + ManifestFile.ADDED_ROWS_COUNT, ManifestFile.EXISTING_ROWS_COUNT, ManifestFile.DELETED_ROWS_COUNT); + + /** + * A wrapper class to write any ManifestFile implementation to Avro using the v1 schema. + * + * This is used to maintain compatibility with v1 by writing manifest list files with the old schema, instead of + * writing a sequence number into metadata files in v1 tables. + */ + static class IndexedManifestFile implements ManifestFile, IndexedRecord { + private static final org.apache.avro.Schema AVRO_SCHEMA = + AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file"); + + private ManifestFile wrapped = null; + + public ManifestFile wrap(ManifestFile file) { + this.wrapped = file; + return this; + } + + @Override + public org.apache.avro.Schema getSchema() { + return AVRO_SCHEMA; + } + + @Override + public void put(int i, Object v) { + throw new UnsupportedOperationException("Cannot read using IndexedManifestFile"); + } + + @Override + public Object get(int pos) { + switch (pos) { + case 0: + return path(); + case 1: + return length(); + case 2: + return partitionSpecId(); + case 3: + return snapshotId(); + case 4: + return addedFilesCount(); + case 5: + return existingFilesCount(); + case 6: + return deletedFilesCount(); + case 7: + return partitions(); + case 8: + return addedRowsCount(); + case 9: + return existingRowsCount(); + case 10: + return deletedRowsCount(); + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public String path() { + return wrapped.path(); + } + + @Override + public long length() { + return wrapped.length(); + } + + @Override + public int partitionSpecId() { + return wrapped.partitionSpecId(); + } + + @Override + public long sequenceNumber() { + return wrapped.sequenceNumber(); + } + + @Override + public long minSequenceNumber() { + return wrapped.minSequenceNumber(); + } + + @Override + public Long snapshotId() { + return wrapped.snapshotId(); + } + + @Override + public boolean hasAddedFiles() { + return wrapped.hasAddedFiles(); + } + + @Override + public Integer addedFilesCount() { + return wrapped.addedFilesCount(); + } + + @Override + public Long addedRowsCount() { + return wrapped.addedRowsCount(); + } + + @Override + public boolean hasExistingFiles() { + return wrapped.hasExistingFiles(); + } + + @Override + public Integer existingFilesCount() { + return wrapped.existingFilesCount(); + } + + @Override + public Long existingRowsCount() { + return wrapped.existingRowsCount(); + } + + @Override + public boolean hasDeletedFiles() { + return wrapped.hasDeletedFiles(); + } + + @Override + public Integer deletedFilesCount() { + return wrapped.deletedFilesCount(); + } + + @Override + public Long deletedRowsCount() { + return wrapped.deletedRowsCount(); + } + + @Override + public List partitions() { + return wrapped.partitions(); + } + + @Override + public ManifestFile copy() { + return wrapped.copy(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java new file mode 100644 index 000000000000..600506d47ef4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.required; + +class V2Metadata { + private V2Metadata() { + } + + // fields for v2 write schema for required metadata + static final Types.NestedField REQUIRED_SNAPSHOT_ID = + required(503, "added_snapshot_id", Types.LongType.get()); + static final Types.NestedField REQUIRED_ADDED_FILES_COUNT = + required(504, "added_data_files_count", Types.IntegerType.get()); + static final Types.NestedField REQUIRED_EXISTING_FILES_COUNT = + required(505, "existing_data_files_count", Types.IntegerType.get()); + static final Types.NestedField REQUIRED_DELETED_FILES_COUNT = + required(506, "deleted_data_files_count", Types.IntegerType.get()); + static final Types.NestedField REQUIRED_ADDED_ROWS_COUNT = + required(512, "added_rows_count", Types.LongType.get()); + static final Types.NestedField REQUIRED_EXISTING_ROWS_COUNT = + required(513, "existing_rows_count", Types.LongType.get()); + static final Types.NestedField REQUIRED_DELETED_ROWS_COUNT = + required(514, "deleted_rows_count", Types.LongType.get()); + static final Types.NestedField REQUIRED_SEQUENCE_NUMBER = + required(515, "sequence_number", Types.LongType.get()); + static final Types.NestedField REQUIRED_MIN_SEQUENCE_NUMBER = + required(516, "min_sequence_number", Types.LongType.get()); + + static final Schema MANIFEST_LIST_SCHEMA = new Schema( + ManifestFile.PATH, ManifestFile.LENGTH, ManifestFile.SPEC_ID, + REQUIRED_SEQUENCE_NUMBER, REQUIRED_MIN_SEQUENCE_NUMBER, REQUIRED_SNAPSHOT_ID, + REQUIRED_ADDED_FILES_COUNT, REQUIRED_EXISTING_FILES_COUNT, REQUIRED_DELETED_FILES_COUNT, + REQUIRED_ADDED_ROWS_COUNT, REQUIRED_EXISTING_ROWS_COUNT, REQUIRED_DELETED_ROWS_COUNT, + ManifestFile.PARTITION_SUMMARIES); + + + /** + * A wrapper class to write any ManifestFile implementation to Avro using the v2 write schema. + * + * This is used to maintain compatibility with v2 by writing manifest list files with the old schema, instead of + * writing a sequence number into metadata files in v2 tables. + */ + static class IndexedManifestFile implements ManifestFile, IndexedRecord { + private static final org.apache.avro.Schema AVRO_SCHEMA = + AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file"); + + private final long sequenceNumber; + private ManifestFile wrapped = null; + + IndexedManifestFile(long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public ManifestFile wrap(ManifestFile file) { + this.wrapped = file; + return this; + } + + @Override + public org.apache.avro.Schema getSchema() { + return AVRO_SCHEMA; + } + + @Override + public void put(int i, Object v) { + throw new UnsupportedOperationException("Cannot read using IndexedManifestFile"); + } + + @Override + public Object get(int pos) { + switch (pos) { + case 0: + return wrapped.path(); + case 1: + return wrapped.length(); + case 2: + return wrapped.partitionSpecId(); + case 3: + if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) { + return sequenceNumber; + } else { + return wrapped.sequenceNumber(); + } + case 4: + return wrapped.minSequenceNumber(); + case 5: + return wrapped.snapshotId(); + case 6: + return wrapped.addedFilesCount(); + case 7: + return wrapped.existingFilesCount(); + case 8: + return wrapped.deletedFilesCount(); + case 9: + return wrapped.addedRowsCount(); + case 10: + return wrapped.existingRowsCount(); + case 11: + return wrapped.deletedRowsCount(); + case 12: + return wrapped.partitions(); + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public String path() { + return wrapped.path(); + } + + @Override + public long length() { + return wrapped.length(); + } + + @Override + public int partitionSpecId() { + return wrapped.partitionSpecId(); + } + + @Override + public long sequenceNumber() { + return wrapped.sequenceNumber(); + } + + @Override + public long minSequenceNumber() { + return wrapped.minSequenceNumber(); + } + + @Override + public Long snapshotId() { + return wrapped.snapshotId(); + } + + @Override + public boolean hasAddedFiles() { + return wrapped.hasAddedFiles(); + } + + @Override + public Integer addedFilesCount() { + return wrapped.addedFilesCount(); + } + + @Override + public Long addedRowsCount() { + return wrapped.addedRowsCount(); + } + + @Override + public boolean hasExistingFiles() { + return wrapped.hasExistingFiles(); + } + + @Override + public Integer existingFilesCount() { + return wrapped.existingFilesCount(); + } + + @Override + public Long existingRowsCount() { + return wrapped.existingRowsCount(); + } + + @Override + public boolean hasDeletedFiles() { + return wrapped.hasDeletedFiles(); + } + + @Override + public Integer deletedFilesCount() { + return wrapped.deletedFilesCount(); + } + + @Override + public Long deletedRowsCount() { + return wrapped.deletedRowsCount(); + } + + @Override + public List partitions() { + return wrapped.partitions(); + } + + @Override + public ManifestFile copy() { + return wrapped.copy(); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestGenericManifestFile.java b/core/src/test/java/org/apache/iceberg/TestGenericManifestFile.java deleted file mode 100644 index eaceedf83d1c..000000000000 --- a/core/src/test/java/org/apache/iceberg/TestGenericManifestFile.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestGenericManifestFile { - - private static final FileIO FILE_IO = new TestTables.LocalFileIO(); - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - @Test - public void testManifestsWithoutRowStats() throws IOException { - File manifestListFile = temp.newFile("manifest-list.avro"); - Assert.assertTrue(manifestListFile.delete()); - - Collection columnNamesWithoutRowStats = ImmutableList.of( - "manifest_path", "manifest_length", "partition_spec_id", "added_snapshot_id", - "added_data_files_count", "existing_data_files_count", "deleted_data_files_count", - "partitions"); - Schema schemaWithoutRowStats = ManifestFile.schema().select(columnNamesWithoutRowStats); - - OutputFile outputFile = FILE_IO.newOutputFile(manifestListFile.getCanonicalPath()); - try (FileAppender appender = Avro.write(outputFile) - .schema(schemaWithoutRowStats) - .named("manifest_file") - .overwrite() - .build()) { - - appender.add(new GenericManifestFile("path/to/manifest.avro", 1024, 1, 100L, 2, 3, 4, ImmutableList.of())); - } - - InputFile inputFile = FILE_IO.newInputFile(manifestListFile.getCanonicalPath()); - try (CloseableIterable files = Avro.read(inputFile) - .rename("manifest_file", GenericManifestFile.class.getName()) - .rename("partitions", GenericPartitionFieldSummary.class.getName()) - .rename("r508", GenericPartitionFieldSummary.class.getName()) - .classLoader(GenericManifestFile.class.getClassLoader()) - .project(ManifestFile.schema()) - .reuseContainers(false) - .build()) { - - ManifestFile manifest = Iterables.getOnlyElement(files); - - Assert.assertTrue("Added files should be present", manifest.hasAddedFiles()); - Assert.assertEquals("Added files count should match", 2, (int) manifest.addedFilesCount()); - Assert.assertNull("Added rows count should be null", manifest.addedRowsCount()); - - Assert.assertTrue("Existing files should be present", manifest.hasExistingFiles()); - Assert.assertEquals("Existing files count should match", 3, (int) manifest.existingFilesCount()); - Assert.assertNull("Existing rows count should be null", manifest.existingRowsCount()); - - Assert.assertTrue("Deleted files should be present", manifest.hasDeletedFiles()); - Assert.assertEquals("Deleted files count should match", 4, (int) manifest.deletedFilesCount()); - Assert.assertNull("Deleted rows count should be null", manifest.deletedRowsCount()); - } - } -} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestFileVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestFileVersions.java new file mode 100644 index 000000000000..0155cca7f777 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestFileVersions.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestManifestFileVersions { + private static final String PATH = "s3://bucket/table/m1.avro"; + private static final long LENGTH = 1024L; + private static final int SPEC_ID = 1; + private static final long SEQ_NUM = 34L; + private static final long MIN_SEQ_NUM = 10L; + private static final long SNAPSHOT_ID = 987134631982734L; + private static final int ADDED_FILES = 2; + private static final long ADDED_ROWS = 5292L; + private static final int EXISTING_FILES = 343; + private static final long EXISTING_ROWS = 857273L; + private static final int DELETED_FILES = 1; + private static final long DELETED_ROWS = 22910L; + private static final List PARTITION_SUMMARIES = ImmutableList.of(); + + private static final ManifestFile TEST_MANIFEST = new GenericManifestFile( + PATH, LENGTH, SPEC_ID, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, + ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, + PARTITION_SUMMARIES); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testV1Write() throws IOException { + ManifestFile manifest = writeAndReadManifestList(1); + + // v2 fields are not written and are defaulted + Assert.assertEquals("Should not contain sequence number, default to 0", 0, manifest.sequenceNumber()); + Assert.assertEquals("Should not contain min sequence number, default to 0", 0, manifest.minSequenceNumber()); + + // v1 fields are read correctly, even though order changed + Assert.assertEquals("Path", PATH, manifest.path()); + Assert.assertEquals("Length", LENGTH, manifest.length()); + Assert.assertEquals("Spec id", SPEC_ID, manifest.partitionSpecId()); + Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) manifest.snapshotId()); + Assert.assertEquals("Added files count", ADDED_FILES, (int) manifest.addedFilesCount()); + Assert.assertEquals("Existing files count", EXISTING_FILES, (int) manifest.existingFilesCount()); + Assert.assertEquals("Deleted files count", DELETED_FILES, (int) manifest.deletedFilesCount()); + Assert.assertEquals("Added rows count", ADDED_ROWS, (long) manifest.addedRowsCount()); + Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) manifest.existingRowsCount()); + Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) manifest.deletedRowsCount()); + } + + @Test + public void testV2Write() throws IOException { + ManifestFile manifest = writeAndReadManifestList(2); + + // all v2 fields should be read correctly + Assert.assertEquals("Path", PATH, manifest.path()); + Assert.assertEquals("Length", LENGTH, manifest.length()); + Assert.assertEquals("Spec id", SPEC_ID, manifest.partitionSpecId()); + Assert.assertEquals("Sequence number", SEQ_NUM, manifest.sequenceNumber()); + Assert.assertEquals("Min sequence number", MIN_SEQ_NUM, manifest.minSequenceNumber()); + Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) manifest.snapshotId()); + Assert.assertEquals("Added files count", ADDED_FILES, (int) manifest.addedFilesCount()); + Assert.assertEquals("Added rows count", ADDED_ROWS, (long) manifest.addedRowsCount()); + Assert.assertEquals("Existing files count", EXISTING_FILES, (int) manifest.existingFilesCount()); + Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) manifest.existingRowsCount()); + Assert.assertEquals("Deleted files count", DELETED_FILES, (int) manifest.deletedFilesCount()); + Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) manifest.deletedRowsCount()); + } + + @Test + public void testV1ForwardCompatibility() throws IOException { + InputFile manifestList = writeManifestList(1); + GenericData.Record generic = readGeneric(manifestList, V1Metadata.MANIFEST_LIST_SCHEMA); + + // v1 metadata should match even though order changed + Assert.assertEquals("Path", PATH, generic.get("manifest_path").toString()); + Assert.assertEquals("Length", LENGTH, generic.get("manifest_length")); + Assert.assertEquals("Spec id", SPEC_ID, generic.get("partition_spec_id")); + Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) generic.get("added_snapshot_id")); + Assert.assertEquals("Added files count", ADDED_FILES, (int) generic.get("added_data_files_count")); + Assert.assertEquals("Existing files count", EXISTING_FILES, (int) generic.get("existing_data_files_count")); + Assert.assertEquals("Deleted files count", DELETED_FILES, (int) generic.get("deleted_data_files_count")); + Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count")); + Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count")); + Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count")); + } + + @Test + public void testV2ForwardCompatibility() throws IOException { + // v2 manifest list files can be read by v1 readers, but the sequence numbers will be ignored. + InputFile manifestList = writeManifestList(2); + GenericData.Record generic = readGeneric(manifestList, V1Metadata.MANIFEST_LIST_SCHEMA); + + // v1 metadata should match even though order changed + Assert.assertEquals("Path", PATH, generic.get("manifest_path").toString()); + Assert.assertEquals("Length", LENGTH, generic.get("manifest_length")); + Assert.assertEquals("Spec id", SPEC_ID, generic.get("partition_spec_id")); + Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) generic.get("added_snapshot_id")); + Assert.assertEquals("Added files count", ADDED_FILES, (int) generic.get("added_data_files_count")); + Assert.assertEquals("Existing files count", EXISTING_FILES, (int) generic.get("existing_data_files_count")); + Assert.assertEquals("Deleted files count", DELETED_FILES, (int) generic.get("deleted_data_files_count")); + Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count")); + Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count")); + Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count")); + } + + @Test + public void testManifestsWithoutRowStats() throws IOException { + File manifestListFile = temp.newFile("manifest-list.avro"); + Assert.assertTrue(manifestListFile.delete()); + + Collection columnNamesWithoutRowStats = ImmutableList.of( + "manifest_path", "manifest_length", "partition_spec_id", "added_snapshot_id", + "added_data_files_count", "existing_data_files_count", "deleted_data_files_count", + "partitions"); + Schema schemaWithoutRowStats = V1Metadata.MANIFEST_LIST_SCHEMA.select(columnNamesWithoutRowStats); + + OutputFile outputFile = Files.localOutput(manifestListFile); + try (FileAppender appender = Avro.write(outputFile) + .schema(schemaWithoutRowStats) + .named("manifest_file") + .overwrite() + .build()) { + + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schemaWithoutRowStats, "manifest_file"); + GenericData.Record withoutRowStats = new GenericRecordBuilder(avroSchema) + .set("manifest_path", "path/to/manifest.avro") + .set("manifest_length", 1024L) + .set("partition_spec_id", 1) + .set("added_snapshot_id", 100L) + .set("added_data_files_count", 2) + .set("existing_data_files_count", 3) + .set("deleted_data_files_count", 4) + .set("partitions", null) + .build(); + appender.add(withoutRowStats); + } + + List files = ManifestLists.read(outputFile.toInputFile()); + ManifestFile manifest = Iterables.getOnlyElement(files); + + Assert.assertTrue("Added files should be present", manifest.hasAddedFiles()); + Assert.assertEquals("Added files count should match", 2, (int) manifest.addedFilesCount()); + Assert.assertNull("Added rows count should be null", manifest.addedRowsCount()); + + Assert.assertTrue("Existing files should be present", manifest.hasExistingFiles()); + Assert.assertEquals("Existing files count should match", 3, (int) manifest.existingFilesCount()); + Assert.assertNull("Existing rows count should be null", manifest.existingRowsCount()); + + Assert.assertTrue("Deleted files should be present", manifest.hasDeletedFiles()); + Assert.assertEquals("Deleted files count should match", 4, (int) manifest.deletedFilesCount()); + Assert.assertNull("Deleted rows count should be null", manifest.deletedRowsCount()); + } + + private InputFile writeManifestList(int formatVersion) throws IOException { + OutputFile manifestList = Files.localOutput(temp.newFile()); + try (FileAppender writer = ManifestLists.write( + formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, formatVersion > 1 ? SEQ_NUM : 0)) { + writer.add(TEST_MANIFEST); + } + return manifestList.toInputFile(); + } + + private GenericData.Record readGeneric(InputFile manifestList, Schema schema) throws IOException { + try (CloseableIterable files = Avro.read(manifestList) + .project(schema) + .reuseContainers(false) + .build()) { + List records = Lists.newLinkedList(files); + Assert.assertEquals("Should contain one manifest", 1, records.size()); + return records.get(0); + } + } + + private ManifestFile writeAndReadManifestList(int formatVersion) throws IOException { + List manifests = ManifestLists.read(writeManifestList(formatVersion)); + Assert.assertEquals("Should contain one manifest", 1, manifests.size()); + return manifests.get(0); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index 67c1c04fe089..5e57296f4b10 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -97,7 +97,7 @@ public void testJsonConversionWithManifestList() throws IOException { Assert.assertTrue(manifestList.delete()); manifestList.deleteOnExit(); - try (ManifestListWriter writer = ManifestListWriter.write(1, Files.localOutput(manifestList), id, parentId)) { + try (ManifestListWriter writer = ManifestLists.write(1, Files.localOutput(manifestList), id, parentId)) { writer.addAll(manifests); }