diff --git a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java index da13f8c06a2d..b17e25b6edc4 100644 --- a/api/src/main/java/com/netflix/iceberg/PartitionSpec.java +++ b/api/src/main/java/com/netflix/iceberg/PartitionSpec.java @@ -50,14 +50,16 @@ public class PartitionSpec implements Serializable { private final Schema schema; // this is ordered so that DataFile has a consistent schema + private final int specId; private final PartitionField[] fields; private transient Map fieldsBySourceId = null; private transient Map fieldsByName = null; private transient Class[] javaClasses = null; private transient List fieldList = null; - private PartitionSpec(Schema schema, List fields) { + private PartitionSpec(Schema schema, int specId, List fields) { this.schema = schema; + this.specId = specId; this.fields = new PartitionField[fields.size()]; for (int i = 0; i < this.fields.length; i += 1) { this.fields[i] = fields.get(i); @@ -71,6 +73,13 @@ public Schema schema() { return schema; } + /** + * @return the ID of this spec + */ + public int specId() { + return specId; + } + /** * @return the list of {@link PartitionField partition fields} for this spec. */ @@ -146,6 +155,13 @@ public String partitionToPath(StructLike data) { return sb.toString(); } + /** + * Returns true if this spec is equivalent to the other, with field names ignored. That is, if + * both specs have the same number of fields, field order, source columns, and transforms. + * + * @param other another PartitionSpec + * @return true if the specs have the same fields, source columns, and transforms. + */ public boolean compatibleWith(PartitionSpec other) { if (equals(other)) { return true; @@ -177,6 +193,9 @@ public boolean equals(Object other) { } PartitionSpec that = (PartitionSpec) other; + if (this.specId != that.specId) { + return false; + } return Arrays.equals(fields, that.fields); } @@ -250,7 +269,7 @@ public String toString() { } private static final PartitionSpec UNPARTITIONED_SPEC = - new PartitionSpec(new Schema(), ImmutableList.of()); + new PartitionSpec(new Schema(), 0, ImmutableList.of()); /** * Returns a spec for unpartitioned tables. @@ -280,6 +299,7 @@ public static class Builder { private final Schema schema; private final List fields = Lists.newArrayList(); private final Set partitionNames = Sets.newHashSet(); + private int specId = 0; private Builder(Schema schema) { this.schema = schema; @@ -293,6 +313,11 @@ private void checkAndAddPartitionName(String name) { partitionNames.add(name); } + public Builder withSpecId(int specId) { + this.specId = specId; + return this; + } + private Types.NestedField findSourceColumn(String sourceName) { Types.NestedField sourceColumn = schema.findField(sourceName); Preconditions.checkNotNull(sourceColumn, "Cannot find source column: %s", sourceName); @@ -371,7 +396,7 @@ public Builder add(int sourceId, String name, String transform) { } public PartitionSpec build() { - PartitionSpec spec = new PartitionSpec(schema, fields); + PartitionSpec spec = new PartitionSpec(schema, specId, fields); checkCompatibility(spec, schema); return spec; } diff --git a/core/src/main/java/com/netflix/iceberg/ManifestReader.java b/core/src/main/java/com/netflix/iceberg/ManifestReader.java index fa6ffbfac58e..065210eb7b13 100644 --- a/core/src/main/java/com/netflix/iceberg/ManifestReader.java +++ b/core/src/main/java/com/netflix/iceberg/ManifestReader.java @@ -99,7 +99,12 @@ private ManifestReader(InputFile file) { throw new RuntimeIOException(e); } this.schema = SchemaParser.fromJson(metadata.get("schema")); - this.spec = PartitionSpecParser.fromJson(schema, metadata.get("partition-spec")); + int specId = TableMetadata.INITIAL_SPEC_ID; + String specProperty = metadata.get("partition-spec-id"); + if (specProperty != null) { + specId = Integer.parseInt(specProperty); + } + this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec")); this.entries = null; } diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java index e787b68a1619..28ba83158af6 100644 --- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java +++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java @@ -108,7 +108,8 @@ private static FileAppender newAppender(FileFormat format, PartitionSpec .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJson(spec)) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) .build(); default: throw new IllegalArgumentException("Unsupported format: " + format); diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java index fe20f65bac46..4df7c5516a80 100644 --- a/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java +++ b/core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java @@ -37,20 +37,18 @@ public class PartitionSpecParser { private PartitionSpecParser() { } + private static final String SPEC_ID = "spec-id"; + private static final String FIELDS = "fields"; private static final String SOURCE_ID = "source-id"; private static final String TRANSFORM = "transform"; private static final String NAME = "name"; public static void toJson(PartitionSpec spec, JsonGenerator generator) throws IOException { - generator.writeStartArray(); - for (PartitionField field : spec.fields()) { - generator.writeStartObject(); - generator.writeStringField(NAME, field.name()); - generator.writeStringField(TRANSFORM, field.transform().toString()); - generator.writeNumberField(SOURCE_ID, field.sourceId()); - generator.writeEndObject(); - } - generator.writeEndArray(); + generator.writeStartObject(); + generator.writeNumberField(SPEC_ID, spec.specId()); + generator.writeFieldName(FIELDS); + toJsonFields(spec, generator); + generator.writeEndObject(); } public static String toJson(PartitionSpec spec) { @@ -74,23 +72,10 @@ public static String toJson(PartitionSpec spec, boolean pretty) { } public static PartitionSpec fromJson(Schema schema, JsonNode json) { - Preconditions.checkArgument(json.isArray(), - "Cannot parse partition spec, not an array: %s", json); - - PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); - Iterator elements = json.elements(); - while (elements.hasNext()) { - JsonNode element = elements.next(); - Preconditions.checkArgument(element.isObject(), - "Cannot parse partition field, not an object: %s", element); - - String name = JsonUtil.getString(NAME, element); - String transform = JsonUtil.getString(TRANSFORM, element); - int sourceId = JsonUtil.getInt(SOURCE_ID, element); - - builder.add(sourceId, name, transform); - } - + Preconditions.checkArgument(json.isObject(), "Cannot parse spec from non-object: %s", json); + int specId = JsonUtil.getInt(SPEC_ID, json); + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId); + buildFromJsonFields(builder, json.get(FIELDS)); return builder.build(); } @@ -113,4 +98,61 @@ public static PartitionSpec fromJson(Schema schema, String json) { } } } + + static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOException { + generator.writeStartArray(); + for (PartitionField field : spec.fields()) { + generator.writeStartObject(); + generator.writeStringField(NAME, field.name()); + generator.writeStringField(TRANSFORM, field.transform().toString()); + generator.writeNumberField(SOURCE_ID, field.sourceId()); + generator.writeEndObject(); + } + generator.writeEndArray(); + } + + static String toJsonFields(PartitionSpec spec) { + try { + StringWriter writer = new StringWriter(); + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + toJsonFields(spec, generator); + generator.flush(); + return writer.toString(); + + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + static PartitionSpec fromJsonFields(Schema schema, int specId, JsonNode json) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId); + buildFromJsonFields(builder, json); + return builder.build(); + } + + static PartitionSpec fromJsonFields(Schema schema, int specId, String json) { + try { + return fromJsonFields(schema, specId, JsonUtil.mapper().readValue(json, JsonNode.class)); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to parse partition spec fields: " + json); + } + } + + private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode json) { + Preconditions.checkArgument(json.isArray(), + "Cannot parse partition spec fields, not an array: %s", json); + + Iterator elements = json.elements(); + while (elements.hasNext()) { + JsonNode element = elements.next(); + Preconditions.checkArgument(element.isObject(), + "Cannot parse partition field, not an object: %s", element); + + String name = JsonUtil.getString(NAME, element); + String transform = JsonUtil.getString(TRANSFORM, element); + int sourceId = JsonUtil.getInt(SOURCE_ID, element); + + builder.add(sourceId, name, transform); + } + } } diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java index 38fda2e3bf8e..05c3392c1f2c 100644 --- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java +++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.netflix.iceberg.exceptions.ValidationException; import com.netflix.iceberg.io.InputFile; @@ -40,6 +41,7 @@ */ public class TableMetadata { static final int TABLE_FORMAT_VERSION = 1; + static final int INITIAL_SPEC_ID = 0; public static TableMetadata newTableMetadata(TableOperations ops, Schema schema, @@ -58,7 +60,8 @@ public static TableMetadata newTableMetadata(TableOperations ops, Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet); // rebuild the partition spec using the new column ids - PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema); + PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema) + .withSpecId(INITIAL_SPEC_ID); for (PartitionField field : spec.fields()) { // look up the name of the source field in the old schema to get the new schema's id String sourceName = schema.findColumnName(field.sourceId()); @@ -71,7 +74,7 @@ public static TableMetadata newTableMetadata(TableOperations ops, return new TableMetadata(ops, null, location, System.currentTimeMillis(), - lastColumnId.get(), freshSchema, freshSpec, + lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec), ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of()); } @@ -126,11 +129,13 @@ public String toString() { private final long lastUpdatedMillis; private final int lastColumnId; private final Schema schema; - private final PartitionSpec spec; + private final int defaultSpecId; + private final List specs; private final Map properties; private final long currentSnapshotId; private final List snapshots; private final Map snapshotsById; + private final Map specsById; private final List snapshotLog; TableMetadata(TableOperations ops, @@ -139,7 +144,8 @@ public String toString() { long lastUpdatedMillis, int lastColumnId, Schema schema, - PartitionSpec spec, + int defaultSpecId, + List specs, Map properties, long currentSnapshotId, List snapshots, @@ -150,17 +156,15 @@ public String toString() { this.lastUpdatedMillis = lastUpdatedMillis; this.lastColumnId = lastColumnId; this.schema = schema; - this.spec = spec; + this.specs = specs; + this.defaultSpecId = defaultSpecId; this.properties = properties; this.currentSnapshotId = currentSnapshotId; this.snapshots = snapshots; this.snapshotLog = snapshotLog; - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Snapshot version : snapshots) { - builder.put(version.snapshotId(), version); - } - this.snapshotsById = builder.build(); + this.snapshotsById = indexSnapshots(snapshots); + this.specsById = indexSpecs(specs); SnapshotLogEntry last = null; for (SnapshotLogEntry logEntry : snapshotLog) { @@ -194,7 +198,19 @@ public Schema schema() { } public PartitionSpec spec() { - return spec; + return specsById.get(defaultSpecId); + } + + public int defaultSpecId() { + return defaultSpecId; + } + + public PartitionSpec spec(int id) { + return specsById.get(id); + } + + public List specs() { + return specs; } public String location() { @@ -239,15 +255,45 @@ public List snapshotLog() { public TableMetadata updateTableLocation(String newLocation) { return new TableMetadata(ops, null, newLocation, - System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId, - snapshots, snapshotLog); + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, snapshots, snapshotLog); } public TableMetadata updateSchema(Schema schema, int lastColumnId) { - PartitionSpec.checkCompatibility(spec, schema); + PartitionSpec.checkCompatibility(spec(), schema); + return new TableMetadata(ops, null, location, + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, snapshots, snapshotLog); + } + + public TableMetadata updatePartitionSpec(PartitionSpec partitionSpec) { + PartitionSpec.checkCompatibility(partitionSpec, schema); + + // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. + int newDefaultSpecId = INITIAL_SPEC_ID; + for (PartitionSpec spec : specs) { + if (partitionSpec.compatibleWith(spec)) { + newDefaultSpecId = spec.specId(); + break; + } else if (newDefaultSpecId <= spec.specId()) { + newDefaultSpecId = spec.specId() + 1; + } + } + + Preconditions.checkArgument(defaultSpecId != newDefaultSpecId, + "Cannot set default partition spec to the current default"); + + ImmutableList.Builder builder = ImmutableList.builder() + .addAll(specs); + if (!specsById.containsKey(newDefaultSpecId)) { + // get a fresh spec to ensure the spec ID is set to the new default + builder.add(freshSpec(newDefaultSpecId, schema, partitionSpec)); + } + return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId, - snapshots,snapshotLog); + System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId, + builder.build(), properties, + currentSnapshotId, snapshots, snapshotLog); } public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { @@ -260,8 +306,8 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())) .build(); return new TableMetadata(ops, null, location, - snapshot.timestampMillis(), lastColumnId, schema, spec, properties, snapshot.snapshotId(), - newSnapshots, newSnapshotLog); + snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + snapshot.snapshotId(), newSnapshots, newSnapshotLog); } public TableMetadata removeSnapshotsIf(Predicate removeIf) { @@ -291,8 +337,8 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) { } return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId, - filtered, ImmutableList.copyOf(newSnapshotLog)); + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog)); } public TableMetadata rollbackTo(Snapshot snapshot) { @@ -306,15 +352,15 @@ public TableMetadata rollbackTo(Snapshot snapshot) { .build(); return new TableMetadata(ops, null, location, - nowMillis, lastColumnId, schema, spec, properties, snapshot.snapshotId(), snapshots, - newSnapshotLog); + nowMillis, lastColumnId, schema, defaultSpecId, specs, properties, + snapshot.snapshotId(), snapshots, newSnapshotLog); } public TableMetadata replaceProperties(Map newProperties) { ValidationException.check(newProperties != null, "Cannot set properties to null"); return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId, schema, spec, newProperties, currentSnapshotId, - snapshots, snapshotLog); + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties, + currentSnapshotId, snapshots, snapshotLog); } public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { @@ -330,8 +376,8 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId, "Cannot set invalid snapshot log: latest entry is not the current snapshot"); return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId, schema, spec, properties, currentSnapshotId, - snapshots, newSnapshotLog); + System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties, + currentSnapshotId, snapshots, newSnapshotLog); } public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec, @@ -339,24 +385,70 @@ public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec AtomicInteger lastColumnId = new AtomicInteger(0); Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet); + int nextSpecId = TableMetadata.INITIAL_SPEC_ID; + for (Integer specId : specsById.keySet()) { + if (nextSpecId <= specId) { + nextSpecId = specId + 1; + } + } + // rebuild the partition spec using the new column ids - PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(freshSchema); + PartitionSpec freshSpec = freshSpec(nextSpecId, freshSchema, partitionSpec); + + // if the spec already exists, use the same ID. otherwise, use 1 more than the highest ID. + int specId = nextSpecId; + for (PartitionSpec spec : specs) { + if (freshSpec.compatibleWith(spec)) { + specId = spec.specId(); + break; + } + } + + ImmutableList.Builder builder = ImmutableList.builder() + .addAll(specs); + if (!specsById.containsKey(specId)) { + builder.add(freshSpec); + } + + Map newProperties = Maps.newHashMap(); + newProperties.putAll(this.properties); + newProperties.putAll(properties); + + return new TableMetadata(ops, null, location, + System.currentTimeMillis(), lastColumnId.get(), freshSchema, + specId, builder.build(), ImmutableMap.copyOf(newProperties), + -1, snapshots, ImmutableList.of()); + } + + private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) { + PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema) + .withSpecId(specId); + for (PartitionField field : partitionSpec.fields()) { // look up the name of the source field in the old schema to get the new schema's id - String sourceName = schema.findColumnName(field.sourceId()); + String sourceName = partitionSpec.schema().findColumnName(field.sourceId()); specBuilder.add( - freshSchema.findField(sourceName).fieldId(), + schema.findField(sourceName).fieldId(), field.name(), field.transform().toString()); } - PartitionSpec freshSpec = specBuilder.build(); - ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.putAll(this.properties); - builder.putAll(properties); + return specBuilder.build(); + } - return new TableMetadata(ops, null, location, - System.currentTimeMillis(), lastColumnId.get(), freshSchema, freshSpec, properties, -1, - snapshots, ImmutableList.of()); + private static Map indexSnapshots(List snapshots) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Snapshot version : snapshots) { + builder.put(version.snapshotId(), version); + } + return builder.build(); + } + + private static Map indexSpecs(List specs) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (PartitionSpec spec : specs) { + builder.put(spec.specId(), spec); + } + return builder.build(); } } diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java index f0508b36e9a2..0961d8c32456 100644 --- a/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java +++ b/core/src/main/java/com/netflix/iceberg/TableMetadataParser.java @@ -46,18 +46,21 @@ public class TableMetadataParser { - private static final String FORMAT_VERSION = "format-version"; - private static final String LOCATION = "location"; - private static final String LAST_UPDATED_MILLIS = "last-updated-ms"; - private static final String LAST_COLUMN_ID = "last-column-id"; - private static final String SCHEMA = "schema"; - private static final String PARTITION_SPEC = "partition-spec"; - private static final String PROPERTIES = "properties"; - private static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; - private static final String SNAPSHOTS = "snapshots"; - private static final String SNAPSHOT_ID = "snapshot-id"; - private static final String TIMESTAMP_MS = "timestamp-ms"; - private static final String SNAPSHOT_LOG = "snapshot-log"; + // visible for testing + static final String FORMAT_VERSION = "format-version"; + static final String LOCATION = "location"; + static final String LAST_UPDATED_MILLIS = "last-updated-ms"; + static final String LAST_COLUMN_ID = "last-column-id"; + static final String SCHEMA = "schema"; + static final String PARTITION_SPEC = "partition-spec"; + static final String PARTITION_SPECS = "partition-specs"; + static final String DEFAULT_SPEC_ID = "default-spec-id"; + static final String PROPERTIES = "properties"; + static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; + static final String SNAPSHOTS = "snapshots"; + static final String SNAPSHOT_ID = "snapshot-id"; + static final String TIMESTAMP_MS = "timestamp-ms"; + static final String SNAPSHOT_LOG = "snapshot-log"; public static String toJson(TableMetadata metadata) { StringWriter writer = new StringWriter(); @@ -100,8 +103,17 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro generator.writeFieldName(SCHEMA); SchemaParser.toJson(metadata.schema(), generator); + // for older readers, continue writing the default spec as "partition-spec" generator.writeFieldName(PARTITION_SPEC); - PartitionSpecParser.toJson(metadata.spec(), generator); + PartitionSpecParser.toJsonFields(metadata.spec(), generator); + + // write the default spec ID and spec list + generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId()); + generator.writeArrayFieldStart(PARTITION_SPECS); + for (PartitionSpec spec : metadata.specs()) { + PartitionSpecParser.toJson(spec, generator); + } + generator.writeEndArray(); generator.writeObjectFieldStart(PROPERTIES); for (Map.Entry keyValue : metadata.properties().entrySet()) { @@ -150,7 +162,32 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node String location = JsonUtil.getString(LOCATION, node); int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node); Schema schema = SchemaParser.fromJson(node.get(SCHEMA)); - PartitionSpec spec = PartitionSpecParser.fromJson(schema, node.get(PARTITION_SPEC)); + + JsonNode specArray = node.get(PARTITION_SPECS); + List specs; + int defaultSpecId; + if (specArray != null) { + Preconditions.checkArgument(specArray.isArray(), + "Cannot parse partition specs from non-array: %s", specArray); + // default spec ID is required when the spec array is present + defaultSpecId = JsonUtil.getInt(DEFAULT_SPEC_ID, node); + + // parse the spec array + ImmutableList.Builder builder = ImmutableList.builder(); + for (JsonNode spec : specArray) { + builder.add(PartitionSpecParser.fromJson(schema, spec)); + } + specs = builder.build(); + + } else { + // partition spec is required for older readers, but is always set to the default if the spec + // array is set. it is only used to default the spec map is missing, indicating that the + // table metadata was written by an older writer. + defaultSpecId = TableMetadata.INITIAL_SPEC_ID; + specs = ImmutableList.of(PartitionSpecParser.fromJsonFields( + schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC))); + } + Map properties = JsonUtil.getStringMap(PROPERTIES, node); long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node); long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node); @@ -177,8 +214,7 @@ static TableMetadata fromJson(TableOperations ops, InputFile file, JsonNode node } return new TableMetadata(ops, file, location, - lastUpdatedMillis, lastAssignedColumnId, schema, spec, properties, currentVersionId, - snapshots, ImmutableList.copyOf(entries.iterator())); + lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties, + currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator())); } - } diff --git a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java index dd7a3cf23521..a1e28bc377aa 100644 --- a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java +++ b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java @@ -19,7 +19,6 @@ package com.netflix.iceberg; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.netflix.iceberg.ManifestEntry.Status; @@ -236,16 +235,14 @@ public void testChangedPartitionSpec() { 1, base.currentSnapshot().manifests().size()); String initialManifest = base.currentSnapshot().manifests().get(0); - PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA) + // build the new spec using the table's schema, which uses fresh IDs + PartitionSpec newSpec = PartitionSpec.builderFor(base.schema()) .bucket("data", 16) .bucket("id", 4) .build(); // commit the new partition spec to the table manually - TableMetadata updated = new TableMetadata(table.ops(), null, base.location(), - System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(), - base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of()); - table.ops().commit(base, updated); + table.ops().commit(base, base.updatePartitionSpec(newSpec)); DataFile newFileC = DataFiles.builder(newSpec) .copy(FILE_C) @@ -284,16 +281,14 @@ public void testChangedPartitionSpecMergeExisting() { 2, base.currentSnapshot().manifests().size()); String manifest = base.currentSnapshot().manifests().get(0); - PartitionSpec newSpec = PartitionSpec.builderFor(SCHEMA) + // build the new spec using the table's schema, which uses fresh IDs + PartitionSpec newSpec = PartitionSpec.builderFor(base.schema()) .bucket("data", 16) .bucket("id", 4) .build(); // commit the new partition spec to the table manually - TableMetadata updated = new TableMetadata(table.ops(), null, base.location(), - System.currentTimeMillis(), base.lastColumnId(), base.schema(), newSpec, base.properties(), - base.currentSnapshot().snapshotId(), base.snapshots(), ImmutableList.of()); - table.ops().commit(base, updated); + table.ops().commit(base, base.updatePartitionSpec(newSpec)); DataFile newFileC = DataFiles.builder(newSpec) .copy(FILE_C) diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java index 53df5db614e3..21acdbd07794 100644 --- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java +++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java @@ -19,19 +19,34 @@ package com.netflix.iceberg; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.netflix.iceberg.TableMetadata.SnapshotLogEntry; +import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.types.Types; import com.netflix.iceberg.util.JsonUtil; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.io.StringWriter; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Random; +import static com.netflix.iceberg.TableMetadataParser.CURRENT_SNAPSHOT_ID; +import static com.netflix.iceberg.TableMetadataParser.FORMAT_VERSION; +import static com.netflix.iceberg.TableMetadataParser.LAST_COLUMN_ID; +import static com.netflix.iceberg.TableMetadataParser.LAST_UPDATED_MILLIS; +import static com.netflix.iceberg.TableMetadataParser.LOCATION; +import static com.netflix.iceberg.TableMetadataParser.PARTITION_SPEC; +import static com.netflix.iceberg.TableMetadataParser.PROPERTIES; +import static com.netflix.iceberg.TableMetadataParser.SCHEMA; +import static com.netflix.iceberg.TableMetadataParser.SNAPSHOTS; + public class TestTableMetadataJson { @Test public void testJsonConversion() throws Exception { @@ -41,7 +56,7 @@ public void testJsonConversion() throws Exception { Types.NestedField.required(3, "z", Types.LongType.get()) ); - PartitionSpec spec = PartitionSpec.builderFor(schema).build(); + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( @@ -56,8 +71,9 @@ public void testJsonConversion() throws Exception { .build(); TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"), - currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog); + System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), + ImmutableMap.of("property", "value"), currentSnapshotId, + Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog); String asJson = TableMetadataParser.toJson(expected); TableMetadata metadata = TableMetadataParser.fromJson(null, null, @@ -71,6 +87,10 @@ public void testJsonConversion() throws Exception { expected.schema().asStruct(), metadata.schema().asStruct()); Assert.assertEquals("Partition spec should match", expected.spec().toString(), metadata.spec().toString()); + Assert.assertEquals("Default spec ID should match", + expected.defaultSpecId(), metadata.defaultSpecId()); + Assert.assertEquals("PartitionSpec map should match", + expected.specs(), metadata.specs()); Assert.assertEquals("Properties should match", expected.properties(), metadata.properties()); Assert.assertEquals("Snapshot logs should match", @@ -96,7 +116,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception { Types.NestedField.required(3, "z", Types.LongType.get()) ); - PartitionSpec spec = PartitionSpec.builderFor(schema).build(); + PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build(); long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( @@ -108,8 +128,9 @@ public void testFromJsonSortsSnapshotLog() throws Exception { List reversedSnapshotLog = Lists.newArrayList(); TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", - System.currentTimeMillis(), 3, schema, spec, ImmutableMap.of("property", "value"), - currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog); + System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec), + ImmutableMap.of("property", "value"), currentSnapshotId, + Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog); // add the entries after creating TableMetadata to avoid the sorted check reversedSnapshotLog.add( @@ -129,4 +150,108 @@ public void testFromJsonSortsSnapshotLog() throws Exception { Assert.assertEquals("Snapshot logs should match", expectedSnapshotLog, metadata.snapshotLog()); } + + @Test + public void testBackwardCompatMissingPartitionSpecList() throws Exception { + Schema schema = new Schema( + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get()), + Types.NestedField.required(3, "z", Types.LongType.get()) + ); + + PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").withSpecId(6).build(); + + long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); + Snapshot previousSnapshot = new BaseSnapshot( + null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro")); + long currentSnapshotId = System.currentTimeMillis(); + Snapshot currentSnapshot = new BaseSnapshot( + null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro")); + + TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location", + System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec), + ImmutableMap.of("property", "value"), currentSnapshotId, + Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of()); + + String asJson = toJsonWithoutSpecList(expected); + TableMetadata metadata = TableMetadataParser.fromJson(null, null, + JsonUtil.mapper().readValue(asJson, JsonNode.class)); + + Assert.assertEquals("Table location should match", + expected.location(), metadata.location()); + Assert.assertEquals("Last column ID should match", + expected.lastColumnId(), metadata.lastColumnId()); + Assert.assertEquals("Schema should match", + expected.schema().asStruct(), metadata.schema().asStruct()); + Assert.assertEquals("Partition spec should be the default", + expected.spec().toString(), metadata.spec().toString()); + Assert.assertEquals("Default spec ID should default to TableMetadata.INITIAL_SPEC_ID", + TableMetadata.INITIAL_SPEC_ID, metadata.defaultSpecId()); + Assert.assertEquals("PartitionSpec should contain the spec", + 1, metadata.specs().size()); + Assert.assertTrue("PartitionSpec should contain the spec", + metadata.specs().get(0).compatibleWith(spec)); + Assert.assertEquals("PartitionSpec should have ID TableMetadata.INITIAL_SPEC_ID", + TableMetadata.INITIAL_SPEC_ID, metadata.specs().get(0).specId()); + Assert.assertEquals("Properties should match", + expected.properties(), metadata.properties()); + Assert.assertEquals("Snapshot logs should match", + expected.snapshotLog(), metadata.snapshotLog()); + Assert.assertEquals("Current snapshot ID should match", + currentSnapshotId, metadata.currentSnapshot().snapshotId()); + Assert.assertEquals("Parent snapshot ID should match", + (Long) previousSnapshotId, metadata.currentSnapshot().parentId()); + Assert.assertEquals("Current snapshot files should match", + currentSnapshot.manifests(), metadata.currentSnapshot().manifests()); + Assert.assertEquals("Previous snapshot ID should match", + previousSnapshotId, metadata.snapshot(previousSnapshotId).snapshotId()); + Assert.assertEquals("Previous snapshot files should match", + previousSnapshot.manifests(), + metadata.snapshot(previousSnapshotId).manifests()); + } + + public static String toJsonWithoutSpecList(TableMetadata metadata) { + StringWriter writer = new StringWriter(); + try { + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + + generator.writeStartObject(); // start table metadata object + + generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION); + generator.writeStringField(LOCATION, metadata.location()); + generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis()); + generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId()); + + generator.writeFieldName(SCHEMA); + SchemaParser.toJson(metadata.schema(), generator); + + // mimic an old writer by writing only partition-spec and not the default ID or spec list + generator.writeFieldName(PARTITION_SPEC); + PartitionSpecParser.toJsonFields(metadata.spec(), generator); + + generator.writeObjectFieldStart(PROPERTIES); + for (Map.Entry keyValue : metadata.properties().entrySet()) { + generator.writeStringField(keyValue.getKey(), keyValue.getValue()); + } + generator.writeEndObject(); + + generator.writeNumberField(CURRENT_SNAPSHOT_ID, + metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1); + + generator.writeArrayFieldStart(SNAPSHOTS); + for (Snapshot snapshot : metadata.snapshots()) { + SnapshotParser.toJson(snapshot, generator); + } + generator.writeEndArray(); + + // skip the snapshot log + + generator.writeEndObject(); // end table metadata object + + generator.flush(); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to write json for: %s", metadata); + } + return writer.toString(); + } }