Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package org.apache.iceberg;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -94,15 +95,22 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
if (!Objects.equal(currentMetadataLocation, newLocation)) {
LOG.info("Refreshing table metadata from new version: {}", newLocation);

AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
Tasks.foreach(newLocation)
.retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
.suppressFailureWhenFinished()
.run(metadataLocation -> {
this.currentMetadata = TableMetadataParser.read(
this, HadoopInputFile.fromLocation(metadataLocation, conf));
this.currentMetadataLocation = metadataLocation;
this.version = parseVersion(metadataLocation);
});
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(this, io().newInputFile(metadataLocation))));

String newUUID = newMetadata.get().uuid();
if (currentMetadata != null) {
Preconditions.checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()),
"Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID);
}

this.currentMetadata = newMetadata.get();
this.currentMetadataLocation = newLocation;
this.version = parseVersion(newLocation);
}
this.shouldRefresh = false;
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ public void commit() {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata updated = base.replaceCurrentSnapshot(newSnapshot);
taskOps.commit(base, updated);
// if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
// to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
taskOps.commit(base, updated.withUUID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand: table UUID isn't supposed to change once assigned and cannot be set back to null, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This identifies a table so you can't start an operation, drop and recreate the table, and then complete the operation. We had it happen once.

});

} catch (RuntimeException e) {
Expand Down
40 changes: 29 additions & 11 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -74,7 +75,7 @@ public static TableMetadata newTableMetadata(TableOperations ops,
}
PartitionSpec freshSpec = specBuilder.build();

return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, UUID.randomUUID().toString(), location,
System.currentTimeMillis(),
lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of());
Expand Down Expand Up @@ -127,6 +128,7 @@ public String toString() {
private final InputFile file;

// stored metadata
private final String uuid;
private final String location;
private final long lastUpdatedMillis;
private final int lastColumnId;
Expand All @@ -142,6 +144,7 @@ public String toString() {

TableMetadata(TableOperations ops,
InputFile file,
String uuid,
String location,
long lastUpdatedMillis,
int lastColumnId,
Expand All @@ -154,6 +157,7 @@ public String toString() {
List<SnapshotLogEntry> snapshotLog) {
this.ops = ops;
this.file = file;
this.uuid = uuid;
this.location = location;
this.lastUpdatedMillis = lastUpdatedMillis;
this.lastColumnId = lastColumnId;
Expand Down Expand Up @@ -187,6 +191,10 @@ public InputFile file() {
return file;
}

public String uuid() {
return uuid;
}

public long lastUpdatedMillis() {
return lastUpdatedMillis;
}
Expand Down Expand Up @@ -251,8 +259,18 @@ public List<SnapshotLogEntry> snapshotLog() {
return snapshotLog;
}

public TableMetadata withUUID() {
if (uuid != null) {
return this;
} else {
return new TableMetadata(ops, null, UUID.randomUUID().toString(), location,
lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog);
}
}

public TableMetadata updateTableLocation(String newLocation) {
return new TableMetadata(ops, null, newLocation,
return new TableMetadata(ops, null, uuid, newLocation,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog);
}
Expand All @@ -262,7 +280,7 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
// rebuild all of the partition specs for the new current schema
List<PartitionSpec> updatedSpecs = Lists.transform(specs,
spec -> updateSpecSchema(newSchema, spec));
return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, uuid, location,
System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs, properties,
currentSnapshotId, snapshots, snapshotLog);
}
Expand Down Expand Up @@ -291,7 +309,7 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec));
}

return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
builder.build(), properties,
currentSnapshotId, snapshots, snapshotLog);
Expand All @@ -306,7 +324,7 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
.addAll(snapshotLog)
.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
.build();
return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, uuid, location,
snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
snapshot.snapshotId(), newSnapshots, newSnapshotLog);
}
Expand Down Expand Up @@ -337,7 +355,7 @@ public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
}
}

return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog));
}
Expand All @@ -352,14 +370,14 @@ public TableMetadata rollbackTo(Snapshot snapshot) {
.add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId()))
.build();

return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, uuid, location,
nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
snapshot.snapshotId(), snapshots, newSnapshotLog);
}

public TableMetadata replaceProperties(Map<String, String> newProperties) {
ValidationException.check(newProperties != null, "Cannot set properties to null");
return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
currentSnapshotId, snapshots, snapshotLog);
}
Expand All @@ -376,7 +394,7 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
ValidationException.check(currentSnapshotId < 0 || // not set
Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
"Cannot set invalid snapshot log: latest entry is not the current snapshot");
return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, newSnapshotLog);
}
Expand Down Expand Up @@ -415,14 +433,14 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
newProperties.putAll(this.properties);
newProperties.putAll(updatedProperties);

return new TableMetadata(ops, null, location,
return new TableMetadata(ops, null, uuid, location,
System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
specId, builder.build(), ImmutableMap.copyOf(newProperties),
-1, snapshots, ImmutableList.of());
}

public TableMetadata updateLocation(String newLocation) {
return new TableMetadata(ops, null, newLocation,
return new TableMetadata(ops, null, uuid, newLocation,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog);
}
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/org/apache/iceberg/TableMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private TableMetadataParser() {}

// visible for testing
static final String FORMAT_VERSION = "format-version";
static final String TABLE_UUID = "table-uuid";
static final String LOCATION = "location";
static final String LAST_UPDATED_MILLIS = "last-updated-ms";
static final String LAST_COLUMN_ID = "last-column-id";
Expand All @@ -65,9 +66,9 @@ private TableMetadataParser() {}

public static void write(TableMetadata metadata, OutputFile outputFile) {
try (OutputStreamWriter writer = new OutputStreamWriter(
outputFile.location().endsWith(".gz") ?
new GZIPOutputStream(outputFile.create()) :
outputFile.create())) {
outputFile.location().endsWith(".gz") ?
new GZIPOutputStream(outputFile.create()) :
outputFile.create())) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
Expand Down Expand Up @@ -97,6 +98,7 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro
generator.writeStartObject();

generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION);
generator.writeStringField(TABLE_UUID, metadata.uuid());
generator.writeStringField(LOCATION, metadata.location());
generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
Expand Down Expand Up @@ -161,6 +163,7 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node
Preconditions.checkArgument(formatVersion == TableMetadata.TABLE_FORMAT_VERSION,
"Cannot read unsupported version %d", formatVersion);

String uuid = JsonUtil.getStringOrNull(TABLE_UUID, node);
String location = JsonUtil.getString(LOCATION, node);
int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node);
Schema schema = SchemaParser.fromJson(node.get(SCHEMA));
Expand Down Expand Up @@ -215,7 +218,7 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node
}
}

return new TableMetadata(ops, file, location,
return new TableMetadata(ops, file, uuid, location,
lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ public TableMetadata refresh() {
throw new RuntimeIOException(e, "Failed to get file system for path: %s", metadataFile);
}
this.version = ver;
this.currentMetadata = TableMetadataParser.read(this,
io().newInputFile(metadataFile.toString()));

TableMetadata newMetadata = TableMetadataParser.read(this, io().newInputFile(metadataFile.toString()));
String newUUID = newMetadata.uuid();
if (currentMetadata != null) {
Preconditions.checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()),
"Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID);
}

this.currentMetadata = newMetadata;
this.shouldRefresh = false;
return currentMetadata;
}
Expand Down
17 changes: 10 additions & 7 deletions core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -80,7 +81,7 @@ public void testJsonConversion() throws Exception {
.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
.build();

TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
TableMetadata expected = new TableMetadata(ops, null, UUID.randomUUID().toString(), "s3://bucket/test/location",
System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
Expand All @@ -89,6 +90,8 @@ public void testJsonConversion() throws Exception {
TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
JsonUtil.mapper().readValue(asJson, JsonNode.class));

Assert.assertEquals("Table UUID should match",
expected.uuid(), metadata.uuid());
Assert.assertEquals("Table location should match",
expected.location(), metadata.location());
Assert.assertEquals("Last column ID should match",
Expand Down Expand Up @@ -139,7 +142,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception {

List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();

TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
TableMetadata expected = new TableMetadata(ops, null, UUID.randomUUID().toString(), "s3://bucket/test/location",
System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
Expand All @@ -164,7 +167,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception {
}

@Test
public void testBackwardCompatMissingPartitionSpecList() throws Exception {
public void testBackwardCompat() throws Exception {
Schema schema = new Schema(
Types.NestedField.required(1, "x", Types.LongType.get()),
Types.NestedField.required(2, "y", Types.LongType.get()),
Expand All @@ -182,16 +185,16 @@ public void testBackwardCompatMissingPartitionSpecList() throws Exception {
ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));


TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
TableMetadata expected = new TableMetadata(ops, null, null, "s3://bucket/test/location",
System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of());

String asJson = toJsonWithoutSpecList(expected);
TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
JsonUtil.mapper().readValue(asJson, JsonNode.class));
TableMetadata metadata = TableMetadataParser
.fromJson(ops, null, JsonUtil.mapper().readValue(asJson, JsonNode.class));

Assert.assertNull("Table UUID should not be assigned", metadata.uuid());
Assert.assertEquals("Table location should match",
expected.location(), metadata.location());
Assert.assertEquals("Last column ID should match",
Expand Down
4 changes: 3 additions & 1 deletion site/docs/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ Table metadata consists of the following fields:

| Field | Description |
| ----- | ----------- |
| **`format-version`** | An integer version number for the format. Currently, this is always 1. |
| **`format-version`** | An integer version number for the format. Currently, this is always 1. Implementations must throw an exception if a table's version is higher than the supported version. |
| **`table-uuid`** | A UUID that identifies the table, generated when the table is created. Implementations must throw an exception if a table's UUID does not match the expected UUID after refreshing metadata. |
| **`location`**| The table’s base location. This is used by writers to determine where to store data files, manifest files, and table metadata files. |
| **`last-updated-ms`**| Timestamp in milliseconds from the unix epoch when the table was last updated. Each table metadata file should update this field just before writing. |
| **`last-column-id`**| An integer; the highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. |
Expand Down Expand Up @@ -588,6 +589,7 @@ Table metadata is serialized as a JSON object according to the following table.
|Metadata field|JSON representation|Example|
|--- |--- |--- |
|**`format-version`**|`JSON int`|`1`|
|**`table-uuid`**|`JSON string`|`"fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"`|
|**`location`**|`JSON string`|`"s3://b/wh/data.db/table"`|
|**`last-updated-ms`**|`JSON long`|`1515100955770`|
|**`last-column-id`**|`JSON int`|`22`|
Expand Down