From 1119159033cea2e6e79691a5f1389c327d926e6f Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 10 Apr 2020 14:25:25 -0700 Subject: [PATCH 01/11] Extract ManifestEntry interface and snapshot v1 manifest entries. --- .../org/apache/iceberg/IndexedStructLike.java | 69 +++++ .../iceberg/InheritableMetadataFactory.java | 22 +- .../org/apache/iceberg/ManifestEntry.java | 253 ++++-------------- .../org/apache/iceberg/ManifestReader.java | 4 +- .../org/apache/iceberg/ManifestWriter.java | 59 ++-- .../java/org/apache/iceberg/V1Metadata.java | 222 +++++++++++++++ .../org/apache/iceberg/TableTestBase.java | 2 +- 7 files changed, 392 insertions(+), 239 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/IndexedStructLike.java diff --git a/core/src/main/java/org/apache/iceberg/IndexedStructLike.java b/core/src/main/java/org/apache/iceberg/IndexedStructLike.java new file mode 100644 index 000000000000..cf5d0e6a3e3b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/IndexedStructLike.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.avro.generic.IndexedRecord; + +/** + * IndexedRecord implementation to wrap a StructLike for writing to Avro. + */ +class IndexedStructLike implements StructLike, IndexedRecord { + private final org.apache.avro.Schema avroSchema; + private StructLike wrapped = null; + + IndexedStructLike(org.apache.avro.Schema avroSchema) { + this.avroSchema = avroSchema; + } + + IndexedStructLike wrap(StructLike struct) { + this.wrapped = struct; + return this; + } + + @Override + public int size() { + return wrapped.size(); + } + + @Override + public T get(int pos, Class javaClass) { + return wrapped.get(pos, javaClass); + } + + @Override + public Object get(int pos) { + return get(pos, Object.class); + } + + @Override + public void set(int pos, T value) { + wrapped.set(pos, value); + } + + @Override + public void put(int pos, Object value) { + set(pos, value); + } + + @Override + public org.apache.avro.Schema getSchema() { + return avroSchema; + } +} diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java index 384ea64baa4d..d05c20e8ab8d 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java @@ -22,6 +22,7 @@ class InheritableMetadataFactory { private static final InheritableMetadata EMPTY = new EmptyInheritableMetadata(); + private static final InheritableMetadata NOOP = new NullInheritableMetadata(); private InheritableMetadataFactory() {} @@ -30,14 +31,17 @@ static InheritableMetadata empty() { } static InheritableMetadata fromManifest(ManifestFile manifest) { - return new BaseInheritableMetadata(manifest.snapshotId()); + if (manifest.snapshotId() != null) { + return new BaseInheritableMetadata(manifest.snapshotId()); + } else { + return NOOP; + } } static class BaseInheritableMetadata implements InheritableMetadata { + private final long snapshotId; - private final Long snapshotId; - - private BaseInheritableMetadata(Long snapshotId) { + private BaseInheritableMetadata(long snapshotId) { this.snapshotId = snapshotId; } @@ -50,6 +54,16 @@ public ManifestEntry apply(ManifestEntry manifestEntry) { } } + static class NullInheritableMetadata implements InheritableMetadata { + private NullInheritableMetadata() { + } + + @Override + public ManifestEntry apply(ManifestEntry manifestEntry) { + return manifestEntry; + } + } + static class EmptyInheritableMetadata implements InheritableMetadata { private EmptyInheritableMetadata() {} diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java index d0f85d6f5fe5..b5655b5217a9 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -20,21 +20,17 @@ package org.apache.iceberg; import com.google.common.base.MoreObjects; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; -import java.util.Map; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; +import org.apache.iceberg.V1Metadata.IndexedDataFile; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.types.Types.IntegerType; -import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; -class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable { +interface ManifestEntry { enum Status { EXISTING(0), ADDED(1), @@ -51,23 +47,59 @@ public int id() { } } + // ids for data-file columns are assigned from 1000 + Types.NestedField STATUS = required(0, "status", Types.IntegerType.get()); + Types.NestedField SNAPSHOT_ID = optional(1, "snapshot_id", Types.LongType.get()); + int DATA_FILE_ID = 2; + + static Schema getSchema(StructType partitionType) { + return wrapFileSchema(DataFile.getType(partitionType)); + } + + static Schema wrapFileSchema(StructType fileType) { + return new Schema(STATUS, SNAPSHOT_ID, required(DATA_FILE_ID, "data_file", fileType)); + } + + /** + * @return the status of the file, whether EXISTING, ADDED, or DELETED + */ + Status status(); + + /** + * @return id of the snapshot in which the file was added to the table + */ + Long snapshotId(); + + void setSnapshotId(long snapshotId); + + /** + * @return a file + */ + DataFile file(); + + ManifestEntry copy(); + + ManifestEntry copyWithoutStats(); +} + +class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable { private final org.apache.avro.Schema schema; private final IndexedDataFile fileWrapper; private Status status = Status.EXISTING; private Long snapshotId = null; private DataFile file = null; - ManifestEntry(org.apache.avro.Schema schema) { + GenericManifestEntry(org.apache.avro.Schema schema) { this.schema = schema; this.fileWrapper = null; // do not use the file wrapper to read } - ManifestEntry(StructType partitionType) { - this.schema = AvroSchemaUtil.convert(getSchema(partitionType), "manifest_entry"); + GenericManifestEntry(StructType partitionType) { + this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry"); this.fileWrapper = new IndexedDataFile(schema.getField("data_file").schema()); } - private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) { + private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { this.schema = toCopy.schema; this.fileWrapper = new IndexedDataFile(schema.getField("data_file").schema()); this.status = toCopy.status; @@ -122,14 +154,15 @@ public DataFile file() { } public ManifestEntry copy() { - return new ManifestEntry(this, true /* full copy */); + return new GenericManifestEntry(this, true /* full copy */); } public ManifestEntry copyWithoutStats() { - return new ManifestEntry(this, false /* drop stats */); + return new GenericManifestEntry(this, false /* drop stats */); } - public void setSnapshotId(Long snapshotId) { + @Override + public void setSnapshotId(long snapshotId) { this.snapshotId = snapshotId; } @@ -173,23 +206,6 @@ public org.apache.avro.Schema getSchema() { return schema; } - static Schema getSchema(StructType partitionType) { - return wrapFileSchema(DataFile.getType(partitionType)); - } - - static Schema projectSchema(StructType partitionType, Collection columns) { - return wrapFileSchema( - new Schema(DataFile.getType(partitionType).fields()).select(columns).asStruct()); - } - - static Schema wrapFileSchema(StructType fileStruct) { - // ids for top-level columns are assigned from 1000 - return new Schema( - required(0, "status", IntegerType.get()), - optional(1, "snapshot_id", LongType.get()), - required(2, "data_file", fileStruct)); - } - @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -198,179 +214,4 @@ public String toString() { .add("file", file) .toString(); } - - private static class IndexedStructLike implements StructLike, IndexedRecord { - private final org.apache.avro.Schema avroSchema; - private StructLike wrapped = null; - - IndexedStructLike(org.apache.avro.Schema avroSchema) { - this.avroSchema = avroSchema; - } - - public IndexedStructLike wrap(StructLike struct) { - this.wrapped = struct; - return this; - } - - @Override - public int size() { - return wrapped.size(); - } - - @Override - public T get(int pos, Class javaClass) { - return wrapped.get(pos, javaClass); - } - - @Override - public Object get(int pos) { - return get(pos, Object.class); - } - - @Override - public void set(int pos, T value) { - wrapped.set(pos, value); - } - - @Override - public void put(int pos, Object value) { - set(pos, value); - } - - @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; - } - } - - private static class IndexedDataFile implements DataFile, IndexedRecord { - private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; - - private final org.apache.avro.Schema avroSchema; - private final IndexedStructLike partitionWrapper; - private DataFile wrapped = null; - - IndexedDataFile(org.apache.avro.Schema avroSchema) { - this.avroSchema = avroSchema; - this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema()); - } - - public IndexedDataFile wrap(DataFile file) { - this.wrapped = file; - return this; - } - - @Override - public Object get(int pos) { - switch (pos) { - case 0: - return wrapped.path().toString(); - case 1: - return wrapped.format() != null ? wrapped.format().toString() : null; - case 2: - return partitionWrapper.wrap(wrapped.partition()); - case 3: - return wrapped.recordCount(); - case 4: - return wrapped.fileSizeInBytes(); - case 5: - return DEFAULT_BLOCK_SIZE; - case 6: - return wrapped.columnSizes(); - case 7: - return wrapped.valueCounts(); - case 8: - return wrapped.nullValueCounts(); - case 9: - return wrapped.lowerBounds(); - case 10: - return wrapped.upperBounds(); - case 11: - return wrapped.keyMetadata(); - case 12: - return wrapped.splitOffsets(); - } - throw new IllegalArgumentException("Unknown field ordinal: " + pos); - } - - @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); - } - - @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; - } - - @Override - public CharSequence path() { - return wrapped.path(); - } - - @Override - public FileFormat format() { - return wrapped.format(); - } - - @Override - public StructLike partition() { - return wrapped.partition(); - } - - @Override - public long recordCount() { - return wrapped.recordCount(); - } - - @Override - public long fileSizeInBytes() { - return wrapped.fileSizeInBytes(); - } - - @Override - public Map columnSizes() { - return wrapped.columnSizes(); - } - - @Override - public Map valueCounts() { - return wrapped.valueCounts(); - } - - @Override - public Map nullValueCounts() { - return wrapped.nullValueCounts(); - } - - @Override - public Map lowerBounds() { - return wrapped.lowerBounds(); - } - - @Override - public Map upperBounds() { - return wrapped.upperBounds(); - } - - @Override - public ByteBuffer keyMetadata() { - return wrapped.keyMetadata(); - } - - @Override - public List splitOffsets() { - return wrapped.splitOffsets(); - } - - @Override - public DataFile copy() { - return wrapped.copy(); - } - - @Override - public DataFile copyWithoutStats() { - return wrapped.copyWithoutStats(); - } - } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 367c2c38ca4b..8155f05bf1ef 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -220,12 +220,12 @@ CloseableIterable entries(Schema fileProjection) { case AVRO: AvroIterable reader = Avro.read(file) .project(ManifestEntry.wrapFileSchema(fileProjection.asStruct())) - .rename("manifest_entry", ManifestEntry.class.getName()) + .rename("manifest_entry", GenericManifestEntry.class.getName()) .rename("partition", PartitionData.class.getName()) .rename("r102", PartitionData.class.getName()) .rename("data_file", GenericDataFile.class.getName()) .rename("r2", GenericDataFile.class.getName()) - .classLoader(ManifestEntry.class.getClassLoader()) + .classLoader(GenericManifestFile.class.getClassLoader()) .reuseContainers() .build(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index e6df3ae257b9..8bd375599566 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -55,7 +55,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private final int specId; private final FileAppender writer; private final Long snapshotId; - private final ManifestEntry reused; + private final GenericManifestEntry reused; private final PartitionSummary stats; private boolean closed = false; @@ -69,12 +69,16 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.file = file; this.specId = spec.specId(); - this.writer = newAppender(FileFormat.AVRO, spec, file); + this.writer = newAppender(spec, file); this.snapshotId = snapshotId; - this.reused = new ManifestEntry(spec.partitionType()); + this.reused = new GenericManifestEntry(spec.partitionType()); this.stats = new PartitionSummary(spec); } + protected abstract ManifestEntry prepare(ManifestEntry entry); + + protected abstract FileAppender newAppender(PartitionSpec spec, OutputFile file); + void addEntry(ManifestEntry entry) { switch (entry.status()) { case ADDED: @@ -91,7 +95,7 @@ void addEntry(ManifestEntry entry) { break; } stats.update(entry.file().partition()); - writer.add(entry); + writer.add(prepare(entry)); } /** @@ -163,31 +167,34 @@ public void close() throws IOException { writer.close(); } - private static FileAppender newAppender(FileFormat format, PartitionSpec spec, - OutputFile file) { - Schema manifestSchema = ManifestEntry.getSchema(spec.partitionType()); - try { - switch (format) { - case AVRO: - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .overwrite() - .build(); - default: - throw new IllegalArgumentException("Unsupported format: " + format); - } - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file); - } - } - static class V1Writer extends ManifestWriter { + V1Metadata.IndexedManifestEntry entryWrapper; + V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { super(spec, file, snapshotId); + this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); + } + + @Override + protected ManifestEntry prepare(ManifestEntry entry) { + return entryWrapper.wrap(entry); + } + + @Override + protected FileAppender newAppender(PartitionSpec spec, OutputFile file) { + Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); + try { + return Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .overwrite() + .build(); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file); + } } } } diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index f581b718d9ee..8b3341ad8a09 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -19,9 +19,14 @@ package org.apache.iceberg; +import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import org.apache.avro.generic.IndexedRecord; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.required; class V1Metadata { private V1Metadata() { @@ -175,4 +180,221 @@ public ManifestFile copy() { return wrapped.copy(); } } + + static Schema entrySchema(Types.StructType partitionType) { + return wrapFileSchema(DataFile.getType(partitionType)); + } + + static Schema wrapFileSchema(Types.StructType fileSchema) { + // this is used to build projection schemas + return new Schema( + ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, + required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema)); + } + + static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { + private final org.apache.avro.Schema avroSchema; + private final IndexedDataFile fileWrapper; + private ManifestEntry wrapped = null; + + IndexedManifestEntry(Types.StructType partitionType) { + this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); + this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema()); + } + + public IndexedManifestEntry wrap(ManifestEntry entry) { + this.wrapped = entry; + return this; + } + + @Override + public org.apache.avro.Schema getSchema() { + return avroSchema; + } + + @Override + public void put(int i, Object v) { + throw new UnsupportedOperationException("Cannot read using IndexedManifestEntry"); + } + + @Override + public Object get(int i) { + switch (i) { + case 0: + return wrapped.status().id(); + case 1: + return wrapped.snapshotId(); + case 2: + DataFile file = wrapped.file(); + if (file == null || file instanceof GenericDataFile) { + return file; + } else { + return fileWrapper.wrap(file); + } + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public Status status() { + return wrapped.status(); + } + + @Override + public Long snapshotId() { + return wrapped.snapshotId(); + } + + @Override + public void setSnapshotId(long snapshotId) { + wrapped.setSnapshotId(snapshotId); + } + + @Override + public DataFile file() { + return wrapped.file(); + } + + @Override + public ManifestEntry copy() { + return wrapped.copy(); + } + + @Override + public ManifestEntry copyWithoutStats() { + return wrapped.copyWithoutStats(); + } + } + + static class IndexedDataFile implements DataFile, IndexedRecord { + private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; + + private final org.apache.avro.Schema avroSchema; + private final IndexedStructLike partitionWrapper; + private DataFile wrapped = null; + + IndexedDataFile(org.apache.avro.Schema avroSchema) { + this.avroSchema = avroSchema; + this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema()); + } + + IndexedDataFile wrap(DataFile file) { + this.wrapped = file; + return this; + } + + @Override + public Object get(int pos) { + switch (pos) { + case 0: + return wrapped.path().toString(); + case 1: + return wrapped.format() != null ? wrapped.format().toString() : null; + case 2: + return partitionWrapper.wrap(wrapped.partition()); + case 3: + return wrapped.recordCount(); + case 4: + return wrapped.fileSizeInBytes(); + case 5: + return DEFAULT_BLOCK_SIZE; + case 6: + return wrapped.columnSizes(); + case 7: + return wrapped.valueCounts(); + case 8: + return wrapped.nullValueCounts(); + case 9: + return wrapped.lowerBounds(); + case 10: + return wrapped.upperBounds(); + case 11: + return wrapped.keyMetadata(); + case 12: + return wrapped.splitOffsets(); + } + throw new IllegalArgumentException("Unknown field ordinal: " + pos); + } + + @Override + public void put(int i, Object v) { + throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); + } + + @Override + public org.apache.avro.Schema getSchema() { + return avroSchema; + } + + @Override + public CharSequence path() { + return wrapped.path(); + } + + @Override + public FileFormat format() { + return wrapped.format(); + } + + @Override + public StructLike partition() { + return wrapped.partition(); + } + + @Override + public long recordCount() { + return wrapped.recordCount(); + } + + @Override + public long fileSizeInBytes() { + return wrapped.fileSizeInBytes(); + } + + @Override + public Map columnSizes() { + return wrapped.columnSizes(); + } + + @Override + public Map valueCounts() { + return wrapped.valueCounts(); + } + + @Override + public Map nullValueCounts() { + return wrapped.nullValueCounts(); + } + + @Override + public Map lowerBounds() { + return wrapped.lowerBounds(); + } + + @Override + public Map upperBounds() { + return wrapped.upperBounds(); + } + + @Override + public ByteBuffer keyMetadata() { + return wrapped.keyMetadata(); + } + + @Override + public List splitOffsets() { + return wrapped.splitOffsets(); + } + + @Override + public DataFile copy() { + return wrapped.copy(); + } + + @Override + public DataFile copyWithoutStats() { + return wrapped.copyWithoutStats(); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 4398c2420b28..6c9f3dbaa112 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -177,7 +177,7 @@ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOExce } ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) { - ManifestEntry entry = new ManifestEntry(table.spec().partitionType()); + GenericManifestEntry entry = new GenericManifestEntry(table.spec().partitionType()); switch (status) { case ADDED: return entry.wrapAppend(snapshotId, file); From ed23f65c7cc4adc582b2712beae71b6144bbfc20 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 10 Apr 2020 17:01:50 -0700 Subject: [PATCH 02/11] Move GenericManifestEntry to a separate file. --- .../apache/iceberg/GenericManifestEntry.java | 160 ++++++++++++++++++ .../org/apache/iceberg/ManifestEntry.java | 134 --------------- 2 files changed, 160 insertions(+), 134 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/GenericManifestEntry.java diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java new file mode 100644 index 000000000000..d1728bda1bd6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.MoreObjects; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types; + +class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable { + private final org.apache.avro.Schema schema; + private final V1Metadata.IndexedDataFile fileWrapper; + private Status status = Status.EXISTING; + private Long snapshotId = null; + private DataFile file = null; + + GenericManifestEntry(org.apache.avro.Schema schema) { + this.schema = schema; + this.fileWrapper = null; // do not use the file wrapper to read + } + + GenericManifestEntry(Types.StructType partitionType) { + this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry"); + this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); + } + + private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { + this.schema = toCopy.schema; + this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); + this.status = toCopy.status; + this.snapshotId = toCopy.snapshotId; + if (fullCopy) { + this.file = toCopy.file().copy(); + } else { + this.file = toCopy.file().copyWithoutStats(); + } + } + + ManifestEntry wrapExisting(Long newSnapshotId, DataFile newFile) { + this.status = Status.EXISTING; + this.snapshotId = newSnapshotId; + this.file = newFile; + return this; + } + + ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { + this.status = Status.ADDED; + this.snapshotId = newSnapshotId; + this.file = newFile; + return this; + } + + ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { + this.status = Status.DELETED; + this.snapshotId = newSnapshotId; + this.file = newFile; + return this; + } + + /** + * @return the status of the file, whether EXISTING, ADDED, or DELETED + */ + public Status status() { + return status; + } + + /** + * @return id of the snapshot in which the file was added to the table + */ + public Long snapshotId() { + return snapshotId; + } + + /** + * @return a file + */ + public DataFile file() { + return file; + } + + public ManifestEntry copy() { + return new GenericManifestEntry(this, true /* full copy */); + } + + public ManifestEntry copyWithoutStats() { + return new GenericManifestEntry(this, false /* drop stats */); + } + + @Override + public void setSnapshotId(long snapshotId) { + this.snapshotId = snapshotId; + } + + @Override + public void put(int i, Object v) { + switch (i) { + case 0: + this.status = Status.values()[(Integer) v]; + return; + case 1: + this.snapshotId = (Long) v; + return; + case 2: + this.file = (DataFile) v; + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (i) { + case 0: + return status.id(); + case 1: + return snapshotId; + case 2: + if (fileWrapper == null || file instanceof GenericDataFile) { + return file; + } else { + return fileWrapper.wrap(file); + } + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public org.apache.avro.Schema getSchema() { + return schema; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("status", status) + .add("snapshot_id", snapshotId) + .add("file", file) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java index b5655b5217a9..f60798d9627c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -81,137 +81,3 @@ static Schema wrapFileSchema(StructType fileType) { ManifestEntry copyWithoutStats(); } - -class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable { - private final org.apache.avro.Schema schema; - private final IndexedDataFile fileWrapper; - private Status status = Status.EXISTING; - private Long snapshotId = null; - private DataFile file = null; - - GenericManifestEntry(org.apache.avro.Schema schema) { - this.schema = schema; - this.fileWrapper = null; // do not use the file wrapper to read - } - - GenericManifestEntry(StructType partitionType) { - this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry"); - this.fileWrapper = new IndexedDataFile(schema.getField("data_file").schema()); - } - - private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { - this.schema = toCopy.schema; - this.fileWrapper = new IndexedDataFile(schema.getField("data_file").schema()); - this.status = toCopy.status; - this.snapshotId = toCopy.snapshotId; - if (fullCopy) { - this.file = toCopy.file().copy(); - } else { - this.file = toCopy.file().copyWithoutStats(); - } - } - - ManifestEntry wrapExisting(Long newSnapshotId, DataFile newFile) { - this.status = Status.EXISTING; - this.snapshotId = newSnapshotId; - this.file = newFile; - return this; - } - - ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { - this.status = Status.ADDED; - this.snapshotId = newSnapshotId; - this.file = newFile; - return this; - } - - ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { - this.status = Status.DELETED; - this.snapshotId = newSnapshotId; - this.file = newFile; - return this; - } - - /** - * @return the status of the file, whether EXISTING, ADDED, or DELETED - */ - public Status status() { - return status; - } - - /** - * @return id of the snapshot in which the file was added to the table - */ - public Long snapshotId() { - return snapshotId; - } - - /** - * @return a file - */ - public DataFile file() { - return file; - } - - public ManifestEntry copy() { - return new GenericManifestEntry(this, true /* full copy */); - } - - public ManifestEntry copyWithoutStats() { - return new GenericManifestEntry(this, false /* drop stats */); - } - - @Override - public void setSnapshotId(long snapshotId) { - this.snapshotId = snapshotId; - } - - @Override - public void put(int i, Object v) { - switch (i) { - case 0: - this.status = Status.values()[(Integer) v]; - return; - case 1: - this.snapshotId = (Long) v; - return; - case 2: - this.file = (DataFile) v; - return; - default: - // ignore the object, it must be from a newer version of the format - } - } - - @Override - public Object get(int i) { - switch (i) { - case 0: - return status.id(); - case 1: - return snapshotId; - case 2: - if (fileWrapper == null || file instanceof GenericDataFile) { - return file; - } else { - return fileWrapper.wrap(file); - } - default: - throw new UnsupportedOperationException("Unknown field ordinal: " + i); - } - } - - @Override - public org.apache.avro.Schema getSchema() { - return schema; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("status", status) - .add("snapshot_id", snapshotId) - .add("file", file) - .toString(); - } -} From 2e53e316970e9bb6263e9612d2179e924bed94d5 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 10 Apr 2020 17:40:25 -0700 Subject: [PATCH 03/11] Add v2 manifest writer and ManifestEntry sequence number. --- .../apache/iceberg/GenericManifestEntry.java | 25 +++- .../iceberg/InheritableMetadataFactory.java | 9 +- .../org/apache/iceberg/ManifestEntry.java | 26 ++++- .../org/apache/iceberg/ManifestFiles.java | 2 + .../apache/iceberg/ManifestListWriter.java | 2 +- .../org/apache/iceberg/ManifestWriter.java | 47 +++++++- .../java/org/apache/iceberg/V1Metadata.java | 10 ++ .../java/org/apache/iceberg/V2Metadata.java | 107 +++++++++++++++++- .../org/apache/iceberg/TableTestBase.java | 2 +- 9 files changed, 211 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java index d1728bda1bd6..d1b1cc2a3060 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java @@ -30,6 +30,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData private final V1Metadata.IndexedDataFile fileWrapper; private Status status = Status.EXISTING; private Long snapshotId = null; + private Long sequenceNumber = null; private DataFile file = null; GenericManifestEntry(org.apache.avro.Schema schema) { @@ -54,9 +55,10 @@ private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { } } - ManifestEntry wrapExisting(Long newSnapshotId, DataFile newFile) { + ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { this.status = Status.EXISTING; this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; this.file = newFile; return this; } @@ -64,6 +66,7 @@ ManifestEntry wrapExisting(Long newSnapshotId, DataFile newFile) { ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { this.status = Status.ADDED; this.snapshotId = newSnapshotId; + this.sequenceNumber = null; this.file = newFile; return this; } @@ -71,6 +74,7 @@ ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { this.status = Status.DELETED; this.snapshotId = newSnapshotId; + this.sequenceNumber = null; this.file = newFile; return this; } @@ -89,6 +93,11 @@ public Long snapshotId() { return snapshotId; } + @Override + public Long sequenceNumber() { + return sequenceNumber; + } + /** * @return a file */ @@ -105,8 +114,13 @@ public ManifestEntry copyWithoutStats() { } @Override - public void setSnapshotId(long snapshotId) { - this.snapshotId = snapshotId; + public void setSnapshotId(long newSnapshotId) { + this.snapshotId = newSnapshotId; + } + + @Override + public void setSequenceNumber(long newSequenceNumber) { + this.sequenceNumber = newSequenceNumber; } @Override @@ -119,6 +133,9 @@ public void put(int i, Object v) { this.snapshotId = (Long) v; return; case 2: + this.sequenceNumber = (Long) v; + return; + case 3: this.file = (DataFile) v; return; default: @@ -134,6 +151,8 @@ public Object get(int i) { case 1: return snapshotId; case 2: + return sequenceNumber; + case 3: if (fileWrapper == null || file instanceof GenericDataFile) { return file; } else { diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java index d05c20e8ab8d..71e7a485c9be 100644 --- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java @@ -32,7 +32,7 @@ static InheritableMetadata empty() { static InheritableMetadata fromManifest(ManifestFile manifest) { if (manifest.snapshotId() != null) { - return new BaseInheritableMetadata(manifest.snapshotId()); + return new BaseInheritableMetadata(manifest.snapshotId(), manifest.sequenceNumber()); } else { return NOOP; } @@ -40,9 +40,11 @@ static InheritableMetadata fromManifest(ManifestFile manifest) { static class BaseInheritableMetadata implements InheritableMetadata { private final long snapshotId; + private final long sequenceNumber; - private BaseInheritableMetadata(long snapshotId) { + private BaseInheritableMetadata(long snapshotId, long sequenceNumber) { this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; } @Override @@ -50,6 +52,9 @@ public ManifestEntry apply(ManifestEntry manifestEntry) { if (manifestEntry.snapshotId() == null) { manifestEntry.setSnapshotId(snapshotId); } + if (manifestEntry.sequenceNumber() == null) { + manifestEntry.setSequenceNumber(sequenceNumber); + } return manifestEntry; } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java index f60798d9627c..bc03850c3633 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -19,11 +19,6 @@ package org.apache.iceberg; -import com.google.common.base.MoreObjects; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.specific.SpecificData; -import org.apache.iceberg.V1Metadata.IndexedDataFile; -import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; @@ -50,14 +45,16 @@ public int id() { // ids for data-file columns are assigned from 1000 Types.NestedField STATUS = required(0, "status", Types.IntegerType.get()); Types.NestedField SNAPSHOT_ID = optional(1, "snapshot_id", Types.LongType.get()); + Types.NestedField SEQUENCE_NUMBER = optional(3, "sequence_number", Types.LongType.get()); int DATA_FILE_ID = 2; + // next ID to assign: 4 static Schema getSchema(StructType partitionType) { return wrapFileSchema(DataFile.getType(partitionType)); } static Schema wrapFileSchema(StructType fileType) { - return new Schema(STATUS, SNAPSHOT_ID, required(DATA_FILE_ID, "data_file", fileType)); + return new Schema(STATUS, SNAPSHOT_ID, SEQUENCE_NUMBER, required(DATA_FILE_ID, "data_file", fileType)); } /** @@ -70,8 +67,25 @@ static Schema wrapFileSchema(StructType fileType) { */ Long snapshotId(); + /** + * Set the snapshot id for this manifest entry. + * + * @param snapshotId a long snapshot id + */ void setSnapshotId(long snapshotId); + /** + * @return the sequence number of the snapshot in which the file was added to the table + */ + Long sequenceNumber(); + + /** + * Set the sequence number for this manifest entry. + * + * @param sequenceNumber a sequence number + */ + void setSequenceNumber(long sequenceNumber); + /** * @return a file */ diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 90b72cdac35a..2b13acceae27 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -90,6 +90,8 @@ static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile ou switch (formatVersion) { case 1: return new ManifestWriter.V1Writer(spec, outputFile, snapshotId); + case 2: + return new ManifestWriter.V2Writer(spec, outputFile, snapshotId); } throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index fe3afe4f8e69..c2b4e51288c9 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -78,7 +78,7 @@ static class V2Writer extends ManifestListWriter { "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), "format-version", "2")); - this.wrapper = new V2Metadata.IndexedManifestFile(sequenceNumber); + this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 8bd375599566..60a0bb09f340 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -65,6 +65,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private long existingRows = 0L; private int deletedFiles = 0; private long deletedRows = 0L; + private long minSequenceNumber = Long.MAX_VALUE; private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.file = file; @@ -95,6 +96,9 @@ void addEntry(ManifestEntry entry) { break; } stats.update(entry.file().partition()); + if (entry.sequenceNumber() < minSequenceNumber) { + this.minSequenceNumber = entry.sequenceNumber(); + } writer.add(prepare(entry)); } @@ -120,12 +124,12 @@ void add(ManifestEntry entry) { * @param existingFile a data file * @param fileSnapshotId snapshot ID when the data file was added to the table */ - public void existing(DataFile existingFile, long fileSnapshotId) { - addEntry(reused.wrapExisting(fileSnapshotId, existingFile)); + public void existing(DataFile existingFile, long fileSnapshotId, long sequenceNumber) { + addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile)); } void existing(ManifestEntry entry) { - addEntry(reused.wrapExisting(entry.snapshotId(), entry.file())); + addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file())); } /** @@ -157,8 +161,8 @@ public long length() { public ManifestFile toManifestFile() { Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); - return new GenericManifestFile(file.location(), writer.length(), specId, UNASSIGNED_SEQ, UNASSIGNED_SEQ, snapshotId, - addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); + return new GenericManifestFile(file.location(), writer.length(), specId, UNASSIGNED_SEQ, minSequenceNumber, + snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); } @Override @@ -167,6 +171,38 @@ public void close() throws IOException { writer.close(); } + static class V2Writer extends ManifestWriter { + V2Metadata.IndexedManifestEntry entryWrapper; + + V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { + super(spec, file, snapshotId); + this.entryWrapper = new V2Metadata.IndexedManifestEntry(snapshotId, spec.partitionType()); + } + + @Override + protected ManifestEntry prepare(ManifestEntry entry) { + return entryWrapper.wrap(entry); + } + + @Override + protected FileAppender newAppender(PartitionSpec spec, OutputFile file) { + Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); + try { + return Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "2") + .overwrite() + .build(); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file); + } + } + } + static class V1Writer extends ManifestWriter { V1Metadata.IndexedManifestEntry entryWrapper; @@ -190,6 +226,7 @@ protected FileAppender newAppender(PartitionSpec spec, OutputFile .meta("schema", SchemaParser.toJson(spec.schema())) .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "1") .overwrite() .build(); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index 8b3341ad8a09..a905b4d6ec55 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -251,6 +251,16 @@ public void setSnapshotId(long snapshotId) { wrapped.setSnapshotId(snapshotId); } + @Override + public Long sequenceNumber() { + return wrapped.sequenceNumber(); + } + + @Override + public void setSequenceNumber(long sequenceNumber) { + wrapped.setSequenceNumber(sequenceNumber); + } + @Override public DataFile file() { return wrapped.file(); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 600506d47ef4..261b58cd7673 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import com.google.common.base.Preconditions; import java.util.List; import org.apache.avro.generic.IndexedRecord; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -68,10 +69,12 @@ static class IndexedManifestFile implements ManifestFile, IndexedRecord { private static final org.apache.avro.Schema AVRO_SCHEMA = AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file"); + private final long snapshotId; private final long sequenceNumber; private ManifestFile wrapped = null; - IndexedManifestFile(long sequenceNumber) { + IndexedManifestFile(long snapshotId, long sequenceNumber) { + this.snapshotId = snapshotId; this.sequenceNumber = sequenceNumber; } @@ -101,6 +104,8 @@ public Object get(int pos) { return wrapped.partitionSpecId(); case 3: if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) { + Preconditions.checkState(snapshotId == wrapped.snapshotId(), + "Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId()); return sequenceNumber; } else { return wrapped.sequenceNumber(); @@ -213,4 +218,104 @@ public ManifestFile copy() { return wrapped.copy(); } } + + static Schema entrySchema(Types.StructType partitionType) { + return wrapFileSchema(DataFile.getType(partitionType)); + } + + static Schema wrapFileSchema(Types.StructType fileSchema) { + // this is used to build projection schemas + return new Schema( + ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, + required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema)); + } + + static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { + private final org.apache.avro.Schema avroSchema; + private final long snapshotId; + private final V1Metadata.IndexedDataFile fileWrapper; + private ManifestEntry wrapped = null; + + IndexedManifestEntry(long snapshotId, Types.StructType partitionType) { + this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); + this.snapshotId = snapshotId; + // TODO: when v2 data files differ from v1, this should use a v2 wrapper + this.fileWrapper = new V1Metadata.IndexedDataFile(avroSchema.getField("data_file").schema()); + } + + public IndexedManifestEntry wrap(ManifestEntry entry) { + this.wrapped = entry; + return this; + } + + @Override + public org.apache.avro.Schema getSchema() { + return avroSchema; + } + + @Override + public void put(int i, Object v) { + throw new UnsupportedOperationException("Cannot read using IndexedManifestEntry"); + } + + @Override + public Object get(int i) { + switch (i) { + case 0: + return wrapped.status().id(); + case 1: + return wrapped.snapshotId(); + case 2: + if (wrapped.sequenceNumber() == null) { + Preconditions.checkState(wrapped.snapshotId() == null || snapshotId == wrapped.snapshotId(), + "Found unassigned sequence number for an entry from snapshot: %s", wrapped.snapshotId()); + } + return wrapped.sequenceNumber(); + case 3: + return fileWrapper.wrap(wrapped.file()); + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public Status status() { + return wrapped.status(); + } + + @Override + public Long snapshotId() { + return wrapped.snapshotId(); + } + + @Override + public void setSnapshotId(long snapshotId) { + wrapped.setSnapshotId(snapshotId); + } + + @Override + public Long sequenceNumber() { + return wrapped.sequenceNumber(); + } + + @Override + public void setSequenceNumber(long sequenceNumber) { + wrapped.setSequenceNumber(sequenceNumber); + } + + @Override + public DataFile file() { + return wrapped.file(); + } + + @Override + public ManifestEntry copy() { + return wrapped.copy(); + } + + @Override + public ManifestEntry copyWithoutStats() { + return wrapped.copyWithoutStats(); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 6c9f3dbaa112..2ff751d38d2e 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -182,7 +182,7 @@ ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFi case ADDED: return entry.wrapAppend(snapshotId, file); case EXISTING: - return entry.wrapExisting(snapshotId, file); + return entry.wrapExisting(snapshotId, 0L, file); case DELETED: return entry.wrapDelete(snapshotId, file); default: From da63373bcd06e2693d3c7072ec54e88d291329e2 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 14 Apr 2020 18:41:51 -0700 Subject: [PATCH 04/11] Add sequence number to GenericManfiestEntry#toString. --- core/src/main/java/org/apache/iceberg/GenericManifestEntry.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java index d1b1cc2a3060..10037f0a4020 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java @@ -173,6 +173,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("status", status) .add("snapshot_id", snapshotId) + .add("sequence_number", sequenceNumber) .add("file", file) .toString(); } From 285da49dc096565dff1dfc42d9d80d4d3a0175b6 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 14 Apr 2020 18:51:04 -0700 Subject: [PATCH 05/11] Add more comments to explain sequence numbers in v2 metadata. --- .../java/org/apache/iceberg/V2Metadata.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 261b58cd7673..677f70acda1f 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -69,12 +69,12 @@ static class IndexedManifestFile implements ManifestFile, IndexedRecord { private static final org.apache.avro.Schema AVRO_SCHEMA = AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file"); - private final long snapshotId; + private final long commitSnapshotId; private final long sequenceNumber; private ManifestFile wrapped = null; - IndexedManifestFile(long snapshotId, long sequenceNumber) { - this.snapshotId = snapshotId; + IndexedManifestFile(long commitSnapshotId, long sequenceNumber) { + this.commitSnapshotId = commitSnapshotId; this.sequenceNumber = sequenceNumber; } @@ -104,7 +104,9 @@ public Object get(int pos) { return wrapped.partitionSpecId(); case 3: if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) { - Preconditions.checkState(snapshotId == wrapped.snapshotId(), + // if the sequence number is being assigned here, then the manifest must be created by the current + // operation. to validate this, check that the snapshot id matches the current commit + Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(), "Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId()); return sequenceNumber; } else { @@ -232,13 +234,13 @@ static Schema wrapFileSchema(Types.StructType fileSchema) { static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { private final org.apache.avro.Schema avroSchema; - private final long snapshotId; + private final long currentSnapshotId; private final V1Metadata.IndexedDataFile fileWrapper; private ManifestEntry wrapped = null; - IndexedManifestEntry(long snapshotId, Types.StructType partitionType) { + IndexedManifestEntry(long currentSnapshotId, Types.StructType partitionType) { this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); - this.snapshotId = snapshotId; + this.currentSnapshotId = currentSnapshotId; // TODO: when v2 data files differ from v1, this should use a v2 wrapper this.fileWrapper = new V1Metadata.IndexedDataFile(avroSchema.getField("data_file").schema()); } @@ -267,7 +269,10 @@ public Object get(int i) { return wrapped.snapshotId(); case 2: if (wrapped.sequenceNumber() == null) { - Preconditions.checkState(wrapped.snapshotId() == null || snapshotId == wrapped.snapshotId(), + // if the entry's sequence number is null, then it will inherit the sequence number of the current commit. + // to validate that this is correct, check that the snapshot id is either null (will also be inherited) or + // that it matches the id of the current commit. + Preconditions.checkState(wrapped.snapshotId() == null || currentSnapshotId == wrapped.snapshotId(), "Found unassigned sequence number for an entry from snapshot: %s", wrapped.snapshotId()); } return wrapped.sequenceNumber(); From 662eb6a48a519b127bc220c363dca5b241e50212 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 15 Apr 2020 10:20:25 -0700 Subject: [PATCH 06/11] Fix checkstyle errors. --- core/src/main/java/org/apache/iceberg/ManifestWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 60a0bb09f340..9d19cc58144e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -78,7 +78,7 @@ private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { protected abstract ManifestEntry prepare(ManifestEntry entry); - protected abstract FileAppender newAppender(PartitionSpec spec, OutputFile file); + protected abstract FileAppender newAppender(PartitionSpec spec, OutputFile outputFile); void addEntry(ManifestEntry entry) { switch (entry.status()) { @@ -172,7 +172,7 @@ public void close() throws IOException { } static class V2Writer extends ManifestWriter { - V2Metadata.IndexedManifestEntry entryWrapper; + private V2Metadata.IndexedManifestEntry entryWrapper; V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { super(spec, file, snapshotId); @@ -204,7 +204,7 @@ protected FileAppender newAppender(PartitionSpec spec, OutputFile } static class V1Writer extends ManifestWriter { - V1Metadata.IndexedManifestEntry entryWrapper; + private V1Metadata.IndexedManifestEntry entryWrapper; V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { super(spec, file, snapshotId); From d4eb3b41cd019303d6ce760f44ae4696c2bea1ef Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 15 Apr 2020 12:38:39 -0700 Subject: [PATCH 07/11] Handle missing sequence numbers when calculating min. --- .../src/main/java/org/apache/iceberg/ManifestWriter.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 9d19cc58144e..ff57626415ff 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -65,7 +65,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private long existingRows = 0L; private int deletedFiles = 0; private long deletedRows = 0L; - private long minSequenceNumber = Long.MAX_VALUE; + private Long minSequenceNumber = null; private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.file = file; @@ -96,7 +96,7 @@ void addEntry(ManifestEntry entry) { break; } stats.update(entry.file().partition()); - if (entry.sequenceNumber() < minSequenceNumber) { + if (entry.sequenceNumber() != null && (minSequenceNumber == null || entry.sequenceNumber() < minSequenceNumber)) { this.minSequenceNumber = entry.sequenceNumber(); } writer.add(prepare(entry)); @@ -161,8 +161,9 @@ public long length() { public ManifestFile toManifestFile() { Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); - return new GenericManifestFile(file.location(), writer.length(), specId, UNASSIGNED_SEQ, minSequenceNumber, - snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); + long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : 0; + return new GenericManifestFile(file.location(), writer.length(), specId, UNASSIGNED_SEQ, minSeqNumber, snapshotId, + addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); } @Override From 53e311974102d5e1534534751698e225a8e412d9 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 16 Apr 2020 16:17:01 -0700 Subject: [PATCH 08/11] Add more comments, fill in min seq number, and other minor fixes. --- .../org/apache/iceberg/ManifestWriter.java | 6 ++++- .../java/org/apache/iceberg/V2Metadata.java | 22 ++++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index ff57626415ff..ece7d510af59 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -33,6 +33,8 @@ */ public abstract class ManifestWriter implements FileAppender { private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class); + // stand-in for the current sequence number that will be assigned when the commit is successful + // this is replaced when writing a manifest list by the ManifestFile wrapper static final long UNASSIGNED_SEQ = -1L; /** @@ -161,7 +163,9 @@ public long length() { public ManifestFile toManifestFile() { Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); - long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : 0; + // if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min + // sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it. + long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ; return new GenericManifestFile(file.location(), writer.length(), specId, UNASSIGNED_SEQ, minSeqNumber, snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); } diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 677f70acda1f..17f693d088d7 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -113,7 +113,16 @@ public Object get(int pos) { return wrapped.sequenceNumber(); } case 4: - return wrapped.minSequenceNumber(); + if (wrapped.minSequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) { + // same sanity check as above + Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(), + "Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId()); + // if the min sequence number is not determined, then there was no assigned sequence number for any file + // written to the wrapped manifest. replace the unassigned sequence number with the one for this commit + return sequenceNumber; + } else { + return wrapped.minSequenceNumber(); + } case 5: return wrapped.snapshotId(); case 6: @@ -228,19 +237,19 @@ static Schema entrySchema(Types.StructType partitionType) { static Schema wrapFileSchema(Types.StructType fileSchema) { // this is used to build projection schemas return new Schema( - ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, + ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, ManifestEntry.SEQUENCE_NUMBER, required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema)); } static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { private final org.apache.avro.Schema avroSchema; - private final long currentSnapshotId; + private final long commitSnapshotId; private final V1Metadata.IndexedDataFile fileWrapper; private ManifestEntry wrapped = null; - IndexedManifestEntry(long currentSnapshotId, Types.StructType partitionType) { + IndexedManifestEntry(long commitSnapshotId, Types.StructType partitionType) { this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); - this.currentSnapshotId = currentSnapshotId; + this.commitSnapshotId = commitSnapshotId; // TODO: when v2 data files differ from v1, this should use a v2 wrapper this.fileWrapper = new V1Metadata.IndexedDataFile(avroSchema.getField("data_file").schema()); } @@ -272,8 +281,9 @@ public Object get(int i) { // if the entry's sequence number is null, then it will inherit the sequence number of the current commit. // to validate that this is correct, check that the snapshot id is either null (will also be inherited) or // that it matches the id of the current commit. - Preconditions.checkState(wrapped.snapshotId() == null || currentSnapshotId == wrapped.snapshotId(), + Preconditions.checkState(wrapped.snapshotId() == null || commitSnapshotId == wrapped.snapshotId(), "Found unassigned sequence number for an entry from snapshot: %s", wrapped.snapshotId()); + return null; } return wrapped.sequenceNumber(); case 3: From b61fb6a70f8d3dbdf7b15ccea96f759850f36df9 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 16 Apr 2020 18:09:06 -0700 Subject: [PATCH 09/11] Add v1 and v2 format test for manifest files. --- ...ons.java => TestManifestListVersions.java} | 2 +- .../iceberg/TestManifestWriterVersions.java | 186 ++++++++++++++++++ 2 files changed, 187 insertions(+), 1 deletion(-) rename core/src/test/java/org/apache/iceberg/{TestManifestFileVersions.java => TestManifestListVersions.java} (99%) create mode 100644 core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java diff --git a/core/src/test/java/org/apache/iceberg/TestManifestFileVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java similarity index 99% rename from core/src/test/java/org/apache/iceberg/TestManifestFileVersions.java rename to core/src/test/java/org/apache/iceberg/TestManifestListVersions.java index 0155cca7f777..65ec4b24b293 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestFileVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java @@ -39,7 +39,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestManifestFileVersions { +public class TestManifestListVersions { private static final String PATH = "s3://bucket/table/m1.avro"; private static final long LENGTH = 1024L; private static final int SPEC_ID = 1; diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java new file mode 100644 index 000000000000..af320a739634 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestManifestWriterVersions { + private static final FileIO FILE_IO = new TestTables.LocalFileIO(); + + private static final Schema SCHEMA = new Schema( + required(1, "id", Types.LongType.get()), + required(2, "timestamp", Types.TimestampType.withZone()), + required(3, "category", Types.StringType.get()), + required(4, "data", Types.StringType.get())); + + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .identity("category") + .hour("timestamp") + .bucket("id", 16) + .build(); + + private static final long SEQUENCE_NUMBER = 34L; + private static final long SNAPSHOT_ID = 987134631982734L; + private static final String PATH = "s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro"; + private static final FileFormat FORMAT = FileFormat.AVRO; + private static final PartitionData PARTITION = DataFiles.data(SPEC, "category=cheesy/timestamp_hour=10/id_bucket=3"); + private static final Metrics METRICS = new Metrics( + 1587L, + ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L), // sizes + ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L), // value counts + ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L), // null value counts + ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds + ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds + private static final List OFFSETS = ImmutableList.of(4L); + + private static final DataFile DATA_FILE = new GenericDataFile( + PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testV1Write() throws IOException { + ManifestFile manifest = writeManifest(1); + checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); + checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ); + } + + @Test + public void testV1WriteWithInheritance() throws IOException { + ManifestFile manifest = writeAndReadManifestList(writeManifest(1), 1); + checkManifest(manifest, 0L); + + // v1 should be read using sequence number 0 because it was missing from the manifest list file + checkEntry(readManifest(manifest), 0L); + } + + @Test + public void testV2Write() throws IOException { + ManifestFile manifest = writeManifest(1); + checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); + checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ); + } + + @Test + public void testV2WriteWithInheritance() throws IOException { + ManifestFile manifest = writeAndReadManifestList(writeManifest(2), 2); + checkManifest(manifest, SEQUENCE_NUMBER); + + // v2 should use the correct sequence number by inheriting it + checkEntry(readManifest(manifest), SEQUENCE_NUMBER); + } + + @Test + public void testV2ManifestRewriteWithInheritance() throws IOException { + // write with v1 + ManifestFile manifest = writeAndReadManifestList(writeManifest(1), 1); + checkManifest(manifest, 0L); + + // rewrite existing metadata with v2 + ManifestFile manifest2 = writeAndReadManifestList(manifest, 2); + checkManifest(manifest2, 0L); + + // should not inherit the v2 sequence number because it was a rewrite + checkEntry(readManifest(manifest2), 0L); + } + + void checkEntry(ManifestEntry entry, Long expectedSequenceNumber) { + Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status()); + Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); + Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); + checkDataFile(entry.file()); + } + + void checkDataFile(DataFile dataFile) { + Assert.assertEquals("Path", PATH, dataFile.path()); + Assert.assertEquals("Format", FORMAT, dataFile.format()); + Assert.assertEquals("Partition", PARTITION, dataFile.partition()); + Assert.assertEquals("Record count", METRICS.recordCount(), (Long) dataFile.recordCount()); + Assert.assertEquals("Column sizes", METRICS.columnSizes(), dataFile.columnSizes()); + Assert.assertEquals("Value counts", METRICS.valueCounts(), dataFile.valueCounts()); + Assert.assertEquals("Null value counts", METRICS.nullValueCounts(), dataFile.nullValueCounts()); + Assert.assertEquals("Lower bounds", METRICS.lowerBounds(), dataFile.lowerBounds()); + Assert.assertEquals("Upper bounds", METRICS.upperBounds(), dataFile.upperBounds()); + } + + void checkManifest(ManifestFile manifest, long expectedSequenceNumber) { + Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, manifest.snapshotId()); + Assert.assertEquals("Sequence number", expectedSequenceNumber, manifest.sequenceNumber()); + Assert.assertEquals("Min sequence number", expectedSequenceNumber, manifest.minSequenceNumber()); + Assert.assertEquals("Added files count", (Integer) 1, manifest.addedFilesCount()); + Assert.assertEquals("Existing files count", (Integer) 0, manifest.existingFilesCount()); + Assert.assertEquals("Deleted files count", (Integer) 0, manifest.deletedFilesCount()); + Assert.assertEquals("Added rows count", METRICS.recordCount(), manifest.addedRowsCount()); + Assert.assertEquals("Existing rows count", (Long) 0L, manifest.existingRowsCount()); + Assert.assertEquals("Deleted rows count", (Long) 0L, manifest.deletedRowsCount()); + } + + private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException { + OutputFile manifestList = Files.localOutput(temp.newFile()); + try (FileAppender writer = ManifestLists.write( + formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, formatVersion > 1 ? SEQUENCE_NUMBER : 0)) { + writer.add(manifest); + } + return manifestList.toInputFile(); + } + + private ManifestFile writeAndReadManifestList(ManifestFile manifest, int formatVersion) throws IOException { + List manifests = ManifestLists.read(writeManifestList(manifest, formatVersion)); + Assert.assertEquals("Should contain one manifest", 1, manifests.size()); + return manifests.get(0); + } + + private ManifestFile writeManifest(int formatVersion) throws IOException { + OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); + ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); + try { + writer.add(DATA_FILE); + } finally { + writer.close(); + } + return writer.toManifestFile(); + } + + private ManifestEntry readManifest(ManifestFile manifest) throws IOException { + try (CloseableIterable reader = ManifestFiles.read(manifest, FILE_IO).entries()) { + List files = Lists.newArrayList(reader); + Assert.assertEquals("Should contain only one data file", 1, files.size()); + return files.get(0); + } + } +} From af1d77f7cd3ccbca164e66472983d1b6f1be9ef3 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 17 Apr 2020 09:57:36 -0700 Subject: [PATCH 10/11] Add another v2 format test. --- .../iceberg/TestManifestWriterVersions.java | 54 ++++++++++++++++++- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index af320a739634..423cd51074dd 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -106,19 +106,39 @@ public void testV2WriteWithInheritance() throws IOException { } @Test - public void testV2ManifestRewriteWithInheritance() throws IOException { + public void testV2ManifestListRewriteWithInheritance() throws IOException { // write with v1 ManifestFile manifest = writeAndReadManifestList(writeManifest(1), 1); checkManifest(manifest, 0L); - // rewrite existing metadata with v2 + // rewrite existing metadata with v2 manifest list ManifestFile manifest2 = writeAndReadManifestList(manifest, 2); + // the ManifestFile did not change and should still have its original sequence number, 0 checkManifest(manifest2, 0L); // should not inherit the v2 sequence number because it was a rewrite checkEntry(readManifest(manifest2), 0L); } + @Test + public void testV2ManifestRewriteWithInheritance() throws IOException { + // write with v1 + ManifestFile manifest = writeAndReadManifestList(writeManifest(1), 1); + checkManifest(manifest, 0L); + + // rewrite the manifest file using a v2 manifest + ManifestFile rewritten = rewriteManifest(manifest, 2); + checkRewrittenManifest(rewritten, ManifestWriter.UNASSIGNED_SEQ, 0L); + + // add the v2 manifest to a v2 manifest list, with a sequence number + ManifestFile manifest2 = writeAndReadManifestList(rewritten, 2); + // the ManifestFile is new so it has a sequence number, but the min sequence number 0 is from the entry + checkRewrittenManifest(manifest2, SEQUENCE_NUMBER, 0L); + + // should not inherit the v2 sequence number because it was written into the v2 manifest + checkRewrittenEntry(readManifest(manifest2), 0L); + } + void checkEntry(ManifestEntry entry, Long expectedSequenceNumber) { Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status()); Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); @@ -126,6 +146,13 @@ void checkEntry(ManifestEntry entry, Long expectedSequenceNumber) { checkDataFile(entry.file()); } + void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber) { + Assert.assertEquals("Status", ManifestEntry.Status.EXISTING, entry.status()); + Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); + Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); + checkDataFile(entry.file()); + } + void checkDataFile(DataFile dataFile) { Assert.assertEquals("Path", PATH, dataFile.path()); Assert.assertEquals("Format", FORMAT, dataFile.format()); @@ -150,6 +177,18 @@ void checkManifest(ManifestFile manifest, long expectedSequenceNumber) { Assert.assertEquals("Deleted rows count", (Long) 0L, manifest.deletedRowsCount()); } + void checkRewrittenManifest(ManifestFile manifest, long expectedSequenceNumber, long expectedMinSequenceNumber) { + Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, manifest.snapshotId()); + Assert.assertEquals("Sequence number", expectedSequenceNumber, manifest.sequenceNumber()); + Assert.assertEquals("Min sequence number", expectedMinSequenceNumber, manifest.minSequenceNumber()); + Assert.assertEquals("Added files count", (Integer) 0, manifest.addedFilesCount()); + Assert.assertEquals("Existing files count", (Integer) 1, manifest.existingFilesCount()); + Assert.assertEquals("Deleted files count", (Integer) 0, manifest.deletedFilesCount()); + Assert.assertEquals("Added rows count", (Long) 0L, manifest.addedRowsCount()); + Assert.assertEquals("Existing rows count", METRICS.recordCount(), manifest.existingRowsCount()); + Assert.assertEquals("Deleted rows count", (Long) 0L, manifest.deletedRowsCount()); + } + private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException { OutputFile manifestList = Files.localOutput(temp.newFile()); try (FileAppender writer = ManifestLists.write( @@ -165,6 +204,17 @@ private ManifestFile writeAndReadManifestList(ManifestFile manifest, int formatV return manifests.get(0); } + private ManifestFile rewriteManifest(ManifestFile manifest, int formatVersion) throws IOException { + OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); + ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); + try { + writer.existing(readManifest(manifest)); + } finally { + writer.close(); + } + return writer.toManifestFile(); + } + private ManifestFile writeManifest(int formatVersion) throws IOException { OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); From 4b7c8f193e133e50d8f2ca858e50bb5930421adb Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 20 Apr 2020 12:43:34 -0700 Subject: [PATCH 11/11] Update Javadoc. --- core/src/main/java/org/apache/iceberg/ManifestWriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index ece7d510af59..97065bf14a26 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -125,6 +125,7 @@ void add(ManifestEntry entry) { * * @param existingFile a data file * @param fileSnapshotId snapshot ID when the data file was added to the table + * @param sequenceNumber sequence number for the data file */ public void existing(DataFile existingFile, long fileSnapshotId, long sequenceNumber) { addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile));