diff --git a/api/src/main/java/org/apache/iceberg/exceptions/UnprocessableEntityException.java b/api/src/main/java/org/apache/iceberg/exceptions/UnprocessableEntityException.java
new file mode 100644
index 000000000000..9c5e0f8852bd
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/exceptions/UnprocessableEntityException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+
+/**
+ * REST exception thrown when a request is well-formed but cannot be applied.
+ *
+ * For example, this is used when a property update requests that properties are simultaneously set and removed.
+ */
+public class UnprocessableEntityException extends RESTException {
+ @FormatMethod
+ public UnprocessableEntityException(String message, Object... args) {
+ super(message, args);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index ad4639c39e93..e63f7bf7e741 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -49,7 +49,7 @@
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
-class BaseTransaction implements Transaction {
+public class BaseTransaction implements Transaction {
private static final Logger LOG = LoggerFactory.getLogger(BaseTransaction.class);
enum TransactionType {
@@ -90,6 +90,14 @@ public Table table() {
return transactionTable;
}
+ public TableMetadata startMetadata() {
+ return current;
+ }
+
+ public TableOperations underlyingOps() {
+ return ops;
+ }
+
private void checkLastOperationCommitted(String operation) {
Preconditions.checkState(hasLastOpCommitted,
"Cannot create new %s: last operation has not committed", operation);
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
new file mode 100644
index 000000000000..f1267ba6d45d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.BaseTransaction;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
+import org.apache.iceberg.rest.responses.DropNamespaceResponse;
+import org.apache.iceberg.rest.responses.DropTableResponse;
+import org.apache.iceberg.rest.responses.GetNamespaceResponse;
+import org.apache.iceberg.rest.responses.ListNamespacesResponse;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
+import org.apache.iceberg.util.Tasks;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+public class CatalogHandlers {
+ private static final Schema EMPTY_SCHEMA = new Schema();
+
+ private CatalogHandlers() {
+ }
+
+ /**
+ * Exception used to avoid retrying commits when assertions fail.
+ *
+ * When a REST assertion fails, it will throw CommitFailedException to send back to the client. But the assertion
+ * checks happen in the block that is retried if {@link TableOperations#commit(TableMetadata, TableMetadata)} throws
+ * CommitFailedException. This is used to avoid retries for assertion failures, which are unwrapped and rethrown
+ * outside of the commit loop.
+ */
+ private static class ValidationFailureException extends RuntimeException {
+ private final CommitFailedException wrapped;
+
+ private ValidationFailureException(CommitFailedException cause) {
+ super(cause);
+ this.wrapped = cause;
+ }
+
+ public CommitFailedException wrapped() {
+ return wrapped;
+ }
+ }
+
+ public static ListNamespacesResponse listNamespaces(SupportsNamespaces catalog, Namespace parent) {
+ List results;
+ if (parent.isEmpty()) {
+ results = catalog.listNamespaces();
+ } else {
+ results = catalog.listNamespaces(parent);
+ }
+
+ return ListNamespacesResponse.builder().addAll(results).build();
+ }
+
+ public static CreateNamespaceResponse createNamespace(SupportsNamespaces catalog, CreateNamespaceRequest request) {
+ Namespace namespace = request.namespace();
+ catalog.createNamespace(namespace, request.properties());
+ return CreateNamespaceResponse.builder()
+ .withNamespace(namespace)
+ .setProperties(catalog.loadNamespaceMetadata(namespace))
+ .build();
+ }
+
+ public static GetNamespaceResponse loadNamespace(SupportsNamespaces catalog, Namespace namespace) {
+ Map properties = catalog.loadNamespaceMetadata(namespace);
+ return GetNamespaceResponse.builder()
+ .withNamespace(namespace)
+ .setProperties(properties)
+ .build();
+ }
+
+ public static DropNamespaceResponse dropNamespace(SupportsNamespaces catalog, Namespace namespace) {
+ boolean dropped = catalog.dropNamespace(namespace);
+ return DropNamespaceResponse.builder()
+ .dropped(dropped)
+ .build();
+ }
+
+ public static UpdateNamespacePropertiesResponse updateNamespaceProperties(
+ SupportsNamespaces catalog, Namespace namespace, UpdateNamespacePropertiesRequest request) {
+ request.validate();
+
+ Set removals = Sets.newHashSet(request.removals());
+ Map updates = request.updates();
+
+ Map startProperties = catalog.loadNamespaceMetadata(namespace);
+ Set missing = Sets.difference(removals, startProperties.keySet());
+
+ if (!updates.isEmpty()) {
+ catalog.setProperties(namespace, updates);
+ }
+
+ if (!removals.isEmpty()) {
+ // remove the original set just in case there was an update just after loading properties
+ catalog.removeProperties(namespace, removals);
+ }
+
+ return UpdateNamespacePropertiesResponse.builder()
+ .addMissing(missing)
+ .addUpdated(updates.keySet())
+ .addRemoved(Sets.difference(removals, missing))
+ .build();
+ }
+
+ public static ListTablesResponse listTables(Catalog catalog, Namespace namespace) {
+ List idents = catalog.listTables(namespace);
+ return ListTablesResponse.builder().addAll(idents).build();
+ }
+
+ public static LoadTableResponse stageTableCreate(Catalog catalog, Namespace namespace, CreateTableRequest request) {
+ request.validate();
+
+ TableIdentifier ident = TableIdentifier.of(namespace, request.name());
+ if (catalog.tableExists(ident)) {
+ throw new AlreadyExistsException("Table already exists: %s", ident);
+ }
+
+ Map properties = Maps.newHashMap();
+ properties.put("created-at", OffsetDateTime.now().toString());
+ properties.putAll(request.properties());
+
+ TableMetadata metadata = TableMetadata.newTableMetadata(
+ request.schema(),
+ request.spec() != null ? request.spec() : PartitionSpec.unpartitioned(),
+ request.writeOrder() != null ? request.writeOrder() : SortOrder.unsorted(),
+ request.location(),
+ properties);
+
+ return LoadTableResponse.builder()
+ .withTableMetadata(metadata)
+ .build();
+ }
+
+ public static LoadTableResponse createTable(Catalog catalog, Namespace namespace, CreateTableRequest request) {
+ request.validate();
+
+ TableIdentifier ident = TableIdentifier.of(namespace, request.name());
+ Table table = catalog.buildTable(ident, request.schema())
+ .withLocation(request.location())
+ .withPartitionSpec(request.spec())
+ .withSortOrder(request.writeOrder())
+ .withProperties(request.properties())
+ .create();
+
+ if (table instanceof BaseTable) {
+ return LoadTableResponse.builder()
+ .withTableMetadata(((BaseTable) table).operations().current())
+ .build();
+ }
+
+ throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
+ }
+
+ public static DropTableResponse dropTable(Catalog catalog, TableIdentifier ident) {
+ boolean dropped = catalog.dropTable(ident);
+ return DropTableResponse.builder().dropped(dropped).build();
+ }
+
+ public static LoadTableResponse loadTable(Catalog catalog, TableIdentifier ident) {
+ Table table = catalog.loadTable(ident);
+
+ if (table instanceof BaseTable) {
+ return LoadTableResponse.builder()
+ .withTableMetadata(((BaseTable) table).operations().current())
+ .build();
+ }
+
+ throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
+ }
+
+ public static LoadTableResponse updateTable(Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ // this is a hacky way to get TableOperations for an uncommitted table
+ Transaction transaction = catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (transaction instanceof BaseTransaction) {
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), baseTransaction.startMetadata(), request);
+ } else {
+ throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (table instanceof BaseTable) {
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ } else {
+ throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
+ }
+ }
+
+ return LoadTableResponse.builder()
+ .withTableMetadata(finalMetadata)
+ .build();
+ }
+
+ private static boolean isCreate(UpdateTableRequest request) {
+ boolean isCreate = request.requirements().stream()
+ .anyMatch(UpdateTableRequest.UpdateRequirement.AssertTableDoesNotExist.class::isInstance);
+
+ if (isCreate) {
+ List invalidRequirements = request.requirements().stream()
+ .filter(req -> !(req instanceof UpdateTableRequest.UpdateRequirement.AssertTableDoesNotExist))
+ .collect(Collectors.toList());
+ Preconditions.checkArgument(invalidRequirements.isEmpty(),
+ "Invalid create requirements: %s", invalidRequirements);
+ }
+
+ return isCreate;
+ }
+
+ private static TableMetadata create(TableOperations ops, TableMetadata start, UpdateTableRequest request) {
+ TableMetadata.Builder builder = TableMetadata.buildFrom(start);
+
+ // the only valid requirement is that the table will be created
+ request.updates().forEach(update -> update.applyTo(builder));
+
+ // create transactions do not retry. if the table exists, retrying is not a solution
+ ops.commit(null, builder.build());
+
+ return ops.current();
+ }
+
+ private static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
+ AtomicBoolean isRetry = new AtomicBoolean(false);
+ try {
+ Tasks.foreach(ops)
+ .retry(COMMIT_NUM_RETRIES_DEFAULT)
+ .exponentialBackoff(
+ COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+ COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(taskOps -> {
+ TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current();
+ isRetry.set(true);
+
+ // validate requirements
+ try {
+ request.requirements().forEach(requirement -> requirement.validate(base));
+ } catch (CommitFailedException e) {
+ // wrap and rethrow outside of tasks to avoid unnecessary retry
+ throw new ValidationFailureException(e);
+ }
+
+ // apply changes
+ TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base);
+ request.updates().forEach(update -> update.applyTo(metadataBuilder));
+
+ TableMetadata updated = metadataBuilder.build();
+ if (updated.changes().isEmpty()) {
+ // do not commit if the metadata has not changed
+ return;
+ }
+
+ // commit
+ taskOps.commit(base, updated);
+ });
+
+ } catch (ValidationFailureException e) {
+ throw e.wrapped();
+ }
+
+ return ops.current();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
new file mode 100644
index 000000000000..66162c4d590e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.Transactions;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.ResolvingFileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
+import org.apache.iceberg.rest.responses.DropNamespaceResponse;
+import org.apache.iceberg.rest.responses.DropTableResponse;
+import org.apache.iceberg.rest.responses.GetNamespaceResponse;
+import org.apache.iceberg.rest.responses.ListNamespacesResponse;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
+import org.apache.iceberg.util.Pair;
+
+public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable {
+ private final Function