diff --git a/api/src/main/java/org/apache/iceberg/PrimaryKey.java b/api/src/main/java/org/apache/iceberg/PrimaryKey.java new file mode 100644 index 000000000000..1280e711e250 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/PrimaryKey.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * A primary key that defines which columns will be unique in this table. + */ +public class PrimaryKey implements Serializable { + + private static final PrimaryKey NON_PRIMARY_KEY = new PrimaryKey(null, 0, false, ImmutableList.of()); + + private final Schema schema; + private final int keyId; + private final boolean enforceUniqueness; + private final Integer[] sourceIds; + + private transient volatile List sourceIdList; + + private PrimaryKey(Schema schema, int keyId, boolean enforceUniqueness, List sourceIds) { + this.schema = schema; + this.keyId = keyId; + this.enforceUniqueness = enforceUniqueness; + this.sourceIds = sourceIds.toArray(new Integer[0]); + } + + /** + * Returns the {@link Schema} for this primary key. + */ + public Schema schema() { + return schema; + } + + /** + * Returns this ID of this primary key. + */ + public int keyId() { + return keyId; + } + + /** + * Returns true if the uniqueness should be guaranteed when writing iceberg table. + */ + public boolean enforceUniqueness() { + return enforceUniqueness; + } + + /** + * Returns the list of source field ids for this primary key. + */ + public List sourceIds() { + if (sourceIdList == null) { + synchronized (this) { + if (sourceIdList == null) { + this.sourceIdList = ImmutableList.copyOf(sourceIds); + } + } + } + return sourceIdList; + } + + /** + * Returns true if the primary key has no column. + */ + public boolean isNonPrimaryKey() { + return sourceIds.length == 0; + } + + /** + * Returns a dummy primary key that has no column. + */ + public static PrimaryKey nonPrimaryKey() { + return NON_PRIMARY_KEY; + } + + /** + * Checks whether this primary key is equivalent to another primary key while ignoring the primary key id. + * + * @param other a different primary key. + * @return true if this key is equivalent to the given key. + */ + public boolean samePrimaryKey(PrimaryKey other) { + return Arrays.equals(sourceIds, other.sourceIds) && enforceUniqueness == other.enforceUniqueness; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PrimaryKey)) { + return false; + } + + PrimaryKey that = (PrimaryKey) other; + if (this.keyId != that.keyId) { + return false; + } + + if (this.enforceUniqueness != that.enforceUniqueness) { + return false; + } + + return Arrays.equals(sourceIds, that.sourceIds); + } + + @Override + public int hashCode() { + return Objects.hashCode(keyId, enforceUniqueness ? 1 : 0, Arrays.hashCode(sourceIds)); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("keyId", keyId) + .add("enforceUniqueness", enforceUniqueness) + .add("sourceIds", sourceIds()) + .toString(); + } + + /** + * Creates a new {@link Builder primary key builder} for the given {@link Schema}. + * + * @param schema a schema + * @return a primary key builder for the given schema. + */ + public static Builder builderFor(Schema schema) { + return new Builder(schema); + } + + /** + * A builder to create valid {@link PrimaryKey primary keys}. Call {@link #builderFor(Schema)} to create a new + * builder. + */ + public static class Builder { + private final Schema schema; + private final List sourceIds = Lists.newArrayList(); + // Default ID to 1 as 0 is reserved for non primary key. + private int keyId = 1; + private boolean enforceUniqueness = false; + + private Builder(Schema schema) { + this.schema = schema; + } + + public Builder withKeyId(int newKeyId) { + this.keyId = newKeyId; + return this; + } + + public Builder withEnforceUniqueness(boolean enable) { + this.enforceUniqueness = enable; + return this; + } + + public Builder addField(String name) { + Types.NestedField column = schema.findField(name); + + Preconditions.checkNotNull(column, "Cannot find source column: %s", name); + Preconditions.checkArgument(column.isRequired(), "Cannot add optional source field to primary key: %s", name); + + Type sourceType = column.type(); + ValidationException.check(sourceType.isPrimitiveType(), "Cannot add non-primitive field: %s", sourceType); + + sourceIds.add(column.fieldId()); + return this; + } + + public Builder addField(int sourceId) { + Types.NestedField column = schema.findField(sourceId); + Preconditions.checkNotNull(column, "Cannot find source column: %s", sourceId); + Preconditions.checkArgument(column.isRequired(), "Cannot add optional source field to primary key: %s", sourceId); + + Type sourceType = column.type(); + ValidationException.check(sourceType.isPrimitiveType(), "Cannot add non-primitive field: %s", sourceType); + + sourceIds.add(sourceId); + return this; + } + + public PrimaryKey build() { + if (keyId == 0 && sourceIds.size() != 0) { + throw new IllegalArgumentException("Primary key ID 0 is reserved for non-primary key"); + } + if (sourceIds.size() == 0 && keyId != 0) { + throw new IllegalArgumentException("Non-primary key ID must be 0"); + } + + return new PrimaryKey(schema, keyId, enforceUniqueness, sourceIds); + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index a4557c8304de..7dae74b6342c 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -88,6 +88,20 @@ default String name() { */ Map sortOrders(); + /** + * Return the {@link PrimaryKey primary key} for this table. + * + * @return this table's primary key. + */ + PrimaryKey primaryKey(); + + /** + * Return a map of primary key IDs to {@link PrimaryKey primary keys} for this table. + * + * @return this table's primary keys map. + */ + Map primaryKeys(); + /** * Return a map of string properties for this table. * diff --git a/api/src/main/java/org/apache/iceberg/Tables.java b/api/src/main/java/org/apache/iceberg/Tables.java index e88ece0e9369..83ea76a2ccb3 100644 --- a/api/src/main/java/org/apache/iceberg/Tables.java +++ b/api/src/main/java/org/apache/iceberg/Tables.java @@ -38,12 +38,13 @@ default Table create(Schema schema, PartitionSpec spec, String tableIdentifier) } default Table create(Schema schema, PartitionSpec spec, Map properties, String tableIdentifier) { - return create(schema, spec, SortOrder.unsorted(), properties, tableIdentifier); + return create(schema, spec, SortOrder.unsorted(), PrimaryKey.nonPrimaryKey(), properties, tableIdentifier); } default Table create(Schema schema, PartitionSpec spec, SortOrder order, + PrimaryKey primaryKey, Map properties, String tableIdentifier) { throw new UnsupportedOperationException(this.getClass().getName() + " does not implement create with a sort order"); diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java index 670276282436..84a1aa9c50f5 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java +++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PrimaryKey; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; @@ -360,6 +361,14 @@ interface TableBuilder { */ TableBuilder withSortOrder(SortOrder sortOrder); + /** + * Sets a primary key for this table. + * + * @param primaryKey a primary key + * @return this for method chaining + */ + TableBuilder withPrimaryKey(PrimaryKey primaryKey); + /** * Sets a location for the table. * diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index ec0812050541..70a1d50a0654 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -29,6 +29,7 @@ abstract class BaseMetadataTable implements Table { private final PartitionSpec spec = PartitionSpec.unpartitioned(); private final SortOrder sortOrder = SortOrder.unsorted(); + private final PrimaryKey primaryKey = PrimaryKey.nonPrimaryKey(); abstract Table table(); @@ -77,6 +78,16 @@ public Map sortOrders() { return ImmutableMap.of(sortOrder.orderId(), sortOrder); } + @Override + public PrimaryKey primaryKey() { + return primaryKey; + } + + @Override + public Map primaryKeys() { + return ImmutableMap.of(primaryKey.keyId(), primaryKey); + } + @Override public Map properties() { return ImmutableMap.of(); diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 2de5c37b66c8..2a6668f30b05 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -159,6 +159,7 @@ protected class BaseMetastoreCatalogTableBuilder implements TableBuilder { private final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); private PartitionSpec spec = PartitionSpec.unpartitioned(); private SortOrder sortOrder = SortOrder.unsorted(); + private PrimaryKey primaryKey = PrimaryKey.nonPrimaryKey(); private String location = null; public BaseMetastoreCatalogTableBuilder(TableIdentifier identifier, Schema schema) { @@ -180,6 +181,12 @@ public TableBuilder withSortOrder(SortOrder newSortOrder) { return this; } + @Override + public TableBuilder withPrimaryKey(PrimaryKey newPrimaryKey) { + this.primaryKey = newPrimaryKey != null ? newPrimaryKey : PrimaryKey.nonPrimaryKey(); + return this; + } + @Override public TableBuilder withLocation(String newLocation) { this.location = newLocation; @@ -209,7 +216,8 @@ public Table create() { String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); Map properties = propertiesBuilder.build(); - TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties); + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, primaryKey, + baseLocation, properties); try { ops.commit(null, metadata); @@ -229,7 +237,8 @@ public Transaction createTransaction() { String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); Map properties = propertiesBuilder.build(); - TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties); + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, primaryKey, + baseLocation, properties); return Transactions.createTableTransaction(identifier.toString(), ops, metadata); } @@ -252,10 +261,12 @@ private Transaction newReplaceTableTransaction(boolean orCreate) { TableMetadata metadata; if (ops.current() != null) { String baseLocation = location != null ? location : ops.current().location(); - metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation, propertiesBuilder.build()); + metadata = ops.current().buildReplacement(schema, spec, sortOrder, primaryKey, + baseLocation, propertiesBuilder.build()); } else { String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); - metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, propertiesBuilder.build()); + metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, primaryKey, + baseLocation, propertiesBuilder.build()); } if (orCreate) { diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 005fba027eaf..aa8d844b7217 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -84,6 +84,16 @@ public Map sortOrders() { return ops.current().sortOrdersById(); } + @Override + public PrimaryKey primaryKey() { + return ops.current().primaryKey(); + } + + @Override + public Map primaryKeys() { + return ops.current().primaryKeysById(); + } + @Override public Map properties() { return ops.current().properties(); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index a00485421ee3..bab6bd37e936 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -540,6 +540,16 @@ public Map sortOrders() { return current.sortOrdersById(); } + @Override + public PrimaryKey primaryKey() { + return current.primaryKey(); + } + + @Override + public Map primaryKeys() { + return current.primaryKeysById(); + } + @Override public Map properties() { return current.properties(); diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java index 95c01a5397f1..0b9436250850 100644 --- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java +++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java @@ -186,6 +186,12 @@ public TableBuilder withSortOrder(SortOrder sortOrder) { return this; } + @Override + public TableBuilder withPrimaryKey(PrimaryKey primaryKey) { + innerBuilder.withPrimaryKey(primaryKey); + return this; + } + @Override public TableBuilder withLocation(String location) { innerBuilder.withLocation(location); diff --git a/core/src/main/java/org/apache/iceberg/PrimaryKeyParser.java b/core/src/main/java/org/apache/iceberg/PrimaryKeyParser.java new file mode 100644 index 000000000000..db95773d4a78 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PrimaryKeyParser.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.util.Iterator; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class PrimaryKeyParser { + private static final String PRIMARY_KEY_ID = "key-id"; + private static final String ENFORCE_UNIQUENESS = "enforce-uniqueness"; + private static final String FIELDS = "fields"; + private static final String SOURCE_ID = "source-id"; + + private PrimaryKeyParser() { + } + + public static void toJson(PrimaryKey primaryKey, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(PRIMARY_KEY_ID, primaryKey.keyId()); + generator.writeBooleanField(ENFORCE_UNIQUENESS, primaryKey.enforceUniqueness()); + generator.writeFieldName(FIELDS); + toJsonFields(primaryKey, generator); + generator.writeEndObject(); + } + + public static String toJson(PrimaryKey primaryKey) { + return toJson(primaryKey, false); + } + + public static String toJson(PrimaryKey primaryKey, boolean pretty) { + try { + StringWriter writer = new StringWriter(); + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + if (pretty) { + generator.useDefaultPrettyPrinter(); + } + toJson(primaryKey, generator); + generator.flush(); + return writer.toString(); + + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + private static void toJsonFields(PrimaryKey primaryKey, JsonGenerator generator) throws IOException { + generator.writeStartArray(); + for (Integer sourceId : primaryKey.sourceIds()) { + generator.writeStartObject(); + generator.writeNumberField(SOURCE_ID, sourceId); + generator.writeEndObject(); + } + generator.writeEndArray(); + } + + public static PrimaryKey fromJson(Schema schema, String json) { + try { + return fromJson(schema, JsonUtil.mapper().readValue(json, JsonNode.class)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static PrimaryKey fromJson(Schema schema, JsonNode json) { + Preconditions.checkArgument(json.isObject(), "Cannot parse primary key from non-object: %s", json); + int primaryKeyId = JsonUtil.getInt(PRIMARY_KEY_ID, json); + boolean enforceUniqueness = JsonUtil.getBool(ENFORCE_UNIQUENESS, json); + PrimaryKey.Builder builder = PrimaryKey.builderFor(schema) + .withKeyId(primaryKeyId) + .withEnforceUniqueness(enforceUniqueness); + buildFromJsonFields(builder, json.get(FIELDS)); + return builder.build(); + } + + private static void buildFromJsonFields(PrimaryKey.Builder builder, JsonNode json) { + Preconditions.checkArgument(json != null, "Cannot parse null primary key fields."); + Preconditions.checkArgument(json.isArray(), "Cannot parse primary key fields, not an array: %s", json); + + Iterator elements = json.elements(); + while (elements.hasNext()) { + JsonNode element = elements.next(); + Preconditions.checkArgument(element.isObject(), "Cannot parse primary key field, not an object: %s", element); + + builder.addField(JsonUtil.getInt(SOURCE_ID, element)); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 7aa23c87b48a..6d9bc85aea89 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -41,6 +41,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; /** @@ -52,6 +53,7 @@ public class TableMetadata implements Serializable { static final int SUPPORTED_TABLE_FORMAT_VERSION = 2; static final int INITIAL_SPEC_ID = 0; static final int INITIAL_SORT_ORDER_ID = 1; + static final int INITIAL_PRIMARY_KEY_ID = 1; private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1); @@ -64,27 +66,31 @@ public static TableMetadata newTableMetadata(TableOperations ops, PartitionSpec spec, String location, Map properties) { - return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION); + return newTableMetadata(schema, spec, SortOrder.unsorted(), PrimaryKey.nonPrimaryKey(), location, properties, + DEFAULT_TABLE_FORMAT_VERSION); } public static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, SortOrder sortOrder, + PrimaryKey primaryKey, String location, Map properties) { - return newTableMetadata(schema, spec, sortOrder, location, properties, DEFAULT_TABLE_FORMAT_VERSION); + return newTableMetadata(schema, spec, sortOrder, primaryKey, location, properties, DEFAULT_TABLE_FORMAT_VERSION); } public static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, String location, Map properties) { - return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION); + return newTableMetadata(schema, spec, SortOrder.unsorted(), PrimaryKey.nonPrimaryKey(), location, properties, + DEFAULT_TABLE_FORMAT_VERSION); } static TableMetadata newTableMetadata(Schema schema, PartitionSpec spec, SortOrder sortOrder, + PrimaryKey primaryKey, String location, Map properties, int formatVersion) { @@ -110,10 +116,15 @@ static TableMetadata newTableMetadata(Schema schema, int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID; SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder); + // rebuild the primary key using the new column ids + int freshPrimaryKeyId = primaryKey.isNonPrimaryKey() ? primaryKey.keyId() : INITIAL_PRIMARY_KEY_ID; + PrimaryKey freshPrimaryKey = freshPrimaryKey(freshPrimaryKeyId, freshSchema, primaryKey); + return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location, INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(), lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec), freshSortOrderId, ImmutableList.of(freshSortOrder), + freshPrimaryKeyId, ImmutableList.of(freshPrimaryKey), ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of(), ImmutableList.of()); } @@ -220,12 +231,15 @@ public String toString() { private final List specs; private final int defaultSortOrderId; private final List sortOrders; + private final int defaultPrimaryKeyId; + private final List primaryKeys; private final Map properties; private final long currentSnapshotId; private final List snapshots; private final Map snapshotsById; private final Map specsById; private final Map sortOrdersById; + private final Map primaryKeysById; private final List snapshotLog; private final List previousFiles; @@ -242,6 +256,8 @@ public String toString() { List specs, int defaultSortOrderId, List sortOrders, + int defaultPrimaryKeyId, + List primaryKeys, Map properties, long currentSnapshotId, List snapshots, @@ -249,6 +265,7 @@ public String toString() { List previousFiles) { Preconditions.checkArgument(specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); Preconditions.checkArgument(sortOrders != null && !sortOrders.isEmpty(), "Sort orders cannot be null or empty"); + Preconditions.checkArgument(primaryKeys != null && !primaryKeys.isEmpty(), "Primary keys cannot be null or empty"); Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION, "Unsupported format version: v%s", formatVersion); Preconditions.checkArgument(formatVersion == 1 || uuid != null, @@ -269,6 +286,8 @@ public String toString() { this.defaultSpecId = defaultSpecId; this.defaultSortOrderId = defaultSortOrderId; this.sortOrders = sortOrders; + this.defaultPrimaryKeyId = defaultPrimaryKeyId; + this.primaryKeys = primaryKeys; this.properties = properties; this.currentSnapshotId = currentSnapshotId; this.snapshots = snapshots; @@ -278,6 +297,7 @@ public String toString() { this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber); this.specsById = indexSpecs(specs); this.sortOrdersById = indexSortOrders(sortOrders); + this.primaryKeysById = indexPrimaryKeys(primaryKeys); HistoryEntry last = null; for (HistoryEntry logEntry : snapshotLog) { @@ -391,6 +411,22 @@ public Map sortOrdersById() { return sortOrdersById; } + public int defaultPrimaryKeyId() { + return defaultPrimaryKeyId; + } + + public PrimaryKey primaryKey() { + return primaryKeysById.get(defaultPrimaryKeyId); + } + + public List primaryKeys() { + return primaryKeys; + } + + public Map primaryKeysById() { + return primaryKeysById; + } + public String location() { return location; } @@ -441,8 +477,8 @@ public TableMetadata withUUID() { } else { return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location, lastSequenceNumber, lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, properties, currentSnapshotId, snapshots, snapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, + currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } } @@ -452,10 +488,12 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) { // rebuild all of the partition specs and sort orders for the new current schema List updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec)); List updatedSortOrders = Lists.transform(sortOrders, order -> updateSortOrderSchema(newSchema, order)); + List updatedPrimaryKeys = Lists.transform(primaryKeys, + primaryKey -> updatePrimaryKeySchema(newSchema, primaryKey)); return new TableMetadata(null, formatVersion, uuid, location, lastSequenceNumber, System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs, - defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId, snapshots, snapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + defaultSortOrderId, updatedSortOrders, defaultPrimaryKeyId, updatedPrimaryKeys, properties, currentSnapshotId, + snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } // The caller is responsible to pass a newPartitionSpec with correct partition field IDs @@ -489,7 +527,7 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) { return new TableMetadata(null, formatVersion, uuid, location, lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId, - builder.build(), defaultSortOrderId, sortOrders, properties, + builder.build(), defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } @@ -526,8 +564,43 @@ public TableMetadata replaceSortOrder(SortOrder newOrder) { return new TableMetadata(null, formatVersion, uuid, location, lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, - newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + newOrderId, builder.build(), defaultPrimaryKeyId, primaryKeys, properties, currentSnapshotId, + snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); + } + + public TableMetadata updatePrimaryKey(PrimaryKey newKey) { + // determine the next primary key id. + int newPrimaryKeyId = INITIAL_PRIMARY_KEY_ID; + for (PrimaryKey primaryKey : primaryKeys) { + if (primaryKey.samePrimaryKey(newKey)) { + newPrimaryKeyId = primaryKey.keyId(); + break; + } else if (newPrimaryKeyId <= primaryKey.keyId()) { + newPrimaryKeyId = primaryKey.keyId() + 1; + } + } + + if (newPrimaryKeyId == defaultPrimaryKeyId) { + return this; + } + + ImmutableList.Builder builder = ImmutableList.builder(); + builder.addAll(primaryKeys); + + if (!primaryKeysById.containsKey(newPrimaryKeyId)) { + if (newKey.isNonPrimaryKey()) { + newPrimaryKeyId = PrimaryKey.nonPrimaryKey().keyId(); + builder.add(PrimaryKey.nonPrimaryKey()); + } else { + // Rebuild the primary key using new column ids. + builder.add(freshPrimaryKey(newPrimaryKeyId, schema, newKey)); + } + } + + return new TableMetadata(null, formatVersion, uuid, location, + lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, + defaultSortOrderId, sortOrders, newPrimaryKeyId, builder.build(), properties, currentSnapshotId, + snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } public TableMetadata addStagedSnapshot(Snapshot snapshot) { @@ -542,8 +615,8 @@ public TableMetadata addStagedSnapshot(Snapshot snapshot) { return new TableMetadata(null, formatVersion, uuid, location, snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, properties, currentSnapshotId, newSnapshots, snapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, currentSnapshotId, + newSnapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { @@ -567,8 +640,8 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) { return new TableMetadata(null, formatVersion, uuid, location, snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, snapshot.snapshotId(), + newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } public TableMetadata removeSnapshotsIf(Predicate removeIf) { @@ -599,8 +672,8 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) { return new TableMetadata(null, formatVersion, uuid, location, lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered, - ImmutableList.copyOf(newSnapshotLog), addPreviousFile(file, lastUpdatedMillis)); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, currentSnapshotId, + filtered, ImmutableList.copyOf(newSnapshotLog), addPreviousFile(file, lastUpdatedMillis)); } private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) { @@ -623,16 +696,16 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) { return new TableMetadata(null, formatVersion, uuid, location, lastSequenceNumber, nowMillis, lastColumnId, schema, defaultSpecId, specs, defaultSortOrderId, - sortOrders, properties, snapshot.snapshotId(), snapshots, newSnapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + sortOrders, defaultPrimaryKeyId, primaryKeys, properties, snapshot.snapshotId(), snapshots, + newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } public TableMetadata replaceProperties(Map newProperties) { ValidationException.check(newProperties != null, "Cannot set properties to null"); return new TableMetadata(null, formatVersion, uuid, location, lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots, snapshotLog, - addPreviousFile(file, lastUpdatedMillis, newProperties)); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, newProperties, currentSnapshotId, + snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties)); } public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { @@ -650,14 +723,14 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { return new TableMetadata(null, formatVersion, uuid, location, lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, properties, currentSnapshotId, snapshots, newSnapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, currentSnapshotId, + snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); } // The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec, - SortOrder updatedSortOrder, String newLocation, - Map updatedProperties) { + SortOrder updatedSortOrder, PrimaryKey updatedPrimaryKey, + String newLocation, Map updatedProperties) { ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec), "Spec does not use sequential IDs that are required in v1: %s", updatedPartitionSpec); @@ -702,21 +775,42 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update sortOrdersBuilder.add(freshSortOrder); } + // determine the next primary key id + OptionalInt maxKeyId = primaryKeys.stream().mapToInt(PrimaryKey::keyId).max(); + int nextKeyId = maxKeyId.isPresent() ? maxKeyId.getAsInt() + 1 : INITIAL_PRIMARY_KEY_ID; + + // rebuild the primary key using new column ids + int freshKeyId = updatedPrimaryKey.isNonPrimaryKey() ? updatedPrimaryKey.keyId() : nextKeyId; + PrimaryKey freshPrimaryKey = freshPrimaryKey(freshKeyId, freshSchema, updatedPrimaryKey); + + // if the primary key already exists, use the same ID. otherwise, use the primary key ID. + int primaryKeyId = primaryKeys.stream() + .filter(primaryKey -> primaryKey.samePrimaryKey(freshPrimaryKey)) + .findFirst() + .map(PrimaryKey::keyId) + .orElse(freshKeyId); + + ImmutableList.Builder primaryKeysBuilder = ImmutableList.builder().addAll(primaryKeys); + if (!primaryKeysById.containsKey(primaryKeyId)) { + primaryKeysBuilder.add(freshPrimaryKey); + } + Map newProperties = Maps.newHashMap(); newProperties.putAll(this.properties); newProperties.putAll(updatedProperties); return new TableMetadata(null, formatVersion, uuid, newLocation, lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchema, - specId, specListBuilder.build(), orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties), - -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties)); + specId, specListBuilder.build(), orderId, sortOrdersBuilder.build(), primaryKeyId, primaryKeysBuilder.build(), + ImmutableMap.copyOf(newProperties), -1, snapshots, ImmutableList.of(), + addPreviousFile(file, lastUpdatedMillis, newProperties)); } public TableMetadata updateLocation(String newLocation) { return new TableMetadata(null, formatVersion, uuid, newLocation, lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, properties, currentSnapshotId, snapshots, snapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, currentSnapshotId, + snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } public TableMetadata upgradeToFormatVersion(int newFormatVersion) { @@ -732,8 +826,8 @@ public TableMetadata upgradeToFormatVersion(int newFormatVersion) { return new TableMetadata(null, newFormatVersion, uuid, location, lastSequenceNumber, System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, properties, currentSnapshotId, snapshots, snapshotLog, - addPreviousFile(file, lastUpdatedMillis)); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, currentSnapshotId, + snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis)); } private List addPreviousFile(InputFile previousFile, long timestampMillis) { @@ -784,6 +878,19 @@ private static SortOrder updateSortOrderSchema(Schema schema, SortOrder sortOrde return builder.build(); } + private static PrimaryKey updatePrimaryKeySchema(Schema schema, PrimaryKey primaryKey) { + PrimaryKey.Builder builder = PrimaryKey.builderFor(schema) + .withKeyId(primaryKey.keyId()) + .withEnforceUniqueness(primaryKey.enforceUniqueness()); + + // add all the fields to the builder, IDs should not change. + for (Integer fieldId : primaryKey.sourceIds()) { + builder.addField(fieldId); + } + + return builder.build(); + } + private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) { PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema) .withSpecId(specId); @@ -819,6 +926,28 @@ private static SortOrder freshSortOrder(int orderId, Schema schema, SortOrder so return builder.build(); } + private static PrimaryKey freshPrimaryKey(int keyId, Schema schema, PrimaryKey primaryKey) { + PrimaryKey.Builder builder = PrimaryKey + .builderFor(schema) + .withKeyId(keyId) + .withEnforceUniqueness(primaryKey.enforceUniqueness()); + + for (Integer fieldId : primaryKey.sourceIds()) { + // look up the name of the source field in the old schema to get the new schema's id + String sourceName = primaryKey.schema().findColumnName(fieldId); + Preconditions.checkNotNull(sourceName, + "Cannot find column in the primary key's schema. id: %s, schema: %s", fieldId, primaryKey.schema()); + + // reassign all primary keys with fresh primary field IDs. + Types.NestedField field = schema.findField(sourceName); + Preconditions.checkNotNull(field, + "Cannot find column in the fresh schema. name: %s, schema: %s", sourceName, schema); + builder.addField(field.fieldId()); + } + + return builder.build(); + } + private static Map indexAndValidateSnapshots(List snapshots, long lastSequenceNumber) { ImmutableMap.Builder builder = ImmutableMap.builder(); for (Snapshot snap : snapshots) { @@ -845,4 +974,12 @@ private static Map indexSortOrders(List sortOrder } return builder.build(); } + + private static Map indexPrimaryKeys(List primaryKeys) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (PrimaryKey primaryKey : primaryKeys) { + builder.put(primaryKey.keyId(), primaryKey); + } + return builder.build(); + } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index 58805a320415..d5b01e9376af 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -93,6 +93,8 @@ private TableMetadataParser() { static final String DEFAULT_SPEC_ID = "default-spec-id"; static final String DEFAULT_SORT_ORDER_ID = "default-sort-order-id"; static final String SORT_ORDERS = "sort-orders"; + static final String DEFAULT_PRIMARY_KEY_ID = "default-primary-key-id"; + static final String PRIMARY_KEYS = "primary-keys"; static final String PROPERTIES = "properties"; static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id"; static final String SNAPSHOTS = "snapshots"; @@ -178,6 +180,7 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro } generator.writeEndArray(); + // write the default order ID and sort order list generator.writeNumberField(DEFAULT_SORT_ORDER_ID, metadata.defaultSortOrderId()); generator.writeArrayFieldStart(SORT_ORDERS); for (SortOrder sortOrder : metadata.sortOrders()) { @@ -185,6 +188,14 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro } generator.writeEndArray(); + // write the default primary key ID and primary key list. + generator.writeNumberField(DEFAULT_PRIMARY_KEY_ID, metadata.defaultPrimaryKeyId()); + generator.writeArrayFieldStart(PRIMARY_KEYS); + for (PrimaryKey primaryKey : metadata.primaryKeys()) { + PrimaryKeyParser.toJson(primaryKey, generator); + } + generator.writeEndArray(); + generator.writeObjectFieldStart(PROPERTIES); for (Map.Entry keyValue : metadata.properties().entrySet()) { generator.writeStringField(keyValue.getKey(), keyValue.getValue()); @@ -242,6 +253,7 @@ public static TableMetadata read(FileIO io, InputFile file) { } } + @SuppressWarnings("checkstyle:CyclomaticComplexity") static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { Preconditions.checkArgument(node.isObject(), "Cannot parse metadata from a non-object: %s", node); @@ -261,6 +273,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node); Schema schema = SchemaParser.fromJson(node.get(SCHEMA)); + // Parse the partition specs. JsonNode specArray = node.get(PARTITION_SPECS); List specs; int defaultSpecId; @@ -288,6 +301,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC))); } + // Parse the sort orders. JsonNode sortOrderArray = node.get(SORT_ORDERS); List sortOrders; int defaultSortOrderId; @@ -306,6 +320,25 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { defaultSortOrderId = defaultSortOrder.orderId(); } + // Parse the primary keys. + JsonNode primaryKeyArray = node.get(PRIMARY_KEYS); + List primaryKeys; + int defaultPrimaryKeyId; + if (primaryKeyArray != null) { + defaultPrimaryKeyId = JsonUtil.getInt(DEFAULT_PRIMARY_KEY_ID, node); + ImmutableList.Builder builder = ImmutableList.builder(); + for (JsonNode primaryKey : primaryKeyArray) { + builder.add(PrimaryKeyParser.fromJson(schema, primaryKey)); + } + primaryKeys = builder.build(); + } else { + Preconditions.checkArgument(formatVersion == 1, + "%s must exist in format v%s", PRIMARY_KEYS, formatVersion); + PrimaryKey defaultPrimaryKey = PrimaryKey.nonPrimaryKey(); + primaryKeys = ImmutableList.of(defaultPrimaryKey); + defaultPrimaryKeyId = defaultPrimaryKey.keyId(); + } + Map properties = JsonUtil.getStringMap(PROPERTIES, node); long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node); long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node); @@ -342,7 +375,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { return new TableMetadata(file, formatVersion, uuid, location, lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, - defaultSortOrderId, sortOrders, properties, currentVersionId, snapshots, entries.build(), - metadataEntries.build()); + defaultSortOrderId, sortOrders, defaultPrimaryKeyId, primaryKeys, properties, currentVersionId, + snapshots, entries.build(), metadataEntries.build()); } } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java index 9a856b2e24d1..4b797b9c0f29 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java @@ -30,6 +30,7 @@ import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PrimaryKey; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StaticTableOperations; @@ -125,17 +126,21 @@ private Table loadMetadataTable(String location, String metadataTableName, Metad * Create a table using the FileSystem implementation resolve from * location. * - * @param schema iceberg schema used to create the table - * @param spec partitioning spec, if null the table will be unpartitioned + * @param schema iceberg schema used to create the table + * @param spec partitioning spec, if null the table will be unpartitioned + * @param order Sort order spec, if null the table will be unsorted. + * @param primaryKey Primary key spec, if null the table will have no primary key. * @param properties a string map of table properties, initialized to empty if null - * @param location a path URI (e.g. hdfs:///warehouse/my_table) + * @param location a path URI (e.g. hdfs:///warehouse/my_table) * @return newly created table implementation */ @Override public Table create(Schema schema, PartitionSpec spec, SortOrder order, - Map properties, String location) { + PrimaryKey primaryKey, Map properties, + String location) { return buildTable(location, schema).withPartitionSpec(spec) .withSortOrder(order) + .withPrimaryKey(primaryKey) .withProperties(properties) .create(); } @@ -194,13 +199,15 @@ TableOperations newTableOps(String location) { } private TableMetadata tableMetadata(Schema schema, PartitionSpec spec, SortOrder order, - Map properties, String location) { + PrimaryKey primaryKey, Map properties, + String location) { Preconditions.checkNotNull(schema, "A table schema is required"); Map tableProps = properties == null ? ImmutableMap.of() : properties; PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned() : spec; SortOrder sortOrder = order == null ? SortOrder.unsorted() : order; - return TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, location, tableProps); + PrimaryKey primaryKeySpec = primaryKey == null ? PrimaryKey.nonPrimaryKey() : primaryKey; + return TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, primaryKeySpec, location, tableProps); } /** @@ -254,7 +261,7 @@ private class HadoopTableBuilder implements Catalog.TableBuilder { private final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); private PartitionSpec spec = PartitionSpec.unpartitioned(); private SortOrder sortOrder = SortOrder.unsorted(); - + private PrimaryKey primaryKey = PrimaryKey.nonPrimaryKey(); HadoopTableBuilder(String location, Schema schema) { this.location = location; @@ -273,6 +280,12 @@ public Catalog.TableBuilder withSortOrder(SortOrder newSortOrder) { return this; } + @Override + public Catalog.TableBuilder withPrimaryKey(PrimaryKey newPrimaryKey) { + this.primaryKey = newPrimaryKey != null ? newPrimaryKey : PrimaryKey.nonPrimaryKey(); + return this; + } + @Override public Catalog.TableBuilder withLocation(String newLocation) { Preconditions.checkArgument(newLocation == null || location.equals(newLocation), @@ -303,7 +316,7 @@ public Table create() { } Map properties = propertiesBuilder.build(); - TableMetadata metadata = tableMetadata(schema, spec, sortOrder, properties, location); + TableMetadata metadata = tableMetadata(schema, spec, sortOrder, primaryKey, properties, location); ops.commit(null, metadata); return new BaseTable(ops, location); } @@ -316,7 +329,7 @@ public Transaction createTransaction() { } Map properties = propertiesBuilder.build(); - TableMetadata metadata = tableMetadata(schema, spec, null, properties, location); + TableMetadata metadata = tableMetadata(schema, spec, null, primaryKey, properties, location); return Transactions.createTableTransaction(location, ops, metadata); } @@ -339,9 +352,9 @@ private Transaction newReplaceTableTransaction(boolean orCreate) { Map properties = propertiesBuilder.build(); TableMetadata metadata; if (ops.current() != null) { - metadata = ops.current().buildReplacement(schema, spec, sortOrder, location, properties); + metadata = ops.current().buildReplacement(schema, spec, sortOrder, primaryKey, location, properties); } else { - metadata = tableMetadata(schema, spec, sortOrder, properties, location); + metadata = tableMetadata(schema, spec, null, primaryKey, properties, location); } if (orCreate) { diff --git a/core/src/test/java/org/apache/iceberg/TestPrimaryKey.java b/core/src/test/java/org/apache/iceberg/TestPrimaryKey.java new file mode 100644 index 000000000000..ce4a8eeda7ae --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestPrimaryKey.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.File; +import java.io.IOException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +@RunWith(Parameterized.class) +public class TestPrimaryKey { + + private final int formatVersion; + + private static final Schema SCHEMA = new Schema( + required(10, "id", Types.IntegerType.get()), + required(11, "data", Types.StringType.get()), + optional(12, "s", Types.StructType.of( + required(13, "id", Types.IntegerType.get()) + )), + optional(14, "map", Types.MapType.ofOptional( + 15, 16, Types.IntegerType.get(), Types.IntegerType.get() + )), + required(17, "required_list", Types.ListType.ofOptional(18, Types.StringType.get())) + ); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + private File tableDir = null; + + @Parameterized.Parameters + public static Iterable parameters() { + return Lists.newArrayList( + new Object[] {1}, + new Object[] {2} + ); + } + + public TestPrimaryKey(int formatVersion) { + this.formatVersion = formatVersion; + } + + @Before + public void setupTableDir() throws IOException { + this.tableDir = temp.newFolder(); + } + + @After + public void cleanupTables() { + TestTables.clearTables(); + } + + @Test + public void testReservedPrimaryKey() { + Assert.assertEquals("Should be able to build non-primary key", + PrimaryKey.nonPrimaryKey(), + PrimaryKey.builderFor(SCHEMA).withKeyId(0).build()); + + AssertHelpers.assertThrows("Should not allow primary key ID 0", + IllegalArgumentException.class, "Primary key ID 0 is reserved for non-primary key", + () -> PrimaryKey.builderFor(SCHEMA).addField("id").withKeyId(0).build()); + + AssertHelpers.assertThrows("Should not allow non-primary key with arbitrary IDs", + IllegalArgumentException.class, "Non-primary key ID must be 0", + () -> PrimaryKey.builderFor(SCHEMA).withKeyId(1).build()); + } + + @Test + public void testDefaultPrimaryKey() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + SortOrder order = SortOrder.unsorted(); + PrimaryKey key = PrimaryKey.nonPrimaryKey(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, key, formatVersion); + Assert.assertEquals("Expected 1 partition spec", 1, table.specs().size()); + Assert.assertEquals("Expected 1 sort order", 1, table.sortOrders().size()); + Assert.assertEquals("Expected 1 primary key", 1, table.primaryKeys().size()); + + PrimaryKey actualKey = table.primaryKey(); + Assert.assertEquals("Primary key ID must match", 0, actualKey.keyId()); + Assert.assertTrue("Primary key is non-primary key", actualKey.isNonPrimaryKey()); + } + + @Test + public void testFreshIds() { + PrimaryKey key = PrimaryKey.builderFor(SCHEMA) + .withKeyId(1) + .addField("id") + .addField("data") + .build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, + PartitionSpec.unpartitioned(), SortOrder.unsorted(), key, formatVersion); + + Assert.assertEquals("Expected 1 primary key", 1, table.primaryKeys().size()); + Assert.assertTrue("Primary key ID must be fresh", table.primaryKeys().containsKey(1)); + + PrimaryKey actualKey = table.primaryKey(); + Assert.assertEquals("Primary key must be fresh", TableMetadata.INITIAL_PRIMARY_KEY_ID, actualKey.keyId()); + Assert.assertEquals("Primary key must have 2 fields", 2, actualKey.sourceIds().size()); + Assert.assertEquals("Field id must be fresh", Integer.valueOf(1), actualKey.sourceIds().get(0)); + Assert.assertEquals("Field id must be fresh", Integer.valueOf(2), actualKey.sourceIds().get(1)); + } + + @Test + public void testAddField() { + AssertHelpers.assertThrows("Should not allow to add no-existing field", + NullPointerException.class, "Cannot find source column: data0", + () -> PrimaryKey.builderFor(SCHEMA).addField("data0").build()); + + AssertHelpers.assertThrows("Should not allow to add no-existing field", + NullPointerException.class, "Cannot find source column: 2147483647", + () -> PrimaryKey.builderFor(SCHEMA).addField(2147483647).build()); + + AssertHelpers.assertThrows("Should not allow to add optional field", + IllegalArgumentException.class, "Cannot add optional source field to primary key: map", + () -> PrimaryKey.builderFor(SCHEMA).addField("map").build()); + + AssertHelpers.assertThrows("Should not allow to add optional field", + IllegalArgumentException.class, "Cannot add optional source field to primary key: 14", + () -> PrimaryKey.builderFor(SCHEMA).addField(14).build()); + + AssertHelpers.assertThrows("Should not allow to add non-primitive field", + ValidationException.class, "Cannot add non-primitive field: list", + () -> PrimaryKey.builderFor(SCHEMA).addField("required_list").build()); + + AssertHelpers.assertThrows("Should not allow to add non-primitive field", + ValidationException.class, "Cannot add non-primitive field: list", + () -> PrimaryKey.builderFor(SCHEMA).addField(17).build()); + } + + @Test + public void testSamePrimaryKey() { + PrimaryKey pk1 = PrimaryKey.builderFor(SCHEMA) + .withKeyId(1) + .addField("id") + .addField("data") + .withEnforceUniqueness(false) + .build(); + PrimaryKey pk2 = PrimaryKey.builderFor(SCHEMA) + .withKeyId(2) + .addField("id") + .addField("data") + .build(); + PrimaryKey pk3 = PrimaryKey.builderFor(SCHEMA) + .withKeyId(3) + .addField("data") + .addField("id") + .build(); + PrimaryKey pk4 = PrimaryKey.builderFor(SCHEMA) + .withKeyId(1) + .addField("id") + .addField("data") + .withEnforceUniqueness(true) + .build(); + PrimaryKey pk5 = PrimaryKey.builderFor(SCHEMA) + .withKeyId(1) + .addField("id") + .addField("data") + .withEnforceUniqueness(false) + .build(); + + Assert.assertNotEquals("Primary key must not be equal.", pk1, pk2); + Assert.assertTrue("Primary key must be equivalent", pk1.samePrimaryKey(pk2)); + Assert.assertTrue("Primary key must be equivalent", pk2.samePrimaryKey(pk1)); + + Assert.assertNotEquals("Primary key must not be equal", pk1, pk3); + Assert.assertFalse("Primary key must not be equivalent", pk1.samePrimaryKey(pk3)); + Assert.assertFalse("Primary key must not be equivalent", pk3.samePrimaryKey(pk1)); + + Assert.assertNotEquals("Primary key must not be equal", pk1, pk4); + Assert.assertFalse("Primary key must not be equivalent", pk1.samePrimaryKey(pk4)); + Assert.assertFalse("Primary key must not be equivalent", pk4.samePrimaryKey(pk1)); + + Assert.assertEquals("Primary key must be equal", pk1, pk5); + Assert.assertTrue("Primary key must be equivalent", pk1.samePrimaryKey(pk5)); + Assert.assertTrue("Primary key must be equivalent", pk5.samePrimaryKey(pk1)); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestPrimaryKeyParser.java b/core/src/test/java/org/apache/iceberg/TestPrimaryKeyParser.java new file mode 100644 index 000000000000..c0a0910c6820 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestPrimaryKeyParser.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestPrimaryKeyParser extends TableTestBase { + + @Parameterized.Parameters(name = "format = {0}") + public static Object[][] parameters() { + return new Object[][] { + {1}, + {2} + }; + } + + public TestPrimaryKeyParser(int formatVersion) { + super(formatVersion); + } + + @Test + public void testToJson() { + String expected = "{\"key-id\":0,\"enforce-uniqueness\":false,\"fields\":[]}"; + Assert.assertEquals(expected, PrimaryKeyParser.toJson(table.primaryKey(), false)); + + PrimaryKey key = PrimaryKey.builderFor(table.schema()) + .addField("id") + .addField("data") + .withEnforceUniqueness(true) + .build(); + + table.ops().commit(table.ops().current(), table.ops().current().updatePrimaryKey(key)); + + expected = "{\n" + + " \"key-id\" : 1,\n" + + " \"enforce-uniqueness\" : true,\n" + + " \"fields\" : [ {\n" + + " \"source-id\" : 1\n" + + " }, {\n" + + " \"source-id\" : 2\n" + + " } ]\n" + + "}"; + Assert.assertEquals(expected, PrimaryKeyParser.toJson(key, true)); + } + + @Test + public void testFromJson() { + String keyString = "{\n" + + " \"key-id\" : 1,\n" + + " \"enforce-uniqueness\" : true,\n" + + " \"fields\" : [ {\n" + + " \"source-id\" : 1\n" + + " } ]\n" + + "}"; + + PrimaryKey expectedKey = PrimaryKey.builderFor(table.schema()) + .addField("id") + .withEnforceUniqueness(true) + .build(); + + PrimaryKey key = PrimaryKeyParser.fromJson(table.schema(), keyString); + Assert.assertEquals(expectedKey, key); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java index b822716fa949..4ebf3c3e304b 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.transforms.Transform; @@ -70,7 +71,8 @@ public void testReplaceTransactionWithCustomSortOrder() { .build(); Map props = Maps.newHashMap(); - Transaction replace = TestTables.beginReplace(tableDir, "test", schema, unpartitioned(), newSortOrder, props); + Transaction replace = TestTables.beginReplace(tableDir, "test", schema, unpartitioned(), newSortOrder, + PrimaryKey.nonPrimaryKey(), props); replace.commitTransaction(); table.refresh(); @@ -91,6 +93,52 @@ public void testReplaceTransactionWithCustomSortOrder() { Assert.assertEquals("Transform must match", transform, sortOrder.fields().get(0).transform()); } + @Test + public void testReplaceTransactionWithNewPrimaryKey() { + Snapshot start = table.currentSnapshot(); + Schema schema = table.schema(); + + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertEquals("Version should be 1", 1L, (long) version()); + + validateSnapshot(start, table.currentSnapshot(), FILE_A); + + PrimaryKey newPrimaryKey = PrimaryKey.builderFor(schema) + .withKeyId(1) + .withEnforceUniqueness(true) + .addField("id") + .addField("data") + .build(); + + Map props = Maps.newHashMap(); + Transaction replace = TestTables.beginReplace(tableDir, "test", schema, unpartitioned(), + SortOrder.unsorted(), newPrimaryKey, props); + replace.commitTransaction(); + + table.refresh(); + + Assert.assertEquals("Version should be 2", 2L, (long) version()); + Assert.assertNull("Table should not have a current snapshot", table.currentSnapshot()); + Assert.assertEquals("Schema should match previous schema", + schema.asStruct(), table.schema().asStruct()); + Assert.assertEquals("Partition spec should have no fields", + 0, table.spec().fields().size()); + Assert.assertEquals("Table should have 1 orders", 1, table.sortOrders().size()); + Assert.assertEquals("Sort order should have no fields", 0, table.sortOrder().fields().size()); + + Assert.assertEquals("Table should have 2 primary keys", 2, table.primaryKeys().size()); + PrimaryKey primaryKey = table.primaryKey(); + Assert.assertEquals("Primary key ID must match", 1, primaryKey.keyId()); + Assert.assertEquals("Primary key must have 2 fields", 2, primaryKey.sourceIds().size()); + Assert.assertTrue("Primary key must be enforced.", primaryKey.enforceUniqueness()); + Assert.assertEquals("Field ID must match", + ImmutableList.of(table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId()), + primaryKey.sourceIds()); + } + @Test public void testReplaceTransaction() { Schema newSchema = new Schema( diff --git a/core/src/test/java/org/apache/iceberg/TestSortOrder.java b/core/src/test/java/org/apache/iceberg/TestSortOrder.java index d27cddfa7f8d..807f85a1cffe 100644 --- a/core/src/test/java/org/apache/iceberg/TestSortOrder.java +++ b/core/src/test/java/org/apache/iceberg/TestSortOrder.java @@ -118,7 +118,8 @@ public void testFreshIds() { .asc("s.id", NULLS_LAST) .desc(truncate("data", 10), NULLS_FIRST) .build(); - TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, formatVersion); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, + PrimaryKey.nonPrimaryKey(), formatVersion); Assert.assertEquals("Expected 1 sort order", 1, table.sortOrders().size()); Assert.assertTrue("Order ID must be fresh", table.sortOrders().containsKey(TableMetadata.INITIAL_SORT_ORDER_ID)); @@ -282,7 +283,8 @@ public void testSchemaEvolutionWithSortOrder() { .asc("s.id") .desc(truncate("data", 10)) .build(); - TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, formatVersion); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, + PrimaryKey.nonPrimaryKey(), formatVersion); table.updateSchema() .renameColumn("s.id", "s.id2") @@ -303,7 +305,8 @@ public void testIncompatibleSchemaEvolutionWithSortOrder() { .asc("s.id") .desc(truncate("data", 10)) .build(); - TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, formatVersion); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, + PrimaryKey.nonPrimaryKey(), formatVersion); AssertHelpers.assertThrows("Should reject deletion of sort columns", ValidationException.class, "Cannot find source column", diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 7509ac4d667a..1191a32ae83f 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -78,6 +78,12 @@ public class TestTableMetadata { .asc("y", NullOrder.NULLS_FIRST) .desc(Expressions.bucket("z", 4), NullOrder.NULLS_LAST) .build(); + private static final PrimaryKey KEY_4 = PrimaryKey.builderFor(TEST_SCHEMA) + .withKeyId(4) + .withEnforceUniqueness(true) + .addField(3) + .addField(1) + .build(); @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -102,7 +108,8 @@ public void testJsonConversion() throws Exception { TableMetadata expected = new TableMetadata(null, 2, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), - 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId, + 3, ImmutableList.of(SORT_ORDER_3), 4, ImmutableList.of(KEY_4), + ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of()); String asJson = TableMetadataParser.toJson(expected); @@ -127,6 +134,18 @@ public void testJsonConversion() throws Exception { expected.defaultSpecId(), metadata.defaultSpecId()); Assert.assertEquals("PartitionSpec map should match", expected.specs(), metadata.specs()); + Assert.assertEquals("Default sort ID should match", + expected.defaultSortOrderId(), metadata.defaultSortOrderId()); + Assert.assertEquals("Sort order should match", + expected.sortOrder(), metadata.sortOrder()); + Assert.assertEquals("Sort order map should match", + expected.sortOrders(), metadata.sortOrders()); + Assert.assertEquals("Default primary key ID should match", + expected.defaultPrimaryKeyId(), metadata.defaultPrimaryKeyId()); + Assert.assertEquals("Primary key should match", + expected.primaryKey(), metadata.primaryKey()); + Assert.assertEquals("Primary key map should match", + expected.primaryKeys(), metadata.primaryKeys()); Assert.assertEquals("Properties should match", expected.properties(), metadata.properties()); Assert.assertEquals("Snapshot logs should match", @@ -160,8 +179,9 @@ public void testBackwardCompat() throws Exception { TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION, 0, System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec), - TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(sortOrder), ImmutableMap.of("property", "value"), - currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of()); + TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(sortOrder), 4, ImmutableList.of(KEY_4), + ImmutableMap.of("property", "value"), currentSnapshotId, + Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of()); String asJson = toJsonWithoutSpecList(expected); TableMetadata metadata = TableMetadataParser @@ -270,7 +290,8 @@ public void testJsonWithPreviousMetadataLog() throws Exception { TableMetadata base = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION, 0, System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), - 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId, + 3, ImmutableList.of(SORT_ORDER_3), 4, ImmutableList.of(KEY_4), + ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog)); @@ -305,7 +326,8 @@ public void testAddPreviousMetadataRemoveNone() { TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(), TEST_LOCATION, 0, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5), - 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of("property", "value"), currentSnapshotId, + 3, ImmutableList.of(SORT_ORDER_3), 4, ImmutableList.of(KEY_4), + ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog)); @@ -350,7 +372,7 @@ public void testAddPreviousMetadataRemoveOne() { TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(), TEST_LOCATION, 0, currentTimestamp - 50, 3, TEST_SCHEMA, 5, - ImmutableList.of(SPEC_5), 3, ImmutableList.of(SORT_ORDER_3), + ImmutableList.of(SPEC_5), 3, ImmutableList.of(SORT_ORDER_3), 4, ImmutableList.of(KEY_4), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog)); @@ -402,7 +424,7 @@ public void testAddPreviousMetadataRemoveMultiple() { TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(), TEST_LOCATION, 0, currentTimestamp - 50, 3, TEST_SCHEMA, 2, ImmutableList.of(SPEC_5), TableMetadata.INITIAL_SORT_ORDER_ID, ImmutableList.of(SortOrder.unsorted()), - ImmutableMap.of("property", "value"), currentSnapshotId, + 4, ImmutableList.of(KEY_4), ImmutableMap.of("property", "value"), currentSnapshotId, Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog)); @@ -428,8 +450,8 @@ public void testV2UUIDValidation() { IllegalArgumentException.class, "UUID is required in format v2", () -> new TableMetadata(null, 2, null, TEST_LOCATION, SEQ_NO, System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), - 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L, - ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) + 3, ImmutableList.of(SORT_ORDER_3), 4, ImmutableList.of(KEY_4), + ImmutableMap.of(), -1L, ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) ); } @@ -440,8 +462,8 @@ public void testVersionValidation() { IllegalArgumentException.class, "Unsupported format version: v" + unsupportedVersion, () -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, SEQ_NO, System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), - 3, ImmutableList.of(SORT_ORDER_3), ImmutableMap.of(), -1L, - ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) + 3, ImmutableList.of(SORT_ORDER_3), 4, ImmutableList.of(KEY_4), + ImmutableMap.of(), -1L, ImmutableList.of(), ImmutableList.of(), ImmutableList.of()) ); } @@ -553,7 +575,7 @@ public void testUpdateSortOrder() { SortOrder order = SortOrder.builderFor(schema).asc("x").build(); TableMetadata sortedByX = TableMetadata.newTableMetadata( - schema, PartitionSpec.unpartitioned(), order, null, ImmutableMap.of()); + schema, PartitionSpec.unpartitioned(), order, PrimaryKey.nonPrimaryKey(), null, ImmutableMap.of()); Assert.assertEquals("Should have 1 sort order", 1, sortedByX.sortOrders().size()); Assert.assertEquals("Should use orderId 1", 1, sortedByX.sortOrder().orderId()); Assert.assertEquals("Should be sorted by one field", 1, sortedByX.sortOrder().fields().size()); @@ -584,4 +606,9 @@ public void testUpdateSortOrder() { Assert.assertEquals("Should be nulls first", NullOrder.NULLS_FIRST, sortedByX.sortOrder().fields().get(0).nullOrder()); } + + @Test + public void testPrimaryKey() { + + } } diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index 2e0ecd60359d..6302f4d5efee 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -48,52 +48,55 @@ private static TestTable upgrade(File temp, String name, int newFormatVersion) { } public static TestTable create(File temp, String name, Schema schema, PartitionSpec spec, int formatVersion) { - return create(temp, name, schema, spec, SortOrder.unsorted(), formatVersion); + return create(temp, name, schema, spec, SortOrder.unsorted(), PrimaryKey.nonPrimaryKey(), formatVersion); } public static TestTable create(File temp, String name, Schema schema, PartitionSpec spec, - SortOrder sortOrder, int formatVersion) { + SortOrder sortOrder, PrimaryKey primaryKey, int formatVersion) { TestTableOperations ops = new TestTableOperations(name, temp); if (ops.current() != null) { throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } - ops.commit(null, newTableMetadata(schema, spec, sortOrder, temp.toString(), ImmutableMap.of(), formatVersion)); + ops.commit(null, newTableMetadata(schema, spec, sortOrder, primaryKey, + temp.toString(), ImmutableMap.of(), formatVersion)); return new TestTable(ops, name); } public static Transaction beginCreate(File temp, String name, Schema schema, PartitionSpec spec) { - return beginCreate(temp, name, schema, spec, SortOrder.unsorted()); + return beginCreate(temp, name, schema, spec, SortOrder.unsorted(), PrimaryKey.nonPrimaryKey()); } public static Transaction beginCreate(File temp, String name, Schema schema, - PartitionSpec spec, SortOrder sortOrder) { + PartitionSpec spec, SortOrder sortOrder, + PrimaryKey primaryKey) { TableOperations ops = new TestTableOperations(name, temp); if (ops.current() != null) { throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } - TableMetadata metadata = newTableMetadata(schema, spec, sortOrder, temp.toString(), ImmutableMap.of(), 1); + TableMetadata metadata = newTableMetadata(schema, spec, sortOrder, primaryKey, temp.toString(), + ImmutableMap.of(), 1); return Transactions.createTableTransaction(name, ops, metadata); } public static Transaction beginReplace(File temp, String name, Schema schema, PartitionSpec spec) { - return beginReplace(temp, name, schema, spec, SortOrder.unsorted(), ImmutableMap.of()); + return beginReplace(temp, name, schema, spec, SortOrder.unsorted(), PrimaryKey.nonPrimaryKey(), ImmutableMap.of()); } public static Transaction beginReplace(File temp, String name, Schema schema, PartitionSpec spec, - SortOrder sortOrder, Map properties) { + SortOrder sortOrder, PrimaryKey primaryKey, Map properties) { TestTableOperations ops = new TestTableOperations(name, temp); TableMetadata current = ops.current(); TableMetadata metadata; if (current != null) { - metadata = current.buildReplacement(schema, spec, sortOrder, current.location(), properties); + metadata = current.buildReplacement(schema, spec, sortOrder, primaryKey, current.location(), properties); return Transactions.replaceTableTransaction(name, ops, metadata); } else { - metadata = newTableMetadata(schema, spec, sortOrder, temp.toString(), properties); + metadata = newTableMetadata(schema, spec, sortOrder, primaryKey, temp.toString(), properties); return Transactions.createTableTransaction(name, ops, metadata); } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java index 64ae92e77caa..6209b9b11ce7 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java @@ -30,6 +30,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PrimaryKey; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -179,6 +180,45 @@ public void testCreateTableCustomSortOrder() throws Exception { Assert.assertEquals("Transform must match", transform, sortOrder.fields().get(0).transform()); } + @Test + public void testCreateTableDefaultPrimaryKey() throws Exception { + Configuration conf = new Configuration(); + String warehousePath = temp.newFolder().getAbsolutePath(); + HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath); + TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl"); + Table table = catalog.createTable(tableIdent, SCHEMA, SPEC); + + PrimaryKey key = table.primaryKey(); + Assert.assertEquals("Primary key ID must match", 0, key.keyId()); + Assert.assertTrue("Primary key must non-primary key", key.isNonPrimaryKey()); + } + + @Test + public void testCreateTableCustomPrimaryKey() throws Exception { + Configuration conf = new Configuration(); + String warehousePath = temp.newFolder().getAbsolutePath(); + HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath); + TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl"); + + PrimaryKey key = PrimaryKey.builderFor(SCHEMA) + .addField("id") + .addField("data") + .withEnforceUniqueness(true) + .build(); + Table table = catalog.buildTable(tableIdent, SCHEMA) + .withPartitionSpec(SPEC) + .withPrimaryKey(key) + .create(); + + PrimaryKey actualKey = table.primaryKey(); + Assert.assertEquals("Primary key ID must match", 1, actualKey.keyId()); + Assert.assertEquals("Primary key must have 2 field", 2, actualKey.sourceIds().size()); + Assert.assertEquals("Primary key must have the expected field", Integer.valueOf(1), actualKey.sourceIds().get(0)); + Assert.assertEquals("Primary key must have the expected field", Integer.valueOf(2), actualKey.sourceIds().get(1)); + Assert.assertTrue("Primary key must have enabled enforced", actualKey.enforceUniqueness()); + Assert.assertEquals("Primary key must have expected schema", table.schema(), actualKey.schema()); + } + @Test public void testBasicCatalog() throws Exception { Configuration conf = new Configuration(); diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTables.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTables.java index c3d0918a3e4a..a808b765baa4 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTables.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTables.java @@ -30,6 +30,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PrimaryKey; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; @@ -128,7 +129,8 @@ public void testCustomSortOrder() { SortOrder order = SortOrder.builderFor(SCHEMA) .asc("id", NULLS_FIRST) .build(); - Table table = TABLES.create(SCHEMA, spec, order, Maps.newHashMap(), tableDir.toURI().toString()); + Table table = TABLES.create(SCHEMA, spec, order, PrimaryKey.nonPrimaryKey(), + Maps.newHashMap(), tableDir.toURI().toString()); SortOrder sortOrder = table.sortOrder(); Assert.assertEquals("Order ID must match", 1, sortOrder.orderId()); diff --git a/core/src/test/resources/TableMetadataV2Valid.json b/core/src/test/resources/TableMetadataV2Valid.json index 3754354ede7b..cf6431194f06 100644 --- a/core/src/test/resources/TableMetadataV2Valid.json +++ b/core/src/test/resources/TableMetadataV2Valid.json @@ -43,6 +43,21 @@ ] } ], + "default-primary-key-id": 3, + "primary-keys": [ + { + "key-id": 3, + "enforce-uniqueness": true, + "fields": [ + { + "source-id": 1 + }, + { + "source-id": 3 + } + ] + } + ], "default-sort-order-id": 3, "sort-orders": [ { diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index efd523c96aa5..d3fa4d9ce1e0 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -26,6 +26,7 @@ import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PrimaryKey; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; @@ -235,6 +236,61 @@ public void testCreateTableCustomSortOrder() { } } + @Test + public void testCreateTableDefaultPrimaryKey() { + Schema schema = new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema) + .bucket("data", 16) + .build(); + TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl"); + + try { + Table table = catalog.createTable(tableIdent, schema, spec); + Assert.assertEquals("Primary key ID must match", 0, table.primaryKey().keyId()); + Assert.assertTrue("Primary key must be non-primary key", table.primaryKey().isNonPrimaryKey()); + } finally { + catalog.dropTable(tableIdent); + } + } + + @Test + public void testCreateTableCustomPrimaryKey() { + Schema schema = new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema) + .bucket("data", 16) + .build(); + PrimaryKey primaryKey = PrimaryKey.builderFor(schema) + .withKeyId(1) + .addField("id") + .addField("data") + .withEnforceUniqueness(true) + .build(); + TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl"); + + try { + Table table = catalog.buildTable(tableIdent, schema) + .withPartitionSpec(spec) + .withPrimaryKey(primaryKey) + .create(); + + PrimaryKey actualKey = table.primaryKey(); + Assert.assertEquals("Primary key ID must match", 1, actualKey.keyId()); + Assert.assertEquals("Primary key must have 2 field", 2, actualKey.sourceIds().size()); + Assert.assertEquals("Primary key must have the expected field", Integer.valueOf(1), actualKey.sourceIds().get(0)); + Assert.assertEquals("Primary key must have the expected field", Integer.valueOf(2), actualKey.sourceIds().get(1)); + Assert.assertTrue("Primary key must have enabled enforced", actualKey.enforceUniqueness()); + Assert.assertEquals("Primary key must have expected schema", table.schema(), actualKey.schema()); + } finally { + catalog.dropTable(tableIdent); + } + } + @Test public void testCreateNamespace() throws TException { Namespace namespace1 = Namespace.of("noLocation"); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java index 94efe42fb702..dfafbb45112a 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PrimaryKey; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; @@ -208,12 +209,13 @@ private CatalogToTables(Catalog catalog) { } @Override - public Table create(Schema schema, PartitionSpec spec, SortOrder sortOrder, + public Table create(Schema schema, PartitionSpec spec, SortOrder sortOrder, PrimaryKey primaryKey, Map properties, String tableIdentifier) { TableIdentifier tableIdent = TableIdentifier.parse(tableIdentifier); return catalog.buildTable(tableIdent, schema) .withPartitionSpec(spec) .withSortOrder(sortOrder) + .withPrimaryKey(primaryKey) .withProperties(properties) .create(); }