Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions api/src/main/java/org/apache/iceberg/RowKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.io.Serializable;
import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;

/**
* Row key of a table.
* <p>
* Row key is a definition of table row uniqueness,
* similar to the concept of primary key in a relational database system.
* A row should be unique in a table based on the values of an unordered set of {@link RowKeyIdentifierField}.
* Iceberg itself does not enforce row uniqueness based on this key.
* It is leveraged by operations such as streaming upsert.
*/
public class RowKey implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion but I'd go for RowIdentifier or RowId. I think Key means uniqueness but I'll be fine this way too as long as we agree Iceberg does not ensure uniqueness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key does not always means uniqueness in my mind. from the MySQL document, index could be created on top of key columns and the index could choose to be unique or non-unique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have gone back and forth on this naming, and for now I would prefer the Key case because Id is heavily used in table metadata to mean concepts such as spec-id, schema-id, order-id, etc. which are the increasing ID of different specs. Using a different keyword Key would provide more clarity in the table metadata.

Copy link
Contributor

@rdblue rdblue Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Jack's logic that "id" is typically used in Iceberg to refer to a numeric identifier. It would be odd to use RowId, especially given the overlap with the JDBC one. But, we have had a significant number of people that find "key" confusing when it is a non-unique "key".

What about shifting the focus from the "key" or "identifier" to the fields? We could use identifier-field-ids to hold the collection and add identifierFieldIds to APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed class RowKeyField to RowKeyIdentifierField, and fields to identifier-fields in metadata. Please let me know if that feels better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I don't have strong opinion about the RowKeyField or RowKeyIdentifierField. I'm okay if you think it's good for one of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more clear, I don't think we should ignore the near consensus from our sync discussion that "key" is misleading. I think we should instead call this class IdentityFields (or something similar) and store identifier-field-ids in table metadata.


private static final RowKey NOT_IDENTIFIED = new RowKey(new Schema(), Sets.newHashSet());

private final Schema schema;
private final RowKeyIdentifierField[] identifierFields;

private transient volatile Set<RowKeyIdentifierField> identifierFieldSet;

private RowKey(Schema schema, Set<RowKeyIdentifierField> identifierFields) {
this.schema = schema;
this.identifierFields = identifierFields.toArray(new RowKeyIdentifierField[0]);
}

/**
* Returns the {@link Schema} referenced by the row key
*/
public Schema schema() {
return schema;
}

/**
* Return the set of {@link RowKeyIdentifierField} in the row key
* <p>
* @return the set of fields in the row key
*/
public Set<RowKeyIdentifierField> identifierFields() {
return lazyIdentifierFieldSet();
}

private Set<RowKeyIdentifierField> lazyIdentifierFieldSet() {
if (identifierFieldSet == null) {
synchronized (this) {
if (identifierFieldSet == null) {
identifierFieldSet = ImmutableSet.copyOf(identifierFields);
}
}
}

return identifierFieldSet;
}

/**
* Returns the default row key that has no field
*/
public static RowKey notIdentified() {
return NOT_IDENTIFIED;
}

/**
* Returns true if the row key is the default one with no field
*/
public boolean isNotIdentified() {
return identifierFields.length < 1;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (other == null || getClass() != other.getClass()) {
return false;
}

RowKey that = (RowKey) other;
return identifierFields().equals(that.identifierFields());
}

@Override
public int hashCode() {
return identifierFields().hashCode();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
for (RowKeyIdentifierField field : identifierFields) {
sb.append("\n");
sb.append(" ").append(field);
}
if (identifierFields.length > 0) {
sb.append("\n");
}
sb.append("]");
return sb.toString();
}

/**
* Creates a new {@link Builder row key builder} for the given {@link Schema}.
*
* @param schema a schema
* @return a row key builder for the given schema.
*/
public static Builder builderFor(Schema schema) {
return new Builder(schema);
}

/**
* A builder to create valid {@link RowKey row key}.
* <p>
* Call {@link #builderFor(Schema)} to create a new builder.
*/
public static class Builder {
private final Schema schema;
private final Set<RowKeyIdentifierField> fields = Sets.newHashSet();

private Builder(Schema schema) {
this.schema = schema;
}

public Builder addField(String name) {
Types.NestedField column = schema.findField(name);
ValidationException.check(column != null, "Cannot find column with name %s in schema %s", name, schema);
return addField(column);
}

public Builder addField(int id) {
Types.NestedField column = schema.findField(id);
ValidationException.check(column != null, "Cannot find column with ID %s in schema %s", id, schema);
return addField(column);
}

private Builder addField(Types.NestedField column) {
ValidationException.check(column.isRequired(),
"Cannot add column %s to row key because it is not a required column in schema %s", column, schema);
ValidationException.check(column.type().isPrimitiveType(),
"Cannot add column %s to row key because it is not a primitive data type in schema %s", column, schema);
fields.add(new RowKeyIdentifierField(column.fieldId()));
return this;
}

public RowKey build() {
if (fields.size() == 0) {
return NOT_IDENTIFIED;
}

return new RowKey(schema, fields);
}
}
}
65 changes: 65 additions & 0 deletions api/src/main/java/org/apache/iceberg/RowKeyIdentifierField.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.io.Serializable;
import java.util.Objects;

/**
* An identifier field in {@link RowKey}
* <p>
* The field must be:
* 1. a required column in the table schema
* 2. a primitive type column
*/
public class RowKeyIdentifierField implements Serializable {
Copy link
Contributor

@rdblue rdblue Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a class that wraps a single ID. Could we get rid of it? Instead, IdentifierFields could expose idSet or ids that returns Collection<Integer>.


private final int sourceId;

RowKeyIdentifierField(int sourceId) {
this.sourceId = sourceId;
}

public int sourceId() {
return sourceId;
}

@Override
public String toString() {
return "(" + sourceId + ")";
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (other == null || getClass() != other.getClass()) {
return false;
}

RowKeyIdentifierField that = (RowKeyIdentifierField) other;
return sourceId == that.sourceId;
}

@Override
public int hashCode() {
return Objects.hash(sourceId);
}
}
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ default String name() {
*/
Map<Integer, SortOrder> sortOrders();

/**
* Return the {@link RowKey row key} for this table.
*
* @return this table's row key.
*/
RowKey rowKey();

/**
* Return a map of string properties for this table.
*
Expand Down
12 changes: 11 additions & 1 deletion api/src/main/java/org/apache/iceberg/Tables.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,17 @@ default Table create(Schema schema,
SortOrder order,
Map<String, String> properties,
String tableIdentifier) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement create with a sort order");
return create(schema, spec, order, RowKey.notIdentified(), properties, tableIdentifier);
}

default Table create(Schema schema,
PartitionSpec spec,
SortOrder order,
RowKey rowKey,
Map<String, String> properties,
String tableIdentifier) {
throw new UnsupportedOperationException(this.getClass().getName() +
" does not implement create with a sort order and row key");
}

Table load(String tableIdentifier);
Expand Down
9 changes: 9 additions & 0 deletions api/src/main/java/org/apache/iceberg/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowKey;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to introduce a createTable method that could accept RowKey in this interface ? I rise this question because when I support flink SQL primary key , I find it's necessary method. Of course, we could publish separate PR for this if possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opening a separate issue for this is good enough for me now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I thought about that, and there were 2 reasons that let me decide to not add it:

  1. currently both Spark and Hive SQL specification does not allow a primary key clause. I would expect the user to run something like ALTER TABLE ... ADD ROW KEY (actual syntax TBD) to add this row key in Spark.

  2. it's now becoming hard to iterate through all the combinations of parameters in createTable, we do not have the methods with SortOrder in parameter yet, although it can be mapped to SORTED BY clause. I would prefer us to switch to use the table builder if possible instead of adding all those overloading methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I agree it's good to use TableBuilder to createTable in future usage, as we are introducing more and more arguments when creating table.

import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -360,6 +361,14 @@ interface TableBuilder {
*/
TableBuilder withSortOrder(SortOrder sortOrder);

/**
* Sets a row key for the table.
*
* @param rowKey a row key
* @return this for method chaining
*/
TableBuilder withRowKey(RowKey rowKey);

/**
* Sets a location for the table.
*
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
private final RowKey rowKey = RowKey.notIdentified();
private final TableOperations ops;
private final Table table;
private final String name;
Expand Down Expand Up @@ -108,6 +109,11 @@ public Map<Integer, SortOrder> sortOrders() {
return ImmutableMap.of(sortOrder.orderId(), sortOrder);
}

@Override
public RowKey rowKey() {
return rowKey;
}

@Override
public Map<String, String> properties() {
return ImmutableMap.of();
Expand Down
19 changes: 15 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ protected class BaseMetastoreCatalogTableBuilder implements TableBuilder {
private final ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
private PartitionSpec spec = PartitionSpec.unpartitioned();
private SortOrder sortOrder = SortOrder.unsorted();
private RowKey rowKey = RowKey.notIdentified();
private String location = null;

public BaseMetastoreCatalogTableBuilder(TableIdentifier identifier, Schema schema) {
Expand All @@ -181,6 +182,12 @@ public TableBuilder withSortOrder(SortOrder newSortOrder) {
return this;
}

@Override
public TableBuilder withRowKey(RowKey newRowKey) {
this.rowKey = newRowKey != null ? newRowKey : RowKey.notIdentified();
return this;
}

@Override
public TableBuilder withLocation(String newLocation) {
this.location = newLocation;
Expand Down Expand Up @@ -210,7 +217,8 @@ public Table create() {

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map<String, String> properties = propertiesBuilder.build();
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
TableMetadata metadata = TableMetadata.newTableMetadata(
schema, spec, sortOrder, rowKey, baseLocation, properties);

try {
ops.commit(null, metadata);
Expand All @@ -230,7 +238,8 @@ public Transaction createTransaction() {

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map<String, String> properties = propertiesBuilder.build();
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
TableMetadata metadata = TableMetadata.newTableMetadata(
schema, spec, sortOrder, rowKey, baseLocation, properties);
return Transactions.createTableTransaction(identifier.toString(), ops, metadata);
}

Expand All @@ -253,10 +262,12 @@ private Transaction newReplaceTableTransaction(boolean orCreate) {
TableMetadata metadata;
if (ops.current() != null) {
String baseLocation = location != null ? location : ops.current().location();
metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
metadata = ops.current().buildReplacement(
schema, spec, sortOrder, rowKey, baseLocation, propertiesBuilder.build());
} else {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
metadata = TableMetadata.newTableMetadata(
schema, spec, sortOrder, rowKey, baseLocation, propertiesBuilder.build());
}

if (orCreate) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public Map<Integer, SortOrder> sortOrders() {
return ops.current().sortOrdersById();
}

@Override
public RowKey rowKey() {
return ops.current().rowKey();
}

@Override
public Map<String, String> properties() {
return ops.current().properties();
Expand Down
Loading