Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
Expand Down Expand Up @@ -223,6 +224,21 @@ public static LoadTableResponse createTable(
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
}

public static LoadTableResponse registerTable(
Catalog catalog, Namespace namespace, RegisterTableRequest request) {
request.validate();

TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
Table table = catalog.registerTable(identifier, request.metadataLocation());
if (table instanceof BaseTable) {
return LoadTableResponse.builder()
.withTableMetadata(((BaseTable) table).operations().current())
.build();
}

throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
}

public static void dropTable(Catalog catalog, TableIdentifier ident) {
boolean dropped = catalog.dropTable(ident, false);
if (!dropped) {
Expand Down
28 changes: 27 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/RESTSerializers.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CommitTransactionRequestParser;
import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest;
import org.apache.iceberg.rest.requests.ImmutableReportMetricsRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequestParser;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequestParser;
import org.apache.iceberg.rest.requests.UpdateRequirementParser;
Expand Down Expand Up @@ -93,7 +96,12 @@ public static void registerAll(ObjectMapper mapper) {
.addSerializer(CommitTransactionRequest.class, new CommitTransactionRequestSerializer())
.addDeserializer(CommitTransactionRequest.class, new CommitTransactionRequestDeserializer())
.addSerializer(UpdateTableRequest.class, new UpdateTableRequestSerializer())
.addDeserializer(UpdateTableRequest.class, new UpdateTableRequestDeserializer());
.addDeserializer(UpdateTableRequest.class, new UpdateTableRequestDeserializer())
.addSerializer(RegisterTableRequest.class, new RegisterTableRequestSerializer<>())
.addDeserializer(RegisterTableRequest.class, new RegisterTableRequestDeserializer<>())
.addSerializer(ImmutableRegisterTableRequest.class, new RegisterTableRequestSerializer<>())
.addDeserializer(
ImmutableRegisterTableRequest.class, new RegisterTableRequestDeserializer<>());
mapper.registerModule(module);
}

Expand Down Expand Up @@ -353,4 +361,22 @@ public UpdateTableRequest deserialize(JsonParser p, DeserializationContext conte
return UpdateTableRequestParser.fromJson(jsonNode);
}
}

public static class RegisterTableRequestSerializer<T extends RegisterTableRequest>
extends JsonSerializer<T> {
@Override
public void serialize(T request, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
RegisterTableRequestParser.toJson(request, gen);
}
}

public static class RegisterTableRequestDeserializer<T extends RegisterTableRequest>
extends JsonDeserializer<T> {
@Override
public T deserialize(JsonParser p, DeserializationContext context) throws IOException {
JsonNode jsonNode = p.getCodec().readTree(p);
return (T) RegisterTableRequestParser.fromJson(jsonNode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
Expand Down Expand Up @@ -404,7 +406,40 @@ public void invalidateTable(SessionContext context, TableIdentifier ident) {}
@Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering why can't we remove this method and let it use the parent implementation.
Other catalogs do the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh.. This is SessionCatalog, not the Catalog. So, there is no parent implementation.
I think this logic can be moved to BaseSessionCatalog. So, it can help if there are some more implementations in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ajantha-bhat for the input. Yes, moving implementation logic makes sense to BaseSessionCatalog. I will update the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that all the method's implementation is in RESTSessionCatalog.java. IMO we should keep registerTable implementation in RESTSessionCatalog.java as well. WDYT?

public Table registerTable(
SessionContext context, TableIdentifier ident, String metadataFileLocation) {
throw new UnsupportedOperationException("Register table is not supported");
checkIdentifierIsValid(ident);

Preconditions.checkArgument(
metadataFileLocation != null && !metadataFileLocation.isEmpty(),
"Invalid metadata file location: %s",
metadataFileLocation);

RegisterTableRequest request =
ImmutableRegisterTableRequest.builder()
.name(ident.name())
.metadataLocation(metadataFileLocation)
.build();

LoadTableResponse response =
client.post(
paths.register(ident.namespace()),
request,
LoadTableResponse.class,
headers(context),
ErrorHandlers.tableErrorHandler());

AuthSession session = tableSession(response.config(), session(context));
RESTTableOperations ops =
new RESTTableOperations(
client,
paths.table(ident),
session::headers,
tableFileIO(context, response.config()),
response.tableMetadata());

trackFileIO(ops);

return new BaseTable(
ops, fullTableName(ident), metricsReporter(paths.metrics(ident), session::headers));
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public String table(TableIdentifier ident) {
RESTUtil.encodeString(ident.name()));
}

public String register(Namespace ns) {
return SLASH.join("v1", prefix, "namespaces", RESTUtil.encodeNamespace(ns), "register");
}

public String rename() {
return SLASH.join("v1", prefix, "tables", "rename");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import org.apache.iceberg.rest.RESTRequest;
import org.immutables.value.Value;

@Value.Immutable
public interface RegisterTableRequest extends RESTRequest {

String name();

String metadataLocation();

@Override
default void validate() {
// nothing to validate as it's not possible to create an invalid instance
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.requests;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.JsonUtil;

public class RegisterTableRequestParser {

private static final String NAME = "name";
private static final String METADATA_LOCATION = "metadata-location";

private RegisterTableRequestParser() {}

public static String toJson(RegisterTableRequest request) {
return toJson(request, false);
}

public static String toJson(RegisterTableRequest request, boolean pretty) {
return JsonUtil.generate(gen -> toJson(request, gen), pretty);
}

public static void toJson(RegisterTableRequest request, JsonGenerator gen) throws IOException {
Preconditions.checkArgument(null != request, "Invalid register table request: null");

gen.writeStartObject();

gen.writeStringField(NAME, request.name());
gen.writeStringField(METADATA_LOCATION, request.metadataLocation());

gen.writeEndObject();
}

public static RegisterTableRequest fromJson(String json) {
return JsonUtil.parse(json, RegisterTableRequestParser::fromJson);
}

public static RegisterTableRequest fromJson(JsonNode json) {
Preconditions.checkArgument(
null != json, "Cannot parse register table request from null object");

String name = JsonUtil.getString(NAME, json);
String metadataLocation = JsonUtil.getString(METADATA_LOCATION, json);

return ImmutableRegisterTableRequest.builder()
.name(name)
.metadataLocation(metadataLocation)
.build();
}
}
86 changes: 86 additions & 0 deletions core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
Expand Down Expand Up @@ -2615,6 +2616,91 @@ public void tableCreationWithoutNamespace() {
.hasMessageContaining("Namespace does not exist: non-existing");
}

@Test
public void testRegisterTable() {
C catalog = catalog();

if (requiresNamespaceCreate()) {
catalog.createNamespace(TABLE.namespace());
}

Map<String, String> properties =
ImmutableMap.of("user", "someone", "created-at", "2023-01-15T00:00:01");
Table originalTable =
catalog
.buildTable(TABLE, SCHEMA)
.withPartitionSpec(SPEC)
.withSortOrder(WRITE_ORDER)
.withProperties(properties)
.create();

originalTable.newFastAppend().appendFile(FILE_A).commit();
originalTable.newFastAppend().appendFile(FILE_B).commit();
originalTable.newDelete().deleteFile(FILE_A).commit();
originalTable.newFastAppend().appendFile(FILE_C).commit();

TableOperations ops = ((BaseTable) originalTable).operations();
String metadataLocation = ops.current().metadataFileLocation();

catalog.dropTable(TABLE, false /* do not purge */);

Table registeredTable = catalog.registerTable(TABLE, metadataLocation);

Assertions.assertThat(registeredTable).isNotNull();
Assertions.assertThat(catalog.tableExists(TABLE)).as("Table must exist").isTrue();
Assertions.assertThat(registeredTable.properties())
.as("Props must match")
.containsAllEntriesOf(properties);
Assertions.assertThat(registeredTable.schema().asStruct())
.as("Schema must match")
.isEqualTo(originalTable.schema().asStruct());
Assertions.assertThat(registeredTable.specs())
.as("Specs must match")
.isEqualTo(originalTable.specs());
Assertions.assertThat(registeredTable.sortOrders())
.as("Sort orders must match")
.isEqualTo(originalTable.sortOrders());
Assertions.assertThat(registeredTable.currentSnapshot())
.as("Current snapshot must match")
.isEqualTo(originalTable.currentSnapshot());
Assertions.assertThat(registeredTable.snapshots())
.as("Snapshots must match")
.isEqualTo(originalTable.snapshots());
Assertions.assertThat(registeredTable.history())
.as("History must match")
.isEqualTo(originalTable.history());

TestHelpers.assertSameSchemaMap(registeredTable.schemas(), originalTable.schemas());
assertFiles(registeredTable, FILE_B, FILE_C);

registeredTable.newFastAppend().appendFile(FILE_A).commit();
assertFiles(registeredTable, FILE_B, FILE_C, FILE_A);

Assertions.assertThat(catalog.loadTable(TABLE)).isNotNull();
Assertions.assertThat(catalog.dropTable(TABLE)).isTrue();
Assertions.assertThat(catalog.tableExists(TABLE)).isFalse();
}

@Test
public void testRegisterExistingTable() {
C catalog = catalog();

TableIdentifier identifier = TableIdentifier.of("a", "t1");

if (requiresNamespaceCreate()) {
catalog.createNamespace(identifier.namespace());
}

catalog.createTable(identifier, SCHEMA);
Table table = catalog.loadTable(identifier);
TableOperations ops = ((BaseTable) table).operations();
String metadataLocation = ops.current().metadataFileLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}

private static void assertEmpty(String context, Catalog catalog, Namespace ns) {
try {
Assertions.assertThat(catalog.listTables(ns)).as(context).isEmpty();
Expand Down
33 changes: 0 additions & 33 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
Expand Down Expand Up @@ -751,37 +749,6 @@ public void testConversions() {
assertThat(JdbcUtil.stringToNamespace(nsString)).isEqualTo(ns);
}

@Test
public void testRegisterTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
catalog.dropTable(identifier, false);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation();
Table registeredTable = catalog.registerTable(identifier, metadataLocation);
Assertions.assertThat(registeredTable).isNotNull();
TestHelpers.assertSerializedAndLoadedMetadata(registeringTable, registeredTable);
String expectedMetadataLocation =
((HasTableOperations) registeredTable).operations().current().metadataFileLocation();
Assertions.assertThat(metadataLocation).isEqualTo(expectedMetadataLocation);
Assertions.assertThat(catalog.loadTable(identifier)).isNotNull();
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}

@Test
public void testRegisterExistingTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}

@Test
public void testCatalogWithCustomMetricsReporter() throws IOException {
JdbcCatalog catalogWithCustomReporter =
Expand Down
Loading