-
Notifications
You must be signed in to change notification settings - Fork 3k
Add v2 manifests #913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add v2 manifests #913
Changes from all commits
1119159
ed23f65
2e53e31
da63373
285da49
662eb6a
d4eb3b4
53e3119
b61fb6a
af1d77f
4b7c8f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg; | ||
|
|
||
| import com.google.common.base.MoreObjects; | ||
| import org.apache.avro.generic.IndexedRecord; | ||
| import org.apache.avro.specific.SpecificData; | ||
| import org.apache.iceberg.avro.AvroSchemaUtil; | ||
| import org.apache.iceberg.types.Types; | ||
|
|
||
| class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable { | ||
| private final org.apache.avro.Schema schema; | ||
| private final V1Metadata.IndexedDataFile fileWrapper; | ||
| private Status status = Status.EXISTING; | ||
| private Long snapshotId = null; | ||
| private Long sequenceNumber = null; | ||
| private DataFile file = null; | ||
|
|
||
| GenericManifestEntry(org.apache.avro.Schema schema) { | ||
| this.schema = schema; | ||
| this.fileWrapper = null; // do not use the file wrapper to read | ||
| } | ||
|
|
||
| GenericManifestEntry(Types.StructType partitionType) { | ||
| this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry"); | ||
| this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); | ||
| } | ||
|
|
||
| private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { | ||
| this.schema = toCopy.schema; | ||
| this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); | ||
| this.status = toCopy.status; | ||
| this.snapshotId = toCopy.snapshotId; | ||
| if (fullCopy) { | ||
| this.file = toCopy.file().copy(); | ||
| } else { | ||
| this.file = toCopy.file().copyWithoutStats(); | ||
| } | ||
| } | ||
|
|
||
| ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { | ||
| this.status = Status.EXISTING; | ||
| this.snapshotId = newSnapshotId; | ||
| this.sequenceNumber = newSequenceNumber; | ||
| this.file = newFile; | ||
| return this; | ||
| } | ||
|
|
||
| ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { | ||
| this.status = Status.ADDED; | ||
| this.snapshotId = newSnapshotId; | ||
| this.sequenceNumber = null; | ||
| this.file = newFile; | ||
| return this; | ||
| } | ||
|
|
||
| ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { | ||
| this.status = Status.DELETED; | ||
| this.snapshotId = newSnapshotId; | ||
| this.sequenceNumber = null; | ||
| this.file = newFile; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * @return the status of the file, whether EXISTING, ADDED, or DELETED | ||
| */ | ||
| public Status status() { | ||
| return status; | ||
| } | ||
|
|
||
| /** | ||
| * @return id of the snapshot in which the file was added to the table | ||
| */ | ||
| public Long snapshotId() { | ||
| return snapshotId; | ||
| } | ||
|
|
||
| @Override | ||
| public Long sequenceNumber() { | ||
| return sequenceNumber; | ||
| } | ||
|
|
||
| /** | ||
| * @return a file | ||
| */ | ||
| public DataFile file() { | ||
| return file; | ||
| } | ||
|
|
||
| public ManifestEntry copy() { | ||
| return new GenericManifestEntry(this, true /* full copy */); | ||
| } | ||
|
|
||
| public ManifestEntry copyWithoutStats() { | ||
| return new GenericManifestEntry(this, false /* drop stats */); | ||
| } | ||
|
|
||
| @Override | ||
| public void setSnapshotId(long newSnapshotId) { | ||
| this.snapshotId = newSnapshotId; | ||
| } | ||
|
|
||
| @Override | ||
| public void setSequenceNumber(long newSequenceNumber) { | ||
| this.sequenceNumber = newSequenceNumber; | ||
| } | ||
|
|
||
| @Override | ||
| public void put(int i, Object v) { | ||
| switch (i) { | ||
| case 0: | ||
| this.status = Status.values()[(Integer) v]; | ||
| return; | ||
| case 1: | ||
| this.snapshotId = (Long) v; | ||
| return; | ||
| case 2: | ||
| this.sequenceNumber = (Long) v; | ||
| return; | ||
| case 3: | ||
| this.file = (DataFile) v; | ||
| return; | ||
| default: | ||
| // ignore the object, it must be from a newer version of the format | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Object get(int i) { | ||
| switch (i) { | ||
| case 0: | ||
| return status.id(); | ||
| case 1: | ||
| return snapshotId; | ||
| case 2: | ||
| return sequenceNumber; | ||
| case 3: | ||
| if (fileWrapper == null || file instanceof GenericDataFile) { | ||
| return file; | ||
| } else { | ||
| return fileWrapper.wrap(file); | ||
| } | ||
| default: | ||
| throw new UnsupportedOperationException("Unknown field ordinal: " + i); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public org.apache.avro.Schema getSchema() { | ||
| return schema; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return MoreObjects.toStringHelper(this) | ||
| .add("status", status) | ||
| .add("snapshot_id", snapshotId) | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .add("sequence_number", sequenceNumber) | ||
| .add("file", file) | ||
| .toString(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg; | ||
|
|
||
| import org.apache.avro.generic.IndexedRecord; | ||
|
|
||
| /** | ||
| * IndexedRecord implementation to wrap a StructLike for writing to Avro. | ||
| */ | ||
| class IndexedStructLike implements StructLike, IndexedRecord { | ||
| private final org.apache.avro.Schema avroSchema; | ||
| private StructLike wrapped = null; | ||
|
|
||
| IndexedStructLike(org.apache.avro.Schema avroSchema) { | ||
| this.avroSchema = avroSchema; | ||
| } | ||
|
|
||
| IndexedStructLike wrap(StructLike struct) { | ||
| this.wrapped = struct; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public int size() { | ||
| return wrapped.size(); | ||
| } | ||
|
|
||
| @Override | ||
| public <T> T get(int pos, Class<T> javaClass) { | ||
| return wrapped.get(pos, javaClass); | ||
| } | ||
|
|
||
| @Override | ||
| public Object get(int pos) { | ||
| return get(pos, Object.class); | ||
| } | ||
|
|
||
| @Override | ||
| public <T> void set(int pos, T value) { | ||
| wrapped.set(pos, value); | ||
| } | ||
|
|
||
| @Override | ||
| public void put(int pos, Object value) { | ||
| set(pos, value); | ||
| } | ||
|
|
||
| @Override | ||
| public org.apache.avro.Schema getSchema() { | ||
| return avroSchema; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| class InheritableMetadataFactory { | ||
|
|
||
| private static final InheritableMetadata EMPTY = new EmptyInheritableMetadata(); | ||
| private static final InheritableMetadata NOOP = new NullInheritableMetadata(); | ||
|
|
||
| private InheritableMetadataFactory() {} | ||
|
|
||
|
|
@@ -30,22 +31,40 @@ static InheritableMetadata empty() { | |
| } | ||
|
|
||
| static InheritableMetadata fromManifest(ManifestFile manifest) { | ||
| return new BaseInheritableMetadata(manifest.snapshotId()); | ||
| if (manifest.snapshotId() != null) { | ||
| return new BaseInheritableMetadata(manifest.snapshotId(), manifest.sequenceNumber()); | ||
| } else { | ||
| return NOOP; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure in which case we will encounter the snapshot == null, mind to explain the case ? Thanks.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Snapshot id is null when a manifest is appended to a table and the snapshot id hasn't been assigned yet. If the manifest is committed to table metadata, then it will be set when writing and will always be present. That's why it is required in the v2 schema for manifest list files. Some appended manifests are rewritten before committing. When reading those to rewrite them, this path is used. I think I'm going to update how this works, but as a separate commit. The problem is that this allows reading a manifest without filling in the snapshot id. But the rewrite method where the reader that is configured this was is used has the snapshot id. So we should add the snapshot id to the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A couple of questions here:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
My comment about changing this wasn't about removing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I asked about Yeah, I like the idea of setting the snapshot id in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll look into removing it entirely, although I don't mind it very much. As long as it is baked into the reader and unavoidable, it seems like a clean way to apply the inherited values. |
||
| } | ||
| } | ||
|
|
||
| static class BaseInheritableMetadata implements InheritableMetadata { | ||
| private final long snapshotId; | ||
| private final long sequenceNumber; | ||
|
|
||
| private final Long snapshotId; | ||
|
|
||
| private BaseInheritableMetadata(Long snapshotId) { | ||
| private BaseInheritableMetadata(long snapshotId, long sequenceNumber) { | ||
| this.snapshotId = snapshotId; | ||
| this.sequenceNumber = sequenceNumber; | ||
| } | ||
|
|
||
| @Override | ||
| public ManifestEntry apply(ManifestEntry manifestEntry) { | ||
| if (manifestEntry.snapshotId() == null) { | ||
| manifestEntry.setSnapshotId(snapshotId); | ||
| } | ||
| if (manifestEntry.sequenceNumber() == null) { | ||
| manifestEntry.setSequenceNumber(sequenceNumber); | ||
| } | ||
| return manifestEntry; | ||
| } | ||
| } | ||
|
|
||
| static class NullInheritableMetadata implements InheritableMetadata { | ||
| private NullInheritableMetadata() { | ||
| } | ||
|
|
||
| @Override | ||
| public ManifestEntry apply(ManifestEntry manifestEntry) { | ||
| return manifestEntry; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The append/delete operations won't attach the sequence number to its entries ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a file is appended, it must have the sequence number of the snapshot where it is committed. The problem is that the sequence number isn't determined until the snapshot's commit is successful. Two committers may be racing to add a commit with the same sequence number.
That's why sequence numbers are assigned initially through inheritance. Every snapshot commit attempt writes a new root manifest list with a new sequence number based on the table metadata's last sequence number. If two writers are trying to commit different snapshots as sequence number 5, one will win and the other will retry with 6. To avoid rewriting the metadata tree below the manifest list, sequence numbers that might change are inherited instead of written into the initial files.