diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java index 80c8284bce39..ed98143a7e7a 100644 --- a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java +++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java @@ -20,13 +20,17 @@ package org.apache.iceberg.util; import java.io.Serializable; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; +import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; public class CharSequenceSet implements Set, Serializable { private static final ThreadLocal wrappers = ThreadLocal.withInitial( @@ -152,4 +156,28 @@ public boolean removeAll(Collection objects) { public void clear() { wrapperSet.clear(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + CharSequenceSet that = (CharSequenceSet) o; + return wrapperSet.equals(that.wrapperSet); + } + + @Override + public int hashCode() { + return Objects.hash(wrapperSet); + } + + @Override + public String toString() { + return "CharSequenceSet({" + Streams.stream(iterator()).collect(Collectors.joining(", ")) + "})"; + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 9cea03d2f1ae..8fad3839f82d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -196,7 +196,7 @@ public Transaction createOrReplaceTransaction() { private Transaction newReplaceTableTransaction(boolean orCreate) { TableOperations ops = newTableOps(identifier); if (!orCreate && ops.current() == null) { - throw new NoSuchTableException("No such table: %s", identifier); + throw new NoSuchTableException("Table does not exist: %s", identifier); } TableMetadata metadata; diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 978c58807905..9af1988ea16c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -25,6 +25,7 @@ import java.util.function.Function; import java.util.function.Predicate; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; @@ -115,7 +116,11 @@ protected void doRefresh() { public void commit(TableMetadata base, TableMetadata metadata) { // if the metadata is already out of date, reject it if (base != current()) { - throw new CommitFailedException("Cannot commit: stale table metadata"); + if (base != null) { + throw new CommitFailedException("Cannot commit: stale table metadata"); + } else { + throw new AlreadyExistsException("Table already exists: %s", tableName()); + } } // if the metadata is not changed, return early if (base == metadata) { diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index ad4639c39e93..0297247bb3ac 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -49,7 +49,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; -class BaseTransaction implements Transaction { +public class BaseTransaction implements Transaction { private static final Logger LOG = LoggerFactory.getLogger(BaseTransaction.class); enum TransactionType { @@ -90,6 +90,14 @@ public Table table() { return transactionTable; } + public TableMetadata startMetadata() { + return current; + } + + public TableOperations underyingOps() { + return ops; + } + private void checkLastOperationCommitted(String operation) { Preconditions.checkState(hasLastOpCommitted, "Cannot create new %s: last operation has not committed", operation); diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 7650190448c8..f5c8e4a5545e 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -22,11 +22,14 @@ import java.io.Serializable; import java.util.Map; import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; /** * Represents a change to table metadata. */ public interface MetadataUpdate extends Serializable { + void applyTo(TableMetadata.Builder metadataBuilder); + class AssignUUID implements MetadataUpdate { private final String uuid; @@ -37,6 +40,11 @@ public AssignUUID(String uuid) { public String uuid() { return uuid; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + throw new UnsupportedOperationException("Not implemented"); + } } class UpgradeFormatVersion implements MetadataUpdate { @@ -49,6 +57,11 @@ public UpgradeFormatVersion(int formatVersion) { public int formatVersion() { return formatVersion; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.upgradeFormatVersion(formatVersion); + } } class AddSchema implements MetadataUpdate { @@ -67,6 +80,11 @@ public Schema schema() { public int lastColumnId() { return lastColumnId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.addSchema(schema, lastColumnId); + } } class SetCurrentSchema implements MetadataUpdate { @@ -79,6 +97,11 @@ public SetCurrentSchema(int schemaId) { public int schemaId() { return schemaId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setCurrentSchema(schemaId); + } } class AddPartitionSpec implements MetadataUpdate { @@ -91,6 +114,11 @@ public AddPartitionSpec(PartitionSpec spec) { public PartitionSpec spec() { return spec; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.addPartitionSpec(spec); + } } class SetDefaultPartitionSpec implements MetadataUpdate { @@ -103,6 +131,11 @@ public SetDefaultPartitionSpec(int schemaId) { public int specId() { return specId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setDefaultPartitionSpec(specId); + } } class AddSortOrder implements MetadataUpdate { @@ -112,9 +145,14 @@ public AddSortOrder(SortOrder sortOrder) { this.sortOrder = sortOrder; } - public SortOrder spec() { + public SortOrder sortOrder() { return sortOrder; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.addSortOrder(sortOrder); + } } class SetDefaultSortOrder implements MetadataUpdate { @@ -127,6 +165,11 @@ public SetDefaultSortOrder(int sortOrderId) { public int sortOrderId() { return sortOrderId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setDefaultSortOrder(sortOrderId); + } } class AddSnapshot implements MetadataUpdate { @@ -139,6 +182,11 @@ public AddSnapshot(Snapshot snapshot) { public Snapshot snapshot() { return snapshot; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.addSnapshot(snapshot); + } } class RemoveSnapshot implements MetadataUpdate { @@ -151,6 +199,11 @@ public RemoveSnapshot(long snapshotId) { public long snapshotId() { return snapshotId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeSnapshots(ImmutableSet.of(snapshotId)); + } } class RemoveSnapshotRef implements MetadataUpdate { @@ -163,6 +216,12 @@ public RemoveSnapshotRef(String name) { public String name() { return name; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + // TODO: this should be generalized when tagging is supported + metadataBuilder.removeBranch(name); + } } class SetSnapshotRef implements MetadataUpdate { @@ -181,6 +240,11 @@ public String name() { public long snapshotId() { return snapshotId; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setBranchSnapshot(snapshotId, name); + } } class SetProperties implements MetadataUpdate { @@ -193,6 +257,11 @@ public SetProperties(Map updated) { public Map updated() { return updated; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setProperties(updated); + } } class RemoveProperties implements MetadataUpdate { @@ -205,6 +274,11 @@ public RemoveProperties(Set removed) { public Set removed() { return removed; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeProperties(removed); + } } class SetLocation implements MetadataUpdate { @@ -217,5 +291,10 @@ public SetLocation(String location) { public String location() { return location; } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setLocation(location); + } } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 2625daa7e55c..ad0ecc96e266 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.io.Serializable; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; @@ -407,7 +409,7 @@ public Map specsById() { return specsById; } - int lastAssignedPartitionId() { + public int lastAssignedPartitionId() { return lastAssignedPartitionId; } @@ -755,6 +757,7 @@ public static Builder buildFrom(TableMetadata base) { public static class Builder { private final TableMetadata base; + private String metadataLocation; private int formatVersion; private String uuid; private Long lastUpdatedMillis; @@ -771,12 +774,15 @@ public static class Builder { private final Map properties; private long currentSnapshotId; private List snapshots; - private Map refs; + private final Map refs; // change tracking private final List changes; private final int startingChangeCount; private boolean discardChanges = false; + private Integer lastAddedSchemaId = null; + private Integer lastAddedSpecId = null; + private Integer lastAddedOrderId = null; // handled in build private final List snapshotLog; @@ -821,6 +827,12 @@ private Builder(TableMetadata base) { this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); } + public Builder withMetadataLocation(String location) { + this.metadataLocation = location; + this.discardChanges = true; + return this; + } + public Builder assignUUID() { if (uuid == null) { this.uuid = UUID.randomUUID().toString(); @@ -853,6 +865,12 @@ public Builder setCurrentSchema(Schema newSchema, int newLastColumnId) { } public Builder setCurrentSchema(int schemaId) { + if (schemaId == -1) { + ValidationException.check(lastAddedSchemaId != null, + "Cannot set last added schema: no schema has been added"); + return setCurrentSchema(lastAddedSchemaId); + } + if (currentSchemaId == schemaId) { return this; } @@ -873,12 +891,18 @@ public Builder setCurrentSchema(int schemaId) { this.currentSchemaId = schemaId; - changes.add(new MetadataUpdate.SetCurrentSchema(schemaId)); + if (lastAddedSchemaId != null && lastAddedSchemaId == schemaId) { + // use -1 to signal that current was set to the last added schema + changes.add(new MetadataUpdate.SetCurrentSchema(-1)); + } else { + changes.add(new MetadataUpdate.SetCurrentSchema(schemaId)); + } return this; } public Builder addSchema(Schema schema, int newLastColumnId) { + // TODO: remove requirement for newLastColumnId addSchemaInternal(schema, newLastColumnId); return this; } @@ -889,13 +913,23 @@ public Builder setDefaultPartitionSpec(PartitionSpec spec) { } public Builder setDefaultPartitionSpec(int specId) { + if (specId == -1) { + ValidationException.check(lastAddedSpecId != null, "Cannot set last added spec: no spec has been added"); + return setDefaultPartitionSpec(lastAddedSpecId); + } + if (defaultSpecId == specId) { // the new spec is already current and no change is needed return this; } this.defaultSpecId = specId; - changes.add(new MetadataUpdate.SetDefaultPartitionSpec(specId)); + if (lastAddedSpecId != null && lastAddedSpecId == specId) { + // use -1 to signal that current was set to the last added schema + changes.add(new MetadataUpdate.SetDefaultPartitionSpec(-1)); + } else { + changes.add(new MetadataUpdate.SetDefaultPartitionSpec(specId)); + } return this; } @@ -911,12 +945,23 @@ public Builder setDefaultSortOrder(SortOrder order) { } public Builder setDefaultSortOrder(int sortOrderId) { + if (sortOrderId == -1) { + ValidationException.check(lastAddedOrderId != null, + "Cannot set last added sort order: no sort order has been added"); + return setDefaultSortOrder(lastAddedOrderId); + } + if (sortOrderId == defaultSortOrderId) { return this; } this.defaultSortOrderId = sortOrderId; - changes.add(new MetadataUpdate.SetDefaultSortOrder(sortOrderId)); + if (lastAddedOrderId != null && lastAddedOrderId == sortOrderId) { + // use -1 to signal that current was set to the last added schema + changes.add(new MetadataUpdate.SetDefaultSortOrder(-1)); + } else { + changes.add(new MetadataUpdate.SetDefaultSortOrder(sortOrderId)); + } return this; } @@ -947,7 +992,7 @@ public Builder addSnapshot(Snapshot snapshot) { public Builder setBranchSnapshot(Snapshot snapshot, String branch) { addSnapshot(snapshot); - setBranchSnapshot(snapshot, branch, null); + setBranchSnapshotInternal(snapshot, branch); return this; } @@ -961,7 +1006,7 @@ public Builder setBranchSnapshot(long snapshotId, String branch) { Snapshot snapshot = snapshotsById.get(snapshotId); ValidationException.check(snapshot != null, "Cannot set %s to unknown snapshot: %s", branch, snapshotId); - setBranchSnapshot(snapshot, branch, System.currentTimeMillis()); + setBranchSnapshotInternal(snapshot, branch); return this; } @@ -983,7 +1028,10 @@ public Builder removeBranch(String branch) { public Builder removeSnapshots(List snapshotsToRemove) { Set idsToRemove = snapshotsToRemove.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return removeSnapshots(idsToRemove); + } + public Builder removeSnapshots(Collection idsToRemove) { List retainedSnapshots = Lists.newArrayListWithExpectedSize(snapshots.size() - idsToRemove.size()); for (Snapshot snapshot : snapshots) { long snapshotId = snapshot.snapshotId(); @@ -1066,7 +1114,7 @@ public TableMetadata build() { List newSnapshotLog = updateSnapshotLog(snapshotLog, snapshotsById, currentSnapshotId, changes); return new TableMetadata( - null, + metadataLocation, formatVersion, uuid, location, @@ -1098,6 +1146,7 @@ private int addSchemaInternal(Schema schema, int newLastColumnId) { boolean schemaFound = schemasById.containsKey(newSchemaId); if (schemaFound && newLastColumnId == lastColumnId) { // the new spec and last column id is already current and no change is needed + this.lastAddedSchemaId = newSchemaId; return newSchemaId; } @@ -1117,6 +1166,8 @@ private int addSchemaInternal(Schema schema, int newLastColumnId) { changes.add(new MetadataUpdate.AddSchema(newSchema, lastColumnId)); + this.lastAddedSchemaId = newSchemaId; + return newSchemaId; } @@ -1136,6 +1187,7 @@ private int reuseOrCreateNewSchemaId(Schema newSchema) { private int addPartitionSpecInternal(PartitionSpec spec) { int newSpecId = reuseOrCreateNewSpecId(spec); if (specsById.containsKey(newSpecId)) { + this.lastAddedSpecId = newSpecId; return newSpecId; } @@ -1151,6 +1203,8 @@ private int addPartitionSpecInternal(PartitionSpec spec) { changes.add(new MetadataUpdate.AddPartitionSpec(newSpec)); + this.lastAddedSpecId = newSpecId; + return newSpecId; } @@ -1171,6 +1225,7 @@ private int reuseOrCreateNewSpecId(PartitionSpec newSpec) { private int addSortOrderInternal(SortOrder order) { int newOrderId = reuseOrCreateNewSortOrderId(order); if (sortOrdersById.containsKey(newOrderId)) { + this.lastAddedOrderId = newOrderId; return newOrderId; } @@ -1190,6 +1245,8 @@ private int addSortOrderInternal(SortOrder order) { changes.add(new MetadataUpdate.AddSortOrder(newOrder)); + this.lastAddedOrderId = newOrderId; + return newOrderId; } @@ -1211,7 +1268,7 @@ private int reuseOrCreateNewSortOrderId(SortOrder newOrder) { return newOrderId; } - private void setBranchSnapshot(Snapshot snapshot, String branch, Long currentTimestampMillis) { + private void setBranchSnapshotInternal(Snapshot snapshot, String branch) { long replacementSnapshotId = snapshot.snapshotId(); SnapshotRef ref = refs.get(branch); if (ref != null) { @@ -1225,7 +1282,9 @@ private void setBranchSnapshot(Snapshot snapshot, String branch, Long currentTim "Last sequence number %s is less than existing snapshot sequence number %s", lastSequenceNumber, snapshot.sequenceNumber()); - this.lastUpdatedMillis = currentTimestampMillis != null ? currentTimestampMillis : snapshot.timestampMillis(); + // if the snapshot was added in this change set, use its timestamp + this.lastUpdatedMillis = isAddedSnapshot(snapshot.snapshotId()) ? + snapshot.timestampMillis() : System.currentTimeMillis(); if (SnapshotRef.MAIN_BRANCH.equals(branch)) { this.currentSnapshotId = replacementSnapshotId; @@ -1328,5 +1387,16 @@ private static List updateSnapshotLog( return newSnapshotLog; } + + private boolean isAddedSnapshot(long snapshotId) { + return changes(MetadataUpdate.AddSnapshot.class) + .anyMatch(add -> add.snapshot().snapshotId() == snapshotId); + } + + private Stream changes(Class updateClass) { + return changes.stream() + .filter(updateClass::isInstance) + .map(updateClass::cast); + } } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java index af939f36f879..6292363602fe 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java @@ -94,7 +94,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { try { Map table = getTable(); - if (!table.isEmpty()) { + if (base != null) { validateMetadataLocation(table, base); String oldMetadataLocation = base.metadataFileLocation(); // Start atomic update @@ -123,6 +123,9 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { } catch (SQLWarning e) { throw new UncheckedSQLException(e, "Database warning"); } catch (SQLException e) { + if (e.getMessage().contains("constraint failed")) { + throw new CommitFailedException("Table already exists: %s", tableIdentifier); + } throw new UncheckedSQLException(e, "Unknown failure"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java new file mode 100644 index 000000000000..b6500fde911f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.CreateNamespaceResponse; +import org.apache.iceberg.rest.responses.DropNamespaceResponse; +import org.apache.iceberg.rest.responses.DropTableResponse; +import org.apache.iceberg.rest.responses.GetNamespaceResponse; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.util.Tasks; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +public class CatalogHandlers { + private static Schema EMPTY_SCHEMA = new Schema(); + + private CatalogHandlers() { + } + + static class UnprocessableEntityException extends RuntimeException { + public UnprocessableEntityException(Set commonKeys) { + super(String.format("Invalid namespace update, cannot set and remove keys: %s", commonKeys)); + } + } + + private static class ValidationFailureException extends RuntimeException { + CommitFailedException wrapped; + + public ValidationFailureException(CommitFailedException cause) { + super(cause); + this.wrapped = cause; + } + } + + public static ListNamespacesResponse listNamespaces(SupportsNamespaces catalog, Namespace parent) { + List results; + if (parent.isEmpty()) { + results = catalog.listNamespaces(); + } else { + results = catalog.listNamespaces(parent); + } + + return ListNamespacesResponse.builder().addAll(results).build(); + } + + public static CreateNamespaceResponse createNamespace(SupportsNamespaces catalog, CreateNamespaceRequest request) { + Namespace namespace = request.namespace(); + catalog.createNamespace(namespace, request.properties()); + return CreateNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties(catalog.loadNamespaceMetadata(namespace)) + .build(); + } + + public static GetNamespaceResponse loadNamespace(SupportsNamespaces catalog, Namespace namespace) { + Map properties = catalog.loadNamespaceMetadata(namespace); + return GetNamespaceResponse.builder() + .withNamespace(namespace) + .setProperties(properties) + .build(); + } + + public static DropNamespaceResponse dropNamespace(SupportsNamespaces catalog, Namespace namespace) { + boolean dropped = catalog.dropNamespace(namespace); + return DropNamespaceResponse.builder() + .dropped(dropped) + .build(); + } + + public static UpdateNamespacePropertiesResponse updateNamespaceProperties( + SupportsNamespaces catalog, Namespace namespace, UpdateNamespacePropertiesRequest request) { + Set removals = Sets.newHashSet(request.removals()); + Map updates = request.updates(); + + Set commonKeys = Sets.intersection(updates.keySet(), removals); + if (!commonKeys.isEmpty()) { + throw new UnprocessableEntityException(commonKeys); + } + + Map startProperties = catalog.loadNamespaceMetadata(namespace); + Set missing = Sets.difference(removals, startProperties.keySet()); + + if (!updates.isEmpty()) { + catalog.setProperties(namespace, updates); + } + + if (!removals.isEmpty()) { + // remove the original set just in case there was an update just after loading properties + catalog.removeProperties(namespace, removals); + } + + return UpdateNamespacePropertiesResponse.builder() + .addMissing(missing) + .addUpdated(updates.keySet()) + .addRemoved(Sets.difference(removals, missing)) + .build(); + } + + public static ListTablesResponse listTables(Catalog catalog, Namespace namespace) { + List idents = catalog.listTables(namespace); + return ListTablesResponse.builder().addAll(idents).build(); + } + + public static LoadTableResponse stageTableCreate(Catalog catalog, Namespace namespace, CreateTableRequest request) { + request.validate(); + + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + if (catalog.tableExists(ident)) { + throw new AlreadyExistsException("Table already exists: %s", ident); + } + + Map properties = Maps.newHashMap(); + properties.put("created-at", OffsetDateTime.now().toString()); + properties.putAll(request.properties()); + + TableMetadata metadata = TableMetadata.newTableMetadata( + request.schema(), + request.spec() != null ? request.spec() : PartitionSpec.unpartitioned(), + request.writeOrder() != null ? request.writeOrder() : SortOrder.unsorted(), + request.location(), + properties); + + return LoadTableResponse.builder() + .withTableMetadata(metadata) + .build(); + } + + public static LoadTableResponse createTable(Catalog catalog, Namespace namespace, CreateTableRequest request) { + request.validate(); + + TableIdentifier ident = TableIdentifier.of(namespace, request.name()); + Table table = catalog.buildTable(ident, request.schema()) + .withLocation(request.location()) + .withPartitionSpec(request.spec()) + .withSortOrder(request.writeOrder()) + .withProperties(request.properties()) + .create(); + + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public static DropTableResponse dropTable(Catalog catalog, TableIdentifier ident) { + boolean dropped = catalog.dropTable(ident); + return DropTableResponse.builder().dropped(dropped).build(); + } + + public static LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) { + Table table = catalog.loadTable(ident); + + if (table instanceof BaseTable) { + return LoadTableResponse.builder() + .withTableMetadata(((BaseTable) table).operations().current()) + .build(); + } + + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + public static LoadTableResponse updateTable(Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { + TableMetadata finalMetadata; + if (isCreate(request)) { + // this is a hacky way to get TableOperations for an uncommitted table + Transaction transaction = catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction(); + if (transaction instanceof BaseTransaction) { + BaseTransaction baseTransaction = (BaseTransaction) transaction; + finalMetadata = create(baseTransaction.underyingOps(), baseTransaction.startMetadata(), request); + } else { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTransaction"); + } + + } else { + Table table = catalog.loadTable(ident); + if (table instanceof BaseTable) { + TableOperations ops = ((BaseTable) table).operations(); + finalMetadata = commit(ops, request); + } else { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + } + + return LoadTableResponse.builder() + .withTableMetadata(finalMetadata) + .build(); + } + + private static boolean isCreate(UpdateTableRequest request) { + boolean isCreate = request.requirements().stream() + .anyMatch(UpdateTableRequest.UpdateRequirement.AssertTableDoesNotExist.class::isInstance); + + if (isCreate) { + List invalidRequirements = request.requirements().stream() + .filter(req -> !(req instanceof UpdateTableRequest.UpdateRequirement.AssertTableDoesNotExist)) + .collect(Collectors.toList()); + Preconditions.checkArgument(invalidRequirements.isEmpty(), + "Invalid create requirements: %s", invalidRequirements); + } + + return isCreate; + } + + private static TableMetadata create(TableOperations ops, TableMetadata start, UpdateTableRequest request) { + TableMetadata.Builder builder = TableMetadata.buildFrom(start); + + // the only valid requirement is that the table will be created + request.updates().forEach(update -> update.applyTo(builder)); + + // create transactions do not retry. if the table exists, retrying is not a solution + ops.commit(null, builder.build()); + + return ops.current(); + } + + private static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + AtomicBoolean isRetry = new AtomicBoolean(false); + try { + Tasks.foreach(ops) + .retry(COMMIT_NUM_RETRIES_DEFAULT) + .exponentialBackoff( + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run(taskOps -> { + TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); + isRetry.set(true); + + // validate requirements + try { + request.requirements().forEach(requirement -> requirement.validate(base)); + } catch (CommitFailedException e) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // apply changes + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + + TableMetadata updated = metadataBuilder.build(); + if (updated.changes().isEmpty()) { + // do not commit if the metadata has not changed + return; + } + + // commit + taskOps.commit(base, updated); + }); + + } catch (ValidationFailureException e) { + throw e.wrapped; + } + + return ops.current(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java index 340a4409a60a..d72ee460aae5 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java @@ -22,6 +22,7 @@ import java.util.function.Consumer; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -47,6 +48,21 @@ public static Consumer tableErrorHandler() { return baseTableErrorHandler().andThen(defaultErrorHandler()); } + public static Consumer tableCommitHandler() { + return baseCommitErrorHandler().andThen(defaultErrorHandler()); + } + + private static Consumer baseCommitErrorHandler() { + return error -> { + switch (error.code()) { + case 404: + throw new NoSuchTableException("%s", error.message()); + case 409: + throw new CommitFailedException("Commit failed: %s", error.message()); + } + }; + } + /** * Table level error handlers. * Should be chained wih the {@link #defaultErrorHandler}, which takes care of common cases. @@ -102,7 +118,7 @@ public static Consumer defaultErrorHandler() { case 501: throw new UnsupportedOperationException(error.message()); case 500: - throw new ServiceFailureException("Server error: %s", error.message()); + throw new ServiceFailureException("Server error: %s: %s", error.type(), error.message()); } throw new RESTException("Unable to process: %s", error.message()); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java new file mode 100644 index 000000000000..0f38aedd2885 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.Transactions; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.ResolvingFileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.responses.CreateNamespaceResponse; +import org.apache.iceberg.rest.responses.DropNamespaceResponse; +import org.apache.iceberg.rest.responses.DropTableResponse; +import org.apache.iceberg.rest.responses.GetNamespaceResponse; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; + +public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable { + private final RESTClient client; + private String catalogName = null; + private Map properties = null; + private Object conf = null; + private FileIO io = null; + + RESTCatalog(RESTClient client) { + this.client = client; + } + + @Override + public void initialize(String name, Map properties) { + this.catalogName = name; + this.properties = properties; + String ioImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + this.io = CatalogUtil.loadFileIO(ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), properties, conf); + } + + @Override + public void setConf(Configuration newConf) { + this.conf = newConf; + } + + @Override + public String name() { + return catalogName; + } + + @Override + public List listTables(Namespace namespace) { + String ns = RESTUtil.urlEncode(namespace); + ListTablesResponse response = client + .get("v1/namespaces/" + ns + "/tables", ListTablesResponse.class, ErrorHandlers.namespaceErrorHandler()); + return response.identifiers(); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + String tablePath = tablePath(identifier); + // TODO: support purge flag + DropTableResponse response = client.delete( + tablePath, DropTableResponse.class, ErrorHandlers.tableErrorHandler()); + return response.isDropped(); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + + } + + private LoadTableResponse loadInternal(TableIdentifier identifier) { + String tablePath = tablePath(identifier); + return client.get(tablePath, LoadTableResponse.class, ErrorHandlers.tableErrorHandler()); + } + + @Override + public Table loadTable(TableIdentifier identifier) { + LoadTableResponse response = loadInternal(identifier); + + // TODO: pass a customized client + FileIO tableIO = tableIO(response.config()); + return new BaseTable( + new RESTTableOperations(client, tablePath(identifier), tableIO, response.tableMetadata()), + fullTableName(identifier)); + } + + @Override + public void createNamespace(Namespace namespace, Map metadata) { + CreateNamespaceRequest request = CreateNamespaceRequest.builder() + .withNamespace(namespace) + .setProperties(metadata) + .build(); + + // for now, ignore the response because there is no way to return it + client.post("v1/namespaces", request, CreateNamespaceResponse.class, ErrorHandlers.namespaceErrorHandler()); + } + + @Override + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + Preconditions.checkArgument(namespace.isEmpty(), "Cannot list namespaces under parent: %s", namespace); + // String joined = NULL.join(namespace.levels()); + ListNamespacesResponse response = client + .get("v1/namespaces", ListNamespacesResponse.class, ErrorHandlers.namespaceErrorHandler()); + return response.namespaces(); + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + String ns = RESTUtil.urlEncode(namespace); + // TODO: rename to LoadNamespaceResponse? + GetNamespaceResponse response = client + .get("v1/namespaces/" + ns, GetNamespaceResponse.class, ErrorHandlers.namespaceErrorHandler()); + return response.properties(); + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + String ns = RESTUtil.urlEncode(namespace); + DropNamespaceResponse response = client + .delete("v1/namespaces/" + ns, DropNamespaceResponse.class, ErrorHandlers.namespaceErrorHandler()); + return response.isDropped(); + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) throws NoSuchNamespaceException { + String ns = RESTUtil.urlEncode(namespace); + UpdateNamespacePropertiesRequest request = UpdateNamespacePropertiesRequest.builder() + .updateAll(properties) + .build(); + + UpdateNamespacePropertiesResponse response = client.post( + "v1/namespaces/" + ns + "/properties", request, UpdateNamespacePropertiesResponse.class, + ErrorHandlers.namespaceErrorHandler()); + + return !response.updated().isEmpty(); + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) throws NoSuchNamespaceException { + String ns = RESTUtil.urlEncode(namespace); + UpdateNamespacePropertiesRequest request = UpdateNamespacePropertiesRequest.builder() + .removeAll(properties) + .build(); + + UpdateNamespacePropertiesResponse response = client.post( + "v1/namespaces/" + ns + "/properties", request, UpdateNamespacePropertiesResponse.class, + ErrorHandlers.namespaceErrorHandler()); + + return !response.removed().isEmpty(); + } + + @Override + public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return new Builder(identifier, schema); + } + + private class Builder implements TableBuilder { + private final TableIdentifier ident; + private final Schema schema; + private final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); + private PartitionSpec spec = null; + private SortOrder writeOrder = null; + private String location = null; + + private Builder(TableIdentifier ident, Schema schema) { + this.ident = ident; + this.schema = schema; + } + + @Override + public TableBuilder withPartitionSpec(PartitionSpec tableSpec) { + this.spec = tableSpec; + return this; + } + + @Override + public TableBuilder withSortOrder(SortOrder tableWriteOrder) { + this.writeOrder = tableWriteOrder; + return this; + } + + @Override + public TableBuilder withLocation(String location) { + this.location = location; + return this; + } + + @Override + public TableBuilder withProperties(Map properties) { + this.propertiesBuilder.putAll(properties); + return this; + } + + @Override + public TableBuilder withProperty(String key, String value) { + this.propertiesBuilder.put(key, value); + return this; + } + + @Override + public Table create() { + String ns = RESTUtil.urlEncode(ident.namespace()); + CreateTableRequest request = CreateTableRequest.builder() + .withName(ident.name()) + .withSchema(schema) + .withPartitionSpec(spec) + .withWriteOrder(writeOrder) + .withLocation(location) + .setProperties(propertiesBuilder.build()) + .build(); + + LoadTableResponse response = client.post( + "v1/namespaces/" + ns + "/tables", request, LoadTableResponse.class, ErrorHandlers.tableErrorHandler()); + + String tablePath = tablePath(ident); + FileIO tableIO = tableIO(response.config()); + + return new BaseTable( + new RESTTableOperations(client, tablePath, tableIO, response.tableMetadata()), + fullTableName(ident)); + } + + @Override + public Transaction createTransaction() { + LoadTableResponse response = stageCreate(); + String fullName = fullTableName(ident); + + String tablePath = tablePath(ident); + FileIO tableIO = tableIO(response.config()); + TableMetadata meta = response.tableMetadata(); + + return Transactions.createTableTransaction( + fullName, + new RESTTableOperations( + client, tablePath, tableIO, RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta), + meta); + } + + @Override + public Transaction replaceTransaction() { + LoadTableResponse response = loadInternal(ident); + String fullName = fullTableName(ident); + + String tablePath = tablePath(ident); + FileIO tableIO = tableIO(response.config()); + TableMetadata base = response.tableMetadata(); + + Map properties = propertiesBuilder.build(); + TableMetadata replacement = base.buildReplacement( + schema, + spec != null ? spec : PartitionSpec.unpartitioned(), + writeOrder != null ? writeOrder : SortOrder.unsorted(), + location != null ? location : base.location(), + properties); + + ImmutableList.Builder changes = ImmutableList.builder(); + + if (replacement.changes().stream().noneMatch(MetadataUpdate.SetCurrentSchema.class::isInstance)) { + // ensure there is a change to set the current schema + changes.add(new MetadataUpdate.SetCurrentSchema(replacement.currentSchemaId())); + } + + if (replacement.changes().stream().noneMatch(MetadataUpdate.SetDefaultPartitionSpec.class::isInstance)) { + // ensure there is a change to set the default spec + changes.add(new MetadataUpdate.SetDefaultPartitionSpec(replacement.defaultSpecId())); + } + + if (replacement.changes().stream().noneMatch(MetadataUpdate.SetDefaultSortOrder.class::isInstance)) { + // ensure there is a change to set the default sort order + changes.add(new MetadataUpdate.SetDefaultSortOrder(replacement.defaultSortOrderId())); + } + + return Transactions.replaceTableTransaction( + fullName, + new RESTTableOperations( + client, tablePath, tableIO, RESTTableOperations.UpdateType.REPLACE, changes.build(), base), + replacement); + } + + @Override + public Transaction createOrReplaceTransaction() { + // return a create or a replace transaction, depending on whether the table exists + // deciding whether to create or replace can't be determined on the service because schema field IDs are assigned + // at this point and then used in data and metadata files. because create and replace will assign different + // field IDs, they must be determined before any writes occur + try { + return replaceTransaction(); + } catch (NoSuchTableException e) { + return createTransaction(); + } + } + + private LoadTableResponse stageCreate() { + String ns = RESTUtil.urlEncode(ident.namespace()); + Map properties = propertiesBuilder.build(); + + CreateTableRequest request = CreateTableRequest.builder() + .withName(ident.name()) + .withSchema(schema) + .withPartitionSpec(spec) + .withWriteOrder(writeOrder) + .withLocation(location) + .setProperties(properties) + .build(); + + // TODO: will this be a specific route or a modified create? + return client.post( + "v1/namespaces/" + ns + "/stageCreate", request, LoadTableResponse.class, ErrorHandlers.tableErrorHandler()); + } + } + + private static List createChanges(TableMetadata meta) { + ImmutableList.Builder changes = ImmutableList.builder(); + + Schema schema = meta.schema(); + changes.add(new MetadataUpdate.AddSchema(schema, schema.highestFieldId())); + changes.add(new MetadataUpdate.SetCurrentSchema(-1)); + + PartitionSpec spec = meta.spec(); + if (spec != null && spec.isPartitioned()) { + changes.add(new MetadataUpdate.AddPartitionSpec(spec)); + changes.add(new MetadataUpdate.SetDefaultPartitionSpec(-1)); + } + + SortOrder order = meta.sortOrder(); + if (order != null && order.isSorted()) { + changes.add(new MetadataUpdate.AddSortOrder(order)); + changes.add(new MetadataUpdate.SetDefaultSortOrder(-1)); + } + + String location = meta.location(); + if (location != null) { + changes.add(new MetadataUpdate.SetLocation(location)); + } + + Map properties = meta.properties(); + if (properties != null && !properties.isEmpty()) { + changes.add(new MetadataUpdate.SetProperties(properties)); + } + + return changes.build(); + } + + private String fullTableName(TableIdentifier ident) { + return String.format("%s.%s", catalogName, ident); + } + + private static String tablePath(TableIdentifier ident) { + return "v1/namespaces/" + RESTUtil.urlEncode(ident.namespace()) + "/tables/" + ident.name(); + } + + private FileIO tableIO(Map conf) { + if (conf.isEmpty()) { + return io; // reuse the FileIO since config is the same + } else { + Map fullConf = Maps.newHashMap(properties); + properties.putAll(conf); + String ioImpl = fullConf.get(CatalogProperties.FILE_IO_IMPL); + return CatalogUtil.loadFileIO(ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), fullConf, this.conf); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogAdapter.java new file mode 100644 index 000000000000..6307e30a3d55 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -0,0 +1,358 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotAuthorizedException; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.util.Pair; + +/** + * Adaptor class to translate REST requests into {@link Catalog} API calls. + */ +public class RESTCatalogAdapter implements RESTClient { + private static final Splitter SLASH = Splitter.on('/'); + + private static final Map, Integer> EXCEPTION_ERROR_CODES = ImmutableMap + ., Integer>builder() + .put(IllegalArgumentException.class, 400) + .put(ValidationException.class, 400) + .put(NamespaceNotEmptyException.class, 400) // TODO: should this be more specific? + .put(NotAuthorizedException.class, 401) + .put(ForbiddenException.class, 403) + .put(NoSuchNamespaceException.class, 404) + .put(NoSuchTableException.class, 404) + .put(NoSuchIcebergTableException.class, 404) + .put(UnsupportedOperationException.class, 406) + .put(AlreadyExistsException.class, 409) + .put(CommitFailedException.class, 409) + .put(CatalogHandlers.UnprocessableEntityException.class, 422) + .put(CommitStateUnknownException.class, 500) + .build(); + + private final Catalog catalog; + private final SupportsNamespaces asNamespaceCatalog; + private final Consumer defaultErrorHandler; + + public RESTCatalogAdapter(Catalog catalog, Consumer defaultErrorHandler) { + this.catalog = catalog; + this.asNamespaceCatalog = catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null; + this.defaultErrorHandler = defaultErrorHandler; + } + + private enum HTTPMethod { + GET, + HEAD, + POST, + DELETE + } + + private enum Route { + CONFIG(HTTPMethod.GET, "v1/config"), + LIST_NAMESPACES(HTTPMethod.GET, "v1/namespaces"), + CREATE_NAMESPACE(HTTPMethod.POST, "v1/namespaces"), + LOAD_NAMESPACE(HTTPMethod.GET, "v1/namespaces/{namespace}"), + DROP_NAMESPACE(HTTPMethod.DELETE, "v1/namespaces/{namespace}"), + UPDATE_NAMESPACE(HTTPMethod.POST, "v1/namespaces/{namespace}/properties"), + LIST_TABLES(HTTPMethod.GET, "v1/namespaces/{namespace}/tables"), + CREATE_TABLE(HTTPMethod.POST, "v1/namespaces/{namespace}/tables"), + STAGE_CREATE_TABLE(HTTPMethod.POST, "v1/namespaces/{namespace}/stageCreate"), + LOAD_TABLE(HTTPMethod.GET, "v1/namespaces/{namespace}/tables/{table}"), + UPDATE_TABLE(HTTPMethod.POST, "v1/namespaces/{namespace}/tables/{table}"), + DROP_TABLE(HTTPMethod.DELETE, "v1/namespaces/{namespace}/tables/{table}"); + + private final HTTPMethod method; + private final int requriedLength; + private final Map requirements; + private final Map variables; + + Route(HTTPMethod method, String pattern) { + this.method = method; + + // parse the pattern into requirements and variables + List parts = SLASH.splitToList(pattern); + ImmutableMap.Builder requirementsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder variablesBuilder = ImmutableMap.builder(); + for (int pos = 0; pos < parts.size(); pos += 1) { + String part = parts.get(pos); + if (part.startsWith("{") && part.endsWith("}")) { + variablesBuilder.put(pos, part.substring(1, part.length() - 1)); + } else { + requirementsBuilder.put(pos, part); + } + } + + this.requriedLength = parts.size(); + this.requirements = requirementsBuilder.build(); + this.variables = variablesBuilder.build(); + } + + private boolean matches(HTTPMethod requestMethod, List requestPath) { + return method == requestMethod && requriedLength == requestPath.size() && + requirements.entrySet().stream().allMatch( + requirement -> requirement.getValue().equalsIgnoreCase(requestPath.get(requirement.getKey()))); + } + + private Map variables(List requestPath) { + ImmutableMap.Builder vars = ImmutableMap.builder(); + variables.forEach((key, value) -> vars.put(value, requestPath.get(key))); + return vars.build(); + } + + public static Pair> from(HTTPMethod method, String path) { + List parts = SLASH.splitToList(path); + for (Route candidate : Route.values()) { + if (candidate.matches(method, parts)) { + return Pair.of(candidate, candidate.variables(parts)); + } + } + + return null; + } + } + + public T handleRequest(Route route, Map vars, Object body, Class responseType) { + switch (route) { + case CONFIG: + // TODO: use the correct response object + return castResponse(responseType, ImmutableMap.of()); + + case LIST_NAMESPACES: + if (asNamespaceCatalog != null) { + // TODO: support parent namespace from query params + return castResponse(responseType, CatalogHandlers.listNamespaces(asNamespaceCatalog, Namespace.empty())); + } + break; + + case CREATE_NAMESPACE: + if (asNamespaceCatalog != null) { + CreateNamespaceRequest request = castRequest(CreateNamespaceRequest.class, body); + return castResponse(responseType, CatalogHandlers.createNamespace(asNamespaceCatalog, request)); + } + break; + + case LOAD_NAMESPACE: + if (asNamespaceCatalog != null) { + Namespace namespace = namespaceFromPathVars(vars); + return castResponse(responseType, CatalogHandlers.loadNamespace(asNamespaceCatalog, namespace)); + } + break; + + case DROP_NAMESPACE: + if (asNamespaceCatalog != null) { + Namespace namespace = namespaceFromPathVars(vars); + return castResponse(responseType, CatalogHandlers.dropNamespace(asNamespaceCatalog, namespace)); + } + break; + + case UPDATE_NAMESPACE: + if (asNamespaceCatalog != null) { + Namespace namespace = namespaceFromPathVars(vars); + UpdateNamespacePropertiesRequest request = castRequest(UpdateNamespacePropertiesRequest.class, body); + return castResponse(responseType, + CatalogHandlers.updateNamespaceProperties(asNamespaceCatalog, namespace, request)); + } + break; + + case LIST_TABLES: { + // TODO: how is the empty namespace passed? + Namespace namespace = namespaceFromPathVars(vars); + return castResponse(responseType, CatalogHandlers.listTables(catalog, namespace)); + } + + case CREATE_TABLE: { + Namespace namespace = namespaceFromPathVars(vars); + CreateTableRequest request = castRequest(CreateTableRequest.class, body); + return castResponse(responseType, CatalogHandlers.createTable(catalog, namespace, request)); + } + + case STAGE_CREATE_TABLE: { + Namespace namespace = namespaceFromPathVars(vars); + CreateTableRequest request = castRequest(CreateTableRequest.class, body); + return castResponse(responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request)); + } + + case DROP_TABLE: { + TableIdentifier ident = identFromPathVars(vars); + return castResponse(responseType, CatalogHandlers.dropTable(catalog, ident)); + } + + case LOAD_TABLE: { + TableIdentifier ident = identFromPathVars(vars); + return castResponse(responseType, CatalogHandlers.loadTable(catalog, ident)); + } + + case UPDATE_TABLE: { + TableIdentifier ident = identFromPathVars(vars); + UpdateTableRequest request = castRequest(UpdateTableRequest.class, body); + return castResponse(responseType, CatalogHandlers.updateTable(catalog, ident, request)); + } + + default: + } + + return null; // will be converted to a 400 + } + + public T execute(HTTPMethod method, String path, Object body, Class responseType, + Consumer errorHandler) { + ErrorResponse.Builder errorBuilder = ErrorResponse.builder(); + Pair> routeAndVars = Route.from(method, path); + if (routeAndVars != null) { + try { + T response = handleRequest(routeAndVars.first(), routeAndVars.second(), body, responseType); + if (response != null) { + return response; + } + + // TODO: should this be more specific and state that namespaces aren't supported? + // if a response was not returned, there was no handler for the route + errorBuilder + .responseCode(400) + .withType("BadRequestException") + .withMessage(String.format("No handler for request: %s %s", method, path)); + + } catch (RuntimeException e) { + configureResponseFromException(e, errorBuilder); + } + + } else { + errorBuilder + .responseCode(400) + .withType("BadRequestException") + .withMessage(String.format("No route for request: %s %s", method, path)); + } + + ErrorResponse error = errorBuilder.build(); + errorHandler.accept(error); + + // if the error handler doesn't throw an exception, throw a generic one + throw new RESTException("Unhandled error: %s", error); + } + + @Override + public T delete(String path, Class responseType, Consumer errorHandler) { + return execute(HTTPMethod.DELETE, path, null, responseType, errorHandler); + } + +// @Override +// public T delete(String path, Class responseType) { +// return execute(HTTPMethod.DELETE, path, null, responseType, defaultErrorHandler); +// } + + @Override + public T post(String path, Object body, Class responseType, Consumer errorHandler) { + return execute(HTTPMethod.POST, path, body, responseType, errorHandler); + } + +// @Override +// public T post(String path, Object body, Class responseType) { +// return execute(HTTPMethod.POST, path, body, responseType, defaultErrorHandler); +// } + + @Override + public T get(String path, Class responseType, Consumer errorHandler) { + return execute(HTTPMethod.GET, path, null, responseType, errorHandler); + } + +// @Override +// public T get(String path, Class responseType) { +// return execute(HTTPMethod.GET, path, null, responseType, defaultErrorHandler); +// } + + @Override + public T head(String path, Consumer errorHandler) { + return execute(HTTPMethod.HEAD, path, null, null, errorHandler); + } + +// @Override +// public T head(String path) { +// return execute(HTTPMethod.HEAD, path, null, null, defaultErrorHandler); +// } + + @Override + public void close() throws IOException { + if (catalog instanceof Closeable) { + ((Closeable) catalog).close(); + } + } + + private static class BadResponseType extends RuntimeException { + public BadResponseType(Class responseType, Object response) { + super(String.format("Invalid response object, not a %s: %s", responseType.getName(), response)); + } + } + + private static class BadRequestType extends RuntimeException { + public BadRequestType(Class requestType, Object request) { + super(String.format("Invalid request object, not a %s: %s", requestType.getName(), request)); + } + } + + public static T castRequest(Class requestType, Object request) { + if (requestType.isInstance(request)) { + return requestType.cast(request); + } + + throw new BadRequestType(requestType, request); + } + + public static T castResponse(Class responseType, Object response) { + if (responseType.isInstance(response)) { + return responseType.cast(response); + } + + throw new BadResponseType(responseType, response); + } + + public static void configureResponseFromException(Exception exc, ErrorResponse.Builder errorBuilder) { + errorBuilder + .responseCode(EXCEPTION_ERROR_CODES.getOrDefault(exc.getClass(), 500)) + .withType(exc.getClass().getSimpleName()) + .withMessage(exc.getMessage()); + } + + private static Namespace namespaceFromPathVars(Map pathVars) { + return RESTUtil.urlDecode(pathVars.get("namespace")); + } + + private static TableIdentifier identFromPathVars(Map pathVars) { + return TableIdentifier.of(namespaceFromPathVars(pathVars), pathVars.get("table")); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java new file mode 100644 index 000000000000..f4bb9c5a9537 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import java.util.Objects; +import org.apache.commons.compress.utils.Lists; +import org.apache.iceberg.LocationProviders; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +class RESTTableOperations implements TableOperations { + private static final String METADATA_FOLDER_NAME = "metadata"; + private static final ObjectMapper MAPPER = new ObjectMapper(new JsonFactory()); + static { + RESTSerializers.registerAll(MAPPER); + } + + enum UpdateType { + CREATE, + REPLACE, + SIMPLE + } + + private final RESTClient client; + private final String path; + private final FileIO io; + private final List createChanges; + private final TableMetadata replaceBase; + private UpdateType updateType; + private TableMetadata current; + + RESTTableOperations(RESTClient client, String path, FileIO io, TableMetadata current) { + this(client, path, io, UpdateType.SIMPLE, Lists.newArrayList(), current); + } + + RESTTableOperations(RESTClient client, String path, FileIO io, UpdateType updateType, + List createChanges, TableMetadata current) { + this.client = client; + this.path = path; + this.io = io; + this.updateType = updateType; + this.createChanges = createChanges; + this.replaceBase = current; + this.current = current; + } + + @Override + public TableMetadata current() { + // TODO: may be able to get rid of this by updating refresh() and passing null for create metadata + return updateType == UpdateType.CREATE ? null : current; + } + + @Override + public TableMetadata refresh() { + // TODO: what if config changes? + LoadTableResponse response = client.get(path, LoadTableResponse.class, ErrorHandlers.tableErrorHandler()); + if (!Objects.equals(current.metadataFileLocation(), response.metadataLocation())) { + this.current = response.tableMetadata(); + } + + return current; + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + UpdateTableRequest.Builder requestBuilder; + List baseChanges; + switch (updateType) { + case CREATE: + Preconditions.checkState(base == null, "Invalid base metadata for create transaction, expected null: %s", base); + requestBuilder = UpdateTableRequest.builderForCreate(); + baseChanges = createChanges; + break; + + case REPLACE: + Preconditions.checkState(base != null, "Invalid base metadata: null"); + // use the original replace base metadata because the transaction will refresh + requestBuilder = UpdateTableRequest.builderForReplace(replaceBase); + baseChanges = createChanges; + break; + + case SIMPLE: + Preconditions.checkState(base != null, "Invalid base metadata: null"); + requestBuilder = UpdateTableRequest.builderFor(base); + baseChanges = ImmutableList.of(); + break; + + default: + throw new UnsupportedOperationException(String.format("Update type %s is not supported", updateType)); + } + + baseChanges.forEach(requestBuilder::update); + metadata.changes().forEach(requestBuilder::update); + UpdateTableRequest request = requestBuilder.build(); + + // the error handler will throw necessary exceptions like CommitFailedException and UnknownCommitStateException + // TODO: ensure that the HTTP client lib passes HTTP client errors to the error handler + LoadTableResponse response = client.post( + path, request, LoadTableResponse.class, ErrorHandlers.tableCommitHandler()); + + // all future commits should be simple commits + this.updateType = UpdateType.SIMPLE; + + this.current = response.tableMetadata(); + } + + @Override + public FileIO io() { + return io; + } + + private static String metadataFileLocation(TableMetadata metadata, String filename) { + String metadataLocation = metadata.properties() + .get(TableProperties.WRITE_METADATA_LOCATION); + + if (metadataLocation != null) { + return String.format("%s/%s", metadataLocation, filename); + } else { + return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename); + } + } + + @Override + public String metadataFileLocation(String filename) { + return metadataFileLocation(current(), filename); + } + + @Override + public LocationProvider locationProvider() { + return LocationProviders.locationsFor(current().location(), current().properties()); + } + + @Override + public TableOperations temp(TableMetadata uncommittedMetadata) { + return new TableOperations() { + @Override + public TableMetadata current() { + return uncommittedMetadata; + } + + @Override + public TableMetadata refresh() { + throw new UnsupportedOperationException("Cannot call refresh on temporary table operations"); + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + throw new UnsupportedOperationException("Cannot call commit on temporary table operations"); + } + + @Override + public String metadataFileLocation(String fileName) { + return RESTTableOperations.metadataFileLocation(uncommittedMetadata, fileName); + } + + @Override + public LocationProvider locationProvider() { + return LocationProviders.locationsFor(uncommittedMetadata.location(), uncommittedMetadata.properties()); + } + + @Override + public FileIO io() { + return RESTTableOperations.this.io(); + } + + @Override + public EncryptionManager encryption() { + return RESTTableOperations.this.encryption(); + } + + @Override + public long newSnapshotId() { + return RESTTableOperations.this.newSnapshotId(); + } + }; + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/CreateTableRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/CreateTableRequest.java new file mode 100644 index 000000000000..aea9b999b5ad --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/CreateTableRequest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.requests; + +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * A REST request to create a namespace, with an optional set of properties. + */ +public class CreateTableRequest { + + private String name; + private String location; + private Schema schema; + private PartitionSpec spec; + private SortOrder order; + private Map properties; + + public CreateTableRequest() { + // Needed for Jackson Deserialization. + } + + private CreateTableRequest(String name, String location, Schema schema, PartitionSpec spec, SortOrder order, + Map properties) { + this.name = name; + this.location = location; + this.schema = schema; + this.spec = spec; + this.order = order; + this.properties = properties; + validate(); + } + + public CreateTableRequest validate() { + Preconditions.checkArgument(name != null, "Invalid table name: null"); + Preconditions.checkArgument(schema != null, "Invalid schema: null"); + return this; + } + + public String name() { + return name; + } + + public String location() { + return location; + } + + public Schema schema() { + return schema; + } + + public PartitionSpec spec() { + return spec; + } + + public SortOrder writeOrder() { + return order; + } + + public Map properties() { + return properties != null ? properties : ImmutableMap.of(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("location", location) + .add("properties", properties) + .add("schema", schema) + .add("spec", spec) + .add("order", order) + .toString(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String name; + private String location; + private Schema schema; + private PartitionSpec spec; + private SortOrder order; + private final ImmutableMap.Builder properties = ImmutableMap.builder(); + + private Builder() { + } + + public Builder withName(String tableName) { + Preconditions.checkNotNull(tableName, "Invalid name: null"); + this.name = tableName; + return this; + } + + public Builder withLocation(String location) { + this.location = location; + return this; + } + + public Builder setProperty(String name, String value) { + Preconditions.checkArgument(name != null, "Invalid property: null"); + Preconditions.checkArgument(value != null, "Invalid value for property %s: null", name); + properties.put(name, value); + return this; + } + + public Builder setProperties(Map props) { + Preconditions.checkNotNull(props, "Invalid collection of properties: null"); + Preconditions.checkArgument(!props.containsKey(null), "Invalid property: null"); + Preconditions.checkArgument(!props.containsValue(null), + "Invalid value for properties %s: null", Maps.filterValues(props, Objects::isNull).keySet()); + properties.putAll(props); + return this; + } + + public Builder withSchema(Schema tableSchema) { + Preconditions.checkNotNull(tableSchema, "Invalid schema: null"); + this.schema = tableSchema; + return this; + } + + public Builder withPartitionSpec(PartitionSpec tableSpec) { + this.spec = tableSpec; + return this; + } + + public Builder withWriteOrder(SortOrder writeOrder) { + this.order = writeOrder; + return this; + } + + public CreateTableRequest build() { + return new CreateTableRequest(name, location, schema, spec, order, properties.build()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java new file mode 100644 index 000000000000..b7f955870659 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.requests; + +import java.util.List; +import java.util.Set; +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.compress.utils.Sets; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; + +public class UpdateTableRequest { + + private List requirements; + private List updates; + + public UpdateTableRequest() { + // needed for Jackson deserialization + } + + public UpdateTableRequest(List requirements, List updates) { + this.requirements = requirements; + this.updates = updates; + } + + public List requirements() { + return requirements != null ? requirements : ImmutableList.of(); + } + + public List updates() { + return updates != null ? updates : ImmutableList.of(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("requirements", requirements) + .add("updates", updates) + .toString(); + } + + public static Builder builderForCreate() { + return new Builder(null, false).requireCreate(); + } + + public static Builder builderForReplace(TableMetadata base) { + Preconditions.checkNotNull(base, "Cannot create a builder from table metadata: null"); + return new Builder(base, true).requireTableUUID(base.uuid()); + } + + public static Builder builderFor(TableMetadata base) { + Preconditions.checkNotNull(base, "Cannot create a builder from table metadata: null"); + return new Builder(base, false).requireTableUUID(base.uuid()); + } + + public static class Builder { + private final TableMetadata base; + private final ImmutableList.Builder requirements = ImmutableList.builder(); + private final List updates = Lists.newArrayList(); + private final Set changedRefs = Sets.newHashSet(); + private final boolean isReplace; + private boolean addedSchema = false; + private boolean setSchemaId = false; + private boolean addedSpec = false; + private boolean setSpecId = false; + private boolean setOrderId = false; + + public Builder(TableMetadata base, boolean isReplace) { + this.base = base; + this.isReplace = isReplace; + } + + private Builder require(UpdateRequirement requirement) { + Preconditions.checkArgument(requirement != null, "Invalid requirement: null"); + requirements.add(requirement); + return this; + } + + private Builder requireCreate() { + return require(new UpdateRequirement.AssertTableDoesNotExist()); + } + + private Builder requireTableUUID(String uuid) { + Preconditions.checkArgument(uuid != null, "Invalid required UUID: null"); + return require(new UpdateRequirement.AssertTableUUID(uuid)); + } + + private Builder requireRefSnapshotId(String ref, Long snapshotId) { + return require(new UpdateRequirement.AssertRefSnapshotID(ref, snapshotId)); + } + + private Builder requireLastAssignedFieldId(int fieldId) { + return require(new UpdateRequirement.AssertLastAssignedFieldId(fieldId)); + } + + private Builder requireCurrentSchemaId(int schemaId) { + return require(new UpdateRequirement.AssertCurrentSchemaID(schemaId)); + } + + private Builder requireLastAssignedPartitionId(int partitionId) { + return require(new UpdateRequirement.AssertLastAssignedPartitionId(partitionId)); + } + + private Builder requireDefaultSpecId(int specId) { + return require(new UpdateRequirement.AssertDefaultSpecID(specId)); + } + + private Builder requireDefaultSortOrderId(int orderId) { + return require(new UpdateRequirement.AssertDefaultSortOrderID(orderId)); + } + + public Builder update(MetadataUpdate update) { + Preconditions.checkArgument(update != null, "Invalid update: null"); + updates.add(update); + + // add requirements based on the change + if (update instanceof MetadataUpdate.SetSnapshotRef) { + // require that the ref is unchanged from the base + MetadataUpdate.SetSnapshotRef setRef = (MetadataUpdate.SetSnapshotRef) update; + String name = setRef.name(); + // add returns true the first time the ref name is added + boolean added = changedRefs.add(name); + if (added && base != null && !isReplace) { + SnapshotRef baseRef = base.ref(name); + // require that the ref does not exist (null) or is the same as the base snapshot + requireRefSnapshotId(name, baseRef != null ? baseRef.snapshotId() : null); + } + + } else if (update instanceof MetadataUpdate.AddSchema) { + if (!addedSchema) { + if (base != null) { + requireLastAssignedFieldId(base.lastColumnId()); + } + this.addedSchema = true; + } + + } else if (update instanceof MetadataUpdate.SetCurrentSchema) { + if (!setSchemaId) { + if (base != null && !isReplace) { + // require that the current schema has not changed + requireCurrentSchemaId(base.currentSchemaId()); + } + this.setSchemaId = true; + } + + } else if (update instanceof MetadataUpdate.AddPartitionSpec) { + if (!addedSpec) { + if (base != null) { + requireLastAssignedPartitionId(base.lastAssignedPartitionId()); + } + this.addedSpec = true; + } + + } else if (update instanceof MetadataUpdate.SetDefaultPartitionSpec) { + if (!setSpecId) { + if (base != null && !isReplace) { + // require that the default spec has not changed + requireDefaultSpecId(base.defaultSpecId()); + } + this.setSpecId = true; + } + + } else if (update instanceof MetadataUpdate.SetDefaultSortOrder) { + if (!setOrderId) { + if (base != null && !isReplace) { + // require that the default write order has not changed + requireDefaultSortOrderId(base.defaultSortOrderId()); + } + this.setOrderId = true; + } + } + + return this; + } + + public UpdateTableRequest build() { + return new UpdateTableRequest(requirements.build(), ImmutableList.copyOf(updates)); + } + } + + public interface UpdateRequirement { + void validate(TableMetadata base); + + class AssertTableDoesNotExist implements UpdateRequirement { + private AssertTableDoesNotExist() { + } + + @Override + public void validate(TableMetadata base) { + if (base != null) { + throw new CommitFailedException("Requirement failed: table already exists"); + } + } + } + + class AssertTableUUID implements UpdateRequirement { + private final String uuid; + + private AssertTableUUID(String uuid) { + this.uuid = uuid; + } + + public String uuid() { + return uuid; + } + + @Override + public void validate(TableMetadata base) { + if (!uuid.equalsIgnoreCase(base.uuid())) { + throw new CommitFailedException( + "Requirement failed: UUID does not match: expected %s != %s", base.uuid(), uuid); + } + } + } + + class AssertRefSnapshotID implements UpdateRequirement { + private final String name; + private final Long snapshotId; + + private AssertRefSnapshotID(String name, Long snapshotId) { + this.name = name; + this.snapshotId = snapshotId; + } + + public String refName() { + return name; + } + + public Long snapshotId() { + return snapshotId; + } + + @Override + public void validate(TableMetadata base) { + SnapshotRef ref = base.ref(name); + if (ref != null) { + String type = ref.isBranch() ? "branch" : "tag"; + if (snapshotId == null) { + // a null snapshot ID means the ref should not exist already + throw new CommitFailedException( + "Requirement failed: %s %s was created concurrently", type, name); + } else if (snapshotId != ref.snapshotId()) { + throw new CommitFailedException( + "Requirement failed: %s %s has changed: expected id %s != %s", + type, name, snapshotId, ref.snapshotId()); + } + } else if (snapshotId != null) { + throw new CommitFailedException( + "Requirement failed: branch or tag %s is missing, expected %s", name, snapshotId); + } + } + } + + class AssertLastAssignedFieldId implements UpdateRequirement { + private final int lastAssignedFieldId; + + public AssertLastAssignedFieldId(int lastAssignedFieldId) { + this.lastAssignedFieldId = lastAssignedFieldId; + } + + public int lastAssignedFieldId() { + return lastAssignedFieldId; + } + + @Override + public void validate(TableMetadata base) { + if (base != null && base.lastColumnId() != lastAssignedFieldId) { + throw new CommitFailedException( + "Requirement failed: last assigned field id changed: expected id %s != %s", + lastAssignedFieldId, base.lastColumnId()); + } + } + } + + class AssertCurrentSchemaID implements UpdateRequirement { + private final int schemaId; + + private AssertCurrentSchemaID(int schemaId) { + this.schemaId = schemaId; + } + + public int schemaId() { + return schemaId; + } + + @Override + public void validate(TableMetadata base) { + if (schemaId != base.currentSchemaId()) { + throw new CommitFailedException( + "Requirement failed: current schema changed: expected id %s != %s", + schemaId, base.currentSchemaId()); + } + } + } + + class AssertLastAssignedPartitionId implements UpdateRequirement { + private final int lastAssignedPartitionId; + + public AssertLastAssignedPartitionId(int lastAssignedPartitionId) { + this.lastAssignedPartitionId = lastAssignedPartitionId; + } + + public int lastAssignedPartitionId() { + return lastAssignedPartitionId; + } + + @Override + public void validate(TableMetadata base) { + if (base != null && base.lastAssignedPartitionId() != lastAssignedPartitionId) { + throw new CommitFailedException( + "Requirement failed: last assigned partition id changed: expected id %s != %s", + lastAssignedPartitionId, base.lastAssignedPartitionId()); + } + } + } + + class AssertDefaultSpecID implements UpdateRequirement { + private final int specId; + + private AssertDefaultSpecID(int specId) { + this.specId = specId; + } + + public int specId() { + return specId; + } + + @Override + public void validate(TableMetadata base) { + if (specId != base.defaultSpecId()) { + throw new CommitFailedException( + "Requirement failed: default partition spec changed: expected id %s != %s", + specId, base.defaultSpecId()); + } + } + } + + class AssertDefaultSortOrderID implements UpdateRequirement { + private final int sortOrderId; + + private AssertDefaultSortOrderID(int sortOrderId) { + this.sortOrderId = sortOrderId; + } + + public int sortOrderId() { + return sortOrderId; + } + + @Override + public void validate(TableMetadata base) { + if (sortOrderId != base.defaultSortOrderId()) { + throw new CommitFailedException( + "Requirement failed: default sort order changed: expected id %s != %s", + sortOrderId, base.defaultSortOrderId()); + } + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/DropTableResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/DropTableResponse.java new file mode 100644 index 000000000000..42393ea67f44 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/DropTableResponse.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.responses; + +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Represents a REST response to drop a table. + */ +public class DropTableResponse { + + // For Jackson to properly fail when deserializing, this needs to be boxed. + // Otherwise, the boolean is parsed according to "loose" javascript JSON rules. + private Boolean dropped; + + public DropTableResponse() { + // Required for Jackson deserialization + } + + private DropTableResponse(boolean dropped) { + this.dropped = dropped; + validate(); + } + + DropTableResponse validate() { + Preconditions.checkArgument(dropped != null, "Invalid response, missing field: dropped"); + return this; + } + + public boolean isDropped() { + return dropped; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("dropped", dropped) + .toString(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Boolean dropped; + + private Builder() { + } + + public Builder dropped(boolean isDropped) { + this.dropped = isDropped; + return this; + } + + public DropTableResponse build() { + Preconditions.checkArgument(dropped != null, "Invalid response, missing field: dropped"); + return new DropTableResponse(dropped); + } + } +} + diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/LoadTableResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/LoadTableResponse.java new file mode 100644 index 000000000000..ca53123ce012 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/LoadTableResponse.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.responses; + +import java.util.Map; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * + */ +public class LoadTableResponse { + + private String metadataLocation; + private TableMetadata meta; + private Map config; + + public LoadTableResponse() { + // Required for Jackson deserialization + } + + private LoadTableResponse(String metadataLocation, TableMetadata meta, Map config) { + this.metadataLocation = metadataLocation; + this.meta = meta; + this.config = config; + } + + public String metadataLocation() { + return metadataLocation; + } + + public TableMetadata tableMetadata() { + return TableMetadata.buildFrom(meta).withMetadataLocation(metadataLocation).build(); + } + + public Map config() { + return config != null ? config : ImmutableMap.of(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("metadata", meta) + .add("config", config) + .toString(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String metadataLocation; + private TableMetadata meta; + private Map config = Maps.newHashMap(); + + private Builder() { + } + + public Builder withTableMetadata(TableMetadata metadata) { + this.metadataLocation = metadata.metadataFileLocation(); + this.meta = metadata; + return this; + } + + public Builder addConfig(String property, String value) { + config.put(property, value); + return this; + } + + public Builder addAllConfig(Map properties) { + config.putAll(properties); + return this; + } + + public LoadTableResponse build() { + Preconditions.checkNotNull(meta, "Invalid metadata: null"); + return new LoadTableResponse(metadataLocation, meta, config); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index fe941cae5247..6cb2b0961676 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -19,22 +19,42 @@ package org.apache.iceberg.catalog; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplaceSortOrder; import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.assertj.core.util.Streams; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -43,32 +63,95 @@ public abstract class CatalogTests { private static final Namespace NS = Namespace.of("newdb"); + private static final TableIdentifier TABLE = TableIdentifier.of(NS, "table"); + // Schema passed to create tables - static final Schema SCHEMA = new Schema( + private static final Schema SCHEMA = new Schema( required(3, "id", Types.IntegerType.get(), "unique ID"), required(4, "data", Types.StringType.get()) ); // This is the actual schema for the table, with column IDs reassigned - static final Schema TABLE_SCHEMA = new Schema( + private static final Schema TABLE_SCHEMA = new Schema( required(1, "id", Types.IntegerType.get(), "unique ID"), required(2, "data", Types.StringType.get()) ); + // This is the actual schema for the table, with column IDs reassigned + private static final Schema REPLACE_SCHEMA = new Schema( + required(2, "id", Types.IntegerType.get(), "unique ID"), + required(3, "data", Types.StringType.get()) + ); + + // another schema that is not the same + private static final Schema OTHER_SCHEMA = new Schema( + required(1, "some_id", Types.IntegerType.get()) + ); + + // Partition spec used to create tables + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .bucket("id", 16) + .build(); + + private static final PartitionSpec TABLE_SPEC = PartitionSpec.builderFor(TABLE_SCHEMA) + .bucket("id", 16) + .build(); + + private static final PartitionSpec REPLACE_SPEC = PartitionSpec.builderFor(REPLACE_SCHEMA) + .bucket("id", 16) + .withSpecId(1) + .build(); + // Partition spec used to create tables - static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) - .bucket("data", 16) + static final SortOrder WRITE_ORDER = SortOrder.builderFor(SCHEMA) + .asc(Expressions.bucket("id", 16)) + .asc("id") + .build(); + + static final SortOrder TABLE_WRITE_ORDER = SortOrder.builderFor(TABLE_SCHEMA) + .asc(Expressions.bucket("id", 16)) + .asc("id") + .build(); + + static final SortOrder REPLACE_WRITE_ORDER = SortOrder.builderFor(REPLACE_SCHEMA) + .asc(Expressions.bucket("id", 16)) + .asc("id") .build(); static final DataFile FILE_A = DataFiles.builder(SPEC) .withPath("/path/to/data-a.parquet") - .withFileSizeInBytes(0) - .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=0") // easy way to set partition data for now + .withRecordCount(2) // needs at least one record or else metrics will filter it out + .build(); + + static final DataFile FILE_B = DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=1") // easy way to set partition data for now + .withRecordCount(2) // needs at least one record or else metrics will filter it out + .build(); + + static final DataFile FILE_C = DataFiles.builder(SPEC) + .withPath("/path/to/data-c.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=2") // easy way to set partition data for now .withRecordCount(2) // needs at least one record or else metrics will filter it out .build(); protected abstract C catalog(); - protected abstract boolean supportsNamespaceProperties(); + + protected boolean supportsNamespaceProperties() { + return true; + } + + protected boolean requiresNamespaceCreate() { + return false; + } + + protected boolean supportsServerSideRetry() { + return false; + } @Test public void testCreateNamespace() { @@ -319,6 +402,1345 @@ public void testNamespaceWithDot() { Assert.assertFalse("Namespace should not exist", catalog.namespaceExists(withDot)); } + @Test + public void testBasicCreateTable() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + + Table table = catalog.buildTable(ident, SCHEMA).create(); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + + // validate table settings + Assert.assertEquals("Table name should report its full name", catalog.name() + "." + ident, table.name()); + Assert.assertEquals("Schema should match expected ID assignment", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertNotNull("Should have a location", table.location()); + Assert.assertTrue("Should be unpartitioned", table.spec().isUnpartitioned()); + Assert.assertTrue("Should be unsorted", table.sortOrder().isUnsorted()); + Assert.assertNotNull("Should have table properties", table.properties()); + } + + @Test + public void testBasicCreateTableThatAlreadyExists() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + + catalog.buildTable(ident, SCHEMA).create(); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + + AssertHelpers.assertThrows("Should fail to create a table that already exists", + AlreadyExistsException.class, "ns.table", + () -> catalog.buildTable(ident, OTHER_SCHEMA).create()); + + Table table = catalog.loadTable(ident); + Assert.assertEquals("Schema should match original table schema", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + } + + @Test + public void testCompleteCreateTable() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Table table = catalog.buildTable(ident, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .create(); + + // validate table settings + Assert.assertEquals("Table name should report its full name", catalog.name() + "." + ident, table.name()); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + Assert.assertEquals("Schema should match expected ID assignment", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertNotNull("Should have a location", table.location()); + Assert.assertEquals("Should use requested partition spec", TABLE_SPEC, table.spec()); + Assert.assertEquals("Should use requested write order", TABLE_WRITE_ORDER, table.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), table.properties().entrySet())); + } + + @Test + public void testLoadTable() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + catalog.buildTable(ident, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .create(); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + + Table table = catalog.loadTable(ident); + // validate table settings + Assert.assertEquals("Table name should report its full name", catalog.name() + "." + ident, table.name()); + Assert.assertTrue("Table should exist", catalog.tableExists(ident)); + Assert.assertEquals("Schema should match expected ID assignment", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertNotNull("Should have a location", table.location()); + Assert.assertEquals("Should use requested partition spec", TABLE_SPEC, table.spec()); + Assert.assertEquals("Should use requested write order", TABLE_WRITE_ORDER, table.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), table.properties().entrySet())); + } + + @Test + public void testLoadMissingTable() { + C catalog = catalog(); + + TableIdentifier ident = TableIdentifier.of("ns", "table"); + + Assert.assertFalse("Table should not exist", catalog.tableExists(ident)); + AssertHelpers.assertThrows("Should fail to load a nonexistent table", + NoSuchTableException.class, ident.toString(), + () -> catalog.loadTable(ident)); + } + + @Test + public void testDropTable() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Assert.assertFalse("Table should not exist before create", catalog.tableExists(TABLE)); + + catalog.buildTable(TABLE, SCHEMA).create(); + Assert.assertTrue("Table should exist after create", catalog.tableExists(TABLE)); + + boolean dropped = catalog.dropTable(TABLE); + Assert.assertTrue("Should drop a table that does exist", dropped); + Assert.assertFalse("Table should not exist after drop", catalog.tableExists(TABLE)); + } + + @Test + public void testDropMissingTable() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + TableIdentifier noSuchTableIdent = TableIdentifier.of(NS, "notable"); + Assert.assertFalse("Table should not exist", catalog.tableExists(noSuchTableIdent)); + Assert.assertFalse("Should not drop a table that does not exist", catalog.dropTable(noSuchTableIdent)); + } + + @Test + public void testListTables() { + C catalog = catalog(); + + Namespace ns1 = Namespace.of("ns_1"); + Namespace ns2 = Namespace.of("ns_2"); + + TableIdentifier ns1Table1 = TableIdentifier.of(ns1, "table_1"); + TableIdentifier ns1Table2 = TableIdentifier.of(ns1, "table_2"); + TableIdentifier ns2Table1 = TableIdentifier.of(ns2, "table_1"); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(ns1); + catalog.createNamespace(ns2); + } + + assertEmpty("Should not have tables in a new namespace, ns_1", catalog, ns1); + assertEmpty("Should not have tables in a new namespace, ns_2", catalog, ns2); + + catalog.buildTable(ns1Table1, SCHEMA).create(); + + Assert.assertEquals("Should contain ns_1.table_1 after create", + ImmutableSet.of(ns1Table1), Sets.newHashSet(catalog.listTables(ns1))); + + catalog.buildTable(ns2Table1, SCHEMA).create(); + + Assert.assertEquals("Should contain ns_2.table_1 after create", + ImmutableSet.of(ns2Table1), Sets.newHashSet(catalog.listTables(ns2))); + Assert.assertEquals("Should not show changes to ns_2 in ns_1", + ImmutableSet.of(ns1Table1), Sets.newHashSet(catalog.listTables(ns1))); + + catalog.buildTable(ns1Table2, SCHEMA).create(); + + Assert.assertEquals("Should not show changes to ns_1 in ns_2", + ImmutableSet.of(ns2Table1), Sets.newHashSet(catalog.listTables(ns2))); + Assert.assertEquals("Should contain ns_1.table_2 after create", + ImmutableSet.of(ns1Table1, ns1Table2), Sets.newHashSet(catalog.listTables(ns1))); + + catalog.dropTable(ns1Table1); + + Assert.assertEquals("Should not show changes to ns_1 in ns_2", + ImmutableSet.of(ns2Table1), Sets.newHashSet(catalog.listTables(ns2))); + Assert.assertEquals("Should not contain ns_1.table_1 after drop", + ImmutableSet.of(ns1Table2), Sets.newHashSet(catalog.listTables(ns1))); + + catalog.dropTable(ns1Table2); + + Assert.assertEquals("Should not show changes to ns_1 in ns_2", + ImmutableSet.of(ns2Table1), Sets.newHashSet(catalog.listTables(ns2))); + assertEmpty("Should not contain ns_1.table_2 after drop", catalog, ns1); + + catalog.dropTable(ns2Table1); + assertEmpty("Should not contain ns_2.table_1 after drop", catalog, ns2); + } + + @Test + public void testUpdateTableSchema() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + + Schema expected = update.apply(); + + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Loaded table should have expected schema", expected.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUUIDValidation() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + + Assert.assertTrue("Should successfully drop table", catalog.dropTable(TABLE)); + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + String expectedMessage = supportsServerSideRetry() ? "Requirement failed: UUID does not match" : "Cannot commit"; + AssertHelpers.assertThrows("Should reject changes to tables that have been dropped and recreated", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + Assert.assertEquals("Loaded table should have expected schema", + OTHER_SCHEMA.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUpdateTableSchemaServerSideRetry() { + Assume.assumeTrue("Schema update recovery is only supported with server-side retry", supportsServerSideRetry()); + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + Schema expected = update.apply(); + + // update the spec concurrently so that the first update fails, but can succeed on retry + catalog.loadTable(TABLE).updateSpec() + .addField("shard", Expressions.bucket("id", 16)) + .commit(); + + // commit the original update + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + Assert.assertEquals("Loaded table should have expected schema", expected.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUpdateTableSchemaConflict() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + + // update the schema concurrently so that the original update fails + UpdateSchema concurrent = catalog.loadTable(TABLE).updateSchema() + .deleteColumn("data"); + Schema expected = concurrent.apply(); + concurrent.commit(); + + // attempt to commit the original update + String expectedMessage = supportsServerSideRetry() ? + "Requirement failed: current schema changed" : "Cannot commit"; + AssertHelpers.assertThrows("Second schema update commit should fail because of a conflict", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + Assert.assertEquals("Loaded table should have expected schema", expected.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUpdateTableSchemaAssignmentConflict() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdateSchema update = table.updateSchema() + .addColumn("new_col", Types.LongType.get()); + + // update the schema concurrently so that the original update fails + UpdateSchema concurrent = catalog.loadTable(TABLE).updateSchema() + .addColumn("another_col", Types.StringType.get()); + Schema expected = concurrent.apply(); + concurrent.commit(); + + // attempt to commit the original update + String expectedMessage = supportsServerSideRetry() ? + "Requirement failed: last assigned field id changed" : "Cannot commit"; + AssertHelpers.assertThrows("Second schema update commit should fail because of a conflict", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + Assert.assertEquals("Loaded table should have expected schema", expected.asStruct(), loaded.schema().asStruct()); + } + + @Test + public void testUpdateTableSpec() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + UpdatePartitionSpec update = table.updateSpec() + .addField("shard", Expressions.bucket("id", 16)); + + PartitionSpec expected = update.apply(); + + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + // the spec ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected spec", expected.fields(), loaded.spec().fields()); + } + + @Test + public void testUpdateTableSpecServerSideRetry() { + Assume.assumeTrue("Spec update recovery is only supported with server-side retry", supportsServerSideRetry()); + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdatePartitionSpec update = table.updateSpec() + .addField("shard", Expressions.bucket("id", 16)); + PartitionSpec expected = update.apply(); + + // update the schema concurrently so that the first update fails, but can succeed on retry + catalog.loadTable(TABLE).updateSchema() + .addColumn("another_col", Types.StringType.get()) + .commit(); + + // commit the original update + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + // the spec ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected spec", expected.fields(), loaded.spec().fields()); + } + + @Test + public void testUpdateTableSpecConflict() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + + UpdatePartitionSpec update = table.updateSpec() + .addField("shard", Expressions.bucket("data", 16)); + + // update the spec concurrently so that the original update fails + UpdatePartitionSpec concurrent = catalog.loadTable(TABLE).updateSpec() + .removeField(Expressions.bucket("id", 16)); + PartitionSpec expected = concurrent.apply(); + concurrent.commit(); + + // attempt to commit the original update + String expectedMessage = supportsServerSideRetry() ? + "Requirement failed: default partition spec changed" : "Cannot commit"; + AssertHelpers.assertThrows("Second partition spec update commit should fail because of a conflict", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + + // the spec ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected spec", expected.fields(), loaded.spec().fields()); + } + + @Test + public void testUpdateTableAssignmentSpecConflict() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + UpdatePartitionSpec update = table.updateSpec() + .addField("shard", Expressions.bucket("id", 16)); + + // update the spec concurrently so that the original update fails + UpdatePartitionSpec concurrent = catalog.loadTable(TABLE).updateSpec() + .addField("shard", Expressions.truncate("id", 100)); + PartitionSpec expected = concurrent.apply(); + concurrent.commit(); + + // attempt to commit the original update + String expectedMessage = supportsServerSideRetry() ? + "Requirement failed: last assigned partition id changed" : "Cannot commit"; + AssertHelpers.assertThrows("Second partition spec update commit should fail because of a conflict", + CommitFailedException.class, expectedMessage, update::commit); + + Table loaded = catalog.loadTable(TABLE); + + // the spec ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected spec", expected.fields(), loaded.spec().fields()); + } + + @Test + public void testUpdateTableSortOrder() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + ReplaceSortOrder update = table.replaceSortOrder() + .asc(Expressions.bucket("id", 16)) + .asc("id"); + + SortOrder expected = update.apply(); + + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + // the sort order ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected order", expected.fields(), loaded.sortOrder().fields()); + } + + @Test + public void testUpdateTableSortOrderServerSideRetry() { + Assume.assumeTrue("Sort order update recovery is only supported with server-side retry", supportsServerSideRetry()); + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + ReplaceSortOrder update = table.replaceSortOrder() + .asc(Expressions.bucket("id", 16)) + .asc("id"); + SortOrder expected = update.apply(); + + // update the schema concurrently so that the first update fails, but can succeed on retry + catalog.loadTable(TABLE).updateSchema() + .addColumn("another_col", Types.StringType.get()) + .commit(); + + // commit the original update + update.commit(); + + Table loaded = catalog.loadTable(TABLE); + + // the sort order ID may not match, so check equality of the fields + Assert.assertEquals("Loaded table should have expected order", expected.fields(), loaded.sortOrder().fields()); + } + + @Test + public void testAppend() throws IOException { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .create(); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + Assert.assertFalse("Should contain no files", tasks.iterator().hasNext()); + } + + table.newFastAppend().appendFile(FILE_A).commit(); + + assertFiles(table, FILE_A); + } + + @Test + public void testConcurrentAppendEmptyTable() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .create(); + + assertNoFiles(table); + + // create an uncommitted append + AppendFiles append = table.newFastAppend().appendFile(FILE_A); + append.apply(); // apply changes to eagerly write metadata + + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit(); + assertFiles(catalog.loadTable(TABLE), FILE_B); + + // the uncommitted append should retry and succeed + append.commit(); + assertFiles(catalog.loadTable(TABLE), FILE_A, FILE_B); + } + + @Test + public void testConcurrentAppendNonEmptyTable() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .create(); + + assertNoFiles(table); + + // TODO: skip the initial refresh in FastAppend so that commits actually fail + + // create an initial snapshot + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).commit(); + + // create an uncommitted append + AppendFiles append = table.newFastAppend().appendFile(FILE_A); + append.apply(); // apply changes to eagerly write metadata + + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit(); + assertFiles(catalog.loadTable(TABLE), FILE_B, FILE_C); + + // the uncommitted append should retry and succeed + append.commit(); + assertFiles(catalog.loadTable(TABLE), FILE_A, FILE_B, FILE_C); + } + + @Test + public void testUpdateTransaction() { + C catalog = catalog(); + + Table table = catalog.buildTable(TABLE, SCHEMA).create(); + + Transaction transaction = table.newTransaction(); + + UpdateSchema updateSchema = transaction.updateSchema() + .addColumn("new_col", Types.LongType.get()); + Schema expectedSchema = updateSchema.apply(); + updateSchema.commit(); + + UpdatePartitionSpec updateSpec = transaction.updateSpec() + .addField("shard", Expressions.bucket("id", 16)); + PartitionSpec expectedSpec = updateSpec.apply(); + updateSpec.commit(); + + transaction.commitTransaction(); + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Loaded table should have expected schema", + expectedSchema.asStruct(), loaded.schema().asStruct()); + Assert.assertEquals("Loaded table should have expected spec", + expectedSpec.fields(), loaded.spec().fields()); + + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testCreateTransaction() { + C catalog = catalog(); + + Transaction create = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + create.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + create.commitTransaction(); + + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + Table table = catalog.loadTable(TABLE); + assertFiles(table, FILE_A); + assertPreviousMetadataFileCount(table, 0); + } + + @Test + public void testCompleteCreateTransaction() { + C catalog = catalog(); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Transaction create = catalog.buildTable(TABLE, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .createTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + create.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + create.commitTransaction(); + + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + Table table = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertEquals("Table should have create partition spec", TABLE_SPEC.fields(), table.spec().fields()); + Assert.assertEquals("Table should have create sort order", TABLE_WRITE_ORDER, table.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), table.properties().entrySet())); + Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location()); + assertFiles(table, FILE_A); + assertPreviousMetadataFileCount(table, 0); + } + + @Test + public void testConcurrentCreateTransaction() { + C catalog = catalog(); + + Transaction create = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + create.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + AssertHelpers.assertThrows("Should fail because table was created concurrently", + CommitFailedException.class, "Table already exists", create::commitTransaction); + + // validate the concurrently created table is unmodified + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + assertNoFiles(table); + } + + @Test + public void testCreateOrReplaceTransactionCreate() { + C catalog = catalog(); + + Transaction create = catalog.buildTable(TABLE, SCHEMA).createOrReplaceTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + create.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + create.commitTransaction(); + + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + Table table = catalog.loadTable(TABLE); + assertFiles(table, FILE_A); + assertPreviousMetadataFileCount(table, 0); + } + + @Test + public void testCompleteCreateOrReplaceTransactionCreate() { + C catalog = catalog(); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Transaction createOrReplace = catalog.buildTable(TABLE, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .createOrReplaceTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + createOrReplace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + createOrReplace.commitTransaction(); + + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + Table table = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + TABLE_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertEquals("Table should have create partition spec", TABLE_SPEC.fields(), table.spec().fields()); + Assert.assertEquals("Table should have create sort order", TABLE_WRITE_ORDER, table.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), table.properties().entrySet())); + Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location()); + assertFiles(table, FILE_A); + assertPreviousMetadataFileCount(table, 0); + } + + @Test + public void testCreateOrReplaceReplaceTransactionReplace() { + C catalog = catalog(); + + Table original = catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assert.assertTrue("Table should exist before replaceTransaction", catalog.tableExists(TABLE)); + + Transaction createOrReplace = catalog.buildTable(TABLE, SCHEMA).createOrReplaceTransaction(); + + Assert.assertTrue("Table should still exist after replaceTransaction", catalog.tableExists(TABLE)); + + createOrReplace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + // validate table has not changed + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + assertUUIDsMatch(original, table); + assertNoFiles(table); + + createOrReplace.commitTransaction(); + + // validate the table after replace + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + table.refresh(); // refresh should work with UUID validation + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), loaded.schema().asStruct()); + assertUUIDsMatch(original, loaded); + assertFiles(loaded, FILE_A); + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testCompleteCreateOrReplaceTransactionReplace() { + C catalog = catalog(); + + Table original = catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assert.assertTrue("Table should exist before replaceTransaction", catalog.tableExists(TABLE)); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Transaction createOrReplace = catalog.buildTable(TABLE, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .createOrReplaceTransaction(); + + Assert.assertTrue("Table should still exist after replaceTransaction", catalog.tableExists(TABLE)); + + createOrReplace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + // validate table has not changed + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertTrue("Table should be unpartitioned", table.spec().isUnpartitioned()); + Assert.assertTrue("Table should be unsorted", table.sortOrder().isUnsorted()); + Assert.assertNotEquals("Created at should not match", + table.properties().get("created-at"), + "2022-02-25T00:38:19"); + assertUUIDsMatch(original, table); + assertNoFiles(table); + + createOrReplace.commitTransaction(); + + // validate the table after replace + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + table.refresh(); // refresh should work with UUID validation + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), loaded.schema().asStruct()); + Assert.assertEquals("Table should have replace partition spec", REPLACE_SPEC, loaded.spec()); + Assert.assertEquals("Table should have replace sort order", REPLACE_WRITE_ORDER, loaded.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), loaded.properties().entrySet())); + Assert.assertEquals("Table location should be replaced", "file:/tmp/ns/table", table.location()); + assertUUIDsMatch(original, loaded); + assertFiles(loaded, FILE_A); + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testCreateOrReplaceTransactionConcurrentCreate() { + Assume.assumeTrue("Conversion to replace transaction is not supported by REST catalog", supportsServerSideRetry()); + + C catalog = catalog(); + + Transaction createOrReplace = catalog.buildTable(TABLE, SCHEMA).createOrReplaceTransaction(); + + Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE)); + + createOrReplace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE)); + + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + AssertHelpers.assertThrows("Should fail because table was created concurrently", + CommitFailedException.class, "Table already exists", createOrReplace::commitTransaction); + + // validate the concurrently created table is unmodified + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + assertNoFiles(table); + } + + @Test + public void testReplaceTransaction() { + C catalog = catalog(); + + Table original = catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assert.assertTrue("Table should exist before replaceTransaction", catalog.tableExists(TABLE)); + + Transaction replace = catalog.buildTable(TABLE, SCHEMA).replaceTransaction(); + + Assert.assertTrue("Table should still exist after replaceTransaction", catalog.tableExists(TABLE)); + + replace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + // validate table has not changed + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + assertUUIDsMatch(original, table); + assertNoFiles(table); + + replace.commitTransaction(); + + // validate the table after replace + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + table.refresh(); // refresh should work with UUID validation + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), loaded.schema().asStruct()); + assertUUIDsMatch(original, loaded); + assertFiles(loaded, FILE_A); + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testCompleteReplaceTransaction() { + C catalog = catalog(); + + Table original = catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Assert.assertTrue("Table should exist before replaceTransaction", catalog.tableExists(TABLE)); + + Map properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19"); + Transaction replace = catalog.buildTable(TABLE, SCHEMA) + .withLocation("file:/tmp/ns/table") + .withPartitionSpec(SPEC) + .withSortOrder(WRITE_ORDER) + .withProperties(properties) + .replaceTransaction(); + + Assert.assertTrue("Table should still exist after replaceTransaction", catalog.tableExists(TABLE)); + + replace.newFastAppend() + .appendFile(FILE_A) + .commit(); + + // validate table has not changed + Table table = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match concurrent create", + OTHER_SCHEMA.asStruct(), table.schema().asStruct()); + Assert.assertTrue("Table should be unpartitioned", table.spec().isUnpartitioned()); + Assert.assertTrue("Table should be unsorted", table.sortOrder().isUnsorted()); + Assert.assertNotEquals("Created at should not match", + table.properties().get("created-at"), + "2022-02-25T00:38:19"); + assertUUIDsMatch(original, table); + assertNoFiles(table); + + replace.commitTransaction(); + + // validate the table after replace + Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE)); + table.refresh(); // refresh should work with UUID validation + + Table loaded = catalog.loadTable(TABLE); + + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), loaded.schema().asStruct()); + Assert.assertEquals("Table should have replace partition spec", REPLACE_SPEC, loaded.spec()); + Assert.assertEquals("Table should have replace sort order", REPLACE_WRITE_ORDER, loaded.sortOrder()); + Assert.assertEquals("Table properties should be a superset of the requested properties", + properties.entrySet(), + Sets.intersection(properties.entrySet(), loaded.properties().entrySet())); + Assert.assertEquals("Table location should be replaced", "file:/tmp/ns/table", table.location()); + assertUUIDsMatch(original, loaded); + assertFiles(loaded, FILE_A); + assertPreviousMetadataFileCount(loaded, 1); + } + + @Test + public void testReplaceTransactionRequiresTableExists() { + C catalog = catalog(); + + AssertHelpers.assertThrows("Should fail to create replace transaction with a missing table", + NoSuchTableException.class, "Table does not exist", + () -> catalog.buildTable(TABLE, SCHEMA).replaceTransaction()); + } + + @Test + public void testConcurrentReplaceTransactions() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + original.schema().asStruct(), afterFirstReplace.schema().asStruct()); + Assert.assertTrue("Table should be unpartitioned", + afterFirstReplace.spec().isUnpartitioned()); + Assert.assertTrue("Table should be unsorted", + afterFirstReplace.sortOrder().isUnsorted()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + original.schema().asStruct(), afterSecondReplace.schema().asStruct()); + Assert.assertTrue("Table should be unpartitioned", + afterSecondReplace.spec().isUnpartitioned()); + Assert.assertTrue("Table should be unsorted", + afterSecondReplace.sortOrder().isUnsorted()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionSchema() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, OTHER_SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, OTHER_SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), afterFirstReplace.schema().asStruct()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + original.schema().asStruct(), afterSecondReplace.schema().asStruct()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionSchema2() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, OTHER_SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, OTHER_SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + original.schema().asStruct(), afterFirstReplace.schema().asStruct()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the new schema", + REPLACE_SCHEMA.asStruct(), afterSecondReplace.schema().asStruct()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionSchemaConflict() { + Assume.assumeTrue("Schema conflicts are detected server-side", supportsServerSideRetry()); + + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, OTHER_SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table schema should match the original schema", + REPLACE_SCHEMA.asStruct(), afterFirstReplace.schema().asStruct()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + // even though the new schema is identical, the assertion that the last assigned id has not changed will fail + AssertHelpers.assertThrows("Should reject concurrent schema update", + CommitFailedException.class, "last assigned field id changed", secondReplace::commitTransaction); + } + + @Test + public void testConcurrentReplaceTransactionPartitionSpec() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table spec should match the new spec", + TABLE_SPEC.fields(), afterFirstReplace.spec().fields()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertTrue("Table should be unpartitioned", + afterSecondReplace.spec().isUnpartitioned()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionPartitionSpec2() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertTrue("Table should be unpartitioned", + afterFirstReplace.spec().isUnpartitioned()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table spec should match the new spec", + TABLE_SPEC.fields(), afterSecondReplace.spec().fields()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionPartitionSpecConflict() { + Assume.assumeTrue("Spec conflicts are detected server-side", supportsServerSideRetry()); + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .withPartitionSpec(SPEC) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table spec should match the new spec", + TABLE_SPEC.fields(), afterFirstReplace.spec().fields()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + // even though the new spec is identical, the assertion that the last assigned id has not changed will fail + AssertHelpers.assertThrows("Should reject concurrent spec update", + CommitFailedException.class, "last assigned partition id changed", secondReplace::commitTransaction); + } + + @Test + public void testConcurrentReplaceTransactionSortOrder() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .withSortOrder(WRITE_ORDER) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table order should match the new order", + TABLE_WRITE_ORDER, afterFirstReplace.sortOrder()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertTrue("Table should be unsorted", + afterSecondReplace.sortOrder().isUnsorted()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + @Test + public void testConcurrentReplaceTransactionSortOrderConflict() { + C catalog = catalog(); + + Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + transaction.newFastAppend() + .appendFile(FILE_A) + .commit(); + transaction.commitTransaction(); + + Table original = catalog.loadTable(TABLE); + assertFiles(original, FILE_A); + + Transaction secondReplace = catalog.buildTable(TABLE, SCHEMA) + .withSortOrder(WRITE_ORDER) + .replaceTransaction(); + secondReplace.newFastAppend() + .appendFile(FILE_C) + .commit(); + + Transaction firstReplace = catalog.buildTable(TABLE, SCHEMA) + .withSortOrder(SortOrder.builderFor(SCHEMA) + .desc(Expressions.bucket("id", 16)) + .desc("id") + .build()) + .replaceTransaction(); + firstReplace.newFastAppend() + .appendFile(FILE_B) + .commit(); + firstReplace.commitTransaction(); + + Table afterFirstReplace = catalog.loadTable(TABLE); + Assert.assertTrue("Table order should be set", + afterFirstReplace.sortOrder().isSorted()); + assertUUIDsMatch(original, afterFirstReplace); + assertFiles(afterFirstReplace, FILE_B); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + Assert.assertEquals("Table order should match the new order", + TABLE_WRITE_ORDER.fields(), afterSecondReplace.sortOrder().fields()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); + } + + private static void assertEmpty(String context, Catalog catalog, Namespace ns) { + try { + Assert.assertEquals(context, 0, catalog.listTables(ns).size()); + } catch (NoSuchNamespaceException e) { + // it is okay if the catalog throws NoSuchNamespaceException when it is empty + } + } + + public void assertUUIDsMatch(Table expected, Table actual) { + Assert.assertEquals("Table UUID should not change", + ((BaseTable) expected).operations().current().uuid(), + ((BaseTable) actual).operations().current().uuid()); + } + + public void assertPreviousMetadataFileCount(Table table, int metadataFileCount) { + TableOperations ops = ((BaseTable) table).operations(); + Assert.assertEquals("Table should have correct number of previous metadata locations", + metadataFileCount, ops.current().previousFiles().size()); + } + + public void assertNoFiles(Table table) { + try (CloseableIterable tasks = table.newScan().planFiles()) { + Assert.assertFalse("Should contain no files", tasks.iterator().hasNext()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public void assertFiles(Table table, DataFile... files) { + try (CloseableIterable tasks = table.newScan().planFiles()) { + List paths = Streams.stream(tasks) + .map(FileScanTask::file) + .map(DataFile::path) + .collect(Collectors.toList()); + Assert.assertEquals("Should contain expected number of data files", files.length, paths.size()); + Assert.assertEquals("Should contain correct file paths", + CharSequenceSet.of(Iterables.transform(Arrays.asList(files), DataFile::path)), + CharSequenceSet.of(paths)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private List concat(List starting, Namespace... additional) { List namespaces = Lists.newArrayList(); namespaces.addAll(starting); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java new file mode 100644 index 000000000000..d8445f76f34a --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.CatalogTests; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +public class TestRESTCatalog extends CatalogTests { + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private RESTCatalog restCatalog; + + @Before + public void createCatalog() throws IOException { + File warehouse = temp.newFolder(); + Configuration conf = new Configuration(); + + JdbcCatalog backendCatalog = new JdbcCatalog(); + backendCatalog.setConf(conf); + Map backendCatalogProperties = ImmutableMap.of( + CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath(), + CatalogProperties.URI, "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""), + JdbcCatalog.PROPERTY_PREFIX + "username", "user", + JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + backendCatalog.initialize("backend", backendCatalogProperties); + + RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog, ErrorHandlers.defaultErrorHandler()); + + this.restCatalog = new RESTCatalog(adaptor); + restCatalog.setConf(conf); + restCatalog.initialize("prod", ImmutableMap.of()); + } + + @Override + protected RESTCatalog catalog() { + return restCatalog; + } + + @Override + protected boolean supportsNamespaceProperties() { + return true; + } + + @Override + protected boolean supportsServerSideRetry() { + return true; + } +}