Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,13 @@ public interface Snapshot extends Serializable {
* @return the location of the manifest list for this Snapshot
*/
String manifestListLocation();

/**
* Return the id of the schema used when this snapshot was created, or null if this information is not available.
*
* @return schema id associated with this snapshot
*/
default Integer schemaId() {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the case that the information will be null ? And if it's null, then how could people read the correct schema for the snapshot ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think you mean if people read the old metadata, its schema id from snapshots will be null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, schemaId() returns null in the case where the snapshot was written before this change. Note though, that even after this change, new metadata can have snapshots without schema id (so schemaId() for those snapshots will return null), if it is metadata for a table existing before this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a PR (#1508) that reads previous metadata to get the schema for the snapshot in case Snapshot#schemaId() returns null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in response, and thank you @openinx for the review! And thank you @wypoon for responding!

}
}
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ default String name() {
*/
Schema schema();

/**
* Return a map of {@link Schema schema} for this table.
*
* @return this table's schema map
*/
Map<Integer, Schema> schemas();

/**
* Return the {@link PartitionSpec partition spec} for this table.
*
Expand Down
15 changes: 15 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@ public static void assertSerializedAndLoadedMetadata(Table expected, Table actua
Assert.assertEquals("History must match", expected.history(), actual.history());
}

public static void assertSameSchemaMap(Map<Integer, Schema> map1, Map<Integer, Schema> map2) {
if (map1.size() != map2.size()) {
Assert.fail("Should have same number of schemas in both maps");
}

map1.forEach((schemaId, schema1) -> {
Schema schema2 = map2.get(schemaId);
Assert.assertNotNull(String.format("Schema ID %s does not exist in map: %s", schemaId, map2), schema2);

Assert.assertEquals("Should have matching schema id", schema1.schemaId(), schema2.schemaId());
Assert.assertTrue(String.format("Should be the same schema. Schema 1: %s, schema 2: %s", schema1, schema2),
schema1.sameSchema(schema2));
});
}

private static class CheckReferencesBound extends ExpressionVisitors.ExpressionVisitor<Void> {
private final String message;

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void refresh() {
table().refresh();
}

@Override
public Map<Integer, Schema> schemas() {
return ImmutableMap.of(TableMetadata.INITIAL_SCHEMA_ID, schema());
}

@Override
public PartitionSpec spec() {
return spec;
Expand Down
21 changes: 17 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class BaseSnapshot implements Snapshot {
private final String manifestListLocation;
private final String operation;
private final Map<String, String> summary;
private final Integer schemaId;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -56,9 +57,10 @@ class BaseSnapshot implements Snapshot {
*/
BaseSnapshot(FileIO io,
long snapshotId,
Integer schemaId,
String... manifestFiles) {
this(io, snapshotId, null, System.currentTimeMillis(), null, null,
Lists.transform(Arrays.asList(manifestFiles),
schemaId, Lists.transform(Arrays.asList(manifestFiles),
path -> new GenericManifestFile(io.newInputFile(path), 0)));
}

Expand All @@ -69,6 +71,7 @@ class BaseSnapshot implements Snapshot {
long timestampMillis,
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList) {
this.io = io;
this.sequenceNumber = sequenceNumber;
Expand All @@ -77,6 +80,7 @@ class BaseSnapshot implements Snapshot {
this.timestampMillis = timestampMillis;
this.operation = operation;
this.summary = summary;
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
}

Expand All @@ -86,8 +90,9 @@ class BaseSnapshot implements Snapshot {
long timestampMillis,
String operation,
Map<String, String> summary,
Integer schemaId,
List<ManifestFile> dataManifests) {
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null);
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, schemaId, null);
this.allManifests = dataManifests;
}

Expand Down Expand Up @@ -121,6 +126,11 @@ public Map<String, String> summary() {
return summary;
}

@Override
public Integer schemaId() {
return schemaId;
}

private void cacheManifests() {
if (allManifests == null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the list
Expand Down Expand Up @@ -222,7 +232,8 @@ public boolean equals(Object o) {
return this.snapshotId == other.snapshotId() &&
Objects.equal(this.parentId, other.parentId()) &&
this.sequenceNumber == other.sequenceNumber() &&
this.timestampMillis == other.timestampMillis();
this.timestampMillis == other.timestampMillis() &&
Objects.equal(this.schemaId, other.schemaId());
}

return false;
Expand All @@ -234,7 +245,8 @@ public int hashCode() {
this.snapshotId,
this.parentId,
this.sequenceNumber,
this.timestampMillis
this.timestampMillis,
this.schemaId
);
}

Expand All @@ -246,6 +258,7 @@ public String toString() {
.add("operation", operation)
.add("summary", summary)
.add("manifest-list", manifestListLocation)
.add("schema-id", schemaId)
.toString();
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public Schema schema() {
return ops.current().schema();
}

@Override
public Map<Integer, Schema> schemas() {
return ops.current().schemasById();
}

@Override
public PartitionSpec spec() {
return ops.current().spec();
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ public Schema schema() {
return current.schema();
}

@Override
public Map<Integer, Schema> schemas() {
return current.schemasById();
}

@Override
public PartitionSpec spec() {
return current.spec();
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ public Schema schema() {
return lazySchema;
}

@Override
public Map<Integer, Schema> schemas() {
return lazyTable().schemas();
}

@Override
public PartitionSpec spec() {
if (lazySpec == null) {
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private SnapshotParser() {
private static final String OPERATION = "operation";
private static final String MANIFESTS = "manifests";
private static final String MANIFEST_LIST = "manifest-list";
private static final String SCHEMA_ID = "schema-id";

static void toJson(Snapshot snapshot, JsonGenerator generator)
throws IOException {
Expand Down Expand Up @@ -88,6 +89,11 @@ static void toJson(Snapshot snapshot, JsonGenerator generator)
generator.writeEndArray();
}

// schema ID might be null for snapshots written by old writers
if (snapshot.schemaId() != null) {
generator.writeNumberField(SCHEMA_ID, snapshot.schemaId());
}

generator.writeEndObject();
}

Expand Down Expand Up @@ -139,17 +145,20 @@ static Snapshot fromJson(FileIO io, JsonNode node) {
summary = builder.build();
}

Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, node);

if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
return new BaseSnapshot(io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, manifestList);
return new BaseSnapshot(
io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, schemaId, manifestList);

} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
// loaded lazily, if it is needed
List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
location -> new GenericManifestFile(io.newInputFile(location), 0));
return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests);
return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, schemaId, manifests);
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ public Snapshot apply() {

return new BaseSnapshot(ops.io(),
sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
manifestList.location());
base.currentSchemaId(), manifestList.location());

} else {
return new BaseSnapshot(ops.io(),
snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
manifests);
base.currentSchemaId(), manifests);
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile
}

Assert.assertFalse("Should find all files in the manifest", newPaths.hasNext());

Assert.assertEquals("Schema ID should match", table.schema().schemaId(), (int) snap.schemaId());
}

void validateTableFiles(Table tbl, DataFile... expectedFiles) {
Expand Down
Loading