From c22fa53fc4d78b045622bd5d0d3e2616ce8d0013 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 25 Nov 2018 14:07:11 -0800 Subject: [PATCH 1/2] Store multiple partition specs in table metadata. The purpose of this change is to enable future partition spec changes and to assign IDs to specs that can be easily encoded in an Avro file that tracks a snapshot's manifests. This updates TableMetadata and the metadata parser to support multiple partition specs. This change is forward-compatible for older readers because the "partition-spec" field in table metadata is still set to the default spec. Multiple specs are now stored in an array in table metadata called "partition-specs". Each entry in the array is an object with two fields, a "spec-id" field with an integer ID value, and a "partition-spec" field with a partition spec value (an array of partition fields). This also adds "default-spec-id" that points to the spec that should be used when writing. --- .../com/netflix/iceberg/PartitionSpec.java | 7 + .../com/netflix/iceberg/TableMetadata.java | 107 +++++++++++---- .../netflix/iceberg/TableMetadataParser.java | 76 +++++++++-- .../com/netflix/iceberg/TestMergeAppend.java | 17 +-- .../iceberg/TestTableMetadataJson.java | 129 +++++++++++++++++- 5 files changed, 282 insertions(+), 54 deletions(-) diff --git a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java index da13f8c06a2d..0896392f0de7 100644 --- a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java +++ b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java @@ -146,6 +146,13 @@ public String partitionToPath(StructLike data) { return sb.toString(); } + /** + * Returns true if this spec is equivalent to the other, with field names ignored. That is, if + * both specs have the same number of fields, field order, source columns, and transforms. + * + * @param other another PartitionSpec + * @return true if the specs have the same fields, source columns, and transforms. + */ public boolean compatibleWith(PartitionSpec other) { if (equals(other)) { return true; diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java index 38fda2e3bf8e..20f71e275893 100644 --- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java +++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.netflix.iceberg.exceptions.ValidationException; import com.netflix.iceberg.io.InputFile; @@ -71,7 +72,7 @@ public static TableMetadata newTableMetadata(TableOperations ops, return new TableMetadata(ops, null, location, System.currentTimeMillis(), - lastColumnId.get(), freshSchema, freshSpec, + lastColumnId.get(), freshSchema, 0, ImmutableMap.of(0, freshSpec), ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of()); } @@ -126,7 +127,8 @@ public String toString() { private final long lastUpdatedMillis; private final int lastColumnId; private final Schema schema; - private final PartitionSpec spec; + private final int defaultSpecId; + private final Map specs; private final Map properties; private final long currentSnapshotId; private final List snapshots; @@ -139,7 +141,8 @@ public String toString() { long lastUpdatedMillis, int lastColumnId, Schema schema, - PartitionSpec spec, + int defaultSpecId, + Map specs, Map properties, long currentSnapshotId, List snapshots, @@ -150,7 +153,8 @@ public String toString() { this.lastUpdatedMillis = lastUpdatedMillis; this.lastColumnId = lastColumnId; this.schema = schema; - this.spec = spec; + this.specs = specs; + this.defaultSpecId = defaultSpecId; this.properties = properties; this.currentSnapshotId = currentSnapshotId; this.snapshots = snapshots; @@ -194,7 +198,19 @@ public Schema schema() { } public PartitionSpec spec() { - return spec; + return specs.get(defaultSpecId); + } + + public int defaultSpecId() { + return defaultSpecId; + } + + public PartitionSpec spec(int id) { + return specs.get(id); + } + + public Map specs() { + return specs; } public String location() { @@ -239,15 +255,42 @@ public List snapshotLog() { public TableMetadata updateTableLocation(String newLocation) { return new TableMetadata(ops, null, newLocation, - System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId, - snapshots, snapshotLog); + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, snapshots, snapshotLog); } public TableMetadata updateSchema(Schema schema, int lastColumnId) { + PartitionSpec.checkCompatibility(spec(), schema); + return new TableMetadata(ops, null, location, + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, snapshots, snapshotLog); + } + + public TableMetadata updatePartitionSpec(PartitionSpec spec) { PartitionSpec.checkCompatibility(spec, schema); + + // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. + int newDefaultSpecId = 0; + for (Map.Entry entry : specs.entrySet()) { + if (spec.equals(entry.getValue())) { + newDefaultSpecId = entry.getKey(); + break; + } else if (newDefaultSpecId <= entry.getKey()) { + newDefaultSpecId = entry.getKey() + 1; + } + } + + Preconditions.checkArgument(defaultSpecId != newDefaultSpecId, + "Cannot set default partition spec to the current default"); + + Map newSpecs = Maps.newHashMap(); + newSpecs.putAll(specs); + newSpecs.put(newDefaultSpecId, spec); + return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId, - snapshots,snapshotLog); + System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId, + ImmutableMap.copyOf(newSpecs), properties, + currentSnapshotId, snapshots, snapshotLog); } public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { @@ -260,8 +303,8 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())) .build(); return new TableMetadata(ops, null, location, - snapshot.timestampMillis(), lastColumnId, schema, spec, properties, snapshot.snapshotId(), - newSnapshots, newSnapshotLog); + snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + snapshot.snapshotId(), newSnapshots, newSnapshotLog); } public TableMetadata removeSnapshotsIf(Predicate removeIf) { @@ -291,8 +334,8 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) { } return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId, - filtered, ImmutableList.copyOf(newSnapshotLog)); + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog)); } public TableMetadata rollbackTo(Snapshot snapshot) { @@ -306,15 +349,15 @@ public TableMetadata rollbackTo(Snapshot snapshot) { .build(); return new TableMetadata(ops, null, location, - nowMillis, lastColumnId, schema, spec, properties, snapshot.snapshotId(), snapshots, - newSnapshotLog); + nowMillis, lastColumnId, schema, defaultSpecId, specs, properties, + snapshot.snapshotId(), snapshots, newSnapshotLog); } public TableMetadata replaceProperties(Map newProperties) { ValidationException.check(newProperties != null, "Cannot set properties to null"); return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId, schema, spec, newProperties, currentSnapshotId, - snapshots, snapshotLog); + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties, + currentSnapshotId, snapshots, snapshotLog); } public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { @@ -330,8 +373,8 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId, "Cannot set invalid snapshot log: latest entry is not the current snapshot"); return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId, - snapshots, newSnapshotLog); + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, snapshots, newSnapshotLog); } public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec, @@ -351,12 +394,28 @@ public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec } PartitionSpec freshSpec = specBuilder.build(); - ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.putAll(this.properties); - builder.putAll(properties); + // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. + int specId = 0; + for (Map.Entry entry : specs.entrySet()) { + if (freshSpec.equals(entry.getValue())) { + specId = entry.getKey(); + break; + } else if (specId <= entry.getKey()) { + specId = entry.getKey() + 1; + } + } + + Map newSpecs = Maps.newHashMap(); + newSpecs.putAll(specs); + newSpecs.put(specId, freshSpec); + + Map newProperties = Maps.newHashMap(); + newProperties.putAll(this.properties); + newProperties.putAll(properties); return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId.get(), freshSchema, freshSpec, properties, -1, - snapshots, ImmutableList.of()); + System.currentTimeMillis(), lastColumnId.get(), freshSchema, + specId, ImmutableMap.copyOf(newSpecs), ImmutableMap.copyOf(newProperties), + -1, snapshots, ImmutableList.of()); } } diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java index f0508b36e9a2..9912317169de 100644 --- a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java +++ b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.netflix.iceberg.TableMetadata.SnapshotLogEntry; @@ -46,18 +47,22 @@ public class TableMetadataParser { - private static final String FORMAT_VERSION = "format-version"; - private static final String LOCATION = "location"; - private static final String LAST_UPDATED_MILLIS = "last-updated-ms"; - private static final String LAST_COLUMN_ID = "last-column-id"; - private static final String SCHEMA = "schema"; - private static final String PARTITION_SPEC = "partition-spec"; - private static final String PROPERTIES = "properties"; - private static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; - private static final String SNAPSHOTS = "snapshots"; - private static final String SNAPSHOT_ID = "snapshot-id"; - private static final String TIMESTAMP_MS = "timestamp-ms"; - private static final String SNAPSHOT_LOG = "snapshot-log"; + // visible for testing + static final String FORMAT_VERSION = "format-version"; + static final String LOCATION = "location"; + static final String LAST_UPDATED_MILLIS = "last-updated-ms"; + static final String LAST_COLUMN_ID = "last-column-id"; + static final String SCHEMA = "schema"; + static final String PARTITION_SPEC = "partition-spec"; + static final String PARTITION_SPECS = "partition-specs"; + static final String SPEC_ID = "spec-id"; + static final String DEFAULT_SPEC_ID = "default-spec-id"; + static final String PROPERTIES = "properties"; + static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; + static final String SNAPSHOTS = "snapshots"; + static final String SNAPSHOT_ID = "snapshot-id"; + static final String TIMESTAMP_MS = "timestamp-ms"; + static final String SNAPSHOT_LOG = "snapshot-log"; public static String toJson(TableMetadata metadata) { StringWriter writer = new StringWriter(); @@ -100,9 +105,22 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro generator.writeFieldName(SCHEMA); SchemaParser.toJson(metadata.schema(), generator); + // for older readers, continue writing the default spec as "partition-spec" generator.writeFieldName(PARTITION_SPEC); PartitionSpecParser.toJson(metadata.spec(), generator); + // write the default spec ID and spec list + generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId()); + generator.writeArrayFieldStart(PARTITION_SPECS); + for (Map.Entry entry : metadata.specs().entrySet()) { + generator.writeStartObject(); + generator.writeNumberField(SPEC_ID, entry.getKey()); + generator.writeFieldName(PARTITION_SPEC); + PartitionSpecParser.toJson(entry.getValue(), generator); + generator.writeEndObject(); + } + generator.writeEndArray(); + generator.writeObjectFieldStart(PROPERTIES); for (Map.Entry keyValue : metadata.properties().entrySet()) { generator.writeStringField(keyValue.getKey(), keyValue.getValue()); @@ -150,7 +168,35 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node String location = JsonUtil.getString(LOCATION, node); int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node); Schema schema = SchemaParser.fromJson(node.get(SCHEMA)); - PartitionSpec spec = PartitionSpecParser.fromJson(schema, node.get(PARTITION_SPEC)); + + JsonNode specArray = node.get(PARTITION_SPECS); + Map specs; + int defaultSpecId = 0; + if (specArray != null) { + Preconditions.checkArgument(specArray.isArray(), + "Cannot parse partition specs from non-array: %s", specArray); + // default spec ID is required when the spec array is present + defaultSpecId = JsonUtil.getInt(DEFAULT_SPEC_ID, node); + + // parse the spec array + ImmutableMap.Builder builder = ImmutableMap.builder(); + Iterator iterator = specArray.iterator(); + while (iterator.hasNext()) { + JsonNode specObject = iterator.next(); + int specId = JsonUtil.getInt(SPEC_ID, specObject); + PartitionSpec spec = PartitionSpecParser.fromJson(schema, specObject.get(PARTITION_SPEC)); + builder.put(specId, spec); + } + specs = builder.build(); + + } else { + // partition spec is required for older readers, but is always set to the default if the spec + // array is set. it is only used to default the spec map is missing, indicating that the + // table metadata was written by an older writer. + PartitionSpec spec = PartitionSpecParser.fromJson(schema, node.get(PARTITION_SPEC)); + specs = ImmutableMap.of(defaultSpecId, spec); + } + Map properties = JsonUtil.getStringMap(PROPERTIES, node); long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node); long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node); @@ -177,8 +223,8 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node } return new TableMetadata(ops, file, location, - lastUpdatedMillis, lastAssignedColumnId, schema, spec, properties, currentVersionId, - snapshots, ImmutableList.copyOf(entries.iterator())); + lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties, + currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator())); } } diff --git a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java index dd7a3cf23521..a1e28bc377aa 100644 --- a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java +++ b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java @@ -19,7 +19,6 @@ package com.netflix.iceberg; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.netflix.iceberg.ManifestEntry.Status; @@ -236,16 +235,14 @@ public void testChangedPartitionSpec() { 1, base.currentSnapshot().manifests().size()); String initialManifest = base.currentSnapshot().manifests().get(0); - PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA) + // build the new spec using the table's schema, which uses fresh IDs + PartitionSpec newSpec = PartitionSpec.builderFor(base.schema()) .bucket("data", 16) .bucket("id", 4) .build(); // commit the new partition spec to the table manually - TableMetadata updated = new TableMetadata(table.ops(), null, base.location(), - System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(), - base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of()); - table.ops().commit(base, updated); + table.ops().commit(base, base.updatePartitionSpec(newSpec)); DataFile newFileC = DataFiles.builder(newSpec) .copy(FILE_C) @@ -284,16 +281,14 @@ public void testChangedPartitionSpecMergeExisting() { 2, base.currentSnapshot().manifests().size()); String manifest = base.currentSnapshot().manifests().get(0); - PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA) + // build the new spec using the table's schema, which uses fresh IDs + PartitionSpec newSpec = PartitionSpec.builderFor(base.schema()) .bucket("data", 16) .bucket("id", 4) .build(); // commit the new partition spec to the table manually - TableMetadata updated = new TableMetadata(table.ops(), null, base.location(), - System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(), - base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of()); - table.ops().commit(base, updated); + table.ops().commit(base, base.updatePartitionSpec(newSpec)); DataFile newFileC = DataFiles.builder(newSpec) .copy(FILE_C) diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java index 53df5db614e3..a35805a7ce65 100644 --- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java +++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java @@ -19,19 +19,34 @@ package com.netflix.iceberg; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.netflix.iceberg.TableMetadata.SnapshotLogEntry; +import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.types.Types; import com.netflix.iceberg.util.JsonUtil; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.io.StringWriter; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Random; +import static com.netflix.iceberg.TableMetadataParser.CURRENT_SNAPSHOT_ID; +import static com.netflix.iceberg.TableMetadataParser.FORMAT_VERSION; +import static com.netflix.iceberg.TableMetadataParser.LAST_COLUMN_ID; +import static com.netflix.iceberg.TableMetadataParser.LAST_UPDATED_MILLIS; +import static com.netflix.iceberg.TableMetadataParser.LOCATION; +import static com.netflix.iceberg.TableMetadataParser.PARTITION_SPEC; +import static com.netflix.iceberg.TableMetadataParser.PROPERTIES; +import static com.netflix.iceberg.TableMetadataParser.SCHEMA; +import static com.netflix.iceberg.TableMetadataParser.SNAPSHOTS; + public class TestTableMetadataJson { @Test public void testJsonConversion() throws Exception { @@ -56,8 +71,9 @@ public void testJsonConversion() throws Exception { .build(); TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"), - currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog); + System.currentTimeMillis(), 3, schema, 5, ImmutableMap.of(5, spec), + ImmutableMap.of("property", "value"), currentSnapshotId, + Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog); String asJson = TableMetadataParser.toJson(expected); TableMetadata metadata = TableMetadataParser.fromJson(null, null, @@ -71,6 +87,10 @@ public void testJsonConversion() throws Exception { expected.schema().asStruct(), metadata.schema().asStruct()); Assert.assertEquals("Partition spec should match", expected.spec().toString(), metadata.spec().toString()); + Assert.assertEquals("Default spec ID should match", + expected.defaultSpecId(), metadata.defaultSpecId()); + Assert.assertEquals("PartitionSpec map should match", + expected.specs(), metadata.specs()); Assert.assertEquals("Properties should match", expected.properties(), metadata.properties()); Assert.assertEquals("Snapshot logs should match", @@ -108,8 +128,9 @@ public void testFromJsonSortsSnapshotLog() throws Exception { List reversedSnapshotLog = Lists.newArrayList(); TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"), - currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog); + System.currentTimeMillis(), 3, schema, 5, ImmutableMap.of(5, spec), + ImmutableMap.of("property", "value"), currentSnapshotId, + Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog); // add the entries after creating TableMetadata to avoid the sorted check reversedSnapshotLog.add( @@ -129,4 +150,104 @@ public void testFromJsonSortsSnapshotLog() throws Exception { Assert.assertEquals("Snapshot logs should match", expectedSnapshotLog, metadata.snapshotLog()); } + + @Test + public void testBackwardCompatMissingPartitionSpecList() throws Exception { + Schema schema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get()), + Types.NestedField.required(3, "z", Types.LongType.get()) + ); + + PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").build(); + + long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); + Snapshot previousSnapshot = new BaseSnapshot( + null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro")); + long currentSnapshotId = System.currentTimeMillis(); + Snapshot currentSnapshot = new BaseSnapshot( + null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro")); + + TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", + System.currentTimeMillis(), 3, schema, 5, ImmutableMap.of(5, spec), + ImmutableMap.of("property", "value"), currentSnapshotId, + Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of()); + + String asJson = toJsonWithoutSpecList(expected); + TableMetadata metadata = TableMetadataParser.fromJson(null, null, + JsonUtil.mapper().readValue(asJson, JsonNode.class)); + + Assert.assertEquals("Table location should match", + expected.location(), metadata.location()); + Assert.assertEquals("Last column ID should match", + expected.lastColumnId(), metadata.lastColumnId()); + Assert.assertEquals("Schema should match", + expected.schema().asStruct(), metadata.schema().asStruct()); + Assert.assertEquals("Partition spec should match", + expected.spec().toString(), metadata.spec().toString()); + Assert.assertEquals("Default spec ID should default to 0", + 0, metadata.defaultSpecId()); + Assert.assertEquals("PartitionSpec map should contain the spec as the default", + ImmutableMap.of(0, spec), metadata.specs()); + Assert.assertEquals("Properties should match", + expected.properties(), metadata.properties()); + Assert.assertEquals("Snapshot logs should match", + expected.snapshotLog(), metadata.snapshotLog()); + Assert.assertEquals("Current snapshot ID should match", + currentSnapshotId, metadata.currentSnapshot().snapshotId()); + Assert.assertEquals("Parent snapshot ID should match", + (Long) previousSnapshotId, metadata.currentSnapshot().parentId()); + Assert.assertEquals("Current snapshot files should match", + currentSnapshot.manifests(), metadata.currentSnapshot().manifests()); + Assert.assertEquals("Previous snapshot ID should match", + previousSnapshotId, metadata.snapshot(previousSnapshotId).snapshotId()); + Assert.assertEquals("Previous snapshot files should match", + previousSnapshot.manifests(), + metadata.snapshot(previousSnapshotId).manifests()); + } + + public static String toJsonWithoutSpecList(TableMetadata metadata) { + StringWriter writer = new StringWriter(); + try { + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + + generator.writeStartObject(); // start table metadata object + + generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION); + generator.writeStringField(LOCATION, metadata.location()); + generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis()); + generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId()); + + generator.writeFieldName(SCHEMA); + SchemaParser.toJson(metadata.schema(), generator); + + // mimic an old writer by writing only partition-spec and not the default ID or spec list + generator.writeFieldName(PARTITION_SPEC); + PartitionSpecParser.toJson(metadata.spec(), generator); + + generator.writeObjectFieldStart(PROPERTIES); + for (Map.Entry keyValue : metadata.properties().entrySet()) { + generator.writeStringField(keyValue.getKey(), keyValue.getValue()); + } + generator.writeEndObject(); + + generator.writeNumberField(CURRENT_SNAPSHOT_ID, + metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1); + + generator.writeArrayFieldStart(SNAPSHOTS); + for (Snapshot snapshot : metadata.snapshots()) { + SnapshotParser.toJson(snapshot, generator); + } + generator.writeEndArray(); + + // skip the snapshot log + + generator.writeEndObject(); // end table metadata object + + generator.flush(); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to write json for: %s", metadata); + } + return writer.toString(); + } } From cc501323d6368e37317532f08970d96643811bea Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 25 Nov 2018 16:44:35 -0800 Subject: [PATCH 2/2] Add ID to PartitionSpec. Spec ID should be part of PartitionSpec so that it doesn't need to be passed separately. All specs should have an ID or default to 0, the initial spec ID for all tables. --- .../com/netflix/iceberg/PartitionSpec.java | 24 +++- .../com/netflix/iceberg/ManifestReader.java | 7 +- .../com/netflix/iceberg/ManifestWriter.java | 3 +- .../netflix/iceberg/PartitionSpecParser.java | 94 ++++++++++---- .../com/netflix/iceberg/TableMetadata.java | 121 +++++++++++------- .../netflix/iceberg/TableMetadataParser.java | 32 ++--- .../iceberg/TestTableMetadataJson.java | 28 ++-- 7 files changed, 201 insertions(+), 108 deletions(-) diff --git a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java index 0896392f0de7..b17e25b6edc4 100644 --- a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java +++ b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java @@ -50,14 +50,16 @@ public class PartitionSpec implements Serializable { private final Schema schema; // this is ordered so that DataFile has a consistent schema + private final int specId; private final PartitionField[] fields; private transient Map fieldsBySourceId = null; private transient Map fieldsByName = null; private transient Class[] javaClasses = null; private transient List fieldList = null; - private PartitionSpec(Schema schema, List fields) { + private PartitionSpec(Schema schema, int specId, List fields) { this.schema = schema; + this.specId = specId; this.fields = new PartitionField[fields.size()]; for (int i = 0; i < this.fields.length; i += 1) { this.fields[i] = fields.get(i); @@ -71,6 +73,13 @@ public Schema schema() { return schema; } + /** + * @return the ID of this spec + */ + public int specId() { + return specId; + } + /** * @return the list of {@link PartitionField partition fields} for this spec. */ @@ -184,6 +193,9 @@ public boolean equals(Object other) { } PartitionSpec that = (PartitionSpec) other; + if (this.specId != that.specId) { + return false; + } return Arrays.equals(fields, that.fields); } @@ -257,7 +269,7 @@ public String toString() { } private static final PartitionSpec UNPARTITIONED_SPEC = - new PartitionSpec(new Schema(), ImmutableList.of()); + new PartitionSpec(new Schema(), 0, ImmutableList.of()); /** * Returns a spec for unpartitioned tables. @@ -287,6 +299,7 @@ public static class Builder { private final Schema schema; private final List fields = Lists.newArrayList(); private final Set partitionNames = Sets.newHashSet(); + private int specId = 0; private Builder(Schema schema) { this.schema = schema; @@ -300,6 +313,11 @@ private void checkAndAddPartitionName(String name) { partitionNames.add(name); } + public Builder withSpecId(int specId) { + this.specId = specId; + return this; + } + private Types.NestedField findSourceColumn(String sourceName) { Types.NestedField sourceColumn = schema.findField(sourceName); Preconditions.checkNotNull(sourceColumn, "Cannot find source column: %s", sourceName); @@ -378,7 +396,7 @@ public Builder add(int sourceId, String name, String transform) { } public PartitionSpec build() { - PartitionSpec spec = new PartitionSpec(schema, fields); + PartitionSpec spec = new PartitionSpec(schema, specId, fields); checkCompatibility(spec, schema); return spec; } diff --git a/core/src/main/java/com/netflix/iceberg/ManifestReader.java b/core/src/main/java/com/netflix/iceberg/ManifestReader.java index fa6ffbfac58e..065210eb7b13 100644 --- a/core/src/main/java/com/netflix/iceberg/ManifestReader.java +++ b/core/src/main/java/com/netflix/iceberg/ManifestReader.java @@ -99,7 +99,12 @@ private ManifestReader(InputFile file) { throw new RuntimeIOException(e); } this.schema = SchemaParser.fromJson(metadata.get("schema")); - this.spec = PartitionSpecParser.fromJson(schema, metadata.get("partition-spec")); + int specId = TableMetadata.INITIAL_SPEC_ID; + String specProperty = metadata.get("partition-spec-id"); + if (specProperty != null) { + specId = Integer.parseInt(specProperty); + } + this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec")); this.entries = null; } diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java index e787b68a1619..28ba83158af6 100644 --- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java +++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java @@ -108,7 +108,8 @@ private static FileAppender newAppender(FileFormat format, PartitionSpec .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJson(spec)) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) .build(); default: throw new IllegalArgumentException("Unsupported format: " + format); diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java index fe20f65bac46..4df7c5516a80 100644 --- a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java +++ b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java @@ -37,20 +37,18 @@ public class PartitionSpecParser { private PartitionSpecParser() { } + private static final String SPEC_ID = "spec-id"; + private static final String FIELDS = "fields"; private static final String SOURCE_ID = "source-id"; private static final String TRANSFORM = "transform"; private static final String NAME = "name"; public static void toJson(PartitionSpec spec, JsonGenerator generator) throws IOException { - generator.writeStartArray(); - for (PartitionField field : spec.fields()) { - generator.writeStartObject(); - generator.writeStringField(NAME, field.name()); - generator.writeStringField(TRANSFORM, field.transform().toString()); - generator.writeNumberField(SOURCE_ID, field.sourceId()); - generator.writeEndObject(); - } - generator.writeEndArray(); + generator.writeStartObject(); + generator.writeNumberField(SPEC_ID, spec.specId()); + generator.writeFieldName(FIELDS); + toJsonFields(spec, generator); + generator.writeEndObject(); } public static String toJson(PartitionSpec spec) { @@ -74,23 +72,10 @@ public static String toJson(PartitionSpec spec, boolean pretty) { } public static PartitionSpec fromJson(Schema schema, JsonNode json) { - Preconditions.checkArgument(json.isArray(), - "Cannot parse partition spec, not an array: %s", json); - - PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); - Iterator elements = json.elements(); - while (elements.hasNext()) { - JsonNode element = elements.next(); - Preconditions.checkArgument(element.isObject(), - "Cannot parse partition field, not an object: %s", element); - - String name = JsonUtil.getString(NAME, element); - String transform = JsonUtil.getString(TRANSFORM, element); - int sourceId = JsonUtil.getInt(SOURCE_ID, element); - - builder.add(sourceId, name, transform); - } - + Preconditions.checkArgument(json.isObject(), "Cannot parse spec from non-object: %s", json); + int specId = JsonUtil.getInt(SPEC_ID, json); + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId); + buildFromJsonFields(builder, json.get(FIELDS)); return builder.build(); } @@ -113,4 +98,61 @@ public static PartitionSpec fromJson(Schema schema, String json) { } } } + + static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOException { + generator.writeStartArray(); + for (PartitionField field : spec.fields()) { + generator.writeStartObject(); + generator.writeStringField(NAME, field.name()); + generator.writeStringField(TRANSFORM, field.transform().toString()); + generator.writeNumberField(SOURCE_ID, field.sourceId()); + generator.writeEndObject(); + } + generator.writeEndArray(); + } + + static String toJsonFields(PartitionSpec spec) { + try { + StringWriter writer = new StringWriter(); + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + toJsonFields(spec, generator); + generator.flush(); + return writer.toString(); + + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + static PartitionSpec fromJsonFields(Schema schema, int specId, JsonNode json) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId); + buildFromJsonFields(builder, json); + return builder.build(); + } + + static PartitionSpec fromJsonFields(Schema schema, int specId, String json) { + try { + return fromJsonFields(schema, specId, JsonUtil.mapper().readValue(json, JsonNode.class)); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to parse partition spec fields: " + json); + } + } + + private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode json) { + Preconditions.checkArgument(json.isArray(), + "Cannot parse partition spec fields, not an array: %s", json); + + Iterator elements = json.elements(); + while (elements.hasNext()) { + JsonNode element = elements.next(); + Preconditions.checkArgument(element.isObject(), + "Cannot parse partition field, not an object: %s", element); + + String name = JsonUtil.getString(NAME, element); + String transform = JsonUtil.getString(TRANSFORM, element); + int sourceId = JsonUtil.getInt(SOURCE_ID, element); + + builder.add(sourceId, name, transform); + } + } } diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java index 20f71e275893..05c3392c1f2c 100644 --- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java +++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java @@ -41,6 +41,7 @@ */ public class TableMetadata { static final int TABLE_FORMAT_VERSION = 1; + static final int INITIAL_SPEC_ID = 0; public static TableMetadata newTableMetadata(TableOperations ops, Schema schema, @@ -59,7 +60,8 @@ public static TableMetadata newTableMetadata(TableOperations ops, Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet); // rebuild the partition spec using the new column ids - PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema); + PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema) + .withSpecId(INITIAL_SPEC_ID); for (PartitionField field : spec.fields()) { // look up the name of the source field in the old schema to get the new schema's id String sourceName = schema.findColumnName(field.sourceId()); @@ -72,7 +74,7 @@ public static TableMetadata newTableMetadata(TableOperations ops, return new TableMetadata(ops, null, location, System.currentTimeMillis(), - lastColumnId.get(), freshSchema, 0, ImmutableMap.of(0, freshSpec), + lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec), ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of()); } @@ -128,11 +130,12 @@ public String toString() { private final int lastColumnId; private final Schema schema; private final int defaultSpecId; - private final Map specs; + private final List specs; private final Map properties; private final long currentSnapshotId; private final List snapshots; private final Map snapshotsById; + private final Map specsById; private final List snapshotLog; TableMetadata(TableOperations ops, @@ -142,7 +145,7 @@ public String toString() { int lastColumnId, Schema schema, int defaultSpecId, - Map specs, + List specs, Map properties, long currentSnapshotId, List snapshots, @@ -160,11 +163,8 @@ public String toString() { this.snapshots = snapshots; this.snapshotLog = snapshotLog; - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Snapshot version : snapshots) { - builder.put(version.snapshotId(), version); - } - this.snapshotsById = builder.build(); + this.snapshotsById = indexSnapshots(snapshots); + this.specsById = indexSpecs(specs); SnapshotLogEntry last = null; for (SnapshotLogEntry logEntry : snapshotLog) { @@ -198,7 +198,7 @@ public Schema schema() { } public PartitionSpec spec() { - return specs.get(defaultSpecId); + return specsById.get(defaultSpecId); } public int defaultSpecId() { @@ -206,10 +206,10 @@ public int defaultSpecId() { } public PartitionSpec spec(int id) { - return specs.get(id); + return specsById.get(id); } - public Map specs() { + public List specs() { return specs; } @@ -266,30 +266,33 @@ public TableMetadata updateSchema(Schema schema, int lastColumnId) { currentSnapshotId, snapshots, snapshotLog); } - public TableMetadata updatePartitionSpec(PartitionSpec spec) { - PartitionSpec.checkCompatibility(spec, schema); + public TableMetadata updatePartitionSpec(PartitionSpec partitionSpec) { + PartitionSpec.checkCompatibility(partitionSpec, schema); // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. - int newDefaultSpecId = 0; - for (Map.Entry entry : specs.entrySet()) { - if (spec.equals(entry.getValue())) { - newDefaultSpecId = entry.getKey(); + int newDefaultSpecId = INITIAL_SPEC_ID; + for (PartitionSpec spec : specs) { + if (partitionSpec.compatibleWith(spec)) { + newDefaultSpecId = spec.specId(); break; - } else if (newDefaultSpecId <= entry.getKey()) { - newDefaultSpecId = entry.getKey() + 1; + } else if (newDefaultSpecId <= spec.specId()) { + newDefaultSpecId = spec.specId() + 1; } } Preconditions.checkArgument(defaultSpecId != newDefaultSpecId, "Cannot set default partition spec to the current default"); - Map newSpecs = Maps.newHashMap(); - newSpecs.putAll(specs); - newSpecs.put(newDefaultSpecId, spec); + ImmutableList.Builder builder = ImmutableList.builder() + .addAll(specs); + if (!specsById.containsKey(newDefaultSpecId)) { + // get a fresh spec to ensure the spec ID is set to the new default + builder.add(freshSpec(newDefaultSpecId, schema, partitionSpec)); + } return new TableMetadata(ops, null, location, System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId, - ImmutableMap.copyOf(newSpecs), properties, + builder.build(), properties, currentSnapshotId, snapshots, snapshotLog); } @@ -382,32 +385,30 @@ public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec AtomicInteger lastColumnId = new AtomicInteger(0); Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet); - // rebuild the partition spec using the new column ids - PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema); - for (PartitionField field : partitionSpec.fields()) { - // look up the name of the source field in the old schema to get the new schema's id - String sourceName = schema.findColumnName(field.sourceId()); - specBuilder.add( - freshSchema.findField(sourceName).fieldId(), - field.name(), - field.transform().toString()); + int nextSpecId = TableMetadata.INITIAL_SPEC_ID; + for (Integer specId : specsById.keySet()) { + if (nextSpecId <= specId) { + nextSpecId = specId + 1; + } } - PartitionSpec freshSpec = specBuilder.build(); + + // rebuild the partition spec using the new column ids + PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, partitionSpec); // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. - int specId = 0; - for (Map.Entry entry : specs.entrySet()) { - if (freshSpec.equals(entry.getValue())) { - specId = entry.getKey(); + int specId = nextSpecId; + for (PartitionSpec spec : specs) { + if (freshSpec.compatibleWith(spec)) { + specId = spec.specId(); break; - } else if (specId <= entry.getKey()) { - specId = entry.getKey() + 1; } } - Map newSpecs = Maps.newHashMap(); - newSpecs.putAll(specs); - newSpecs.put(specId, freshSpec); + ImmutableList.Builder builder = ImmutableList.builder() + .addAll(specs); + if (!specsById.containsKey(specId)) { + builder.add(freshSpec); + } Map newProperties = Maps.newHashMap(); newProperties.putAll(this.properties); @@ -415,7 +416,39 @@ public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec return new TableMetadata(ops, null, location, System.currentTimeMillis(), lastColumnId.get(), freshSchema, - specId, ImmutableMap.copyOf(newSpecs), ImmutableMap.copyOf(newProperties), + specId, builder.build(), ImmutableMap.copyOf(newProperties), -1, snapshots, ImmutableList.of()); } + + private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) { + PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema) + .withSpecId(specId); + + for (PartitionField field : partitionSpec.fields()) { + // look up the name of the source field in the old schema to get the new schema's id + String sourceName = partitionSpec.schema().findColumnName(field.sourceId()); + specBuilder.add( + schema.findField(sourceName).fieldId(), + field.name(), + field.transform().toString()); + } + + return specBuilder.build(); + } + + private static Map indexSnapshots(List snapshots) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Snapshot version : snapshots) { + builder.put(version.snapshotId(), version); + } + return builder.build(); + } + + private static Map indexSpecs(List specs) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (PartitionSpec spec : specs) { + builder.put(spec.specId(), spec); + } + return builder.build(); + } } diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java index 9912317169de..0961d8c32456 100644 --- a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java +++ b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.netflix.iceberg.TableMetadata.SnapshotLogEntry; @@ -55,7 +54,6 @@ public class TableMetadataParser { static final String SCHEMA = "schema"; static final String PARTITION_SPEC = "partition-spec"; static final String PARTITION_SPECS = "partition-specs"; - static final String SPEC_ID = "spec-id"; static final String DEFAULT_SPEC_ID = "default-spec-id"; static final String PROPERTIES = "properties"; static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; @@ -107,17 +105,13 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro // for older readers, continue writing the default spec as "partition-spec" generator.writeFieldName(PARTITION_SPEC); - PartitionSpecParser.toJson(metadata.spec(), generator); + PartitionSpecParser.toJsonFields(metadata.spec(), generator); // write the default spec ID and spec list generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId()); generator.writeArrayFieldStart(PARTITION_SPECS); - for (Map.Entry entry : metadata.specs().entrySet()) { - generator.writeStartObject(); - generator.writeNumberField(SPEC_ID, entry.getKey()); - generator.writeFieldName(PARTITION_SPEC); - PartitionSpecParser.toJson(entry.getValue(), generator); - generator.writeEndObject(); + for (PartitionSpec spec : metadata.specs()) { + PartitionSpecParser.toJson(spec, generator); } generator.writeEndArray(); @@ -170,8 +164,8 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node Schema schema = SchemaParser.fromJson(node.get(SCHEMA)); JsonNode specArray = node.get(PARTITION_SPECS); - Map specs; - int defaultSpecId = 0; + List specs; + int defaultSpecId; if (specArray != null) { Preconditions.checkArgument(specArray.isArray(), "Cannot parse partition specs from non-array: %s", specArray); @@ -179,13 +173,9 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node defaultSpecId = JsonUtil.getInt(DEFAULT_SPEC_ID, node); // parse the spec array - ImmutableMap.Builder builder = ImmutableMap.builder(); - Iterator iterator = specArray.iterator(); - while (iterator.hasNext()) { - JsonNode specObject = iterator.next(); - int specId = JsonUtil.getInt(SPEC_ID, specObject); - PartitionSpec spec = PartitionSpecParser.fromJson(schema, specObject.get(PARTITION_SPEC)); - builder.put(specId, spec); + ImmutableList.Builder builder = ImmutableList.builder(); + for (JsonNode spec : specArray) { + builder.add(PartitionSpecParser.fromJson(schema, spec)); } specs = builder.build(); @@ -193,8 +183,9 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node // partition spec is required for older readers, but is always set to the default if the spec // array is set. it is only used to default the spec map is missing, indicating that the // table metadata was written by an older writer. - PartitionSpec spec = PartitionSpecParser.fromJson(schema, node.get(PARTITION_SPEC)); - specs = ImmutableMap.of(defaultSpecId, spec); + defaultSpecId = TableMetadata.INITIAL_SPEC_ID; + specs = ImmutableList.of(PartitionSpecParser.fromJsonFields( + schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC))); } Map properties = JsonUtil.getStringMap(PROPERTIES, node); @@ -226,5 +217,4 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties, currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator())); } - } diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java index a35805a7ce65..21acdbd07794 100644 --- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java +++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java @@ -56,7 +56,7 @@ public void testJsonConversion() throws Exception { Types.NestedField.required(3, "z", Types.LongType.get()) ); - PartitionSpec spec = PartitionSpec.builderFor(schema).build(); + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( @@ -71,7 +71,7 @@ public void testJsonConversion() throws Exception { .build(); TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, 5, ImmutableMap.of(5, spec), + System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog); @@ -116,7 +116,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception { Types.NestedField.required(3, "z", Types.LongType.get()) ); - PartitionSpec spec = PartitionSpec.builderFor(schema).build(); + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( @@ -128,7 +128,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception { List reversedSnapshotLog = Lists.newArrayList(); TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, 5, ImmutableMap.of(5, spec), + System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog); @@ -159,7 +159,7 @@ public void testBackwardCompatMissingPartitionSpecList() throws Exception { Types.NestedField.required(3, "z", Types.LongType.get()) ); - PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").build(); + PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").withSpecId(6).build(); long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( @@ -169,7 +169,7 @@ public void testBackwardCompatMissingPartitionSpecList() throws Exception { null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro")); TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, 5, ImmutableMap.of(5, spec), + System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of()); @@ -183,12 +183,16 @@ public void testBackwardCompatMissingPartitionSpecList() throws Exception { expected.lastColumnId(), metadata.lastColumnId()); Assert.assertEquals("Schema should match", expected.schema().asStruct(), metadata.schema().asStruct()); - Assert.assertEquals("Partition spec should match", + Assert.assertEquals("Partition spec should be the default", expected.spec().toString(), metadata.spec().toString()); - Assert.assertEquals("Default spec ID should default to 0", - 0, metadata.defaultSpecId()); - Assert.assertEquals("PartitionSpec map should contain the spec as the default", - ImmutableMap.of(0, spec), metadata.specs()); + Assert.assertEquals("Default spec ID should default to TableMetadata.INITIAL_SPEC_ID", + TableMetadata.INITIAL_SPEC_ID, metadata.defaultSpecId()); + Assert.assertEquals("PartitionSpec should contain the spec", + 1, metadata.specs().size()); + Assert.assertTrue("PartitionSpec should contain the spec", + metadata.specs().get(0).compatibleWith(spec)); + Assert.assertEquals("PartitionSpec should have ID TableMetadata.INITIAL_SPEC_ID", + TableMetadata.INITIAL_SPEC_ID, metadata.specs().get(0).specId()); Assert.assertEquals("Properties should match", expected.properties(), metadata.properties()); Assert.assertEquals("Snapshot logs should match", @@ -223,7 +227,7 @@ public static String toJsonWithoutSpecList(TableMetadata metadata) { // mimic an old writer by writing only partition-spec and not the default ID or spec list generator.writeFieldName(PARTITION_SPEC); - PartitionSpecParser.toJson(metadata.spec(), generator); + PartitionSpecParser.toJsonFields(metadata.spec(), generator); generator.writeObjectFieldStart(PROPERTIES); for (Map.Entry keyValue : metadata.properties().entrySet()) {