diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NotAuthorizedException.java b/api/src/main/java/org/apache/iceberg/exceptions/NotAuthorizedException.java new file mode 100644 index 000000000000..90519bf5b5df --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/NotAuthorizedException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.exceptions; + +/** + * Exception raised when an attempt to perform an operation fails due to invalid permissions. + */ +public class NotAuthorizedException extends RuntimeException { + public NotAuthorizedException(String message) { + super(message); + } + + public NotAuthorizedException(String message, Object... args) { + super(String.format(message, args)); + } + + public NotAuthorizedException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/api/src/main/java/org/apache/iceberg/rest/CreateNamespaceRequest.java b/api/src/main/java/org/apache/iceberg/rest/CreateNamespaceRequest.java new file mode 100644 index 000000000000..5c102d7b2a48 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/rest/CreateNamespaceRequest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.util.Map; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +/** + * Represents a REST request to create a namespace / database. + * + * Possible example POST body (outer format object possibly only used on Response): + * { "data": { "namespace": ["prod", "accounting"], properties: {} }} + */ +public class CreateNamespaceRequest { + + private Namespace namespace; + private Map properties; + + private CreateNamespaceRequest() { + } + + private CreateNamespaceRequest(String[] namespaceLevels, Map properties) { + this.namespace = Namespace.of(namespaceLevels); + this.properties = properties; + } + + public Namespace getNamespace() { + return namespace; + } + + public void setNamespace(Namespace namespace) { + this.namespace = namespace; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private ImmutableList.Builder namespaceBuilder; + private final ImmutableMap.Builder propertiesBuilder; + + public Builder() { + this.namespaceBuilder = ImmutableList.builder(); + this.propertiesBuilder = ImmutableMap.builder(); + } + + public Builder withNamespace(Namespace namespace) { + this.namespaceBuilder.add(namespace.levels()); + return this; + } + + public Builder withProperties(Map properties) { + if (properties != null) { + propertiesBuilder.putAll(properties); + } + return this; + } + + public Builder withProperty(String key, String value) { + propertiesBuilder.put(key, value); + return this; + } + + public CreateNamespaceRequest build() { + String[] namespaceLevels = namespaceBuilder.build().toArray(new String[0]); + Preconditions.checkState(namespaceLevels.length > 0, + "Cannot create a CreateNamespaceRequest with an empty namespace."); + return new CreateNamespaceRequest(namespaceLevels, propertiesBuilder.build()); + + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/rest/CreateNamespaceResponse.java b/api/src/main/java/org/apache/iceberg/rest/CreateNamespaceResponse.java new file mode 100644 index 000000000000..70bfe0fe1347 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/rest/CreateNamespaceResponse.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.util.Map; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Represents a REST response to a create a namespace / database request. + * + * The properties returned will include all the user provided properties from the + * request, as well as any server-side added properties. + * + * Example server-side added properties could include things such as "created-at" + * or "owner". + * + * Presently, the JSON looks as follows: + * { "namespace": "ns1.ns2.ns3", "properties": { "owner": "hank", "created-at": "1425744000000" } } + * + * Eventually, the idea is to wrap responses in IcebergHttpResponse so that it has a more standardized + * structure, including the possibility of richer metadata on errors, such as tracing telemetry or follw + * up user instruction. + * + * { + * "error": { }, + * "data": { + * "namespace": "ns1.ns2.ns3", + * "properties": { + * "owner": "hank", + * "created-at": "1425744000000" + * } + * } + * } + * + * For an error response, we'll see something like the following: + * + * { + * "data": { }, + * "error": { + * "message": "Namespace already exists", + * "type": "AlreadyExistsException", + * "code": 40901 + * } + * } + */ +public class CreateNamespaceResponse { + private Namespace namespace; + private Map properties; + + private CreateNamespaceResponse() { + } + + private CreateNamespaceResponse(Namespace namespace, Map properties) { + this.namespace = namespace; + this.properties = properties; + } + + public Namespace getNamespace() { + return namespace; + } + + public void setNamespace(Namespace namespace) { + this.namespace = namespace; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Namespace namespace; + private Map properties; + + private Builder() { + + } + + public Builder withNamespace(Namespace ns) { + this.namespace = ns; + return this; + } + + public Builder withProperties(Map props) { + this.properties = props; + return this; + } + + public CreateNamespaceResponse build() { + Preconditions.checkArgument(namespace != null && !namespace.isEmpty(), + "Cannot create a CreateNamespaceResponse with a null or empty namespace"); + return new CreateNamespaceResponse(namespace, properties); + } + } +} diff --git a/build.gradle b/build.gradle index 38388747d25b..0827dcae7287 100644 --- a/build.gradle +++ b/build.gradle @@ -120,6 +120,9 @@ subprojects { testImplementation 'org.slf4j:slf4j-simple' testImplementation 'org.mockito:mockito-core' testImplementation 'org.assertj:assertj-core' + testImplementation 'org.mock-server:mockserver-junit-jupiter' + testImplementation 'org.mock-server:mockserver-netty' + testImplementation 'org.mock-server:mockserver-client-java' } test { @@ -221,6 +224,8 @@ project(':iceberg-core') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' } + implementation 'org.apache.httpcomponents.client5:httpclient5' + testImplementation "org.xerial:sqlite-jdbc" testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') } diff --git a/core/src/main/java/org/apache/iceberg/rest/RestCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RestCatalog.java new file mode 100644 index 000000000000..ef69288dc70c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RestCatalog.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.http.ErrorHandlers; +import org.apache.iceberg.rest.http.HttpClient; +import org.apache.iceberg.rest.http.RequestResponseSerializers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RestCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable { + + private static final Logger LOG = LoggerFactory.getLogger(RestCatalog.class); + + private String catalogName; + private Map properties; + private ObjectMapper mapper; + private String baseUrl; + private Configuration hadoopConf; + private FileIO fileIO; + + // TODO - Refactor to use interface, which doesn't currently have POST etc embedded into it. + private HttpClient restClient; + + @Override + public void initialize(String name, Map props) { + this.catalogName = name; + this.properties = props; + + // TODO - Possibly authenticate with the server initially and then have the server return some of this information + Preconditions.checkNotNull( + properties.getOrDefault("baseUrl", null), + "Cannot initialize the RestCatalog as the baseUrl is a required parameter."); + + this.baseUrl = properties.get("baseUrl"); + this.fileIO = initializeFileIO(properties); + + this.mapper = new ObjectMapper(); + RequestResponseSerializers.registerAll(mapper); + + // TODO - We can possibly handle multiple warehouses via one RestCatalog to reuse the connection pool + // and for cross database calls if users need to authenticate with each. + this.restClient = HttpClient.builder() + .baseUrl(String.format("%s/warehouse/%s", baseUrl, catalogName)) + .mapper(mapper) + .defaultErrorHandler(ErrorHandlers.tableErrorHandler()) + .build(); + } + + // TODO - Pass in ObjectMapper too when testing. + @VisibleForTesting + public void initialize( + String name, + Map props, + HttpClient httpClient) { + this.catalogName = name; + this.properties = props; + + // TODO - Possibly authenticate with the server initially and then have the server return some of this information + Preconditions.checkNotNull( + properties.getOrDefault("baseUrl", null), + "Cannot initialize the RestCatalog as the baseUrl is a required parameter."); + + this.baseUrl = properties.get("baseUrl"); + this.fileIO = initializeFileIO(properties); + + this.mapper = new ObjectMapper(); + RequestResponseSerializers.registerAll(mapper); + + // Pass in the mock client instead. + this.restClient = httpClient; + } + + @Override + public String name() { + return catalogName; + } + + @Override + public Table createTable( + TableIdentifier identifier, Schema schema, PartitionSpec spec, String location, + Map props) { + throw new UnsupportedOperationException("Not implemented: createTable"); + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + // Users might have not authenticated, possibly need to reauthenticate, or get authentication info per request. + // Though usually, auth tokens are persistent for at least some period of time. + return newTableOps(tableIdentifier, null); + } + + protected RestTableOperations newTableOps(TableIdentifier tableIdentifier, String authToken) { + throw new UnsupportedOperationException("Not implemented: newTableOps"); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + throw new UnsupportedOperationException("Not implemented: defaultWarehouseLocation"); + } + + @Override + public List listTables(Namespace namespace) { + throw new UnsupportedOperationException("Not implemented: listTables"); + } + + @Override + public boolean tableExists(TableIdentifier identifier) { + throw new UnsupportedOperationException("Not implemented: tableExists"); + } + + @Override + public boolean dropTable(TableIdentifier identifier) { + return dropTable(identifier, false); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + throw new UnsupportedOperationException("Not implemented: dropTable"); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + throw new UnsupportedOperationException("Not implemented: renameTable"); + } + + @Override + public void close() throws IOException { + restClient.close(); + } + + @Override + public void createNamespace(Namespace namespace, Map metadata) { + CreateNamespaceRequest req = CreateNamespaceRequest.builder() + .withNamespace(namespace) + .withProperties(metadata) + .build(); + + // TODO - This should come from the server side. + String path = properties.getOrDefault("create-namespace-path", "namespace"); + restClient.post(path, req, CreateNamespaceResponse.class, ErrorHandlers.namespaceErrorHandler()); + } + + public CreateNamespaceResponse createDatabase(Namespace namespace, Map props) { + CreateNamespaceRequest req = CreateNamespaceRequest.builder() + .withNamespace(namespace) + .withProperties(props) + .build(); + + // TODO - This should come from the server side. + String path = properties.getOrDefault("create-namespace-path", "namespace"); + return restClient.post(path, req, CreateNamespaceResponse.class, ErrorHandlers.namespaceErrorHandler()); + } + + @Override + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + throw new UnsupportedOperationException("Not implemented: listNamespaces"); + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + throw new UnsupportedOperationException("Not implemented: loadNamespaceMetadata"); + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + throw new UnsupportedOperationException("Not implemented: dropNamespace"); + } + + @Override + public boolean setProperties(Namespace namespace, Map props) throws NoSuchNamespaceException { + throw new UnsupportedOperationException("Not implemented: setProperties"); + } + + @Override + public boolean removeProperties(Namespace namespace, Set props) throws NoSuchNamespaceException { + throw new UnsupportedOperationException("Not implemented: removeProperties"); + } + + @Override + public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return new RestTableBuilder(identifier, schema); + } + + @Override + public Configuration getConf() { + return hadoopConf; + } + + @Override + public void setConf(Configuration conf) { + this.hadoopConf = conf; + } + + private FileIO initializeFileIO(Map props) { + String fileIOImpl = props.get(CatalogProperties.FILE_IO_IMPL); + return CatalogUtil.loadFileIO(fileIOImpl, props, hadoopConf); + } + + private static class RestTableBuilder implements Catalog.TableBuilder { + private final TableIdentifier identifier; + private final Schema schema; + private final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); + private String location; + private PartitionSpec spec = PartitionSpec.unpartitioned(); + private SortOrder sortOrder = SortOrder.unsorted(); + + protected RestTableBuilder(TableIdentifier identifier, Schema schema) { + this.identifier = identifier; + this.schema = schema; + } + + @Override + public TableBuilder withPartitionSpec(PartitionSpec newSpec) { + this.spec = newSpec != null ? newSpec : PartitionSpec.unpartitioned(); + return this; + } + + @Override + public TableBuilder withSortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder != null ? newSortOrder : SortOrder.unsorted(); + return this; + } + + // TODO - Figure out why this is giving me HiddenField and see about turning that off. + @Override + public TableBuilder withLocation(String tableLocation) { + this.location = tableLocation; + return this; + } + + @Override + public TableBuilder withProperties(Map props) { + if (props != null) { + propertiesBuilder.putAll(props); + } + return this; + } + + @Override + public TableBuilder withProperty(String key, String value) { + propertiesBuilder.put(key, value); + return this; + } + + @Override + public Table create() { + throw new UnsupportedOperationException("Not implemented: create"); + } + + @Override + public Transaction createTransaction() { + throw new UnsupportedOperationException("Not implemented: createTransaction"); + } + + @Override + public Transaction replaceTransaction() { + throw new UnsupportedOperationException("Replace currently not supported"); + } + + @Override + public Transaction createOrReplaceTransaction() { + throw new UnsupportedOperationException("Replace currently not supported"); + } + } + + // TODO - Modified from Dynamo catalog. Should probably share it. + private void validateNamespace(Namespace namespace) { + // We might sometimes allow Namespace.of()? + ValidationException.check(!namespace.isEmpty(), + "A namespace object with no levels is not a valid namespace for a REST request"); + for (String level : namespace.levels()) { + ValidationException.check(level != null && !level.isEmpty(), + "Namespace level must not be empty: %s", namespace); + ValidationException.check(!level.contains("."), + "Namespace level must not contain dot, but found %s in %s", level, namespace); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RestClient.java b/core/src/main/java/org/apache/iceberg/rest/RestClient.java new file mode 100644 index 000000000000..425772e47e35 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RestClient.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.io.Closeable; + +/** + * Eventual Interface for pluggable RestClients. + */ +public interface RestClient extends Closeable { } diff --git a/core/src/main/java/org/apache/iceberg/rest/RestException.java b/core/src/main/java/org/apache/iceberg/rest/RestException.java new file mode 100644 index 000000000000..f18f17b3baa5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RestException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import com.google.errorprone.annotations.FormatMethod; + +public class RestException extends RuntimeException { + + public RestException(String message) { + super(message); + } + + @FormatMethod + public RestException(String message, Object... args) { + super(String.format(message, args)); + } + + public RestException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RestTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RestTableOperations.java new file mode 100644 index 000000000000..09b27a245249 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RestTableOperations.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.util.Map; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.http.HttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO - Extract out to an interface - Implement with HTTP version. +// TODO - Should we implement Configurable here? Since this will be an interface, I think not in the interface. +// TODO - As this will be more of an interface, possibly extend TableOperations directly (like HadoopTableOperations) +class RestTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(RestTableOperations.class); + + private final TableIdentifier tableIdentifier; + private final String fullTableName; + // TODO - Use the RestClient interface here instead. + private final HttpClient httpClient; + private final Map properties; + private final String catalogName; // TODO - Likely not needed. Can embed in the Request classes. + private final FileIO fileIO; + private TableMetadata currentMetadata; + private String metadataFileLocation; + + protected RestTableOperations( + HttpClient httpClient, + Map properties, + FileIO fileIO, + String catalogName, + TableIdentifier tableIdentifier) { + this.httpClient = httpClient; + this.properties = properties; + this.fileIO = fileIO; + this.catalogName = catalogName; + this.tableIdentifier = tableIdentifier; + this.fullTableName = String.format("%s.%s", catalogName, tableIdentifier); + } + + // TODO - Probably remove this builder since `newTableOps` is used instead. + public static Builder builder() { + return new Builder(); + } + + @Override + public TableMetadata refresh() { + throw new UnsupportedOperationException("Not implemented: refresh"); + } + + @Override + protected void doRefresh() { + throw new UnsupportedOperationException("Not implemented: doRefresh"); + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected String tableName() { + return fullTableName; + } + + public void setCurrentMetadata(TableMetadata tableMetadata) { + this.currentMetadata = tableMetadata; + } + + public static class Builder { + private TableIdentifier identifier; + // TODO - Use the RestClient interface here instead. + private HttpClient httpClient; + private Map properties; + private String catalogName; // TODO - Likely not needed. Can embed in the Request classes. + private FileIO io; + + public Builder identifier(TableIdentifier tableIdentifier) { + this.identifier = tableIdentifier; + return this; + } + + // TODO - Change to use interface + public Builder httpClient(HttpClient client) { + this.httpClient = client; + return this; + } + + public Builder properties(Map props) { + this.properties = props; + return this; + } + + public Builder catalogName(String name) { + this.catalogName = name; + return this; + } + + public Builder fileIO(FileIO fileIO) { + this.io = fileIO; + return this; + } + + public RestTableOperations build() { + return new RestTableOperations(httpClient, properties, io, catalogName, identifier); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/UncheckedRestException.java b/core/src/main/java/org/apache/iceberg/rest/UncheckedRestException.java new file mode 100644 index 000000000000..e3492c74b0fd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/UncheckedRestException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import com.google.errorprone.annotations.FormatMethod; +import java.util.Objects; + +/** + * Exception thrown by the REST client and catalog, to wrap a checked exception + * in an unchecked exception. + */ +public class UncheckedRestException extends RuntimeException { + + public UncheckedRestException(Throwable cause) { + super(Objects.requireNonNull(cause)); + } + + @FormatMethod + public UncheckedRestException(String message, Object... args) { + super(String.format(message, args)); + } + + @FormatMethod + public UncheckedRestException(Throwable cause, String message, Object... args) { + super(String.format(message, args), cause); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/http/ErrorHandlers.java b/core/src/main/java/org/apache/iceberg/rest/http/ErrorHandlers.java new file mode 100644 index 000000000000..eddaad387509 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/http/ErrorHandlers.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.rest.http; + +import java.util.function.Consumer; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotAuthorizedException; +import org.apache.iceberg.rest.RestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ErrorHandlers { + + private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreCatalog.class); + + private ErrorHandlers() { + + } + + /** + * Table level error handlers. + */ + public static Consumer tableErrorHandler() { + return (errorResponse) -> { + String responseBody = getResponseBody(errorResponse); + String responseException = getIcebergExceptionHeader(errorResponse); + + switch (errorResponse.getCode()) { + // TODO - Probably this should be 400 and not 404. 404 makes monitoring more difficult. + // as it's hard to tell what's a 404 from a route that does not exist and what's a + // 404 from a client calling an endpoint that doesn't exist. + case HttpStatus.SC_NOT_FOUND: + // TODO: Exception handling here could be better + // some methods can result in different resource not found exceptions, so here we need to + // differentiate between 404 for non-existent Namespace/Database by looking at X-Iceberg-Exception header. + if (NoSuchNamespaceException.class.getSimpleName().equals(responseException)) { + throw new NoSuchNamespaceException("Resource not found: %s", responseBody); + } else { + throw new NoSuchTableException("Resource not found: %s", responseBody); + } + case HttpStatus.SC_CONFLICT: + throw new AlreadyExistsException("Already exists: %s", responseBody); + case HttpStatus.SC_FORBIDDEN: + case HttpStatus.SC_UNAUTHORIZED: + throw new NotAuthorizedException("Not Authorized: %s", responseBody); + default: + throw new RestException("Unknown error: %s", errorResponse); + } + }; + } + + /** + * Request error handlers specifically for CRUD ops on databases / namespaces. + */ + public static Consumer namespaceErrorHandler() { + return (errorResponse) -> { + String responseBody = getResponseBody(errorResponse); + + switch (errorResponse.getCode()) { + case HttpStatus.SC_NOT_FOUND: + throw new NoSuchNamespaceException("Namespace not found: %s", responseBody); + case HttpStatus.SC_CONFLICT: + throw new AlreadyExistsException("Already exists: %s", responseBody); + case HttpStatus.SC_FORBIDDEN: + case HttpStatus.SC_UNAUTHORIZED: + throw new NotAuthorizedException("Not Authorized: %s", responseBody); + default: + throw new RestException("Unknown error: %s", errorResponse); + } + }; + } + + static String getResponseBody(CloseableHttpResponse response) { + String responseBody = "Non-parseable Response Body"; + try { + responseBody = EntityUtils.toString(response.getEntity()); + } catch (Exception e) { + LOG.error("Encountered an exception getting response body", e); + } + + return responseBody; + } + + static String getIcebergExceptionHeader(CloseableHttpResponse response) { + String icebergException = null; + try { + // TODO - Extract more specific exception from a header or from the response body? + // Some servers/proxies will strip headers that aren't in the whitelist so possibly headers + // aren't the way to go (though that's typically more on the Request end than the Response end). + icebergException = response.getHeader("X-Iceberg-Exception").getValue(); + } catch (Exception e) { + // TODO - Better error message and handling. Will be refactoring this anyway. + LOG.error("Encountered an error when getting the X-Iceberg-Exception header", e); + } + + return icebergException; + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/http/HttpClient.java b/core/src/main/java/org/apache/iceberg/rest/http/HttpClient.java new file mode 100644 index 000000000000..267773e9a9fb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/http/HttpClient.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.http; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Map; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.hc.client5.http.classic.methods.HttpUriRequest; +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.hc.core5.io.CloseMode; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.RestClient; +import org.apache.iceberg.rest.UncheckedRestException; + +/** + * An HttpClient that can be used for the RestClient (eventually). + * TODO - Extract out top for all of this so that other implementations are easy to implement (e.g. gRPC). + */ +public class HttpClient implements RestClient { + private final String baseUrl; + + // Need their own Builders? + private final CloseableHttpClient httpClient; + private final Consumer defaultErrorHandler; + private final ObjectMapper mapper; + private final Map additionalHeaders; + private Consumer requestInterceptor = (r) -> { }; + + private HttpClient( + String baseUrl, CloseableHttpClient httpClient, ObjectMapper mapper, + Map additionalHeaders, Consumer requestIntercepter, + Consumer defaultErrorHandler) { + this.baseUrl = baseUrl; + this.httpClient = httpClient != null ? httpClient : HttpClients.createDefault(); + this.mapper = mapper != null ? mapper : new ObjectMapper(); + this.additionalHeaders = additionalHeaders != null ? additionalHeaders : Maps.newHashMap(); + // TODO - Best way to handle Preconditions here. + this.requestInterceptor = requestIntercepter; + this.defaultErrorHandler = defaultErrorHandler; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Method to execute an HTTP request and process the corresponding response. + * + * @param method - HTTP method, such as GET, POST, HEAD, etc. + * @param path - URL path to send the request to + * @param body - Contents of the request body. + * @param responseType - Class of the Response type. Needs to have serializer registered with ObjectMapper + * @param errorHandler - Error handler delegated for HTTP responses which handles server error responses + * @param - Class type of the response for deserialization. Must be registered with the ObjectMapper. + * @return The Response enttity, parsed and converted to its type T + * @throws UncheckedIOException - Shouldn't throw this as the requestInterceptor should handle expected cases. + * @throws UncheckedRestException - Wraps exceptions to avoid having to use checked exceptions everywhere. + */ + @Nullable + public T execute( + Method method, String path, Object body, Class responseType, + Consumer errorHandler) { + HttpUriRequestBase request = new HttpUriRequestBase(method.name(), URI.create(String.format("%s/%s", baseUrl, + path))); + addRequestHeaders(request); + + if (body != null) { + try { + request.setEntity(new StringEntity(mapper.writeValueAsString(body))); + } catch (JsonProcessingException e) { + throw new UncheckedRestException(e, "Failed to write request body"); + } + } + + requestInterceptor.accept(request); + + try (CloseableHttpResponse response = httpClient.execute(request)) { + if (response.getCode() != HttpStatus.SC_OK) { + errorHandler.accept(response); + } + + if (responseType == null || response.getEntity() == null) { + return null; + } + + HttpEntity entity = response.getEntity(); + return mapper.readValue(entity.getContent(), responseType); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (Exception e) { + throw new UncheckedRestException(e, "An unhandled error occurred while executing an Http request"); + } + } + + public T head(String path) { + return execute(Method.HEAD, path, null, null, defaultErrorHandler); + } + + public T head(String path, Class responseType, Consumer errorHandler) { + return execute(Method.HEAD, path, null, responseType, errorHandler); + } + + public T get(String path, Class responseType) { + return execute(Method.GET, path, null, responseType, defaultErrorHandler); + } + + public T get(String path, Class responseType, Consumer errorHandler) { + return execute(Method.GET, path, null, responseType, errorHandler); + } + + public T post(String path, Object body, Class responseType) { + return execute(Method.POST, path, body, responseType, defaultErrorHandler); + } + + public T post(String path, Object body, Class responseType, Consumer errorHandler) { + return execute(Method.POST, path, body, responseType, errorHandler); + } + + public T put(String path, Object body, Class responseType) { + return execute(Method.PUT, path, body, responseType, defaultErrorHandler); + } + + public T put(String path, Object body, Class responseType, Consumer errorHandler) { + return execute(Method.PUT, path, body, responseType, errorHandler); + } + + public T delete(String path, Class responseType) { + return execute(Method.DELETE, path, null, responseType, defaultErrorHandler); + } + + public T delete(String path, Class responseType, Consumer errorHandler) { + return execute(Method.DELETE, path, null, responseType, errorHandler); + } + + private void addRequestHeaders(HttpUriRequest request) { + request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + request.setHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.getMimeType()); + additionalHeaders.forEach(request::setHeader); + } + + @Override + public void close() throws IOException { + httpClient.close(CloseMode.GRACEFUL); + } + + public static class Builder { + private String baseUrl; + private CloseableHttpClient httpClient; + private ObjectMapper mapper; + private Map additionalHeaders = Maps.newHashMap(); + private Consumer requestInterceptor = r -> { }; + private Consumer defaultErrorHandler = ErrorHandlers.namespaceErrorHandler(); + + private Builder() { + + } + + public Builder baseUrl(String url) { + this.baseUrl = url; + return this; + } + + public Builder httpClient(CloseableHttpClient client) { + this.httpClient = client; + return this; + } + + public Builder mapper(ObjectMapper objectMapper) { + this.mapper = objectMapper; + return this; + } + + public Builder additionalHeaders(Map headers) { + this.additionalHeaders = headers; + return this; + } + + public Builder withHeader(String key, String value) { + if (additionalHeaders == null) { + additionalHeaders = Maps.newHashMap(); + } + additionalHeaders.put(key, value); + return this; + } + + // TODO - Perhaps place auth checking on headers in here? + public Builder requestInterceptor(Consumer reqInterceptor) { + // TODO - Needs Precondition checks. + this.requestInterceptor = reqInterceptor; + return this; + } + + public Builder defaultErrorHandler(Consumer errorHandler) { + // TODO - Maybe log if this is null, but allow it as there's other ways to register. + this.defaultErrorHandler = errorHandler; + return this; + } + + public HttpClient build() { + // TODO - Put some Preconditions here. + return new HttpClient(baseUrl, httpClient, mapper, additionalHeaders, requestInterceptor, defaultErrorHandler); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/http/IcebergHttpResponse.java b/core/src/main/java/org/apache/iceberg/rest/http/IcebergHttpResponse.java new file mode 100644 index 000000000000..27e2c8768043 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/http/IcebergHttpResponse.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonInclude(JsonInclude.Include.NON_NULL) +public class IcebergHttpResponse { + + private final T data; + private final Error error; + + @JsonCreator + public IcebergHttpResponse( + @JsonProperty("data") T data, + @JsonProperty("error") Error error) { + this.data = data; + this.error = error; + } + + public Error error() { + return error; + } + + public T data() { + return data; + } + + /** + * An error object embedded in every HTTP response. + * + * On error, this contains: + * - message: A short, human-readable description of the error. + * - type: Type of exception - more specifically a class name, e.g. NamespaceNotFoundException) + * - code: An (optional) application specific error code, to distinguish between causes + * of the same HTTP response code (eg possibly different types of Unauthorized exceptions). + * + * #################### Optional fields to consider ###################################### + * - status: HTTP response code (optional). + * - traceId: Unique specific identifier for this error and request, for monitoring purposes. + * Presumably this would be an OpenTracing Span (optional). + * Will almost certainly add tracing headers as an optional follow-up. + * - metadata: Further map of optional metadata (such as further directions to users etc) (optional - unsure?). + * ####################################################################################### + * + * Example: + * "error": { + * "message": "Authorization denied: Missing Bearer header", + * "type": "OAuthException", + * "code": 40102 + * } + */ + public static class Error { + + private final String message; + private final String type; + private final int code; + + @JsonCreator + public Error( + @JsonProperty("message") String message, + @JsonProperty("type") String type, + @JsonProperty("code") int code) { + this.message = message; + this.type = type; + this.code = code; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/http/RequestResponseSerializers.java b/core/src/main/java/org/apache/iceberg/rest/http/RequestResponseSerializers.java new file mode 100644 index 000000000000..0eb1514d1bb5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/http/RequestResponseSerializers.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.http; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.avro.data.Json; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderParser; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.CreateNamespaceRequest; + +public class RequestResponseSerializers { + + private RequestResponseSerializers() { + } + + public static void registerAll(ObjectMapper mapper) { + SimpleModule module = new SimpleModule(); + module + .addSerializer(TableIdentifier.class, new RequestResponseSerializers.TableIdentifierSerializer()) + .addDeserializer(TableIdentifier.class, new RequestResponseSerializers.TableIdentifierDeserializer()) + .addSerializer(Namespace.class, new RequestResponseSerializers.NamespaceSerializer()) + .addDeserializer(Namespace.class, new RequestResponseSerializers.NamespaceDeserializer()) + .addSerializer(Schema.class, new RequestResponseSerializers.SchemaSerializer()) + .addDeserializer(Schema.class, new RequestResponseSerializers.SchemaDeserializer()) + .addSerializer(PartitionSpec.class, new RequestResponseSerializers.PartitionSpecSerializer()) + .addDeserializer(PartitionSpec.class, new RequestResponseSerializers.PartitionSpecDeserializer()) + .addSerializer(SortOrder.class, new RequestResponseSerializers.SortOrderSerializer()) + .addDeserializer(SortOrder.class, new RequestResponseSerializers.SortOrderDeserializer()) + .addSerializer(TableMetadata.class, new RequestResponseSerializers.TableMetadataSerializer()); + mapper.registerModule(module); + } + + public static class NamespaceDeserializer extends JsonDeserializer { + @Override + public Namespace deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return Namespace.of(jsonNode.asText().split("\\.")); + } + } + + public static class NamespaceSerializer extends JsonSerializer { + @Override + public void serialize(Namespace namespace, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeString(namespace.toString()); + } + } + + public static class TableIdentifierDeserializer extends JsonDeserializer { + @Override + public TableIdentifier deserialize(JsonParser p, DeserializationContext context) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return TableIdentifier.parse(jsonNode.asText()); + } + } + + public static class TableIdentifierSerializer extends JsonSerializer { + @Override + public void serialize(TableIdentifier identifier, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + gen.writeString(identifier.toString()); + } + } + + public static class SchemaDeserializer extends JsonDeserializer { + @Override + public Schema deserialize(JsonParser p, DeserializationContext context) + throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + Schema schema = SchemaParser.fromJson(jsonNode); + context.setAttribute("schema", schema); + return schema; + } + } + + public static class SchemaSerializer extends JsonSerializer { + @Override + public void serialize(Schema schema, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + SchemaParser.toJson(schema, gen); + } + } + + public static class PartitionSpecSerializer extends JsonSerializer { + @Override + public void serialize(PartitionSpec partitionSpec, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PartitionSpecParser.toJson(partitionSpec, gen); + } + } + + public static class PartitionSpecDeserializer extends JsonDeserializer { + @Override + public PartitionSpec deserialize(JsonParser p, DeserializationContext context) + throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + + Schema schema = (Schema) context.getAttribute("schema"); + + return PartitionSpecParser.fromJson(schema, jsonNode); + } + } + + public static class SortOrderSerializer extends JsonSerializer { + @Override + public void serialize(SortOrder sortOrder, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + SortOrderParser.toJson(sortOrder, gen); + } + } + + public static class SortOrderDeserializer extends JsonDeserializer { + @Override + public SortOrder deserialize(JsonParser p, DeserializationContext context) + throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + Schema schema = (Schema) context.getAttribute("schema"); + return SortOrderParser.fromJson(schema, jsonNode); + } + } + + public static class TableMetadataSerializer extends JsonSerializer { + @Override + public void serialize(TableMetadata metadata, JsonGenerator gen, SerializerProvider serializers) { + try { + gen.writeRaw(TableMetadataParser.toJson(metadata)); + } catch (IOException e) { + // TODO: Wrap serializers with a delegate that rethrows checked IOException as UncheckeddIOException + // to avoid code repetition. + throw new UncheckedIOException("Failed to serialize TableMetadata", e); + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRestCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRestCatalog.java new file mode 100644 index 000000000000..f1651be36fac --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestRestCatalog.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.http.HttpClient; +import org.apache.iceberg.rest.http.RequestResponseSerializers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.mockito.Mockito; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.Header; + +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.model.StringBody.exact; + +public class TestRestCatalog { + + private static final String WAREHOUSE_PATH = "s3://bucket"; + private static final String CATALOG_NAME = "rest"; + private static final Map properties = ImmutableMap.of( + "baseUrl", "http://localhost:1080", + "type", "hadoop", + "io-impl", "org.apache.iceberg.hadoop.HadoopFileIO" + ); + + private HttpClient httpClient; + private ObjectMapper mapper; + private RestCatalog restCatalog; + private ClientAndServer mockServer; + + @Before + public void before() { + mockServer = startClientAndServer(1080); + ObjectMapper mapper = new ObjectMapper(); + RequestResponseSerializers.registerAll(mapper); + // this.httpClient = HttpClient + // .builder() + // .baseUrl("http://localhost:1080") + // .mapper(mapper) + // .build(); + // this.restCatalog = new RestCatalog(); + // restCatalog.initialize(CATALOG_NAME, properties, httpClient); + this.restCatalog = Mockito.mock(RestCatalog.class); + } + + @After + public void after() { + if (mockServer != null) { + if (mockServer.isRunning()) { + mockServer.stop(); + } + mockServer.close(); + } + } + + @Test + public void noOpTest() { + Assert.assertTrue(true); + } + + // TODO - Figure out how to get this to actually use req and resp. + // Right now this passes but I have to specify everything manually. + // + // This will be great for the OpenAPI spec but not as much for testing. + // Should just use a mock I guess? + @Test + public void testCreateNamespaceSpec() { + mockServer + .when( + request() + .withMethod("POST") + .withSocketAddress("http://localhost", 1080) + .withPath("/namespace") + .withHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .withBody(exact("{ namespace: 'prod.accounting', properties: { owner: 'hank' }")) + ).respond( + response() + .withStatusCode(200) + .withHeaders( + new Header("Content-Type", "application/json; charset=utf-8") + ) + .withBody("{ namespace: 'prod.accounting', properties: { owner: 'hank' } }") + ); + } + + @Test + public void createNamespaceWithMocks() throws JsonProcessingException { + CreateNamespaceRequest req = CreateNamespaceRequest + .builder() + .withNamespace(Namespace.of("prod", "accounting")) + .withProperties(ImmutableMap.of("owner", "hank")) + .build(); + + CreateNamespaceResponse expected = CreateNamespaceResponse + .builder() + .withNamespace(Namespace.of("prod", "accounting")) + .withProperties(ImmutableMap.of("owner", "hank")) + .build(); + + Mockito.doReturn(expected) + .when(restCatalog).createDatabase(req.getNamespace(), req.getProperties()); + CreateNamespaceResponse actual = restCatalog.createDatabase( + Namespace.of("prod", "accounting"), ImmutableMap.of("owner", "hank")); + + Assert.assertEquals("The response body of a create database request should be expected JSON", + expected, actual); + } +} diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java index e61d6ffb9e5e..be8a30167cc2 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java @@ -186,6 +186,10 @@ public long newSnapshotId() { static class LocalFileIO implements FileIO { + LocalFileIO() { + + } + @Override public InputFile newInputFile(String path) { return Files.localInput(path); diff --git a/versions.props b/versions.props index abe0f80c4abb..87b247e082af 100644 --- a/versions.props +++ b/versions.props @@ -4,6 +4,7 @@ org.apache.calcite:* = 1.10.0 org.apache.flink:* = 1.12.5 org.apache.hadoop:* = 2.7.3 org.apache.hive:* = 2.3.8 +org.apache.httpcomponents.client5:* = 5.1 org.apache.orc:* = 1.7.1 org.apache.parquet:* = 1.12.2 org.apache.spark:spark-hive_2.11 = 2.4.8 @@ -37,3 +38,6 @@ org.xerial:sqlite-jdbc = 3.34.0 com.fasterxml.jackson.dataformat:jackson-dataformat-xml = 2.9.9 org.springframework:* = 5.3.9 org.springframework.boot:* = 2.5.4 +org.mock-server:mockserver-netty = 5.11.1 +org.mock-server:mockserver-client-java = 5.11.1 +org.mock-server:mockserver-junit-jupiter = 5.11.1