diff --git a/api/src/main/java/org/apache/iceberg/RowKey.java b/api/src/main/java/org/apache/iceberg/RowKey.java
new file mode 100644
index 000000000000..1b6c8237ea90
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/RowKey.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Row key of a table.
+ *
+ * Row key is a definition of table row uniqueness,
+ * similar to the concept of primary key in a relational database system.
+ * A row should be unique in a table based on the values of an unordered set of {@link RowKeyIdentifierField}.
+ * Iceberg itself does not enforce row uniqueness based on this key.
+ * It is leveraged by operations such as streaming upsert.
+ */
+public class RowKey implements Serializable {
+
+ private static final RowKey NOT_IDENTIFIED = new RowKey(new Schema(), Sets.newHashSet());
+
+ private final Schema schema;
+ private final RowKeyIdentifierField[] identifierFields;
+
+ private transient volatile Set identifierFieldSet;
+
+ private RowKey(Schema schema, Set identifierFields) {
+ this.schema = schema;
+ this.identifierFields = identifierFields.toArray(new RowKeyIdentifierField[0]);
+ }
+
+ /**
+ * Returns the {@link Schema} referenced by the row key
+ */
+ public Schema schema() {
+ return schema;
+ }
+
+ /**
+ * Return the set of {@link RowKeyIdentifierField} in the row key
+ *
+ * @return the set of fields in the row key
+ */
+ public Set identifierFields() {
+ return lazyIdentifierFieldSet();
+ }
+
+ private Set lazyIdentifierFieldSet() {
+ if (identifierFieldSet == null) {
+ synchronized (this) {
+ if (identifierFieldSet == null) {
+ identifierFieldSet = ImmutableSet.copyOf(identifierFields);
+ }
+ }
+ }
+
+ return identifierFieldSet;
+ }
+
+ /**
+ * Returns the default row key that has no field
+ */
+ public static RowKey notIdentified() {
+ return NOT_IDENTIFIED;
+ }
+
+ /**
+ * Returns true if the row key is the default one with no field
+ */
+ public boolean isNotIdentified() {
+ return identifierFields.length < 1;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ RowKey that = (RowKey) other;
+ return identifierFields().equals(that.identifierFields());
+ }
+
+ @Override
+ public int hashCode() {
+ return identifierFields().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (RowKeyIdentifierField field : identifierFields) {
+ sb.append("\n");
+ sb.append(" ").append(field);
+ }
+ if (identifierFields.length > 0) {
+ sb.append("\n");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ /**
+ * Creates a new {@link Builder row key builder} for the given {@link Schema}.
+ *
+ * @param schema a schema
+ * @return a row key builder for the given schema.
+ */
+ public static Builder builderFor(Schema schema) {
+ return new Builder(schema);
+ }
+
+ /**
+ * A builder to create valid {@link RowKey row key}.
+ *
+ * Call {@link #builderFor(Schema)} to create a new builder.
+ */
+ public static class Builder {
+ private final Schema schema;
+ private final Set fields = Sets.newHashSet();
+
+ private Builder(Schema schema) {
+ this.schema = schema;
+ }
+
+ public Builder addField(String name) {
+ Types.NestedField column = schema.findField(name);
+ ValidationException.check(column != null, "Cannot find column with name %s in schema %s", name, schema);
+ return addField(column);
+ }
+
+ public Builder addField(int id) {
+ Types.NestedField column = schema.findField(id);
+ ValidationException.check(column != null, "Cannot find column with ID %s in schema %s", id, schema);
+ return addField(column);
+ }
+
+ private Builder addField(Types.NestedField column) {
+ ValidationException.check(column.isRequired(),
+ "Cannot add column %s to row key because it is not a required column in schema %s", column, schema);
+ ValidationException.check(column.type().isPrimitiveType(),
+ "Cannot add column %s to row key because it is not a primitive data type in schema %s", column, schema);
+ fields.add(new RowKeyIdentifierField(column.fieldId()));
+ return this;
+ }
+
+ public RowKey build() {
+ if (fields.size() == 0) {
+ return NOT_IDENTIFIED;
+ }
+
+ return new RowKey(schema, fields);
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/RowKeyIdentifierField.java b/api/src/main/java/org/apache/iceberg/RowKeyIdentifierField.java
new file mode 100644
index 000000000000..477b2dab8ed7
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/RowKeyIdentifierField.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * An identifier field in {@link RowKey}
+ *
+ * The field must be:
+ * 1. a required column in the table schema
+ * 2. a primitive type column
+ */
+public class RowKeyIdentifierField implements Serializable {
+
+ private final int sourceId;
+
+ RowKeyIdentifierField(int sourceId) {
+ this.sourceId = sourceId;
+ }
+
+ public int sourceId() {
+ return sourceId;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + sourceId + ")";
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ RowKeyIdentifierField that = (RowKeyIdentifierField) other;
+ return sourceId == that.sourceId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sourceId);
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index a4557c8304de..704b52b94a83 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -88,6 +88,13 @@ default String name() {
*/
Map sortOrders();
+ /**
+ * Return the {@link RowKey row key} for this table.
+ *
+ * @return this table's row key.
+ */
+ RowKey rowKey();
+
/**
* Return a map of string properties for this table.
*
diff --git a/api/src/main/java/org/apache/iceberg/Tables.java b/api/src/main/java/org/apache/iceberg/Tables.java
index 1c1daafed506..7b77f2be7607 100644
--- a/api/src/main/java/org/apache/iceberg/Tables.java
+++ b/api/src/main/java/org/apache/iceberg/Tables.java
@@ -46,7 +46,17 @@ default Table create(Schema schema,
SortOrder order,
Map properties,
String tableIdentifier) {
- throw new UnsupportedOperationException(this.getClass().getName() + " does not implement create with a sort order");
+ return create(schema, spec, order, RowKey.notIdentified(), properties, tableIdentifier);
+ }
+
+ default Table create(Schema schema,
+ PartitionSpec spec,
+ SortOrder order,
+ RowKey rowKey,
+ Map properties,
+ String tableIdentifier) {
+ throw new UnsupportedOperationException(this.getClass().getName() +
+ " does not implement create with a sort order and row key");
}
Table load(String tableIdentifier);
diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
index 670276282436..b1bc0f4966bd 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowKey;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
@@ -360,6 +361,14 @@ interface TableBuilder {
*/
TableBuilder withSortOrder(SortOrder sortOrder);
+ /**
+ * Sets a row key for the table.
+ *
+ * @param rowKey a row key
+ * @return this for method chaining
+ */
+ TableBuilder withRowKey(RowKey rowKey);
+
/**
* Sets a location for the table.
*
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 18006369a3c5..5da3e7fb23b4 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -37,6 +37,7 @@
abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
+ private final RowKey rowKey = RowKey.notIdentified();
private final TableOperations ops;
private final Table table;
private final String name;
@@ -108,6 +109,11 @@ public Map sortOrders() {
return ImmutableMap.of(sortOrder.orderId(), sortOrder);
}
+ @Override
+ public RowKey rowKey() {
+ return rowKey;
+ }
+
@Override
public Map properties() {
return ImmutableMap.of();
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index 0730667a68fc..006a93b65131 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -160,6 +160,7 @@ protected class BaseMetastoreCatalogTableBuilder implements TableBuilder {
private final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder();
private PartitionSpec spec = PartitionSpec.unpartitioned();
private SortOrder sortOrder = SortOrder.unsorted();
+ private RowKey rowKey = RowKey.notIdentified();
private String location = null;
public BaseMetastoreCatalogTableBuilder(TableIdentifier identifier, Schema schema) {
@@ -181,6 +182,12 @@ public TableBuilder withSortOrder(SortOrder newSortOrder) {
return this;
}
+ @Override
+ public TableBuilder withRowKey(RowKey newRowKey) {
+ this.rowKey = newRowKey != null ? newRowKey : RowKey.notIdentified();
+ return this;
+ }
+
@Override
public TableBuilder withLocation(String newLocation) {
this.location = newLocation;
@@ -210,7 +217,8 @@ public Table create() {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map properties = propertiesBuilder.build();
- TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
+ TableMetadata metadata = TableMetadata.newTableMetadata(
+ schema, spec, sortOrder, rowKey, baseLocation, properties);
try {
ops.commit(null, metadata);
@@ -230,7 +238,8 @@ public Transaction createTransaction() {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map properties = propertiesBuilder.build();
- TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
+ TableMetadata metadata = TableMetadata.newTableMetadata(
+ schema, spec, sortOrder, rowKey, baseLocation, properties);
return Transactions.createTableTransaction(identifier.toString(), ops, metadata);
}
@@ -253,10 +262,12 @@ private Transaction newReplaceTableTransaction(boolean orCreate) {
TableMetadata metadata;
if (ops.current() != null) {
String baseLocation = location != null ? location : ops.current().location();
- metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
+ metadata = ops.current().buildReplacement(
+ schema, spec, sortOrder, rowKey, baseLocation, propertiesBuilder.build());
} else {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
- metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
+ metadata = TableMetadata.newTableMetadata(
+ schema, spec, sortOrder, rowKey, baseLocation, propertiesBuilder.build());
}
if (orCreate) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 51c056f5e1dd..1b2080af9fe7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -89,6 +89,11 @@ public Map sortOrders() {
return ops.current().sortOrdersById();
}
+ @Override
+ public RowKey rowKey() {
+ return ops.current().rowKey();
+ }
+
@Override
public Map properties() {
return ops.current().properties();
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 925f37f89955..bfc2c2c671d4 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -545,6 +545,11 @@ public Map sortOrders() {
return current.sortOrdersById();
}
+ @Override
+ public RowKey rowKey() {
+ return current.rowKey();
+ }
+
@Override
public Map properties() {
return current.properties();
diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
index 95c01a5397f1..9943e79aba4b 100644
--- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java
@@ -186,6 +186,12 @@ public TableBuilder withSortOrder(SortOrder sortOrder) {
return this;
}
+ @Override
+ public TableBuilder withRowKey(RowKey rowKey) {
+ innerBuilder.withRowKey(rowKey);
+ return this;
+ }
+
@Override
public TableBuilder withLocation(String location) {
innerBuilder.withLocation(location);
diff --git a/core/src/main/java/org/apache/iceberg/RowKeyParser.java b/core/src/main/java/org/apache/iceberg/RowKeyParser.java
new file mode 100644
index 000000000000..984ea3c0c415
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/RowKeyParser.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class RowKeyParser {
+ private static final String FIELDS = "identifier-fields";
+ private static final String SOURCE_ID = "source-id";
+
+ private RowKeyParser() {
+ }
+
+ public static void toJson(RowKey rowKey, JsonGenerator generator) throws IOException {
+ generator.writeStartObject();
+ generator.writeFieldName(FIELDS);
+ toJsonFields(rowKey, generator);
+ generator.writeEndObject();
+ }
+
+ public static String toJson(RowKey rowKey) {
+ return toJson(rowKey, false);
+ }
+
+ public static String toJson(RowKey rowKey, boolean pretty) {
+ try {
+ StringWriter writer = new StringWriter();
+ JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+ if (pretty) {
+ generator.useDefaultPrettyPrinter();
+ }
+ toJson(rowKey, generator);
+ generator.flush();
+ return writer.toString();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private static void toJsonFields(RowKey rowKey, JsonGenerator generator) throws IOException {
+ generator.writeStartArray();
+ for (RowKeyIdentifierField field : rowKey.identifierFields()) {
+ generator.writeStartObject();
+ generator.writeNumberField(SOURCE_ID, field.sourceId());
+ generator.writeEndObject();
+ }
+ generator.writeEndArray();
+ }
+
+ public static RowKey fromJson(Schema schema, String json) {
+ try {
+ return fromJson(schema, JsonUtil.mapper().readValue(json, JsonNode.class));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public static RowKey fromJson(Schema schema, JsonNode json) {
+ Preconditions.checkArgument(json.isObject(), "Cannot parse row key from non-object: %s", json);
+ RowKey.Builder builder = RowKey.builderFor(schema);
+ buildFromJsonFields(builder, json.get(FIELDS));
+ return builder.build();
+ }
+
+ private static void buildFromJsonFields(RowKey.Builder builder, JsonNode json) {
+ Preconditions.checkArgument(json != null, "Cannot parse null row key fields");
+ Preconditions.checkArgument(json.isArray(), "Cannot parse row key fields, not an array: %s", json);
+
+ Iterator elements = json.elements();
+ while (elements.hasNext()) {
+ JsonNode element = elements.next();
+ Preconditions.checkArgument(element.isObject(),
+ "Cannot parse row key field, not an object: %s", element);
+ builder.addField(JsonUtil.getInt(SOURCE_ID, element));
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 1e33c54b8ec6..94c3c54a61ba 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -65,7 +65,8 @@ public static TableMetadata newTableMetadata(TableOperations ops,
PartitionSpec spec,
String location,
Map properties) {
- return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION);
+ return newTableMetadata(schema, spec, SortOrder.unsorted(), RowKey.notIdentified(), location,
+ properties, DEFAULT_TABLE_FORMAT_VERSION);
}
public static TableMetadata newTableMetadata(Schema schema,
@@ -73,19 +74,31 @@ public static TableMetadata newTableMetadata(Schema schema,
SortOrder sortOrder,
String location,
Map properties) {
- return newTableMetadata(schema, spec, sortOrder, location, properties, DEFAULT_TABLE_FORMAT_VERSION);
+ return newTableMetadata(schema, spec, sortOrder, RowKey.notIdentified(), location,
+ properties, DEFAULT_TABLE_FORMAT_VERSION);
+ }
+
+ public static TableMetadata newTableMetadata(Schema schema,
+ PartitionSpec spec,
+ SortOrder sortOrder,
+ RowKey rowKey,
+ String location,
+ Map properties) {
+ return newTableMetadata(schema, spec, sortOrder, rowKey, location, properties, DEFAULT_TABLE_FORMAT_VERSION);
}
public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
String location,
Map properties) {
- return newTableMetadata(schema, spec, SortOrder.unsorted(), location, properties, DEFAULT_TABLE_FORMAT_VERSION);
+ return newTableMetadata(schema, spec, SortOrder.unsorted(), RowKey.notIdentified(), location,
+ properties, DEFAULT_TABLE_FORMAT_VERSION);
}
static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
SortOrder sortOrder,
+ RowKey rowKey,
String location,
Map properties,
int formatVersion) {
@@ -111,6 +124,9 @@ static TableMetadata newTableMetadata(Schema schema,
int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID;
SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder);
+ // rebuild the row key using the new column ids
+ RowKey freshRowKey = freshRowKey(freshSchema, rowKey);
+
// Validate the metrics configuration. Note: we only do this on new tables to we don't
// break existing tables.
MetricsConfig.fromProperties(properties).validateReferencedColumns(schema);
@@ -119,7 +135,7 @@ static TableMetadata newTableMetadata(Schema schema,
INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(),
lastColumnId.get(), freshSchema.schemaId(), ImmutableList.of(freshSchema),
freshSpec.specId(), ImmutableList.of(freshSpec), freshSpec.lastAssignedFieldId(),
- freshSortOrderId, ImmutableList.of(freshSortOrder),
+ freshSortOrderId, ImmutableList.of(freshSortOrder), freshRowKey,
ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
ImmutableList.of(), ImmutableList.of());
}
@@ -228,6 +244,7 @@ public String toString() {
private final int lastAssignedPartitionId;
private final int defaultSortOrderId;
private final List sortOrders;
+ private final RowKey rowKey;
private final Map properties;
private final long currentSnapshotId;
private final List snapshots;
@@ -253,6 +270,7 @@ public String toString() {
int lastAssignedPartitionId,
int defaultSortOrderId,
List sortOrders,
+ RowKey rowKey,
Map properties,
long currentSnapshotId,
List snapshots,
@@ -260,6 +278,7 @@ public String toString() {
List previousFiles) {
Preconditions.checkArgument(specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty");
Preconditions.checkArgument(sortOrders != null && !sortOrders.isEmpty(), "Sort orders cannot be null or empty");
+ Preconditions.checkArgument(rowKey != null, "Row key cannot be null or empty");
Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
"Unsupported format version: v%s", formatVersion);
Preconditions.checkArgument(formatVersion == 1 || uuid != null,
@@ -281,6 +300,7 @@ public String toString() {
this.defaultSpecId = defaultSpecId;
this.lastAssignedPartitionId = lastAssignedPartitionId;
this.defaultSortOrderId = defaultSortOrderId;
+ this.rowKey = rowKey;
this.sortOrders = sortOrders;
this.properties = properties;
this.currentSnapshotId = currentSnapshotId;
@@ -421,6 +441,10 @@ public Map sortOrdersById() {
return sortOrdersById;
}
+ public RowKey rowKey() {
+ return rowKey;
+ }
+
public String location() {
return location;
}
@@ -471,8 +495,8 @@ public TableMetadata withUUID() {
} else {
return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
lastSequenceNumber, lastUpdatedMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties,
- currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, rowKey,
+ properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
}
@@ -482,6 +506,7 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
// rebuild all of the partition specs and sort orders for the new current schema
List updatedSpecs = Lists.transform(specs, spec -> updateSpecSchema(newSchema, spec));
List updatedSortOrders = Lists.transform(sortOrders, order -> updateSortOrderSchema(newSchema, order));
+ RowKey updatedRowKey = updateRowKeySchema(newSchema, rowKey);
int newSchemaId = reuseOrCreateNewSchemaId(newSchema);
if (currentSchemaId == newSchemaId && newLastColumnId == lastColumnId) {
@@ -497,8 +522,8 @@ public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), newLastColumnId,
newSchemaId, builder.build(), defaultSpecId, updatedSpecs, lastAssignedPartitionId,
- defaultSortOrderId, updatedSortOrders, properties, currentSnapshotId, snapshots, snapshotLog,
- addPreviousFile(file, lastUpdatedMillis));
+ defaultSortOrderId, updatedSortOrders, updatedRowKey,
+ properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
// The caller is responsible to pass a newPartitionSpec with correct partition field IDs
@@ -535,7 +560,7 @@ public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, newDefaultSpecId,
builder.build(), Math.max(lastAssignedPartitionId, newPartitionSpec.lastAssignedFieldId()),
- defaultSortOrderId, sortOrders, properties,
+ defaultSortOrderId, sortOrders, rowKey, properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -573,8 +598,19 @@ public TableMetadata replaceSortOrder(SortOrder newOrder) {
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, newOrderId, builder.build(), properties, currentSnapshotId, snapshots, snapshotLog,
- addPreviousFile(file, lastUpdatedMillis));
+ lastAssignedPartitionId, newOrderId, builder.build(), rowKey,
+ properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
+ }
+
+ public TableMetadata updateRowKey(RowKey newKey) {
+ if (rowKey.equals(newKey)) {
+ return this;
+ }
+
+ return new TableMetadata(null, formatVersion, uuid, location,
+ lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, freshRowKey(schema(), newKey),
+ properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata addStagedSnapshot(Snapshot snapshot) {
@@ -590,8 +626,8 @@ public TableMetadata addStagedSnapshot(Snapshot snapshot) {
return new TableMetadata(null, formatVersion, uuid, location,
snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
- defaultSortOrderId, sortOrders, properties, currentSnapshotId, newSnapshots, snapshotLog,
- addPreviousFile(file, lastUpdatedMillis));
+ defaultSortOrderId, sortOrders, rowKey,
+ properties, currentSnapshotId, newSnapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
@@ -616,8 +652,8 @@ public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
return new TableMetadata(null, formatVersion, uuid, location,
snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
- defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog,
- addPreviousFile(file, lastUpdatedMillis));
+ defaultSortOrderId, sortOrders, rowKey,
+ properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata removeSnapshotsIf(Predicate removeIf) {
@@ -648,8 +684,9 @@ public TableMetadata removeSnapshotsIf(Predicate removeIf) {
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId, filtered,
- ImmutableList.copyOf(newSnapshotLog), addPreviousFile(file, lastUpdatedMillis));
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, rowKey,
+ properties, currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog),
+ addPreviousFile(file, lastUpdatedMillis));
}
private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
@@ -672,16 +709,17 @@ private TableMetadata setCurrentSnapshotTo(Snapshot snapshot) {
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, nowMillis, lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, snapshot.snapshotId(), snapshots,
- newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, rowKey,
+ properties, snapshot.snapshotId(), snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata replaceProperties(Map newProperties) {
ValidationException.check(newProperties != null, "Cannot set properties to null");
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, newProperties, currentSnapshotId, snapshots,
- snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties));
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, rowKey,
+ newProperties, currentSnapshotId, snapshots, snapshotLog,
+ addPreviousFile(file, lastUpdatedMillis, newProperties));
}
public TableMetadata removeSnapshotLogEntries(Set snapshotIds) {
@@ -699,14 +737,14 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) {
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, rowKey,
+ properties, currentSnapshotId, snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
// The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs
public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec,
- SortOrder updatedSortOrder, String newLocation,
- Map updatedProperties) {
+ SortOrder updatedSortOrder, RowKey updatedRowKey,
+ String newLocation, Map updatedProperties) {
ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec),
"Spec does not use sequential IDs that are required in v1: %s", updatedPartitionSpec);
@@ -751,6 +789,8 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
sortOrdersBuilder.add(freshSortOrder);
}
+ RowKey freshRowKey = freshRowKey(freshSchema, updatedRowKey);
+
Map newProperties = Maps.newHashMap();
newProperties.putAll(this.properties);
newProperties.putAll(updatedProperties);
@@ -766,15 +806,16 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
return new TableMetadata(null, formatVersion, uuid, newLocation,
lastSequenceNumber, System.currentTimeMillis(), newLastColumnId.get(), freshSchemaId, schemasBuilder.build(),
specId, specListBuilder.build(), Math.max(lastAssignedPartitionId, freshSpec.lastAssignedFieldId()),
- orderId, sortOrdersBuilder.build(), ImmutableMap.copyOf(newProperties),
- -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));
+ orderId, sortOrdersBuilder.build(), freshRowKey,
+ ImmutableMap.copyOf(newProperties), -1, snapshots, ImmutableList.of(),
+ addPreviousFile(file, lastUpdatedMillis, newProperties));
}
public TableMetadata updateLocation(String newLocation) {
return new TableMetadata(null, formatVersion, uuid, newLocation,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, rowKey,
+ properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
@@ -790,8 +831,8 @@ public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
return new TableMetadata(null, newFormatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
- snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, rowKey,
+ properties, currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
private List addPreviousFile(InputFile previousFile, long timestampMillis) {
@@ -842,6 +883,17 @@ private static SortOrder updateSortOrderSchema(Schema schema, SortOrder sortOrde
return builder.build();
}
+ private static RowKey updateRowKeySchema(Schema schema, RowKey rowKey) {
+ RowKey.Builder builder = RowKey.builderFor(schema);
+
+ // add all the fields to the builder, source column IDs should not change.
+ for (RowKeyIdentifierField field : rowKey.identifierFields()) {
+ builder.addField(field.sourceId());
+ }
+
+ return builder.build();
+ }
+
private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
.withSpecId(specId);
@@ -877,6 +929,23 @@ private static SortOrder freshSortOrder(int orderId, Schema schema, SortOrder so
return builder.build();
}
+ private static RowKey freshRowKey(Schema schema, RowKey rowKey) {
+ RowKey.Builder builder = RowKey.builderFor(schema);
+
+ for (RowKeyIdentifierField field : rowKey.identifierFields()) {
+ // look up the name of the source field in the old schema to get the new schema's id
+ String columnName = rowKey.schema().findColumnName(field.sourceId());
+ Preconditions.checkNotNull(columnName,
+ "Cannot find column in the row key's schema. id: %s, schema: %s",
+ field.sourceId(), rowKey.schema());
+
+ // reassign all row keys with fresh column IDs.
+ builder.addField(columnName);
+ }
+
+ return builder.build();
+ }
+
private static Map indexAndValidateSnapshots(List snapshots, long lastSequenceNumber) {
ImmutableMap.Builder builder = ImmutableMap.builder();
for (Snapshot snap : snapshots) {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 29ad8998b472..a3e8fd682117 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -96,6 +96,7 @@ private TableMetadataParser() {
static final String LAST_PARTITION_ID = "last-partition-id";
static final String DEFAULT_SORT_ORDER_ID = "default-sort-order-id";
static final String SORT_ORDERS = "sort-orders";
+ static final String ROW_KEY = "row-key";
static final String PROPERTIES = "properties";
static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
static final String SNAPSHOTS = "snapshots";
@@ -152,6 +153,7 @@ public static String toJson(TableMetadata metadata) {
}
}
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
private static void toJson(TableMetadata metadata, JsonGenerator generator) throws IOException {
generator.writeStartObject();
@@ -203,6 +205,10 @@ private static void toJson(TableMetadata metadata, JsonGenerator generator) thro
}
generator.writeEndArray();
+ // write row key.
+ generator.writeFieldName(ROW_KEY);
+ RowKeyParser.toJson(metadata.rowKey(), generator);
+
// write properties map
generator.writeObjectFieldStart(PROPERTIES);
for (Map.Entry keyValue : metadata.properties().entrySet()) {
@@ -261,7 +267,7 @@ public static TableMetadata read(FileIO io, InputFile file) {
}
}
- @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ @SuppressWarnings({"checkstyle:CyclomaticComplexity", "checkstyle:MethodLength"})
static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {
Preconditions.checkArgument(node.isObject(),
"Cannot parse metadata from a non-object: %s", node);
@@ -368,6 +374,17 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {
defaultSortOrderId = defaultSortOrder.orderId();
}
+ // Parse row keys
+ RowKey rowKey;
+ JsonNode rowKeyObject = node.get(ROW_KEY);
+ if (rowKeyObject != null) {
+ rowKey = RowKeyParser.fromJson(schema, rowKeyObject);
+ } else {
+ Preconditions.checkArgument(formatVersion == 1,
+ "%s must exist in format v%s", ROW_KEY, formatVersion);
+ rowKey = RowKey.notIdentified();
+ }
+
// parse properties map
Map properties = JsonUtil.getStringMap(PROPERTIES, node);
long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
@@ -405,7 +422,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {
return new TableMetadata(file, formatVersion, uuid, location,
lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, currentSchemaId, schemas, defaultSpecId, specs,
- lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentVersionId,
- snapshots, entries.build(), metadataEntries.build());
+ lastAssignedPartitionId, defaultSortOrderId, sortOrders, rowKey,
+ properties, currentVersionId, snapshots, entries.build(), metadataEntries.build());
}
}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
index dc1a2d256cee..fc4e860409bd 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
@@ -30,6 +30,7 @@
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowKey;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StaticTableOperations;
@@ -132,15 +133,17 @@ private Table loadMetadataTable(String location, String metadataTableName, Metad
*
* @param schema iceberg schema used to create the table
* @param spec partitioning spec, if null the table will be unpartitioned
+ * @param rowKey row key, if null the table will have no row key
* @param properties a string map of table properties, initialized to empty if null
* @param location a path URI (e.g. hdfs:///warehouse/my_table)
* @return newly created table implementation
*/
@Override
- public Table create(Schema schema, PartitionSpec spec, SortOrder order,
+ public Table create(Schema schema, PartitionSpec spec, SortOrder order, RowKey rowKey,
Map properties, String location) {
return buildTable(location, schema).withPartitionSpec(spec)
.withSortOrder(order)
+ .withRowKey(rowKey)
.withProperties(properties)
.create();
}
@@ -199,13 +202,14 @@ TableOperations newTableOps(String location) {
}
private TableMetadata tableMetadata(Schema schema, PartitionSpec spec, SortOrder order,
- Map properties, String location) {
+ RowKey key, Map properties, String location) {
Preconditions.checkNotNull(schema, "A table schema is required");
Map tableProps = properties == null ? ImmutableMap.of() : properties;
PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned() : spec;
SortOrder sortOrder = order == null ? SortOrder.unsorted() : order;
- return TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, location, tableProps);
+ RowKey rowKey = key == null ? RowKey.notIdentified() : key;
+ return TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, rowKey, location, tableProps);
}
/**
@@ -259,6 +263,7 @@ private class HadoopTableBuilder implements Catalog.TableBuilder {
private final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder();
private PartitionSpec spec = PartitionSpec.unpartitioned();
private SortOrder sortOrder = SortOrder.unsorted();
+ private RowKey rowKey = RowKey.notIdentified();
HadoopTableBuilder(String location, Schema schema) {
@@ -278,6 +283,12 @@ public Catalog.TableBuilder withSortOrder(SortOrder newSortOrder) {
return this;
}
+ @Override
+ public Catalog.TableBuilder withRowKey(RowKey newRowKey) {
+ this.rowKey = newRowKey != null ? newRowKey : RowKey.notIdentified();
+ return this;
+ }
+
@Override
public Catalog.TableBuilder withLocation(String newLocation) {
Preconditions.checkArgument(newLocation == null || location.equals(newLocation),
@@ -308,7 +319,7 @@ public Table create() {
}
Map properties = propertiesBuilder.build();
- TableMetadata metadata = tableMetadata(schema, spec, sortOrder, properties, location);
+ TableMetadata metadata = tableMetadata(schema, spec, sortOrder, rowKey, properties, location);
ops.commit(null, metadata);
return new BaseTable(ops, location);
}
@@ -321,7 +332,7 @@ public Transaction createTransaction() {
}
Map properties = propertiesBuilder.build();
- TableMetadata metadata = tableMetadata(schema, spec, null, properties, location);
+ TableMetadata metadata = tableMetadata(schema, spec, null, rowKey, properties, location);
return Transactions.createTableTransaction(location, ops, metadata);
}
@@ -344,9 +355,9 @@ private Transaction newReplaceTableTransaction(boolean orCreate) {
Map properties = propertiesBuilder.build();
TableMetadata metadata;
if (ops.current() != null) {
- metadata = ops.current().buildReplacement(schema, spec, sortOrder, location, properties);
+ metadata = ops.current().buildReplacement(schema, spec, sortOrder, rowKey, location, properties);
} else {
- metadata = tableMetadata(schema, spec, sortOrder, properties, location);
+ metadata = tableMetadata(schema, spec, sortOrder, rowKey, properties, location);
}
if (orCreate) {
diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
index b822716fa949..461f35302db6 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
@@ -24,7 +24,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.transforms.Transform;
@@ -70,7 +72,8 @@ public void testReplaceTransactionWithCustomSortOrder() {
.build();
Map props = Maps.newHashMap();
- Transaction replace = TestTables.beginReplace(tableDir, "test", schema, unpartitioned(), newSortOrder, props);
+ Transaction replace = TestTables.beginReplace(tableDir, "test", schema, unpartitioned(), newSortOrder,
+ RowKey.notIdentified(), props);
replace.commitTransaction();
table.refresh();
@@ -364,6 +367,47 @@ public void testReplaceToCreateAndAppend() throws IOException {
validateSnapshot(null, meta.currentSnapshot(), FILE_A, FILE_B);
}
+ @Test
+ public void testReplaceTransactionWithNewRowKey() {
+ Snapshot start = table.currentSnapshot();
+ Schema schema = table.schema();
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ Assert.assertEquals("Version should be 1", 1L, (long) version());
+
+ validateSnapshot(start, table.currentSnapshot(), FILE_A);
+
+ RowKey newRowKey = RowKey.builderFor(schema)
+ .addField("id")
+ .addField("data")
+ .build();
+
+ Map props = Maps.newHashMap();
+ Transaction replace = TestTables.beginReplace(tableDir, "test", schema, unpartitioned(),
+ SortOrder.unsorted(), newRowKey, props);
+ replace.commitTransaction();
+
+ table.refresh();
+
+ Assert.assertEquals("Version should be 2", 2L, (long) version());
+ Assert.assertNull("Table should not have a current snapshot", table.currentSnapshot());
+ Assert.assertEquals("Schema should match previous schema",
+ schema.asStruct(), table.schema().asStruct());
+ Assert.assertEquals("Partition spec should have no fields",
+ 0, table.spec().fields().size());
+ Assert.assertEquals("Table should have 1 orders", 1, table.sortOrders().size());
+ Assert.assertEquals("Sort order should have no fields", 0, table.sortOrder().fields().size());
+
+ RowKey rowKey = table.rowKey();
+ Assert.assertEquals("Row key must have 2 fields", 2, rowKey.identifierFields().size());
+ Assert.assertEquals("Field source column IDs must match",
+ ImmutableList.of(table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId()),
+ rowKey.identifierFields().stream().map(RowKeyIdentifierField::sourceId).collect(Collectors.toList()));
+ }
+
private static Schema assignFreshIds(Schema schema) {
AtomicInteger lastColumnId = new AtomicInteger(0);
return TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
diff --git a/core/src/test/java/org/apache/iceberg/TestRowKey.java b/core/src/test/java/org/apache/iceberg/TestRowKey.java
new file mode 100644
index 000000000000..d3985b06a402
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestRowKey.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestRowKey {
+
+ private final int formatVersion;
+
+ private static final Schema SCHEMA = new Schema(
+ required(10, "id", Types.IntegerType.get()),
+ required(11, "data", Types.StringType.get()),
+ optional(12, "s", Types.StructType.of(
+ required(13, "id", Types.IntegerType.get())
+ )),
+ optional(14, "map", Types.MapType.ofOptional(
+ 15, 16, Types.IntegerType.get(), Types.IntegerType.get()
+ )),
+ required(17, "required_list", Types.ListType.ofOptional(18, Types.StringType.get()))
+ );
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private File tableDir = null;
+
+ @Parameterized.Parameters
+ public static Iterable