diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java index 89a0ef58eff..60c0958c143 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java @@ -210,6 +210,8 @@ public Table loadTable(NameIdentifier ident) throws NoSuchTableException { .withName(tableEntity.name()) .withComment(tableEntity.getComment()) .build(); + } catch (NoSuchEntityException e) { + throw new NoSuchTableException(e, "Table %s does not exist", ident); } catch (IOException e) { throw new RuntimeException("Failed to list tables under schema " + ident.namespace(), e); } @@ -270,6 +272,34 @@ public Table createTable( .withAuditInfo(auditInfo) .build(); store.put(entityToStore); + + // Get the value of register in table properties + boolean register = + Boolean.parseBoolean( + properties.getOrDefault( + GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_REGISTER, "false")); + if (register) { + // Do not need to create the physical table if this is a registration operation. + // Whether we need to check the existence of the physical table? + GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder(); + return builder + .withName(ident.name()) + .withColumns(columns) + .withComment(comment) + .withProperties(properties) + .withDistribution(distribution) + .withIndexes(indexes) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentUserName()) + .withCreateTime(Instant.now()) + .build()) + .withPartitioning(partitions) + .withSortOrders(sortOrders) + .withFormat(LakehouseTableFormat.LANCE.lowerName()) + .build(); + } + LakehouseCatalogOperations lanceCatalogOperations = getLakehouseCatalogOperations(newProperties); return lanceCatalogOperations.createTable( @@ -324,7 +354,6 @@ private String calculateTableLocation( @Override public Table alterTable(NameIdentifier ident, TableChange... changes) throws NoSuchTableException, IllegalArgumentException { - Namespace namespace = ident.namespace(); try { TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, TableEntity.class); Map tableProperties = tableEntity.getProperties(); @@ -332,13 +361,12 @@ public Table alterTable(NameIdentifier ident, TableChange... changes) getLakehouseCatalogOperations(tableProperties); return lakehouseCatalogOperations.alterTable(ident, changes); } catch (IOException e) { - throw new RuntimeException("Failed to list tables under schema " + namespace, e); + throw new RuntimeException("Failed to alter table " + ident, e); } } @Override public boolean dropTable(NameIdentifier ident) { - Namespace namespace = ident.namespace(); try { TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, TableEntity.class); LakehouseCatalogOperations lakehouseCatalogOperations = @@ -348,7 +376,20 @@ public boolean dropTable(NameIdentifier ident) { LOG.warn("Table {} does not exist, skip dropping it.", ident); return false; } catch (IOException e) { - throw new RuntimeException("Failed to list tables under schema " + namespace, e); + throw new RuntimeException("Failed to drop table: " + ident, e); + } + } + + @Override + public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationException { + try { + // Only delete the metadata entry here. The physical data will not be deleted. + if (!tableExists(ident)) { + return false; + } + return store.delete(ident, Entity.EntityType.TABLE); + } catch (IOException e) { + throw new RuntimeException("Failed to purge table " + ident, e); } } diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java index f8ca11b0a01..72c1e5bc57c 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java @@ -18,6 +18,7 @@ */ package org.apache.gravitino.catalog.lakehouse; +import static org.apache.gravitino.connector.PropertyEntry.booleanPropertyEntry; import static org.apache.gravitino.connector.PropertyEntry.enumPropertyEntry; import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry; @@ -32,6 +33,7 @@ public class GenericLakehouseTablePropertiesMetadata extends BasePropertiesMetad public static final String LAKEHOUSE_LOCATION = "location"; public static final String LAKEHOUSE_FORMAT = "format"; public static final String LANCE_TABLE_STORAGE_OPTION_PREFIX = "lance.storage."; + public static final String LAKEHOUSE_REGISTER = "register"; private static final Map> PROPERTIES_METADATA; @@ -59,7 +61,15 @@ public class GenericLakehouseTablePropertiesMetadata extends BasePropertiesMetad false /* immutable */, null /* default value*/, false /* hidden */, - false /* reserved */)); + false /* reserved */), + booleanPropertyEntry( + LAKEHOUSE_REGISTER, + "Whether this is a table registration operation.", + false, + true /* immutable */, + false /* defaultValue */, + false /* hidden */, + false)); PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); } diff --git a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java index e549b806d82..6289bd34049 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java @@ -408,6 +408,10 @@ public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationExcep RuntimeException.class, UnsupportedOperationException.class); + if (isManagedTable(catalogIdent)) { + return droppedFromCatalog; + } + // For unmanaged table, it could happen that the table: // 1. Is not found in the catalog (dropped directly from underlying sources) // 2. Is found in the catalog but not in the store (not managed by Gravitino) diff --git a/lance/lance-common/build.gradle.kts b/lance/lance-common/build.gradle.kts index 6e18f3981fb..5be0eae4bf9 100644 --- a/lance/lance-common/build.gradle.kts +++ b/lance/lance-common/build.gradle.kts @@ -25,7 +25,8 @@ plugins { } dependencies { - implementation(project(":clients:client-java-runtime", configuration = "shadow")) + implementation(project(":clients:client-java")) + implementation(project(":api")) implementation(project(":common")) { exclude("*") } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java index b8a967cd30b..8a356fb1359 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java @@ -19,7 +19,9 @@ package org.apache.gravitino.lance.common.ops; import com.lancedb.lance.namespace.model.CreateTableResponse; +import com.lancedb.lance.namespace.model.DeregisterTableResponse; import com.lancedb.lance.namespace.model.DescribeTableResponse; +import com.lancedb.lance.namespace.model.RegisterTableResponse; import java.util.Map; public interface LanceTableOperations { @@ -32,6 +34,10 @@ CreateTableResponse createTable( String delimiter, String tableLocation, Map tableProperties, - String rootCatalog, byte[] arrowStreamBody); + + RegisterTableResponse registerTable( + String tableId, String mode, String delimiter, Map tableProperties); + + DeregisterTableResponse deregisterTable(String tableId, String delimiter); } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java index d3ddbb0edee..865313b31c1 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java @@ -35,6 +35,7 @@ import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum; import com.lancedb.lance.namespace.model.CreateNamespaceResponse; import com.lancedb.lance.namespace.model.CreateTableResponse; +import com.lancedb.lance.namespace.model.DeregisterTableResponse; import com.lancedb.lance.namespace.model.DescribeNamespaceResponse; import com.lancedb.lance.namespace.model.DescribeTableResponse; import com.lancedb.lance.namespace.model.DropNamespaceRequest; @@ -42,6 +43,8 @@ import com.lancedb.lance.namespace.model.JsonArrowSchema; import com.lancedb.lance.namespace.model.ListNamespacesResponse; import com.lancedb.lance.namespace.model.ListTablesResponse; +import com.lancedb.lance.namespace.model.RegisterTableRequest; +import com.lancedb.lance.namespace.model.RegisterTableResponse; import com.lancedb.lance.namespace.util.CommonUtil; import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter; import com.lancedb.lance.namespace.util.PageUtil; @@ -492,7 +495,7 @@ public ListTablesResponse listTables( String namespaceId, String delimiter, String pageToken, Integer limit) { ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, Pattern.quote(delimiter)); Preconditions.checkArgument( - nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s", nsId.levels()); + nsId.levels() == 2, "Expected 2-level namespace but got: %s", nsId.levels()); String catalogName = nsId.levelAtListPos(0); Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); String schemaName = nsId.levelAtListPos(1); @@ -516,7 +519,7 @@ public ListTablesResponse listTables( public DescribeTableResponse describeTable(String tableId, String delimiter) { ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); Preconditions.checkArgument( - nsId.levels() <= 3, "Expected at most 3-level namespace but got: %s", nsId.levels()); + nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); String catalogName = nsId.levelAtListPos(0); Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); @@ -538,17 +541,10 @@ public CreateTableResponse createTable( String delimiter, String tableLocation, Map tableProperties, - String rootCatalog, byte[] arrowStreamBody) { ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); Preconditions.checkArgument( - nsId.levels() <= 3, "Expected at most 3-level namespace but got: %s", nsId.levels()); - if (rootCatalog != null) { - List levels = nsId.listStyleId(); - List newLevels = Lists.newArrayList(rootCatalog); - newLevels.addAll(levels); - nsId = ObjectIdentifier.of(newLevels); - } + nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); // Parser column information. List columns = Lists.newArrayList(); @@ -614,6 +610,68 @@ public CreateTableResponse createTable( return response; } + @Override + public RegisterTableResponse registerTable( + String tableId, String mode, String delimiter, Map tableProperties) { + ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); + Preconditions.checkArgument( + nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); + + String catalogName = nsId.levelAtListPos(0); + Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); + NameIdentifier tableIdentifier = + NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); + + // TODO Support real register API + RegisterTableRequest.ModeEnum createMode = + RegisterTableRequest.ModeEnum.fromValue(mode.toUpperCase()); + if (createMode == RegisterTableRequest.ModeEnum.CREATE + && catalog.asTableCatalog().tableExists(tableIdentifier)) { + throw LanceNamespaceException.conflict( + "Table already exists: " + tableId, + SchemaAlreadyExistsException.class.getSimpleName(), + tableId, + CommonUtil.formatCurrentStackTrace()); + } + + if (createMode == RegisterTableRequest.ModeEnum.OVERWRITE + && catalog.asTableCatalog().tableExists(tableIdentifier)) { + LOG.info("Overwriting existing table: {}", tableId); + catalog.asTableCatalog().dropTable(tableIdentifier); + } + + Table t = + catalog.asTableCatalog().createTable(tableIdentifier, new Column[] {}, "", tableProperties); + + RegisterTableResponse response = new RegisterTableResponse(); + response.setProperties(t.properties()); + response.setLocation(t.properties().get("location")); + return response; + } + + @Override + public DeregisterTableResponse deregisterTable(String tableId, String delimiter) { + + ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); + Preconditions.checkArgument( + nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); + + String catalogName = nsId.levelAtListPos(0); + Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); + + NameIdentifier tableIdentifier = + NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); + Table t = catalog.asTableCatalog().loadTable(tableIdentifier); + Map properties = t.properties(); + // TODO Support real deregister API. + catalog.asTableCatalog().purgeTable(tableIdentifier); + + DeregisterTableResponse response = new DeregisterTableResponse(); + response.setProperties(properties); + response.setLocation(properties.get("location")); + return response; + } + private JsonArrowSchema toJsonArrowSchema(Column[] columns) { List fields = Arrays.stream(columns) @@ -627,7 +685,6 @@ private JsonArrowSchema toJsonArrowSchema(Column[] columns) { @VisibleForTesting org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) { org.apache.arrow.vector.types.pojo.Schema schema; - try (BufferAllocator allocator = new RootAllocator(); ByteArrayInputStream bais = new ByteArrayInputStream(stream); ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) { diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java index 8c800e49d64..afab3ab2a7e 100644 --- a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java @@ -127,6 +127,7 @@ private NamespaceWrapper loadNamespaceImpl(LanceConfig lanceConfig) { try { Constructor constructor = lanceNamespaceBackend.getWrapperClass().getConstructor(LanceConfig.class); + return constructor.newInstance(lanceConfig); } catch (Exception e) { LOG.error("Error loading namespace implementation for backend type: {}", backendType, e); diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java new file mode 100644 index 00000000000..f39ea2e684f --- /dev/null +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lance.service; + +public class ServiceConstants { + public static final String LANCE_HTTP_HEADER_PREFIX = "x-lance-"; + + public static final String LANCE_TABLE_LOCATION_HEADER = + LANCE_HTTP_HEADER_PREFIX + "table-location"; + public static final String LANCE_TABLE_PROPERTIES_PREFIX_HEADER = + LANCE_HTTP_HEADER_PREFIX + "table-properties"; +} diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java index 690cb8759e2..5590eef9bdc 100644 --- a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java @@ -19,24 +19,33 @@ package org.apache.gravitino.lance.service.rest; import static org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_DELIMITER_DEFAULT; +import static org.apache.gravitino.lance.service.ServiceConstants.LANCE_TABLE_LOCATION_HEADER; +import static org.apache.gravitino.lance.service.ServiceConstants.LANCE_TABLE_PROPERTIES_PREFIX_HEADER; import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Maps; import com.lancedb.lance.namespace.model.CreateTableResponse; +import com.lancedb.lance.namespace.model.DeregisterTableRequest; +import com.lancedb.lance.namespace.model.DeregisterTableResponse; import com.lancedb.lance.namespace.model.DescribeTableResponse; +import com.lancedb.lance.namespace.model.RegisterTableRequest; +import com.lancedb.lance.namespace.model.RegisterTableResponse; import com.lancedb.lance.namespace.util.JsonUtil; import java.util.Map; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; -import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.lance.common.ops.NamespaceWrapper; @@ -81,18 +90,20 @@ public Response createTable( @PathParam("id") String tableId, @QueryParam("mode") @DefaultValue("create") String mode, // create, exist_ok, overwrite @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) String delimiter, - @HeaderParam("x-lance-table-location") String tableLocation, - @HeaderParam("x-lance-table-properties") String tableProperties, - @HeaderParam("x-lance-root-catalog") String rootCatalog, + @Context HttpHeaders headers, byte[] arrowStreamBody) { try { + // Extract table properties from header + MultivaluedMap headersMap = headers.getRequestHeaders(); + String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER); + String tableProperties = headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER); + Map props = JsonUtil.mapper().readValue(tableProperties, new TypeReference<>() {}); CreateTableResponse response = lanceNamespace .asTableOps() - .createTable( - tableId, mode, delimiter, tableLocation, props, rootCatalog, arrowStreamBody); + .createTable(tableId, mode, delimiter, tableLocation, props, arrowStreamBody); return Response.ok(response).build(); } catch (Exception e) { return LanceExceptionMapper.toRESTResponse(tableId, e); @@ -107,10 +118,13 @@ public Response createEmptyTable( @PathParam("id") String tableId, @QueryParam("mode") @DefaultValue("create") String mode, // create, exist_ok, overwrite @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) String delimiter, - @HeaderParam("x-lance-table-location") String tableLocation, - @HeaderParam("x-lance-root-catalog") String rootCatalog, - @HeaderParam("x-lance-table-properties") String tableProperties) { + @Context HttpHeaders headers) { try { + // Extract table properties from header + MultivaluedMap headersMap = headers.getRequestHeaders(); + String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER); + String tableProperties = headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER); + Map props = StringUtils.isBlank(tableProperties) ? Map.of() @@ -118,7 +132,52 @@ public Response createEmptyTable( CreateTableResponse response = lanceNamespace .asTableOps() - .createTable(tableId, mode, delimiter, tableLocation, props, rootCatalog, null); + .createTable(tableId, mode, delimiter, tableLocation, props, null); + return Response.ok(response).build(); + } catch (Exception e) { + return LanceExceptionMapper.toRESTResponse(tableId, e); + } + } + + @POST + @Path("/register") + @Timed(name = "register-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) + @ResponseMetered(name = "register-table", absolute = true) + public Response registerTable( + @PathParam("id") String tableId, + @QueryParam("mode") @DefaultValue("create") String mode, // overwrite or + @QueryParam("delimiter") @DefaultValue("$") String delimiter, + @Context HttpHeaders headers, + RegisterTableRequest registerTableRequest) { + try { + Map props = + registerTableRequest.getProperties() == null + ? Maps.newHashMap() + : Maps.newHashMap(registerTableRequest.getProperties()); + props.put("register", "true"); + props.put("location", registerTableRequest.getLocation()); + props.put("format", "lance"); + + RegisterTableResponse response = + lanceNamespace.asTableOps().registerTable(tableId, mode, delimiter, props); + return Response.ok(response).build(); + } catch (Exception e) { + return LanceExceptionMapper.toRESTResponse(tableId, e); + } + } + + @POST + @Path("/deregister") + @Timed(name = "deregister-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) + @ResponseMetered(name = "deregister-table", absolute = true) + public Response deregisterTable( + @PathParam("id") String tableId, + @QueryParam("delimiter") @DefaultValue("$") String delimiter, + @Context HttpHeaders headers, + DeregisterTableRequest deregisterTableRequest) { + try { + DeregisterTableResponse response = + lanceNamespace.asTableOps().deregisterTable(tableId, delimiter); return Response.ok(response).build(); } catch (Exception e) { return LanceExceptionMapper.toRESTResponse(tableId, e);