diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 84bce08852ac..f0502b643c5c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -20,10 +20,11 @@ package org.apache.iceberg; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.hadoop.HadoopFileIO; -import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; @@ -94,15 +95,22 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) { if (!Objects.equal(currentMetadataLocation, newLocation)) { LOG.info("Refreshing table metadata from new version: {}", newLocation); + AtomicReference newMetadata = new AtomicReference<>(); Tasks.foreach(newLocation) .retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) .suppressFailureWhenFinished() - .run(metadataLocation -> { - this.currentMetadata = TableMetadataParser.read( - this, HadoopInputFile.fromLocation(metadataLocation, conf)); - this.currentMetadataLocation = metadataLocation; - this.version = parseVersion(metadataLocation); - }); + .run(metadataLocation -> newMetadata.set( + TableMetadataParser.read(this, io().newInputFile(metadataLocation)))); + + String newUUID = newMetadata.get().uuid(); + if (currentMetadata != null) { + Preconditions.checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()), + "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID); + } + + this.currentMetadata = newMetadata.get(); + this.currentMetadataLocation = newLocation; + this.version = parseVersion(newLocation); } this.shouldRefresh = false; } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 109fbd9609c8..e8cc29baf7ea 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -231,7 +231,9 @@ public void commit() { Snapshot newSnapshot = apply(); newSnapshotId.set(newSnapshot.snapshotId()); TableMetadata updated = base.replaceCurrentSnapshot(newSnapshot); - taskOps.commit(base, updated); + // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries + // to ensure that if a concurrent operation assigns the UUID, this operation will not fail. + taskOps.commit(base, updated.withUUID()); }); } catch (RuntimeException e) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index a7c0288739c8..612090955864 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import org.apache.iceberg.exceptions.ValidationException; @@ -74,7 +75,7 @@ public static TableMetadata newTableMetadata(TableOperations ops, } PartitionSpec freshSpec = specBuilder.build(); - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, UUID.randomUUID().toString(), location, System.currentTimeMillis(), lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec), ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of()); @@ -127,6 +128,7 @@ public String toString() { private final InputFile file; // stored metadata + private final String uuid; private final String location; private final long lastUpdatedMillis; private final int lastColumnId; @@ -142,6 +144,7 @@ public String toString() { TableMetadata(TableOperations ops, InputFile file, + String uuid, String location, long lastUpdatedMillis, int lastColumnId, @@ -154,6 +157,7 @@ public String toString() { List snapshotLog) { this.ops = ops; this.file = file; + this.uuid = uuid; this.location = location; this.lastUpdatedMillis = lastUpdatedMillis; this.lastColumnId = lastColumnId; @@ -187,6 +191,10 @@ public InputFile file() { return file; } + public String uuid() { + return uuid; + } + public long lastUpdatedMillis() { return lastUpdatedMillis; } @@ -251,8 +259,18 @@ public List snapshotLog() { return snapshotLog; } + public TableMetadata withUUID() { + if (uuid != null) { + return this; + } else { + return new TableMetadata(ops, null, UUID.randomUUID().toString(), location, + lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, snapshots, snapshotLog); + } + } + public TableMetadata updateTableLocation(String newLocation) { - return new TableMetadata(ops, null, newLocation, + return new TableMetadata(ops, null, uuid, newLocation, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, snapshots, snapshotLog); } @@ -262,7 +280,7 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) { // rebuild all of the partition specs for the new current schema List updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec)); - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, uuid, location, System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs, properties, currentSnapshotId, snapshots, snapshotLog); } @@ -291,7 +309,7 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) { builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec)); } - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, uuid, location, System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog); @@ -306,7 +324,7 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { .addAll(snapshotLog) .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())) .build(); - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, uuid, location, snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog); } @@ -337,7 +355,7 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) { } } - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, uuid, location, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog)); } @@ -352,14 +370,14 @@ public TableMetadata rollbackTo(Snapshot snapshot) { .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId())) .build(); - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, uuid, location, nowMillis, lastColumnId, schema, defaultSpecId, specs, properties, snapshot.snapshotId(), snapshots, newSnapshotLog); } public TableMetadata replaceProperties(Map newProperties) { ValidationException.check(newProperties != null, "Cannot set properties to null"); - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, uuid, location, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties, currentSnapshotId, snapshots, snapshotLog); } @@ -376,7 +394,7 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { ValidationException.check(currentSnapshotId < 0 || // not set Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId, "Cannot set invalid snapshot log: latest entry is not the current snapshot"); - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, uuid, location, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, snapshots, newSnapshotLog); } @@ -415,14 +433,14 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update newProperties.putAll(this.properties); newProperties.putAll(updatedProperties); - return new TableMetadata(ops, null, location, + return new TableMetadata(ops, null, uuid, location, System.currentTimeMillis(), nextLastColumnId.get(), freshSchema, specId, builder.build(), ImmutableMap.copyOf(newProperties), -1, snapshots, ImmutableList.of()); } public TableMetadata updateLocation(String newLocation) { - return new TableMetadata(ops, null, newLocation, + return new TableMetadata(ops, null, uuid, newLocation, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, snapshots, snapshotLog); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index bf06c9ef38d7..96351e6c5249 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -49,6 +49,7 @@ private TableMetadataParser() {} // visible for testing static final String FORMAT_VERSION = "format-version"; + static final String TABLE_UUID = "table-uuid"; static final String LOCATION = "location"; static final String LAST_UPDATED_MILLIS = "last-updated-ms"; static final String LAST_COLUMN_ID = "last-column-id"; @@ -65,9 +66,9 @@ private TableMetadataParser() {} public static void write(TableMetadata metadata, OutputFile outputFile) { try (OutputStreamWriter writer = new OutputStreamWriter( - outputFile.location().endsWith(".gz") ? - new GZIPOutputStream(outputFile.create()) : - outputFile.create())) { + outputFile.location().endsWith(".gz") ? + new GZIPOutputStream(outputFile.create()) : + outputFile.create())) { JsonGenerator generator = JsonUtil.factory().createGenerator(writer); generator.useDefaultPrettyPrinter(); toJson(metadata, generator); @@ -97,6 +98,7 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro generator.writeStartObject(); generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION); + generator.writeStringField(TABLE_UUID, metadata.uuid()); generator.writeStringField(LOCATION, metadata.location()); generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis()); generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId()); @@ -161,6 +163,7 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node Preconditions.checkArgument(formatVersion == TableMetadata.TABLE_FORMAT_VERSION, "Cannot read unsupported version %d", formatVersion); + String uuid = JsonUtil.getStringOrNull(TABLE_UUID, node); String location = JsonUtil.getString(LOCATION, node); int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node); Schema schema = SchemaParser.fromJson(node.get(SCHEMA)); @@ -215,7 +218,7 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node } } - return new TableMetadata(ops, file, location, + return new TableMetadata(ops, file, uuid, location, lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties, currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator())); } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index b4fd03cb7eda..256aa808f011 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -94,8 +94,15 @@ public TableMetadata refresh() { throw new RuntimeIOException(e, "Failed to get file system for path: %s", metadataFile); } this.version = ver; - this.currentMetadata = TableMetadataParser.read(this, - io().newInputFile(metadataFile.toString())); + + TableMetadata newMetadata = TableMetadataParser.read(this, io().newInputFile(metadataFile.toString())); + String newUUID = newMetadata.uuid(); + if (currentMetadata != null) { + Preconditions.checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()), + "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID); + } + + this.currentMetadata = newMetadata; this.shouldRefresh = false; return currentMetadata; } diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java b/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java index 00f24bd3509d..18231223da34 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import org.apache.iceberg.TableMetadata.SnapshotLogEntry; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.types.Types; @@ -80,7 +81,7 @@ public void testJsonConversion() throws Exception { .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId())) .build(); - TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location", + TableMetadata expected = new TableMetadata(ops, null, UUID.randomUUID().toString(), "s3://bucket/test/location", System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog); @@ -89,6 +90,8 @@ public void testJsonConversion() throws Exception { TableMetadata metadata = TableMetadataParser.fromJson(ops, null, JsonUtil.mapper().readValue(asJson, JsonNode.class)); + Assert.assertEquals("Table UUID should match", + expected.uuid(), metadata.uuid()); Assert.assertEquals("Table location should match", expected.location(), metadata.location()); Assert.assertEquals("Last column ID should match", @@ -139,7 +142,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception { List reversedSnapshotLog = Lists.newArrayList(); - TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location", + TableMetadata expected = new TableMetadata(ops, null, UUID.randomUUID().toString(), "s3://bucket/test/location", System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog); @@ -164,7 +167,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception { } @Test - public void testBackwardCompatMissingPartitionSpecList() throws Exception { + public void testBackwardCompat() throws Exception { Schema schema = new Schema( Types.NestedField.required(1, "x", Types.LongType.get()), Types.NestedField.required(2, "y", Types.LongType.get()), @@ -182,16 +185,16 @@ public void testBackwardCompatMissingPartitionSpecList() throws Exception { ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); - - TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location", + TableMetadata expected = new TableMetadata(ops, null, null, "s3://bucket/test/location", System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of()); String asJson = toJsonWithoutSpecList(expected); - TableMetadata metadata = TableMetadataParser.fromJson(ops, null, - JsonUtil.mapper().readValue(asJson, JsonNode.class)); + TableMetadata metadata = TableMetadataParser + .fromJson(ops, null, JsonUtil.mapper().readValue(asJson, JsonNode.class)); + Assert.assertNull("Table UUID should not be assigned", metadata.uuid()); Assert.assertEquals("Table location should match", expected.location(), metadata.location()); Assert.assertEquals("Last column ID should match", diff --git a/site/docs/spec.md b/site/docs/spec.md index ef99596e402e..5a0a22de28d2 100644 --- a/site/docs/spec.md +++ b/site/docs/spec.md @@ -330,7 +330,8 @@ Table metadata consists of the following fields: | Field | Description | | ----- | ----------- | -| **`format-version`** | An integer version number for the format. Currently, this is always 1. | +| **`format-version`** | An integer version number for the format. Currently, this is always 1. Implementations must throw an exception if a table's version is higher than the supported version. | +| **`table-uuid`** | A UUID that identifies the table, generated when the table is created. Implementations must throw an exception if a table's UUID does not match the expected UUID after refreshing metadata. | | **`location`**| The table’s base location. This is used by writers to determine where to store data files, manifest files, and table metadata files. | | **`last-updated-ms`**| Timestamp in milliseconds from the unix epoch when the table was last updated. Each table metadata file should update this field just before writing. | | **`last-column-id`**| An integer; the highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. | @@ -588,6 +589,7 @@ Table metadata is serialized as a JSON object according to the following table. |Metadata field|JSON representation|Example| |--- |--- |--- | |**`format-version`**|`JSON int`|`1`| +|**`table-uuid`**|`JSON string`|`"fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"`| |**`location`**|`JSON string`|`"s3://b/wh/data.db/table"`| |**`last-updated-ms`**|`JSON long`|`1515100955770`| |**`last-column-id`**|`JSON int`|`22`|