Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ static StructType getType(StructType partitionType) {
optional(128, "upper_bounds", MapType.ofRequired(129, 130,
IntegerType.get(), BinaryType.get())),
optional(131, "key_metadata", BinaryType.get()),
optional(132, "split_offsets", ListType.ofRequired(133, LongType.get()))
// NEXT ID TO ASSIGN: 134
optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())),
optional(134, "sequence_number", LongType.get())
// NEXT ID TO ASSIGN: 135
);
}

Expand Down Expand Up @@ -152,4 +153,11 @@ static StructType getType(StructType partitionType) {
* are determined by these offsets. The returned list must be sorted in ascending order.
*/
List<Long> splitOffsets();

/**
* @return The sequence number to identify the order in which data files and deletion files are to be processed.
* If the sequence number is not specified it is inherited from the manifest file struct in the manifest list file.
*/
Long sequenceNumber();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this ever return null? I would rather use a valid number and a primitive type to ensure that we always have a sequence number that is valid. For older data, the sequence number should be 0, indicating that it is older than when we started writing sequence numbers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use boxing here because I don't want to rewrite the manifest file. The sequence number is not determined when committing the update, leave it to null so that we can inherit the sequence number from the value in the manifest list file.

If we use the primitive type, we need to check where the sequence number of the manifest is null or not. It should be doable, let me update this.


}
11 changes: 10 additions & 1 deletion api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public interface ManifestFile {
))),
optional(512, "added_rows_count", Types.LongType.get()),
optional(513, "existing_rows_count", Types.LongType.get()),
optional(514, "deleted_rows_count", Types.LongType.get()));
optional(514, "deleted_rows_count", Types.LongType.get()),
optional(515, "sequence_number", Types.LongType.get())
);

static Schema schema() {
return SCHEMA;
Expand Down Expand Up @@ -128,6 +130,13 @@ default boolean hasDeletedFiles() {
*/
Long deletedRowsCount();

/**
* @return the sequence number of this manifest. The sequence number of manifest stores in manifest list file. Since
* The data files' sequence number is optional, it should inherit the manifest's sequence number if the reader reads
* null from manifest file.
*/
Long sequenceNumber();
Copy link
Contributor

@prodeezy prodeezy Apr 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a seq number is available at manifest file level then we assume all datafiles in that manifestfile have the same seq number. But if there are datafile level seq numbers then should we be tracking any metadata at ManifestFile level? Like Min/Max seq number?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the logic is just the first step that if a seq number is not found in data_file, then we try to find it from manifestFile. It doesn't contain the logic about computing the min/max value. I would like to use separated fields to track the min/max sequence number in the next step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


/**
* Returns a list of {@link PartitionFieldSummary partition field summaries}.
* <p>
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,11 @@ public interface Snapshot {
* @return the location of the manifest list for this Snapshot
*/
String manifestListLocation();

/**
* Return this snapshot's sequence number, or 0 if the table has no snapshot yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case will this actually return 0? If there is no snapshot, then there is no Snapshot object, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make sense to return 0 if the snapshot was written without a sequence number, and to use a primitive, right?

*
* @return the sequence number of this Snapshot
*/
Long sequenceNumber();
}
10 changes: 10 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ public Long deletedRowsCount() {
return deletedRows;
}

@Override
public Long sequenceNumber() {
return null;
}

@Override
public List<PartitionFieldSummary> partitions() {
return partitions;
Expand Down Expand Up @@ -372,5 +377,10 @@ public DataFile copyWithoutStats() {
public List<Long> splitOffsets() {
return null;
}

@Override
public Long sequenceNumber() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public RewriteManifests addManifest(ManifestFile manifest) {
if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
addedManifests.add(manifest);
} else {
// the manifest must be rewritten with this update's snapshot ID
// the manifest must be rewritten with this update's snapshot ID and sequence number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite cannot set the sequence number. If it did, then we would have to rewrite the manifest again if the commit fails and has to retry -- because the sequence number would be incremented.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. When the commit fails, it just needs to update the sequence number in the manifest list file and doesn't have to keep sequence number in the manifest file.

ManifestFile copiedManifest = copyManifest(manifest);
rewrittenAddedManifests.add(copiedManifest);
}
Expand Down Expand Up @@ -178,10 +178,11 @@ public List<ManifestFile> apply(TableMetadata base) {

validateFilesCounts();

// TODO: add sequence numbers here
Iterable<ManifestFile> newManifestsWithMetadata = Iterables.transform(
Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests),
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId())
.withSequenceNumber(sequenceNumber())
.build());

// put new manifests at the beginning
List<ManifestFile> apply = new ArrayList<>();
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class BaseSnapshot implements Snapshot {
private final InputFile manifestList;
private final String operation;
private final Map<String, String> summary;
private Long sequenceNumber;
Copy link
Contributor

@prodeezy prodeezy Apr 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada Can you comment on why the max seq number is not being tracked in Snapshot but instead Snapshot Seq Number is? the Merge-on-Read doc says we use max seq number in the snapshot. Maybe i missed the context. what's the reason for switching to snapshot sequence number?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prodeezy , you may want to check this: #588 (comment).


// lazily initialized
private List<ManifestFile> manifests = null;
Expand Down Expand Up @@ -75,6 +76,31 @@ class BaseSnapshot implements Snapshot {
this.manifestList = manifestList;
}

BaseSnapshot(FileIO io,
long snapshotId,
Long parentId,
long timestampMillis,
String operation,
Map<String, String> summary,
InputFile manifestList,
Long sequenceNumber) {
this(io, snapshotId, parentId, timestampMillis, operation, summary, manifestList);
this.sequenceNumber = sequenceNumber;
}

BaseSnapshot(FileIO io,
long snapshotId,
Long parentId,
long timestampMillis,
String operation,
Map<String, String> summary,
List<ManifestFile> manifests,
Long sequenceNumber) {
this(io, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
this.manifests = manifests;
this.sequenceNumber = sequenceNumber;
}

BaseSnapshot(FileIO io,
long snapshotId,
Long parentId,
Expand Down Expand Up @@ -186,6 +212,15 @@ private void cacheChanges() {
this.cachedDeletes = deletes.build();
}

@Override
public Long sequenceNumber() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, does this mean that all existing snapshots written without sequence numbers will have sequenceNumber as 0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an initial implementation that needs some discussion. For example, we can generate sequence numbers from the timestamp for existing snapshots.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the timestamp-based approach would work for existing snapshots, but I am not sure how we will propagate this info down to manifests and files. For example, ManifestFile keeps track of the snapshot in which the manifest was added, but that snapshot can be already expired, so we won't be able to fetch its metadata. Maybe, defaulting to 0 won't be an issue, let me think more about this.

@rdblue, any thoughts on keeping backward compatibility?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we start using sequence numbers, anything without a sequence number should be considered older and should default to 0. That will work as long as there is some time at which we always use sequence numbers.

What wouldn't work is if we have some writers that use sequence numbers and some writers that do not because the ones that don't write sequence numbers would be sequenced before writes that do: if some writer creates seq 1 and an older writer commits later, the older writer's new commit would have seq 0. That new writer would probably also drop all sequence numbers from metadata -- the table sequence number and the one for each ManifestFile stored in the ManifestList.

One more thing to note: the older writer would reset sequence numbers, but metadata written by newer clients could still contain explicit sequence numbers. For example, a new writer that performs a compaction with older data will write the older data's sequence numbers into metadata. If the sequence number gets reset to 0 by an older writer after that, then sequence numbers get all mixed up. So we will need to ensure that older writers cannot operate on tables once they move to sequence numbers for correctness. That requires bumping the format version.

Does that reasoning sound correct, @aokolnychyi, @chenjunjiedada?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense. Bumping the format version can prevent an old writer from writing data to a table with a new version format. We may still need to consider how to handle the existing data.

So the question is do we want to allow writing from the old writer and new writer at the same time. Maybe a spark and a presto are allowed to write to the table at the same time? If we don't want to support the old writer and new writer at the same time, we can update the format version for existing data when committing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Ryan that old writers should not be allowed to write to tables with sequence numbers. It seems like the right time to bump the format version and include new manifest types and snapshot id inheritance. Then we can safely assign 0 sequence number to all files that were written before we start assigning new ones.

if (sequenceNumber == null) {
return 0L;
}

return sequenceNumber;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -194,6 +229,7 @@ public String toString() {
.add("operation", operation)
.add("summary", summary)
.add("manifests", manifests())
.add("sequence_number", sequenceNumber())
.toString();
}
}
20 changes: 14 additions & 6 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public FastAppend appendManifest(ManifestFile manifest) {
summaryBuilder.addedManifest(manifest);
appendManifests.add(manifest);
} else {
// the manifest must be rewritten with this update's snapshot ID
// the manifest must be rewritten with this update's snapshot ID and sequence number
ManifestFile copiedManifest = copyManifest(manifest);
rewrittenAppendManifests.add(copiedManifest);
}
Expand Down Expand Up @@ -130,17 +130,25 @@ public List<ManifestFile> apply(TableMetadata base) {
throw new RuntimeIOException(e, "Failed to write manifest");
}

// TODO: add sequence numbers here
Iterable<ManifestFile> newManifestsWithMetadata = Iterables.transform(newManifests,
manifest -> GenericManifestFile.copyOf(manifest).withSequenceNumber(sequenceNumber()).build());

Iterable<ManifestFile> appendManifestsWithMetadata = Iterables.transform(
Iterables.concat(appendManifests, rewrittenAppendManifests),
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
Iterables.addAll(newManifests, appendManifestsWithMetadata);
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId())
.withSequenceNumber(sequenceNumber())
.build());

List<ManifestFile> manifestsWithMetadata = Lists.newArrayList();
Iterables.addAll(manifestsWithMetadata, newManifestsWithMetadata);
Iterables.addAll(manifestsWithMetadata, appendManifestsWithMetadata);


if (base.currentSnapshot() != null) {
newManifests.addAll(base.currentSnapshot().manifests());
manifestsWithMetadata.addAll(base.currentSnapshot().manifests());
}

return newManifests;
return manifestsWithMetadata;
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public PartitionData copy() {
private Map<Integer, ByteBuffer> upperBounds = null;
private List<Long> splitOffsets = null;
private byte[] keyMetadata = null;
private Long sequenceNumber = null;

// cached schema
private transient org.apache.avro.Schema avroSchema = null;
Expand Down Expand Up @@ -270,6 +271,11 @@ public List<Long> splitOffsets() {
return splitOffsets;
}

@Override
public Long sequenceNumber() {
return sequenceNumber;
}

@Override
public org.apache.avro.Schema getSchema() {
if (avroSchema == null) {
Expand Down Expand Up @@ -332,6 +338,9 @@ public void put(int i, Object v) {
case 14:
this.splitOffsets = (List<Long>) v;
return;
case 15:
this.sequenceNumber = (Long) v;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand Down Expand Up @@ -382,6 +391,8 @@ public Object get(int i) {
return keyMetadata();
case 14:
return splitOffsets;
case 15:
return sequenceNumber;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class GenericManifestFile
private Integer deletedFilesCount = null;
private Long deletedRowsCount = null;
private List<PartitionFieldSummary> partitions = null;
private Long sequenceNumber = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also check the similar things in GenericDataFile, such as the constructor ? toString() ? etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


/**
* Used by Avro reflection to instantiate this class when reading manifest files.
Expand Down Expand Up @@ -136,6 +137,26 @@ public GenericManifestFile(String path, long length, int specId, Long snapshotId
this.fromProjectionPos = null;
}

public GenericManifestFile(String path, long length, int specId, Long snapshotId, Long sequenceNumber,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal class, so it shouldn't be necessary to add a new constructor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, no place uses it actually.

int addedFilesCount, long addedRowsCount, int existingFilesCount,
long existingRowsCount, int deletedFilesCount, long deletedRowsCount,
List<PartitionFieldSummary> partitions) {
this.avroSchema = AVRO_SCHEMA;
this.manifestPath = path;
this.length = length;
this.specId = specId;
this.snapshotId = snapshotId;
this.sequenceNumber = sequenceNumber;
this.addedFilesCount = addedFilesCount;
this.addedRowsCount = addedRowsCount;
this.existingFilesCount = existingFilesCount;
this.existingRowsCount = existingRowsCount;
this.deletedFilesCount = deletedFilesCount;
this.deletedRowsCount = deletedRowsCount;
this.partitions = partitions;
this.fromProjectionPos = null;
}

/**
* Copy constructor.
*
Expand All @@ -155,6 +176,7 @@ private GenericManifestFile(GenericManifestFile toCopy) {
this.deletedRowsCount = toCopy.deletedRowsCount;
this.partitions = ImmutableList.copyOf(Iterables.transform(toCopy.partitions, PartitionFieldSummary::copy));
this.fromProjectionPos = toCopy.fromProjectionPos;
this.sequenceNumber = toCopy.sequenceNumber;
}

/**
Expand Down Expand Up @@ -226,6 +248,11 @@ public Long deletedRowsCount() {
return deletedRowsCount;
}

@Override
public Long sequenceNumber() {
return sequenceNumber;
}

@Override
public List<PartitionFieldSummary> partitions() {
return partitions;
Expand Down Expand Up @@ -271,6 +298,8 @@ public Object get(int i) {
return existingRowsCount;
case 10:
return deletedRowsCount;
case 11:
return sequenceNumber;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
Expand Down Expand Up @@ -319,6 +348,9 @@ public <T> void set(int i, T value) {
case 10:
this.deletedRowsCount = (Long) value;
return;
case 11:
this.sequenceNumber = (Long) value;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand Down Expand Up @@ -396,6 +428,11 @@ public CopyBuilder withSnapshotId(Long newSnapshotId) {
return this;
}

public CopyBuilder withSequenceNumber(Long newSequenceNumber) {
manifestFile.sequenceNumber = newSequenceNumber;
return this;
}

public ManifestFile build() {
return manifestFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,26 @@ static InheritableMetadata empty() {
}

static InheritableMetadata fromManifest(ManifestFile manifest) {
return new BaseInheritableMetadata(manifest.snapshotId());
return new BaseInheritableMetadata(manifest.snapshotId(), manifest.sequenceNumber());
}

static class BaseInheritableMetadata implements InheritableMetadata {

private final Long snapshotId;
private final Long sequenceNumber;

private BaseInheritableMetadata(Long snapshotId) {
private BaseInheritableMetadata(Long snapshotId, Long sequenceNumber) {
this.snapshotId = snapshotId;
this.sequenceNumber = sequenceNumber;
}

public ManifestEntry apply(ManifestEntry manifestEntry) {
if (manifestEntry.snapshotId() == null) {
manifestEntry.setSnapshotId(snapshotId);
}
if (manifestEntry.sequenceNumber() == null) {
manifestEntry.setSequenceNumber(sequenceNumber);
}
return manifestEntry;
}
}
Expand Down
Loading