diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 5a9469ca1013..0eee04b68a6c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; @@ -74,7 +73,6 @@ public class BaseRewriteManifests extends SnapshotProducer imp private final Set rewrittenManifests = Sets.newConcurrentHashSet(); private final Map writers = Maps.newConcurrentMap(); - private final AtomicInteger manifestSuffix = new AtomicInteger(0); private final AtomicLong entryCount = new AtomicLong(0); private Function clusterByFunc; @@ -157,7 +155,7 @@ public RewriteManifests addManifest(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), specsById)) { - OutputFile newFile = manifestPath(manifestSuffix.getAndIncrement()); + OutputFile newFile = newManifestOutput(); return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); @@ -336,18 +334,14 @@ class WriterWrapper { synchronized void addEntry(ManifestEntry entry) { if (writer == null) { - writer = newWriter(); + writer = newManifestWriter(spec); } else if (writer.length() >= getManifestTargetSizeBytes()) { close(); - writer = newWriter(); + writer = newManifestWriter(spec); } writer.existing(entry); } - private ManifestWriter newWriter() { - return new ManifestWriter(spec, manifestPath(manifestSuffix.getAndIncrement()), snapshotId()); - } - synchronized void close() { if (writer != null) { try { diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 246e10fa38c1..333883151787 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.OutputFile; @@ -48,7 +47,6 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private ManifestFile newManifest = null; - private final AtomicInteger manifestCount = new AtomicInteger(0); private boolean hasNewFiles = false; FastAppend(TableOperations ops) { @@ -110,7 +108,7 @@ public FastAppend appendManifest(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { - OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement()); + OutputFile newManifestPath = newManifestOutput(); return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); @@ -165,9 +163,7 @@ private ManifestFile writeManifest() throws IOException { } if (newManifest == null && newFiles.size() > 0) { - OutputFile out = manifestPath(manifestCount.getAndIncrement()); - - ManifestWriter writer = new ManifestWriter(spec, out, snapshotId()); + ManifestWriter writer = newManifestWriter(spec); try { writer.addAll(newFiles); } finally { diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index 0271695d32b9..e81d496075a4 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -28,10 +28,18 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; -class ManifestListWriter implements FileAppender { +abstract class ManifestListWriter implements FileAppender { + static ManifestListWriter write(int formatVersion, OutputFile manifestListFile, + long snapshotId, Long parentSnapshotId) { + if (formatVersion == 1) { + return new V1Writer(manifestListFile, snapshotId, parentSnapshotId); + } + throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion); + } + private final FileAppender writer; - ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) { + private ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) { this.writer = newAppender(snapshotFile, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId))); @@ -80,4 +88,10 @@ private static FileAppender newAppender(OutputFile file, Map { +public abstract class ManifestWriter implements FileAppender { private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class); static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId, @@ -44,7 +44,7 @@ static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputF static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder, Set allowedEntryStatuses) { - ManifestWriter writer = new ManifestWriter(reader.spec(), outputFile, snapshotId); + ManifestWriter writer = new V1Writer(reader.spec(), outputFile, snapshotId); boolean threw = true; try { for (ManifestEntry entry : reader.entries()) { @@ -93,7 +93,15 @@ static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, l * @return a manifest writer */ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { - return new ManifestWriter(spec, outputFile, null); + // always use a v1 writer for appended manifests because sequence number must be inherited + return write(1, spec, outputFile, null); + } + + static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + if (formatVersion == 1) { + return new V1Writer(spec, outputFile, snapshotId); + } + throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion); } private final OutputFile file; @@ -111,7 +119,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private int deletedFiles = 0; private long deletedRows = 0L; - ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.file = file; this.specId = spec.specId(); this.writer = newAppender(FileFormat.AVRO, spec, file); @@ -231,4 +239,10 @@ private static FileAppender newAppender(FileFormat format, PartitionSpec throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file); } } + + private static class V1Writer extends ManifestWriter { + V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { + super(spec, file, snapshotId); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e08ae494787a..4537c232d183 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; @@ -87,7 +86,6 @@ public String partition() { private final boolean snapshotIdInheritanceEnabled; // update data - private final AtomicInteger manifestCount = new AtomicInteger(0); private final List newFiles = Lists.newArrayList(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); @@ -230,7 +228,7 @@ protected void add(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { - OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement()); + OutputFile newManifestPath = newManifestOutput(); return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), appendedManifestsSummary); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); @@ -542,8 +540,7 @@ private ManifestFile filterManifestWithDeletedFiles( // manifest. produce a copy of the manifest with all deleted files removed. List deletedFiles = Lists.newArrayList(); Set deletedPaths = Sets.newHashSet(); - OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement()); - ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId()); + ManifestWriter writer = newManifestWriter(reader.spec()); try { reader.entries().forEach(entry -> { DataFile file = entry.file(); @@ -655,9 +652,7 @@ private ManifestFile createManifest(int specId, List bin) throws I return mergeManifests.get(bin); } - OutputFile out = manifestPath(manifestCount.getAndIncrement()); - - ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId()); + ManifestWriter writer = newManifestWriter(ops.current().spec()); try { for (ManifestFile manifest : bin) { try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { @@ -697,9 +692,7 @@ private ManifestFile newFilesAsManifest() throws IOException { } if (cachedNewManifest == null) { - OutputFile out = manifestPath(manifestCount.getAndIncrement()); - - ManifestWriter writer = new ManifestWriter(spec, out, snapshotId()); + ManifestWriter writer = newManifestWriter(spec); try { writer.addAll(newFiles); } finally { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 4cf46007873f..0d3239fd29d1 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -75,6 +75,7 @@ public void accept(String file) { private final TableOperations ops; private final String commitUUID = UUID.randomUUID().toString(); + private final AtomicInteger manifestCount = new AtomicInteger(0); private final AtomicInteger attempt = new AtomicInteger(0); private final List manifestLists = Lists.newArrayList(); private volatile Long snapshotId = null; @@ -148,8 +149,8 @@ public Snapshot apply() { if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) { OutputFile manifestList = manifestListPath(); - try (ManifestListWriter writer = new ManifestListWriter( - manifestList, snapshotId(), parentSnapshotId)) { + try (ManifestListWriter writer = ManifestListWriter.write( + ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId)) { // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -310,9 +311,13 @@ protected OutputFile manifestListPath() { String.format("snap-%d-%d-%s", snapshotId(), attempt.incrementAndGet(), commitUUID)))); } - protected OutputFile manifestPath(int manifestNumber) { + protected OutputFile newManifestOutput() { return ops.io().newOutputFile( - ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestNumber))); + ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()))); + } + + protected ManifestWriter newManifestWriter(PartitionSpec spec) { + return ManifestWriter.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); } protected long snapshotId() { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index e44e002bdab4..4d12cf5ce84c 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -43,15 +43,10 @@ * Metadata for a table. */ public class TableMetadata { - static final int TABLE_FORMAT_VERSION = 1; + static final int DEFAULT_TABLE_FORMAT_VERSION = 1; + static final int SUPPORTED_TABLE_FORMAT_VERSION = 2; static final int INITIAL_SPEC_ID = 0; - public static TableMetadata newTableMetadata(Schema schema, - PartitionSpec spec, - String location) { - return newTableMetadata(schema, spec, location, ImmutableMap.of()); - } - public static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, String location, @@ -73,7 +68,7 @@ public static TableMetadata newTableMetadata(Schema schema, } PartitionSpec freshSpec = specBuilder.build(); - return new TableMetadata(null, UUID.randomUUID().toString(), location, + return new TableMetadata(null, DEFAULT_TABLE_FORMAT_VERSION, UUID.randomUUID().toString(), location, System.currentTimeMillis(), lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec), ImmutableMap.copyOf(properties), -1, ImmutableList.of(), @@ -170,6 +165,7 @@ public String toString() { private final InputFile file; // stored metadata + private final int formatVersion; private final String uuid; private final String location; private final long lastUpdatedMillis; @@ -186,6 +182,7 @@ public String toString() { private final List previousFiles; TableMetadata(InputFile file, + int formatVersion, String uuid, String location, long lastUpdatedMillis, @@ -198,6 +195,13 @@ public String toString() { List snapshots, List snapshotLog, List previousFiles) { + Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION, + "Unsupported format version: v%s", formatVersion); + if (formatVersion > 1) { + Preconditions.checkArgument(uuid != null, "UUID is required in format v%s", formatVersion); + } + + this.formatVersion = formatVersion; this.file = file; this.uuid = uuid; this.location = location; @@ -240,6 +244,10 @@ public String toString() { "Invalid table metadata: Cannot find current version"); } + public int formatVersion() { + return formatVersion; + } + public InputFile file() { return file; } @@ -328,24 +336,18 @@ public TableMetadata withUUID() { if (uuid != null) { return this; } else { - return new TableMetadata(null, UUID.randomUUID().toString(), location, + return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location, lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } } - public TableMetadata updateTableLocation(String newLocation) { - return new TableMetadata(null, uuid, newLocation, - System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, - currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); - } - public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) { PartitionSpec.checkCompatibility(spec(), newSchema); // rebuild all of the partition specs for the new current schema List updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec)); - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs, properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } @@ -374,7 +376,7 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) { builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec)); } - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); @@ -385,7 +387,7 @@ public TableMetadata addStagedSnapshot(Snapshot snapshot) { .addAll(snapshots) .add(snapshot) .build(); - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, newSnapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } @@ -404,7 +406,7 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { .addAll(snapshotLog) .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())) .build(); - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } @@ -435,7 +437,7 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) { } } - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog), addPreviousFile(file, lastUpdatedMillis)); @@ -456,14 +458,14 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) { .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId())) .build(); - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, nowMillis, lastColumnId, schema, defaultSpecId, specs, properties, snapshot.snapshotId(), snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } public TableMetadata replaceProperties(Map newProperties) { ValidationException.check(newProperties != null, "Cannot set properties to null"); - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties)); } @@ -480,7 +482,7 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { ValidationException.check(currentSnapshotId < 0 || // not set Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId, "Cannot set invalid snapshot log: latest entry is not the current snapshot"); - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } @@ -519,14 +521,30 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update newProperties.putAll(this.properties); newProperties.putAll(updatedProperties); - return new TableMetadata(null, uuid, location, + return new TableMetadata(null, formatVersion, uuid, location, System.currentTimeMillis(), nextLastColumnId.get(), freshSchema, specId, builder.build(), ImmutableMap.copyOf(newProperties), -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties)); } public TableMetadata updateLocation(String newLocation) { - return new TableMetadata(null, uuid, newLocation, + return new TableMetadata(null, formatVersion, uuid, newLocation, + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); + } + + public TableMetadata upgradeToFormatVersion(int newFormatVersion) { + Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION, + "Cannot upgrade table to unsupported format version: v%s (supported: v%s)", + newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION); + Preconditions.checkArgument(newFormatVersion >= formatVersion, + "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion); + + if (newFormatVersion == formatVersion) { + return this; + } + + return new TableMetadata(null, newFormatVersion, uuid, location, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index fd68338ffb4d..6fca73b2bb44 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -151,7 +151,7 @@ public static String toJson(TableMetadata metadata) { private static void toJson(TableMetadata metadata, JsonGenerator generator) throws IOException { generator.writeStartObject(); - generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION); + generator.writeNumberField(FORMAT_VERSION, metadata.formatVersion()); generator.writeStringField(TABLE_UUID, metadata.uuid()); generator.writeStringField(LOCATION, metadata.location()); generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis()); @@ -161,8 +161,10 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro SchemaParser.toJson(metadata.schema(), generator); // for older readers, continue writing the default spec as "partition-spec" - generator.writeFieldName(PARTITION_SPEC); - PartitionSpecParser.toJsonFields(metadata.spec(), generator); + if (metadata.formatVersion() == 1) { + generator.writeFieldName(PARTITION_SPEC); + PartitionSpecParser.toJsonFields(metadata.spec(), generator); + } // write the default spec ID and spec list generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId()); @@ -226,7 +228,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { "Cannot parse metadata from a non-object: %s", node); int formatVersion = JsonUtil.getInt(FORMAT_VERSION, node); - Preconditions.checkArgument(formatVersion == TableMetadata.TABLE_FORMAT_VERSION, + Preconditions.checkArgument(formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION, "Cannot read unsupported version %s", formatVersion); String uuid = JsonUtil.getStringOrNull(TABLE_UUID, node); @@ -295,7 +297,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { } } - return new TableMetadata(file, uuid, location, + return new TableMetadata(file, formatVersion, uuid, location, lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties, currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()), ImmutableList.copyOf(metadataEntries.iterator())); diff --git a/core/src/test/java/org/apache/iceberg/TestFormatVersions.java b/core/src/test/java/org/apache/iceberg/TestFormatVersions.java new file mode 100644 index 000000000000..f58d70beeeb1 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestFormatVersions.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.junit.Assert; +import org.junit.Test; + +public class TestFormatVersions extends TableTestBase { + @Test + public void testDefaultFormatVersion() { + Assert.assertEquals("Should default to v1", 1, table.ops().current().formatVersion()); + } + + @Test + public void testFormatVersionUpgrade() { + TableOperations ops = table.ops(); + TableMetadata base = ops.current(); + ops.commit(base, base.upgradeToFormatVersion(2)); + + Assert.assertEquals("Should report v2", 2, ops.current().formatVersion()); + } + + @Test + public void testFormatVersionDowngrade() { + TableOperations ops = table.ops(); + TableMetadata base = ops.current(); + ops.commit(base, base.upgradeToFormatVersion(2)); + + Assert.assertEquals("Should report v2", 2, ops.current().formatVersion()); + + AssertHelpers.assertThrows("Should reject a version downgrade", + IllegalArgumentException.class, "Cannot downgrade", + () -> ops.current().upgradeToFormatVersion(1)); + + Assert.assertEquals("Should report v2", 2, ops.current().formatVersion()); + } + + @Test + public void testFormatVersionUpgradeNotSupported() { + TableOperations ops = table.ops(); + TableMetadata base = ops.current(); + AssertHelpers.assertThrows("Should reject an unsupported version upgrade", + IllegalArgumentException.class, "Cannot upgrade table to unsupported format version", + () -> ops.commit(base, base.upgradeToFormatVersion(TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1))); + + Assert.assertEquals("Should report v1", 1, ops.current().formatVersion()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index 16ad4ee16a75..5e55d66ccd40 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -95,8 +95,7 @@ public void testJsonConversionWithManifestList() throws IOException { Assert.assertTrue(manifestList.delete()); manifestList.deleteOnExit(); - try (ManifestListWriter writer = new ManifestListWriter( - Files.localOutput(manifestList), id, parentId)) { + try (ManifestListWriter writer = ManifestListWriter.write(1, Files.localOutput(manifestList), id, parentId)) { writer.addAll(manifests); } diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index c2603f2613e5..a77530bba8ed 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -55,8 +55,21 @@ import static org.apache.iceberg.TableMetadataParser.PROPERTIES; import static org.apache.iceberg.TableMetadataParser.SCHEMA; import static org.apache.iceberg.TableMetadataParser.SNAPSHOTS; +import static org.apache.iceberg.TableMetadataParser.TABLE_UUID; public class TestTableMetadata { + private static final String TEST_LOCATION = "s3://bucket/test/location"; + + private static final Schema TEST_SCHEMA = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get(), "comment"), + Types.NestedField.required(3, "z", Types.LongType.get()) + ); + + private static final int LAST_ASSIGNED_COLUMN_ID = 3; + + private static final PartitionSpec SPEC_5 = PartitionSpec.builderFor(TEST_SCHEMA).withSpecId(5).build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -64,30 +77,22 @@ public class TestTableMetadata { @Test public void testJsonConversion() throws Exception { - Schema schema = new Schema( - Types.NestedField.required(1, "x", Types.LongType.get()), - Types.NestedField.required(2, "y", Types.LongType.get(), "comment"), - Types.NestedField.required(3, "z", Types.LongType.get()) - ); - - PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); - long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List snapshotLog = ImmutableList.builder() .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId())) .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId())) .build(); - TableMetadata expected = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), + TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION, + System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of()); @@ -95,6 +100,8 @@ public void testJsonConversion() throws Exception { TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), null, JsonUtil.mapper().readValue(asJson, JsonNode.class)); + Assert.assertEquals("Format version should match", + expected.formatVersion(), metadata.formatVersion()); Assert.assertEquals("Table UUID should match", expected.uuid(), metadata.uuid()); Assert.assertEquals("Table location should match", @@ -128,27 +135,19 @@ public void testJsonConversion() throws Exception { @Test public void testFromJsonSortsSnapshotLog() throws Exception { - Schema schema = new Schema( - Types.NestedField.required(1, "x", Types.LongType.get()), - Types.NestedField.required(2, "y", Types.LongType.get()), - Types.NestedField.required(3, "z", Types.LongType.get()) - ); - - PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); - long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); - TableMetadata expected = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), + TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION, + System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.of()); @@ -173,13 +172,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception { @Test public void testBackwardCompat() throws Exception { - Schema schema = new Schema( - Types.NestedField.required(1, "x", Types.LongType.get()), - Types.NestedField.required(2, "y", Types.LongType.get()), - Types.NestedField.required(3, "z", Types.LongType.get()) - ); - - PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").withSpecId(6).build(); + PartitionSpec spec = PartitionSpec.builderFor(TEST_SCHEMA).identity("x").withSpecId(6).build(); long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( @@ -190,8 +183,8 @@ public void testBackwardCompat() throws Exception { ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); - TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec), + TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION, + System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of()); @@ -199,6 +192,8 @@ public void testBackwardCompat() throws Exception { TableMetadata metadata = TableMetadataParser .fromJson(ops.io(), null, JsonUtil.mapper().readValue(asJson, JsonNode.class)); + Assert.assertEquals("Format version should match", + expected.formatVersion(), metadata.formatVersion()); Assert.assertNull("Table UUID should not be assigned", metadata.uuid()); Assert.assertEquals("Table location should match", expected.location(), metadata.location()); @@ -242,7 +237,7 @@ public static String toJsonWithoutSpecList(TableMetadata metadata) { generator.writeStartObject(); // start table metadata object - generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION); + generator.writeNumberField(FORMAT_VERSION, 1); generator.writeStringField(LOCATION, metadata.location()); generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis()); generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId()); @@ -281,22 +276,14 @@ public static String toJsonWithoutSpecList(TableMetadata metadata) { @Test public void testJsonWithPreviousMetadataLog() throws Exception { - Schema schema = new Schema( - Types.NestedField.required(1, "x", Types.LongType.get()), - Types.NestedField.required(2, "y", Types.LongType.get()), - Types.NestedField.required(3, "z", Types.LongType.get()) - ); - - PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); - long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -304,8 +291,8 @@ public void testJsonWithPreviousMetadataLog() throws Exception { previousMetadataLog.add(new MetadataLogEntry(currentTimestamp, "/tmp/000001-" + UUID.randomUUID().toString() + ".metadata.json")); - TableMetadata base = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), + TableMetadata base = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION, + System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog)); @@ -318,23 +305,15 @@ public void testJsonWithPreviousMetadataLog() throws Exception { } @Test - public void testAddPreviousMetadataRemoveNone() throws Exception { - Schema schema = new Schema( - Types.NestedField.required(1, "x", Types.LongType.get()), - Types.NestedField.required(2, "y", Types.LongType.get()), - Types.NestedField.required(3, "z", Types.LongType.get()) - ); - - PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); - + public void testAddPreviousMetadataRemoveNone() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -347,8 +326,8 @@ public void testAddPreviousMetadataRemoveNone() throws Exception { MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 80, "/tmp/000003-" + UUID.randomUUID().toString() + ".metadata.json"); - TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(), - "s3://bucket/test/location", currentTimestamp - 80, 3, schema, 5, ImmutableList.of(spec), + TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(), + TEST_LOCATION, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog)); @@ -365,23 +344,15 @@ public void testAddPreviousMetadataRemoveNone() throws Exception { } @Test - public void testAddPreviousMetadataRemoveOne() throws Exception { - Schema schema = new Schema( - Types.NestedField.required(1, "x", Types.LongType.get()), - Types.NestedField.required(2, "y", Types.LongType.get()), - Types.NestedField.required(3, "z", Types.LongType.get()) - ); - - PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); - + public void testAddPreviousMetadataRemoveOne() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -400,9 +371,9 @@ public void testAddPreviousMetadataRemoveOne() throws Exception { MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 50, "/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json"); - TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(), - "s3://bucket/test/location", currentTimestamp - 50, 3, schema, 5, - ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, + TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(), + TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 5, + ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog)); @@ -423,23 +394,15 @@ public void testAddPreviousMetadataRemoveOne() throws Exception { } @Test - public void testAddPreviousMetadataRemoveMultiple() throws Exception { - Schema schema = new Schema( - Types.NestedField.required(1, "x", Types.LongType.get()), - Types.NestedField.required(2, "y", Types.LongType.get()), - Types.NestedField.required(3, "z", Types.LongType.get()) - ); - - PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); - + public void testAddPreviousMetadataRemoveMultiple() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -458,9 +421,9 @@ public void testAddPreviousMetadataRemoveMultiple() throws Exception { MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 50, "/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json"); - TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(), - "s3://bucket/test/location", currentTimestamp - 50, 3, schema, 2, - ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, + TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(), + TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 2, + ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog)); @@ -480,4 +443,87 @@ public void testAddPreviousMetadataRemoveMultiple() throws Exception { ImmutableList.copyOf(removedPreviousMetadata)); } + @Test + public void testV2UUIDValidation() { + AssertHelpers.assertThrows("Should reject v2 metadata without a UUID", + IllegalArgumentException.class, "UUID is required in format v2", + () -> new TableMetadata(null, 2, null, TEST_LOCATION, System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID, + TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L, ImmutableList.of(), + ImmutableList.of(), ImmutableList.of()) + ); + } + + @Test + public void testVersionValidation() { + int unsupportedVersion = TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1; + AssertHelpers.assertThrows("Should reject unsupported metadata", + IllegalArgumentException.class, "Unsupported format version: v" + unsupportedVersion, + () -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, System.currentTimeMillis(), + LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L, + ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) + ); + } + + @Test + public void testParserVersionValidation() throws Exception { + String supportedVersion = toJsonWithVersion( + TableMetadata.newTableMetadata(TEST_SCHEMA, SPEC_5, TEST_LOCATION, ImmutableMap.of()), + TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION); + TableMetadata parsed = TableMetadataParser.fromJson( + ops.io(), null, JsonUtil.mapper().readValue(supportedVersion, JsonNode.class)); + Assert.assertNotNull("Should successfully read supported metadata version", parsed); + + String unsupportedVersion = toJsonWithVersion( + TableMetadata.newTableMetadata(TEST_SCHEMA, SPEC_5, TEST_LOCATION, ImmutableMap.of()), + TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1); + AssertHelpers.assertThrows("Should not read unsupported metadata", + IllegalArgumentException.class, "Cannot read unsupported version", + () -> TableMetadataParser.fromJson( + ops.io(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class))); + } + + public static String toJsonWithVersion(TableMetadata metadata, int version) { + StringWriter writer = new StringWriter(); + try { + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + + generator.writeStartObject(); // start table metadata object + + generator.writeNumberField(FORMAT_VERSION, version); + generator.writeStringField(TABLE_UUID, metadata.uuid()); + generator.writeStringField(LOCATION, metadata.location()); + generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis()); + generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId()); + + generator.writeFieldName(SCHEMA); + SchemaParser.toJson(metadata.schema(), generator); + + // mimic an old writer by writing only partition-spec and not the default ID or spec list + generator.writeFieldName(PARTITION_SPEC); + PartitionSpecParser.toJsonFields(metadata.spec(), generator); + + generator.writeObjectFieldStart(PROPERTIES); + for (Map.Entry keyValue : metadata.properties().entrySet()) { + generator.writeStringField(keyValue.getKey(), keyValue.getValue()); + } + generator.writeEndObject(); + + generator.writeNumberField(CURRENT_SNAPSHOT_ID, + metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1); + + generator.writeArrayFieldStart(SNAPSHOTS); + for (Snapshot snapshot : metadata.snapshots()) { + SnapshotParser.toJson(snapshot, generator); + } + generator.writeEndArray(); + // skip the snapshot log + + generator.writeEndObject(); // end table metadata object + + generator.flush(); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to write json for: %s", metadata); + } + return writer.toString(); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index 20d2f3e67a35..41974e37ac24 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -43,7 +43,7 @@ public static TestTable create(File temp, String name, Schema schema, PartitionS if (ops.current() != null) { throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } - ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString())); + ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of())); return new TestTable(ops, name); } @@ -53,7 +53,7 @@ public static Transaction beginCreate(File temp, String name, Schema schema, Par throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } - TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, temp.toString()); + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of()); return Transactions.createTableTransaction(ops, metadata); } diff --git a/site/docs/spec.md b/site/docs/spec.md index 921c100fb5a5..332e247e027a 100644 --- a/site/docs/spec.md +++ b/site/docs/spec.md @@ -677,3 +677,17 @@ This serialization scheme is for storing single values as individual binary valu | **`map`** | Not supported | +## Format version changes + +### Version 2 + +Writing metadata: +* Table metadata field `sequence-number` is required. +* Table metadata field `table-uuid` is required. +* Table metadata field `partition-specs` is required. +* Table metadata field `default-spec-id` is required. +* Table metadata field `partition-spec` is no longer required and may be omitted. +* Snapshot field `manifest-list` is required. +* Snapshot field `manifests` is not allowed. + +Note that these requirements apply when writing data to a v2 table. Tables that are upgraded from v1 may contain metadata that does not follow these requirements. Implementations should remain backward-compatible with v1 metadata requirements. diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java index 879aa16b18cf..b9d6c9c6d125 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.io.File; import java.util.Map; @@ -49,7 +50,7 @@ static TestTable create(File temp, String name, Schema schema, PartitionSpec spe if (ops.current() != null) { throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } - ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString())); + ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of())); return new TestTable(ops, name); }