diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java index 60c0958c143..3860b52d57d 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; import org.apache.gravitino.EntityStore; import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; @@ -304,6 +305,8 @@ public Table createTable( getLakehouseCatalogOperations(newProperties); return lanceCatalogOperations.createTable( ident, columns, comment, newProperties, partitions, distribution, sortOrders, indexes); + } catch (EntityAlreadyExistsException e) { + throw new TableAlreadyExistsException(e, "Table %s already exists", ident); } catch (IOException e) { throw new RuntimeException("Failed to create table " + ident, e); } @@ -366,22 +369,22 @@ public Table alterTable(NameIdentifier ident, TableChange... changes) } @Override - public boolean dropTable(NameIdentifier ident) { + public boolean purgeTable(NameIdentifier ident) { try { TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, TableEntity.class); LakehouseCatalogOperations lakehouseCatalogOperations = getLakehouseCatalogOperations(tableEntity.getProperties()); - return lakehouseCatalogOperations.dropTable(ident); + return lakehouseCatalogOperations.purgeTable(ident); } catch (NoSuchTableException e) { - LOG.warn("Table {} does not exist, skip dropping it.", ident); + LOG.warn("Table {} does not exist, skip purging it.", ident); return false; } catch (IOException e) { - throw new RuntimeException("Failed to drop table: " + ident, e); + throw new RuntimeException("Failed to purge table: " + ident, e); } } @Override - public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationException { + public boolean dropTable(NameIdentifier ident) throws UnsupportedOperationException { try { // Only delete the metadata entry here. The physical data will not be deleted. if (!tableExists(ident)) { @@ -389,7 +392,7 @@ public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationExcep } return store.delete(ident, Entity.EntityType.TABLE); } catch (IOException e) { - throw new RuntimeException("Failed to purge table " + ident, e); + throw new RuntimeException("Failed to drop table " + ident, e); } } diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java index 0ed83457a48..16bef5565fc 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java @@ -20,9 +20,7 @@ package org.apache.gravitino.catalog.lakehouse.lance; import static org.apache.gravitino.Entity.EntityType.TABLE; -import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.lancedb.lance.Dataset; import com.lancedb.lance.WriteParams; @@ -58,6 +56,7 @@ import org.apache.gravitino.exceptions.NoSuchTableException; import org.apache.gravitino.exceptions.TableAlreadyExistsException; import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter; +import org.apache.gravitino.lance.common.utils.LancePropertiesUtils; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.meta.TableEntity; import org.apache.gravitino.rel.Column; @@ -120,13 +119,7 @@ public Table createTable( throws NoSuchSchemaException, TableAlreadyExistsException { // Ignore partitions, distributions, sortOrders, and indexes for Lance tables; String location = properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION); - Map storageProps = - properties.entrySet().stream() - .filter(e -> e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX)) - .collect( - Collectors.toMap( - e -> e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()), - Map.Entry::getValue)); + Map storageProps = LancePropertiesUtils.getLanceStorageOptions(properties); try (Dataset ignored = Dataset.create( @@ -280,7 +273,7 @@ private IndexParams getIndexParamsByIndexType(IndexType indexType) { } @Override - public boolean dropTable(NameIdentifier ident) { + public boolean purgeTable(NameIdentifier ident) { try { TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, TableEntity.class); Map lancePropertiesMap = tableEntity.getProperties(); @@ -288,14 +281,24 @@ public boolean dropTable(NameIdentifier ident) { lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION); if (!store.delete(ident, Entity.EntityType.TABLE)) { - throw new RuntimeException("Failed to drop Lance table: " + ident.name()); + throw new RuntimeException("Failed to purge Lance table: " + ident.name()); } // Drop the Lance dataset from cloud storage. - Dataset.drop(location, ImmutableMap.of()); + Dataset.drop(location, LancePropertiesUtils.getLanceStorageOptions(lancePropertiesMap)); return true; } catch (IOException e) { - throw new RuntimeException("Failed to drop Lance table: " + ident.name(), e); + throw new RuntimeException("Failed to purge Lance table: " + ident.name(), e); } } + + @Override + public boolean dropTable(NameIdentifier ident) { + // TODO We will handle it in GenericLakehouseCatalogOperations. However, we need + // to figure out it's a external table or not first. we will introduce a property + // to indicate that. Currently, all Lance tables are external tables. `drop` will + // just remove the metadata in metastore and will not delete data in storage. + throw new UnsupportedOperationException( + "LanceCatalogOperations does not support dropTable operation."); + } } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java index 8a356fb1359..7f2f1df52ff 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java @@ -18,26 +18,80 @@ */ package org.apache.gravitino.lance.common.ops; +import com.lancedb.lance.namespace.model.CreateEmptyTableResponse; +import com.lancedb.lance.namespace.model.CreateTableRequest; import com.lancedb.lance.namespace.model.CreateTableResponse; import com.lancedb.lance.namespace.model.DeregisterTableResponse; import com.lancedb.lance.namespace.model.DescribeTableResponse; +import com.lancedb.lance.namespace.model.RegisterTableRequest; import com.lancedb.lance.namespace.model.RegisterTableResponse; import java.util.Map; +import java.util.Optional; public interface LanceTableOperations { - DescribeTableResponse describeTable(String tableId, String delimiter); + /** + * Describe the details of a table. + * + * @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}" + * @param delimiter the delimiter used in the namespace + * @param version the version of the table to describe, if null, describe the latest version + * @return the table description + */ + DescribeTableResponse describeTable(String tableId, String delimiter, Optional version); + /** + * Create a new table. + * + * @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}" + * @param mode it can be CREATE, OVERWRITE, or EXIST_OK + * @param delimiter the delimiter used in the namespace + * @param tableLocation the location where the table data will be stored + * @param tableProperties the properties of the table + * @param arrowStreamBody the arrow stream bytes containing the schema and data + * @return the response of the create table operation + */ CreateTableResponse createTable( String tableId, - String mode, + CreateTableRequest.ModeEnum mode, String delimiter, String tableLocation, Map tableProperties, byte[] arrowStreamBody); + /** + * Create an new table without schema. + * + * @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}" + * @param delimiter the delimiter used in the namespace + * @param tableLocation the location where the table data will be stored + * @param tableProperties the properties of the table + * @return the response of the create table operation + */ + CreateEmptyTableResponse createEmptyTable( + String tableId, String delimiter, String tableLocation, Map tableProperties); + + /** + * Register an existing table. + * + * @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}" + * @param mode it can be REGISTER or OVERWRITE. + * @param delimiter the delimiter used in the namespace + * @param tableProperties the properties of the table, it should contain the table location + * @return the response of the register table operation + */ RegisterTableResponse registerTable( - String tableId, String mode, String delimiter, Map tableProperties); + String tableId, + RegisterTableRequest.ModeEnum mode, + String delimiter, + Map tableProperties); + /** + * Deregister a table. It will not delete the underlying lance data. + * + * @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}" + * @param delimiter the delimiter used in the namespace + * @return the response of the deregister table operation + */ DeregisterTableResponse deregisterTable(String tableId, String delimiter); } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java new file mode 100644 index 00000000000..961134ec667 --- /dev/null +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lance.common.ops.gravitino; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.lancedb.lance.namespace.LanceNamespaceException; +import com.lancedb.lance.namespace.ObjectIdentifier; +import com.lancedb.lance.namespace.model.CreateNamespaceRequest; +import com.lancedb.lance.namespace.model.CreateNamespaceResponse; +import com.lancedb.lance.namespace.model.DescribeNamespaceResponse; +import com.lancedb.lance.namespace.model.DropNamespaceRequest; +import com.lancedb.lance.namespace.model.DropNamespaceResponse; +import com.lancedb.lance.namespace.model.ListNamespacesResponse; +import com.lancedb.lance.namespace.model.ListTablesResponse; +import com.lancedb.lance.namespace.util.CommonUtil; +import com.lancedb.lance.namespace.util.PageUtil; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.CatalogChange; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.SchemaChange; +import org.apache.gravitino.client.GravitinoClient; +import org.apache.gravitino.exceptions.CatalogAlreadyExistsException; +import org.apache.gravitino.exceptions.CatalogInUseException; +import org.apache.gravitino.exceptions.NoSuchCatalogException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NonEmptyCatalogException; +import org.apache.gravitino.exceptions.NonEmptySchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations; + +public class GravitinoLanceNameSpaceOperations implements LanceNamespaceOperations { + + private final GravitinoLanceNamespaceWrapper namespaceWrapper; + private final GravitinoClient client; + + public GravitinoLanceNameSpaceOperations(GravitinoLanceNamespaceWrapper namespaceWrapper) { + this.namespaceWrapper = namespaceWrapper; + this.client = namespaceWrapper.getClient(); + } + + @Override + public ListNamespacesResponse listNamespaces( + String namespaceId, String delimiter, String pageToken, Integer limit) { + ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); + Preconditions.checkArgument( + nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s", namespaceId); + + List namespaces; + switch (nsId.levels()) { + case 0: + namespaces = + Arrays.stream(client.listCatalogsInfo()) + .filter(namespaceWrapper::isLakehouseCatalog) + .map(Catalog::name) + .collect(Collectors.toList()); + break; + + case 1: + Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0)); + namespaces = Lists.newArrayList(catalog.asSchemas().listSchemas()); + break; + + case 2: + namespaces = Lists.newArrayList(); + break; + + default: + throw new IllegalArgumentException( + "Expected at most 2-level namespace but got: " + namespaceId); + } + + Collections.sort(namespaces); + PageUtil.Page page = + PageUtil.splitPage(namespaces, pageToken, PageUtil.normalizePageSize(limit)); + ListNamespacesResponse response = new ListNamespacesResponse(); + response.setNamespaces(Sets.newHashSet(page.items())); + response.setPageToken(page.nextPageToken()); + return response; + } + + @Override + public DescribeNamespaceResponse describeNamespace(String namespaceId, String delimiter) { + ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); + Preconditions.checkArgument( + nsId.levels() <= 2 && nsId.levels() > 0, + "Expected at most 2-level and at least 1-level namespace but got: %s", + namespaceId); + + Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0)); + Map properties = Maps.newHashMap(); + + switch (nsId.levels()) { + case 1: + Optional.ofNullable(catalog.properties()).ifPresent(properties::putAll); + break; + case 2: + String schemaName = nsId.levelAtListPos(1); + Schema schema = catalog.asSchemas().loadSchema(schemaName); + Optional.ofNullable(schema.properties()).ifPresent(properties::putAll); + break; + default: + throw new IllegalArgumentException( + "Expected at most 2-level and at least 1-level namespace but got: " + namespaceId); + } + + DescribeNamespaceResponse response = new DescribeNamespaceResponse(); + response.setProperties(properties); + return response; + } + + @Override + public CreateNamespaceResponse createNamespace( + String namespaceId, + String delimiter, + CreateNamespaceRequest.ModeEnum mode, + Map properties) { + ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); + Preconditions.checkArgument( + nsId.levels() <= 2 && nsId.levels() > 0, + "Expected at most 2-level and at least 1-level namespace but got: %s", + namespaceId); + + switch (nsId.levels()) { + case 1: + return createOrUpdateCatalog(nsId.levelAtListPos(0), mode, properties); + case 2: + return createOrUpdateSchema( + nsId.levelAtListPos(0), nsId.levelAtListPos(1), mode, properties); + default: + throw new IllegalArgumentException( + "Expected at most 2-level and at least 1-level namespace but got: " + namespaceId); + } + } + + @Override + public DropNamespaceResponse dropNamespace( + String namespaceId, + String delimiter, + DropNamespaceRequest.ModeEnum mode, + DropNamespaceRequest.BehaviorEnum behavior) { + ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); + Preconditions.checkArgument( + nsId.levels() <= 2 && nsId.levels() > 0, + "Expected at most 2-level and at least 1-level namespace but got: %s", + namespaceId); + + switch (nsId.levels()) { + case 1: + return dropCatalog(nsId.levelAtListPos(0), mode, behavior); + case 2: + return dropSchema(nsId.levelAtListPos(0), nsId.levelAtListPos(1), mode, behavior); + default: + throw new IllegalArgumentException( + "Expected at most 2-level and at least 1-level namespace but got: " + namespaceId); + } + } + + @Override + public void namespaceExists(String namespaceId, String delimiter) throws LanceNamespaceException { + ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); + Preconditions.checkArgument( + nsId.levels() <= 2 && nsId.levels() > 0, + "Expected at most 2-level and at least 1-level namespace but got: %s", + namespaceId); + + Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0)); + if (nsId.levels() == 2) { + String schemaName = nsId.levelAtListPos(1); + if (!catalog.asSchemas().schemaExists(schemaName)) { + throw LanceNamespaceException.notFound( + "Schema not found: " + schemaName, + NoSuchSchemaException.class.getSimpleName(), + schemaName, + CommonUtil.formatCurrentStackTrace()); + } + } + } + + private CreateNamespaceResponse createOrUpdateCatalog( + String catalogName, CreateNamespaceRequest.ModeEnum mode, Map properties) { + CreateNamespaceResponse response = new CreateNamespaceResponse(); + + Catalog catalog; + try { + catalog = client.loadCatalog(catalogName); + } catch (NoSuchCatalogException e) { + // Catalog does not exist, create it + Catalog createdCatalog = + client.createCatalog( + catalogName, + Catalog.Type.RELATIONAL, + "generic-lakehouse", + "created by Lance REST server", + properties); + response.setProperties( + createdCatalog.properties() == null ? Maps.newHashMap() : createdCatalog.properties()); + return response; + } + + // Catalog exists, validate type + if (!namespaceWrapper.isLakehouseCatalog(catalog)) { + throw LanceNamespaceException.conflict( + "Catalog already exists but is not a lakehouse catalog: " + catalogName, + CatalogAlreadyExistsException.class.getSimpleName(), + catalogName, + CommonUtil.formatCurrentStackTrace()); + } + + // Catalog exists, handle based on mode + switch (mode) { + case EXIST_OK: + response.setProperties( + Optional.ofNullable(catalog.properties()).orElse(Collections.emptyMap())); + return response; + case CREATE: + throw LanceNamespaceException.conflict( + "Catalog already exists: " + catalogName, + CatalogAlreadyExistsException.class.getSimpleName(), + catalogName, + CommonUtil.formatCurrentStackTrace()); + case OVERWRITE: + CatalogChange[] changes = + buildChanges( + properties, + removeInUseProperty(catalog.properties()), + CatalogChange::setProperty, + CatalogChange::removeProperty, + CatalogChange[]::new); + Catalog alteredCatalog = client.alterCatalog(catalogName, changes); + Optional.ofNullable(alteredCatalog.properties()).ifPresent(response::setProperties); + return response; + default: + throw new IllegalArgumentException("Unknown mode: " + mode); + } + } + + private Map removeInUseProperty(Map properties) { + return properties.entrySet().stream() + .filter(e -> !e.getKey().equalsIgnoreCase(Catalog.PROPERTY_IN_USE)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private CreateNamespaceResponse createOrUpdateSchema( + String catalogName, + String schemaName, + CreateNamespaceRequest.ModeEnum mode, + Map properties) { + CreateNamespaceResponse response = new CreateNamespaceResponse(); + Catalog loadedCatalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName); + + Schema schema; + try { + schema = loadedCatalog.asSchemas().loadSchema(schemaName); + } catch (NoSuchSchemaException e) { + // Schema does not exist, create it + Schema createdSchema = loadedCatalog.asSchemas().createSchema(schemaName, null, properties); + response.setProperties( + createdSchema.properties() == null ? Maps.newHashMap() : createdSchema.properties()); + return response; + } + + // Schema exists, handle based on mode + switch (mode) { + case EXIST_OK: + response.setProperties( + Optional.ofNullable(schema.properties()).orElse(Collections.emptyMap())); + return response; + case CREATE: + throw LanceNamespaceException.conflict( + "Schema already exists: " + schemaName, + SchemaAlreadyExistsException.class.getSimpleName(), + schemaName, + CommonUtil.formatCurrentStackTrace()); + case OVERWRITE: + SchemaChange[] changes = + buildChanges( + properties, + schema.properties(), + SchemaChange::setProperty, + SchemaChange::removeProperty, + SchemaChange[]::new); + Schema alteredSchema = loadedCatalog.asSchemas().alterSchema(schemaName, changes); + Optional.ofNullable(alteredSchema.properties()).ifPresent(response::setProperties); + return response; + default: + throw new IllegalArgumentException("Unknown mode: " + mode); + } + } + + private DropNamespaceResponse dropCatalog( + String catalogName, + DropNamespaceRequest.ModeEnum mode, + DropNamespaceRequest.BehaviorEnum behavior) { + try { + boolean dropped = + client.dropCatalog(catalogName, behavior == DropNamespaceRequest.BehaviorEnum.CASCADE); + if (dropped) { + return new DropNamespaceResponse(); + } else { + // Catalog did not exist + if (mode == DropNamespaceRequest.ModeEnum.FAIL) { + throw LanceNamespaceException.notFound( + "Catalog not found: " + catalogName, + NoSuchCatalogException.class.getSimpleName(), + catalogName, + CommonUtil.formatCurrentStackTrace()); + } + return new DropNamespaceResponse(); // SKIP mode + } + } catch (NonEmptyCatalogException | CatalogInUseException e) { + throw LanceNamespaceException.badRequest( + String.format("Catalog %s is not empty or in used", catalogName), + NonEmptyCatalogException.class.getSimpleName(), + catalogName, + CommonUtil.formatCurrentStackTrace()); + } + } + + private DropNamespaceResponse dropSchema( + String catalogName, + String schemaName, + DropNamespaceRequest.ModeEnum mode, + DropNamespaceRequest.BehaviorEnum behavior) { + try { + boolean dropped = + client + .loadCatalog(catalogName) + .asSchemas() + .dropSchema(schemaName, behavior == DropNamespaceRequest.BehaviorEnum.CASCADE); + if (dropped) { + return new DropNamespaceResponse(); + } else { + // Schema did not exist + if (mode == DropNamespaceRequest.ModeEnum.FAIL) { + throw LanceNamespaceException.notFound( + "Schema not found: " + schemaName, + NoSuchSchemaException.class.getSimpleName(), + schemaName, + CommonUtil.formatCurrentStackTrace()); + } + return new DropNamespaceResponse(); // SKIP mode + } + } catch (NoSuchCatalogException e) { + throw LanceNamespaceException.notFound( + "Catalog not found: " + catalogName, + NoSuchCatalogException.class.getSimpleName(), + catalogName, + CommonUtil.formatCurrentStackTrace()); + } catch (NonEmptySchemaException e) { + throw LanceNamespaceException.badRequest( + String.format("Schema %s is not empty.", schemaName), + NonEmptySchemaException.class.getSimpleName(), + schemaName, + CommonUtil.formatCurrentStackTrace()); + } + } + + private T[] buildChanges( + Map newProps, + Map oldProps, + BiFunction setPropertyFunc, + Function removePropertyFunc, + IntFunction arrayCreator) { + Stream setPropertiesStream = + newProps.entrySet().stream() + .map(entry -> setPropertyFunc.apply(entry.getKey(), entry.getValue())); + + Stream removePropertiesStream = + oldProps == null + ? Stream.empty() + : oldProps.keySet().stream() + .filter(key -> !newProps.containsKey(key)) + .map(removePropertyFunc); + + return Stream.concat(setPropertiesStream, removePropertiesStream).toArray(arrayCreator); + } + + @Override + public ListTablesResponse listTables( + String namespaceId, String delimiter, String pageToken, Integer limit) { + ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, Pattern.quote(delimiter)); + Preconditions.checkArgument( + nsId.levels() == 2, "Expected 2-level namespace but got: %s", nsId.levels()); + String catalogName = nsId.levelAtListPos(0); + Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName); + String schemaName = nsId.levelAtListPos(1); + List tables = + Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName))) + .map(ident -> Joiner.on(delimiter).join(catalogName, schemaName, ident.name())) + .sorted() + .collect(Collectors.toList()); + + PageUtil.Page page = PageUtil.splitPage(tables, pageToken, PageUtil.normalizePageSize(limit)); + ListNamespacesResponse response = new ListNamespacesResponse(); + response.setNamespaces(Sets.newHashSet(page.items())); + response.setPageToken(page.nextPageToken()); + + return new ListTablesResponse() + .tables(response.getNamespaces()) + .pageToken(response.getPageToken()); + } +} diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java index dd2c4629b4f..a1fee0a73e4 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java @@ -20,81 +20,30 @@ import static org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME; import static org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_BACKEND_URI; -import static org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER; -import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.lancedb.lance.namespace.LanceNamespaceException; -import com.lancedb.lance.namespace.ObjectIdentifier; -import com.lancedb.lance.namespace.model.CreateNamespaceRequest; -import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum; -import com.lancedb.lance.namespace.model.CreateNamespaceResponse; -import com.lancedb.lance.namespace.model.CreateTableResponse; -import com.lancedb.lance.namespace.model.DeregisterTableResponse; -import com.lancedb.lance.namespace.model.DescribeNamespaceResponse; -import com.lancedb.lance.namespace.model.DescribeTableResponse; -import com.lancedb.lance.namespace.model.DropNamespaceRequest; -import com.lancedb.lance.namespace.model.DropNamespaceResponse; -import com.lancedb.lance.namespace.model.JsonArrowSchema; -import com.lancedb.lance.namespace.model.ListNamespacesResponse; -import com.lancedb.lance.namespace.model.ListTablesResponse; -import com.lancedb.lance.namespace.model.RegisterTableRequest; -import com.lancedb.lance.namespace.model.RegisterTableResponse; import com.lancedb.lance.namespace.util.CommonUtil; -import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter; -import com.lancedb.lance.namespace.util.PageUtil; -import java.io.ByteArrayInputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.IntFunction; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.arrow.vector.types.pojo.Field; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; -import org.apache.gravitino.CatalogChange; -import org.apache.gravitino.NameIdentifier; -import org.apache.gravitino.Namespace; -import org.apache.gravitino.Schema; -import org.apache.gravitino.SchemaChange; import org.apache.gravitino.client.GravitinoClient; -import org.apache.gravitino.exceptions.CatalogAlreadyExistsException; -import org.apache.gravitino.exceptions.CatalogInUseException; import org.apache.gravitino.exceptions.NoSuchCatalogException; -import org.apache.gravitino.exceptions.NoSuchSchemaException; -import org.apache.gravitino.exceptions.NonEmptyCatalogException; -import org.apache.gravitino.exceptions.NonEmptySchemaException; -import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; import org.apache.gravitino.lance.common.config.LanceConfig; import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations; import org.apache.gravitino.lance.common.ops.LanceTableOperations; import org.apache.gravitino.lance.common.ops.NamespaceWrapper; -import org.apache.gravitino.rel.Column; -import org.apache.gravitino.rel.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper - implements LanceNamespaceOperations, LanceTableOperations { +public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper { private static final Logger LOG = LoggerFactory.getLogger(GravitinoLanceNamespaceWrapper.class); private GravitinoClient client; + private LanceNamespaceOperations namespaceOperations; + private LanceTableOperations tableOperations; + @VisibleForTesting GravitinoLanceNamespaceWrapper() { super(null); @@ -104,6 +53,10 @@ public GravitinoLanceNamespaceWrapper(LanceConfig config) { super(config); } + public GravitinoClient getClient() { + return client; + } + @Override protected void initialize() { String uri = config().get(NAMESPACE_BACKEND_URI); @@ -113,16 +66,18 @@ protected void initialize() { "Metalake name must be provided for Lance Gravitino namespace backend"); this.client = GravitinoClient.builder(uri).withMetalake(metalakeName).build(); + this.namespaceOperations = new GravitinoLanceNameSpaceOperations(this); + this.tableOperations = new GravitinoLanceTableOperations(this); } @Override public LanceNamespaceOperations newNamespaceOps() { - return this; + return namespaceOperations; } @Override protected LanceTableOperations newTableOps() { - return this; + return tableOperations; } @Override @@ -136,150 +91,12 @@ public void close() { } } - @Override - public ListNamespacesResponse listNamespaces( - String namespaceId, String delimiter, String pageToken, Integer limit) { - ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); - Preconditions.checkArgument( - nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s", namespaceId); - - List namespaces; - switch (nsId.levels()) { - case 0: - namespaces = - Arrays.stream(client.listCatalogsInfo()) - .filter(this::isLakehouseCatalog) - .map(Catalog::name) - .collect(Collectors.toList()); - break; - - case 1: - Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0)); - namespaces = Lists.newArrayList(catalog.asSchemas().listSchemas()); - break; - - case 2: - namespaces = Lists.newArrayList(); - break; - - default: - throw new IllegalArgumentException( - "Expected at most 2-level namespace but got: " + namespaceId); - } - - Collections.sort(namespaces); - PageUtil.Page page = - PageUtil.splitPage(namespaces, pageToken, PageUtil.normalizePageSize(limit)); - ListNamespacesResponse response = new ListNamespacesResponse(); - response.setNamespaces(Sets.newHashSet(page.items())); - response.setPageToken(page.nextPageToken()); - return response; - } - - @Override - public DescribeNamespaceResponse describeNamespace(String namespaceId, String delimiter) { - ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); - Preconditions.checkArgument( - nsId.levels() <= 2 && nsId.levels() > 0, - "Expected at most 2-level and at least 1-level namespace but got: %s", - namespaceId); - - Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0)); - Map properties = Maps.newHashMap(); - - switch (nsId.levels()) { - case 1: - Optional.ofNullable(catalog.properties()).ifPresent(properties::putAll); - break; - case 2: - String schemaName = nsId.levelAtListPos(1); - Schema schema = catalog.asSchemas().loadSchema(schemaName); - Optional.ofNullable(schema.properties()).ifPresent(properties::putAll); - break; - default: - throw new IllegalArgumentException( - "Expected at most 2-level and at least 1-level namespace but got: " + namespaceId); - } - - DescribeNamespaceResponse response = new DescribeNamespaceResponse(); - response.setProperties(properties); - return response; - } - - @Override - public CreateNamespaceResponse createNamespace( - String namespaceId, - String delimiter, - CreateNamespaceRequest.ModeEnum mode, - Map properties) { - ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); - Preconditions.checkArgument( - nsId.levels() <= 2 && nsId.levels() > 0, - "Expected at most 2-level and at least 1-level namespace but got: %s", - namespaceId); - - switch (nsId.levels()) { - case 1: - return createOrUpdateCatalog(nsId.levelAtListPos(0), mode, properties); - case 2: - return createOrUpdateSchema( - nsId.levelAtListPos(0), nsId.levelAtListPos(1), mode, properties); - default: - throw new IllegalArgumentException( - "Expected at most 2-level and at least 1-level namespace but got: " + namespaceId); - } - } - - @Override - public DropNamespaceResponse dropNamespace( - String namespaceId, - String delimiter, - DropNamespaceRequest.ModeEnum mode, - DropNamespaceRequest.BehaviorEnum behavior) { - ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); - Preconditions.checkArgument( - nsId.levels() <= 2 && nsId.levels() > 0, - "Expected at most 2-level and at least 1-level namespace but got: %s", - namespaceId); - - switch (nsId.levels()) { - case 1: - return dropCatalog(nsId.levelAtListPos(0), mode, behavior); - case 2: - return dropSchema(nsId.levelAtListPos(0), nsId.levelAtListPos(1), mode, behavior); - default: - throw new IllegalArgumentException( - "Expected at most 2-level and at least 1-level namespace but got: " + namespaceId); - } - } - - @Override - public void namespaceExists(String namespaceId, String delimiter) throws LanceNamespaceException { - ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter); - Preconditions.checkArgument( - nsId.levels() <= 2 && nsId.levels() > 0, - "Expected at most 2-level and at least 1-level namespace but got: %s", - namespaceId); - - Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0)); - if (nsId.levels() == 2) { - String schemaName = nsId.levelAtListPos(1); - if (!catalog.asSchemas().schemaExists(schemaName)) { - throw LanceNamespaceException.notFound( - "Schema not found: " + schemaName, - NoSuchSchemaException.class.getSimpleName(), - schemaName, - CommonUtil.formatCurrentStackTrace()); - } - } - } - - private boolean isLakehouseCatalog(Catalog catalog) { + public boolean isLakehouseCatalog(Catalog catalog) { return catalog.type().equals(Catalog.Type.RELATIONAL) && "generic-lakehouse".equals(catalog.provider()); } - private Catalog loadAndValidateLakehouseCatalog(String catalogName) { + public Catalog loadAndValidateLakehouseCatalog(String catalogName) { Catalog catalog; try { catalog = client.loadCatalog(catalogName); @@ -299,426 +116,4 @@ private Catalog loadAndValidateLakehouseCatalog(String catalogName) { } return catalog; } - - private CreateNamespaceResponse createOrUpdateCatalog( - String catalogName, CreateNamespaceRequest.ModeEnum mode, Map properties) { - CreateNamespaceResponse response = new CreateNamespaceResponse(); - - Catalog catalog; - try { - catalog = client.loadCatalog(catalogName); - } catch (NoSuchCatalogException e) { - // Catalog does not exist, create it - Catalog createdCatalog = - client.createCatalog( - catalogName, - Catalog.Type.RELATIONAL, - "generic-lakehouse", - "created by Lance REST server", - properties); - response.setProperties( - createdCatalog.properties() == null ? Maps.newHashMap() : createdCatalog.properties()); - return response; - } - - // Catalog exists, validate type - if (!isLakehouseCatalog(catalog)) { - throw LanceNamespaceException.conflict( - "Catalog already exists but is not a lakehouse catalog: " + catalogName, - CatalogAlreadyExistsException.class.getSimpleName(), - catalogName, - CommonUtil.formatCurrentStackTrace()); - } - - // Catalog exists, handle based on mode - switch (mode) { - case EXIST_OK: - response.setProperties( - Optional.ofNullable(catalog.properties()).orElse(Collections.emptyMap())); - return response; - case CREATE: - throw LanceNamespaceException.conflict( - "Catalog already exists: " + catalogName, - CatalogAlreadyExistsException.class.getSimpleName(), - catalogName, - CommonUtil.formatCurrentStackTrace()); - case OVERWRITE: - CatalogChange[] changes = - buildChanges( - properties, - removeInUseProperty(catalog.properties()), - CatalogChange::setProperty, - CatalogChange::removeProperty, - CatalogChange[]::new); - Catalog alteredCatalog = client.alterCatalog(catalogName, changes); - Optional.ofNullable(alteredCatalog.properties()).ifPresent(response::setProperties); - return response; - default: - throw new IllegalArgumentException("Unknown mode: " + mode); - } - } - - private Map removeInUseProperty(Map properties) { - return properties.entrySet().stream() - .filter(e -> !e.getKey().equalsIgnoreCase(Catalog.PROPERTY_IN_USE)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - private CreateNamespaceResponse createOrUpdateSchema( - String catalogName, - String schemaName, - CreateNamespaceRequest.ModeEnum mode, - Map properties) { - CreateNamespaceResponse response = new CreateNamespaceResponse(); - Catalog loadedCatalog = loadAndValidateLakehouseCatalog(catalogName); - - Schema schema; - try { - schema = loadedCatalog.asSchemas().loadSchema(schemaName); - } catch (NoSuchSchemaException e) { - // Schema does not exist, create it - Schema createdSchema = loadedCatalog.asSchemas().createSchema(schemaName, null, properties); - response.setProperties( - createdSchema.properties() == null ? Maps.newHashMap() : createdSchema.properties()); - return response; - } - - // Schema exists, handle based on mode - switch (mode) { - case EXIST_OK: - response.setProperties( - Optional.ofNullable(schema.properties()).orElse(Collections.emptyMap())); - return response; - case CREATE: - throw LanceNamespaceException.conflict( - "Schema already exists: " + schemaName, - SchemaAlreadyExistsException.class.getSimpleName(), - schemaName, - CommonUtil.formatCurrentStackTrace()); - case OVERWRITE: - SchemaChange[] changes = - buildChanges( - properties, - schema.properties(), - SchemaChange::setProperty, - SchemaChange::removeProperty, - SchemaChange[]::new); - Schema alteredSchema = loadedCatalog.asSchemas().alterSchema(schemaName, changes); - Optional.ofNullable(alteredSchema.properties()).ifPresent(response::setProperties); - return response; - default: - throw new IllegalArgumentException("Unknown mode: " + mode); - } - } - - private DropNamespaceResponse dropCatalog( - String catalogName, - DropNamespaceRequest.ModeEnum mode, - DropNamespaceRequest.BehaviorEnum behavior) { - try { - boolean dropped = - client.dropCatalog(catalogName, behavior == DropNamespaceRequest.BehaviorEnum.CASCADE); - if (dropped) { - return new DropNamespaceResponse(); - } else { - // Catalog did not exist - if (mode == DropNamespaceRequest.ModeEnum.FAIL) { - throw LanceNamespaceException.notFound( - "Catalog not found: " + catalogName, - NoSuchCatalogException.class.getSimpleName(), - catalogName, - CommonUtil.formatCurrentStackTrace()); - } - return new DropNamespaceResponse(); // SKIP mode - } - } catch (NonEmptyCatalogException | CatalogInUseException e) { - throw LanceNamespaceException.badRequest( - String.format("Catalog %s is not empty or in used", catalogName), - NonEmptyCatalogException.class.getSimpleName(), - catalogName, - CommonUtil.formatCurrentStackTrace()); - } - } - - private DropNamespaceResponse dropSchema( - String catalogName, - String schemaName, - DropNamespaceRequest.ModeEnum mode, - DropNamespaceRequest.BehaviorEnum behavior) { - try { - boolean dropped = - client - .loadCatalog(catalogName) - .asSchemas() - .dropSchema(schemaName, behavior == DropNamespaceRequest.BehaviorEnum.CASCADE); - if (dropped) { - return new DropNamespaceResponse(); - } else { - // Schema did not exist - if (mode == DropNamespaceRequest.ModeEnum.FAIL) { - throw LanceNamespaceException.notFound( - "Schema not found: " + schemaName, - NoSuchSchemaException.class.getSimpleName(), - schemaName, - CommonUtil.formatCurrentStackTrace()); - } - return new DropNamespaceResponse(); // SKIP mode - } - } catch (NoSuchCatalogException e) { - throw LanceNamespaceException.notFound( - "Catalog not found: " + catalogName, - NoSuchCatalogException.class.getSimpleName(), - catalogName, - CommonUtil.formatCurrentStackTrace()); - } catch (NonEmptySchemaException e) { - throw LanceNamespaceException.badRequest( - String.format("Schema %s is not empty.", schemaName), - NonEmptySchemaException.class.getSimpleName(), - schemaName, - CommonUtil.formatCurrentStackTrace()); - } - } - - private T[] buildChanges( - Map newProps, - Map oldProps, - BiFunction setPropertyFunc, - Function removePropertyFunc, - IntFunction arrayCreator) { - Stream setPropertiesStream = - newProps.entrySet().stream() - .map(entry -> setPropertyFunc.apply(entry.getKey(), entry.getValue())); - - Stream removePropertiesStream = - oldProps == null - ? Stream.empty() - : oldProps.keySet().stream() - .filter(key -> !newProps.containsKey(key)) - .map(removePropertyFunc); - - return Stream.concat(setPropertiesStream, removePropertiesStream).toArray(arrayCreator); - } - - @Override - public ListTablesResponse listTables( - String namespaceId, String delimiter, String pageToken, Integer limit) { - ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, Pattern.quote(delimiter)); - Preconditions.checkArgument( - nsId.levels() == 2, "Expected 2-level namespace but got: %s", nsId.levels()); - String catalogName = nsId.levelAtListPos(0); - Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); - String schemaName = nsId.levelAtListPos(1); - List tables = - Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName))) - .map(ident -> Joiner.on(delimiter).join(catalogName, schemaName, ident.name())) - .sorted() - .collect(Collectors.toList()); - - PageUtil.Page page = PageUtil.splitPage(tables, pageToken, PageUtil.normalizePageSize(limit)); - ListNamespacesResponse response = new ListNamespacesResponse(); - response.setNamespaces(Sets.newHashSet(page.items())); - response.setPageToken(page.nextPageToken()); - - return new ListTablesResponse() - .tables(response.getNamespaces()) - .pageToken(response.getPageToken()); - } - - @Override - public DescribeTableResponse describeTable(String tableId, String delimiter) { - ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); - Preconditions.checkArgument( - nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); - - String catalogName = nsId.levelAtListPos(0); - Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); - NameIdentifier tableIdentifier = - NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); - - Table table = catalog.asTableCatalog().loadTable(tableIdentifier); - DescribeTableResponse response = new DescribeTableResponse(); - response.setProperties(table.properties()); - response.setLocation(table.properties().get("location")); - response.setSchema(toJsonArrowSchema(table.columns())); - return response; - } - - @Override - public CreateTableResponse createTable( - String tableId, - String mode, - String delimiter, - String tableLocation, - Map tableProperties, - byte[] arrowStreamBody) { - ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); - Preconditions.checkArgument( - nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); - - // Parser column information. - List columns = Lists.newArrayList(); - if (arrowStreamBody != null) { - org.apache.arrow.vector.types.pojo.Schema schema = parseArrowIpcStream(arrowStreamBody); - columns = extractColumns(schema); - } - - String catalogName = nsId.levelAtListPos(0); - Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); - - NameIdentifier tableIdentifier = - NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); - - Map createTableProperties = Maps.newHashMap(tableProperties); - createTableProperties.put("location", tableLocation); - createTableProperties.put("mode", mode); - // TODO considering the mode (create, exist_ok, overwrite) - - ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase()); - switch (createMode) { - case EXIST_OK: - if (catalog.asTableCatalog().tableExists(tableIdentifier)) { - CreateTableResponse response = new CreateTableResponse(); - Table existingTable = catalog.asTableCatalog().loadTable(tableIdentifier); - response.setProperties(existingTable.properties()); - response.setLocation(existingTable.properties().get("location")); - response.setVersion(0L); - return response; - } - break; - case CREATE: - if (catalog.asTableCatalog().tableExists(tableIdentifier)) { - throw LanceNamespaceException.conflict( - "Table already exists: " + tableId, - SchemaAlreadyExistsException.class.getSimpleName(), - tableId, - CommonUtil.formatCurrentStackTrace()); - } - break; - case OVERWRITE: - if (catalog.asTableCatalog().tableExists(tableIdentifier)) { - catalog.asTableCatalog().dropTable(tableIdentifier); - } - break; - default: - throw new IllegalArgumentException("Unknown mode: " + mode); - } - - Table t = - catalog - .asTableCatalog() - .createTable( - tableIdentifier, - columns.toArray(new Column[0]), - tableLocation, - createTableProperties); - - CreateTableResponse response = new CreateTableResponse(); - response.setProperties(t.properties()); - response.setLocation(tableLocation); - response.setVersion(0L); - return response; - } - - @Override - public RegisterTableResponse registerTable( - String tableId, String mode, String delimiter, Map tableProperties) { - ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); - Preconditions.checkArgument( - nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); - - String catalogName = nsId.levelAtListPos(0); - Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); - NameIdentifier tableIdentifier = - NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); - - // TODO Support real register API - RegisterTableRequest.ModeEnum createMode = - RegisterTableRequest.ModeEnum.fromValue(mode.toUpperCase()); - if (createMode == RegisterTableRequest.ModeEnum.CREATE - && catalog.asTableCatalog().tableExists(tableIdentifier)) { - throw LanceNamespaceException.conflict( - "Table already exists: " + tableId, - SchemaAlreadyExistsException.class.getSimpleName(), - tableId, - CommonUtil.formatCurrentStackTrace()); - } - - if (createMode == RegisterTableRequest.ModeEnum.OVERWRITE - && catalog.asTableCatalog().tableExists(tableIdentifier)) { - LOG.info("Overwriting existing table: {}", tableId); - catalog.asTableCatalog().dropTable(tableIdentifier); - } - - Table t = - catalog.asTableCatalog().createTable(tableIdentifier, new Column[] {}, "", tableProperties); - - RegisterTableResponse response = new RegisterTableResponse(); - response.setProperties(t.properties()); - response.setLocation(t.properties().get("location")); - return response; - } - - @Override - public DeregisterTableResponse deregisterTable(String tableId, String delimiter) { - - ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); - Preconditions.checkArgument( - nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); - - String catalogName = nsId.levelAtListPos(0); - Catalog catalog = loadAndValidateLakehouseCatalog(catalogName); - - NameIdentifier tableIdentifier = - NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); - Table t = catalog.asTableCatalog().loadTable(tableIdentifier); - Map properties = t.properties(); - // TODO Support real deregister API. - catalog.asTableCatalog().purgeTable(tableIdentifier); - - DeregisterTableResponse response = new DeregisterTableResponse(); - response.setProperties(properties); - response.setLocation(properties.get("location")); - return response; - } - - private JsonArrowSchema toJsonArrowSchema(Column[] columns) { - List fields = - Arrays.stream(columns) - .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(), col.nullable())) - .collect(Collectors.toList()); - - return JsonArrowSchemaConverter.convertToJsonArrowSchema( - new org.apache.arrow.vector.types.pojo.Schema(fields)); - } - - @VisibleForTesting - org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) { - org.apache.arrow.vector.types.pojo.Schema schema; - try (BufferAllocator allocator = new RootAllocator(); - ByteArrayInputStream bais = new ByteArrayInputStream(stream); - ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) { - schema = reader.getVectorSchemaRoot().getSchema(); - } catch (Exception e) { - throw new RuntimeException("Failed to parse Arrow IPC stream", e); - } - - Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC stream"); - return schema; - } - - private List extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) { - List columns = new ArrayList<>(); - - for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) { - columns.add( - Column.of( - field.getName(), - CONVERTER.toGravitino(field), - null, - field.isNullable(), - false, - DEFAULT_VALUE_NOT_SET)); - } - return columns; - } } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java new file mode 100644 index 00000000000..d298dbe5e54 --- /dev/null +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.lance.common.ops.gravitino; + +import static org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_LOCATION; +import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.lancedb.lance.namespace.LanceNamespaceException; +import com.lancedb.lance.namespace.ObjectIdentifier; +import com.lancedb.lance.namespace.model.CreateEmptyTableResponse; +import com.lancedb.lance.namespace.model.CreateTableRequest; +import com.lancedb.lance.namespace.model.CreateTableRequest.ModeEnum; +import com.lancedb.lance.namespace.model.CreateTableResponse; +import com.lancedb.lance.namespace.model.DeregisterTableResponse; +import com.lancedb.lance.namespace.model.DescribeTableResponse; +import com.lancedb.lance.namespace.model.JsonArrowSchema; +import com.lancedb.lance.namespace.model.RegisterTableRequest; +import com.lancedb.lance.namespace.model.RegisterTableResponse; +import com.lancedb.lance.namespace.util.CommonUtil; +import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.lance.common.ops.LanceTableOperations; +import org.apache.gravitino.lance.common.utils.ArrowUtils; +import org.apache.gravitino.lance.common.utils.LancePropertiesUtils; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GravitinoLanceTableOperations implements LanceTableOperations { + private static final Logger LOG = LoggerFactory.getLogger(GravitinoLanceTableOperations.class); + + private final GravitinoLanceNamespaceWrapper namespaceWrapper; + + public GravitinoLanceTableOperations(GravitinoLanceNamespaceWrapper namespaceWrapper) { + this.namespaceWrapper = namespaceWrapper; + } + + @Override + public DescribeTableResponse describeTable( + String tableId, String delimiter, Optional version) { + if (!version.isEmpty()) { + throw new UnsupportedOperationException( + "Describing specific table version is not supported. It should be null to indicate the" + + " latest version."); + } + + ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); + Preconditions.checkArgument( + nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); + + String catalogName = nsId.levelAtListPos(0); + Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName); + NameIdentifier tableIdentifier = + NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); + + Table table = catalog.asTableCatalog().loadTable(tableIdentifier); + DescribeTableResponse response = new DescribeTableResponse(); + response.setProperties(table.properties()); + response.setLocation(table.properties().get(LANCE_LOCATION)); + response.setSchema(toJsonArrowSchema(table.columns())); + response.setVersion(null); + response.setStorageOptions(LancePropertiesUtils.getLanceStorageOptions(table.properties())); + return response; + } + + @Override + public CreateTableResponse createTable( + String tableId, + CreateTableRequest.ModeEnum mode, + String delimiter, + String tableLocation, + Map tableProperties, + byte[] arrowStreamBody) { + ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); + Preconditions.checkArgument( + nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); + + // Parser column information. + List columns = Lists.newArrayList(); + if (arrowStreamBody != null) { + org.apache.arrow.vector.types.pojo.Schema schema = + ArrowUtils.parseArrowIpcStream(arrowStreamBody); + columns = extractColumns(schema); + } + + String catalogName = nsId.levelAtListPos(0); + Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName); + + NameIdentifier tableIdentifier = + NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); + + Map createTableProperties = Maps.newHashMap(tableProperties); + if (tableLocation != null) { + createTableProperties.put(LANCE_LOCATION, tableLocation); + } + // The format is defined in GenericLakehouseCatalog + createTableProperties.put("format", "lance"); + + Table t; + try { + t = + catalog + .asTableCatalog() + .createTable( + tableIdentifier, columns.toArray(new Column[0]), null, createTableProperties); + } catch (TableAlreadyExistsException exception) { + if (mode == CreateTableRequest.ModeEnum.CREATE) { + throw LanceNamespaceException.conflict( + "Table already exists: " + tableId, + TableAlreadyExistsException.class.getSimpleName(), + tableId, + CommonUtil.formatCurrentStackTrace()); + } else if (mode == CreateTableRequest.ModeEnum.OVERWRITE) { + LOG.info("Overwriting existing table: {}", tableId); + catalog.asTableCatalog().purgeTable(tableIdentifier); + + t = + catalog + .asTableCatalog() + .createTable( + tableIdentifier, columns.toArray(new Column[0]), null, createTableProperties); + } else { // EXIST_OK + CreateTableResponse response = new CreateTableResponse(); + Table existingTable = catalog.asTableCatalog().loadTable(tableIdentifier); + response.setProperties(existingTable.properties()); + response.setLocation(existingTable.properties().get(LANCE_LOCATION)); + response.setVersion(null); + response.setStorageOptions( + LancePropertiesUtils.getLanceStorageOptions(existingTable.properties())); + return response; + } + } + + CreateTableResponse response = new CreateTableResponse(); + response.setProperties(t.properties()); + response.setLocation(tableLocation); + // Extract storage options from table properties. All storage options stores in table + // properties. + response.setStorageOptions(LancePropertiesUtils.getLanceStorageOptions(t.properties())); + response.setVersion(null); + response.setLocation(t.properties().get(LANCE_LOCATION)); + response.setProperties(t.properties()); + return response; + } + + @Override + public CreateEmptyTableResponse createEmptyTable( + String tableId, String delimiter, String tableLocation, Map tableProperties) { + CreateTableResponse response = + createTable(tableId, ModeEnum.CREATE, delimiter, tableLocation, tableProperties, null); + CreateEmptyTableResponse emptyTableResponse = new CreateEmptyTableResponse(); + emptyTableResponse.setProperties(response.getProperties()); + emptyTableResponse.setLocation(response.getLocation()); + emptyTableResponse.setStorageOptions(response.getStorageOptions()); + return emptyTableResponse; + } + + @Override + public RegisterTableResponse registerTable( + String tableId, + RegisterTableRequest.ModeEnum mode, + String delimiter, + Map tableProperties) { + ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); + Preconditions.checkArgument( + nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); + + String catalogName = nsId.levelAtListPos(0); + Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName); + NameIdentifier tableIdentifier = + NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); + + Map copiedTableProperties = Maps.newHashMap(tableProperties); + copiedTableProperties.put("format", "lance"); + Table t = null; + try { + t = + catalog + .asTableCatalog() + .createTable(tableIdentifier, new Column[] {}, null, copiedTableProperties); + } catch (TableAlreadyExistsException exception) { + if (mode == RegisterTableRequest.ModeEnum.CREATE) { + throw LanceNamespaceException.conflict( + "Table already exists: " + tableId, + TableAlreadyExistsException.class.getSimpleName(), + tableId, + CommonUtil.formatCurrentStackTrace()); + } else if (mode == RegisterTableRequest.ModeEnum.OVERWRITE) { + LOG.info("Overwriting existing table: {}", tableId); + catalog.asTableCatalog().dropTable(tableIdentifier); + + t = + catalog + .asTableCatalog() + .createTable(tableIdentifier, new Column[] {}, null, copiedTableProperties); + } + } + + RegisterTableResponse response = new RegisterTableResponse(); + response.setProperties(t.properties()); + response.setLocation(t.properties().get(LANCE_LOCATION)); + return response; + } + + @Override + public DeregisterTableResponse deregisterTable(String tableId, String delimiter) { + ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter)); + Preconditions.checkArgument( + nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels()); + + String catalogName = nsId.levelAtListPos(0); + Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName); + + NameIdentifier tableIdentifier = + NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2)); + Table t = catalog.asTableCatalog().loadTable(tableIdentifier); + Map properties = t.properties(); + // TODO Support real deregister API. + boolean result = catalog.asTableCatalog().dropTable(tableIdentifier); + if (!result) { + throw LanceNamespaceException.notFound( + "Table not found: " + tableId, + NoSuchTableException.class.getSimpleName(), + tableId, + CommonUtil.formatCurrentStackTrace()); + } + + DeregisterTableResponse response = new DeregisterTableResponse(); + response.setProperties(properties); + response.setLocation(properties.get(LANCE_LOCATION)); + response.setId(nsId.listStyleId()); + return response; + } + + private List extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) { + List columns = new ArrayList<>(); + + for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) { + columns.add( + Column.of( + field.getName(), + CONVERTER.toGravitino(field), + null, + field.isNullable(), + false, + DEFAULT_VALUE_NOT_SET)); + } + return columns; + } + + private JsonArrowSchema toJsonArrowSchema(Column[] columns) { + List fields = + Arrays.stream(columns) + .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(), col.nullable())) + .collect(Collectors.toList()); + + return JsonArrowSchemaConverter.convertToJsonArrowSchema( + new org.apache.arrow.vector.types.pojo.Schema(fields)); + } +} diff --git a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java similarity index 68% rename from lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java rename to lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java index b0ddb980ab0..5d8508ee459 100644 --- a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java @@ -16,40 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.gravitino.lance.common.ops.gravitino; +package org.apache.gravitino.lance.common.utils; +import com.google.common.base.Preconditions; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.channels.Channels; -import java.util.Arrays; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.ipc.ArrowStreamWriter; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -public class TestGravitinoLanceNamespaceWrapper { - - @Test - public void testParseArrowIpcStream() throws Exception { - Schema schema = - new Schema( - Arrays.asList( - Field.nullable("id", new ArrowType.Int(32, true)), - Field.nullable("value", new ArrowType.Utf8()))); - - GravitinoLanceNamespaceWrapper wrapper = new GravitinoLanceNamespaceWrapper(); - byte[] ipcStream = generateIpcStream(schema); - Schema parsedSchema = wrapper.parseArrowIpcStream(ipcStream); - - Assertions.assertEquals(schema, parsedSchema); - } - - private byte[] generateIpcStream(Schema arrowSchema) throws IOException { +public class ArrowUtils { + public static byte[] generateIpcStream(Schema arrowSchema) throws IOException { try (BufferAllocator allocator = new RootAllocator()) { // Create an empty VectorSchemaRoot with the schema @@ -73,4 +55,18 @@ private byte[] generateIpcStream(Schema arrowSchema) throws IOException { throw new IOException("Failed to create empty Arrow IPC stream: " + e.getMessage(), e); } } + + public static Schema parseArrowIpcStream(byte[] stream) { + Schema schema; + try (BufferAllocator allocator = new RootAllocator(); + ByteArrayInputStream bais = new ByteArrayInputStream(stream); + ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) { + schema = reader.getVectorSchemaRoot().getSchema(); + } catch (Exception e) { + throw new IllegalArgumentException("Failed to parse Arrow IPC stream", e); + } + + Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC stream"); + return schema; + } } diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java similarity index 77% rename from lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java rename to lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java index f39ea2e684f..c34a7be58a2 100644 --- a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java @@ -17,13 +17,18 @@ * under the License. */ -package org.apache.gravitino.lance.service; +package org.apache.gravitino.lance.common.utils; -public class ServiceConstants { +public class LanceConstants { public static final String LANCE_HTTP_HEADER_PREFIX = "x-lance-"; public static final String LANCE_TABLE_LOCATION_HEADER = LANCE_HTTP_HEADER_PREFIX + "table-location"; public static final String LANCE_TABLE_PROPERTIES_PREFIX_HEADER = LANCE_HTTP_HEADER_PREFIX + "table-properties"; + // Key for table location in table properties map + public static final String LANCE_LOCATION = "location"; + + // Prefix for storage options in LanceConfig + public static final String LANCE_STORAGE_OPTIONS_PREFIX = "lance.storage."; } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java new file mode 100644 index 00000000000..e674a7266ab --- /dev/null +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.gravitino.lance.common.utils; + +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_STORAGE_OPTIONS_PREFIX; + +import java.util.Map; +import java.util.stream.Collectors; + +public class LancePropertiesUtils { + + private LancePropertiesUtils() { + // Private constructor to prevent instantiation + } + + public static Map getLanceStorageOptions(Map tableProperties) { + if (tableProperties == null) { + return Map.of(); + } + + return tableProperties.entrySet().stream() + .filter(e -> e.getKey().startsWith(LANCE_STORAGE_OPTIONS_PREFIX)) + .collect( + Collectors.toMap( + e -> e.getKey().substring(LANCE_STORAGE_OPTIONS_PREFIX.length()), + Map.Entry::getValue)); + } +} diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java new file mode 100644 index 00000000000..8e5fb2494be --- /dev/null +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.gravitino.lance.common.utils; + +import com.google.common.collect.ImmutableMap; +import com.lancedb.lance.namespace.util.JsonUtil; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; + +public class SerializationUtils { + + private SerializationUtils() { + // Utility class + } + + // Lance REST uses a unique way to serialize and serialize table, please see: + // see https://github.com/lancedb/lance-namespace/blob/2033b2fca126e87e56ba0d5ec19c5ec010c7a98f/ + // java/lance-namespace-core/src/main/java/com/lancedb/lance/namespace/rest/RestNamespace.java#L207-L208 + public static Map deserializeProperties(String serializedProperties) { + return StringUtils.isBlank(serializedProperties) + ? ImmutableMap.of() + : JsonUtil.parse( + serializedProperties, + jsonNode -> { + Map map = new HashMap<>(); + jsonNode + .fields() + .forEachRemaining( + entry -> { + map.put(entry.getKey(), entry.getValue().asText()); + }); + return map; + }); + } +} diff --git a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java new file mode 100644 index 00000000000..43f0bf6ec6f --- /dev/null +++ b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.common.utils; + +import java.util.Arrays; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestArrowUtils { + + @Test + public void testParseArrowIpcStream() throws Exception { + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("value", new ArrowType.Utf8()))); + byte[] ipcStream = ArrowUtils.generateIpcStream(schema); + Schema parsedSchema = ArrowUtils.parseArrowIpcStream(ipcStream); + + Assertions.assertEquals(schema, parsedSchema); + } +} diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java index 5590eef9bdc..290730f39a7 100644 --- a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java @@ -19,21 +19,26 @@ package org.apache.gravitino.lance.service.rest; import static org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_DELIMITER_DEFAULT; -import static org.apache.gravitino.lance.service.ServiceConstants.LANCE_TABLE_LOCATION_HEADER; -import static org.apache.gravitino.lance.service.ServiceConstants.LANCE_TABLE_PROPERTIES_PREFIX_HEADER; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_LOCATION; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_LOCATION_HEADER; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_PROPERTIES_PREFIX_HEADER; import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Maps; +import com.lancedb.lance.namespace.model.CreateEmptyTableRequest; +import com.lancedb.lance.namespace.model.CreateEmptyTableResponse; +import com.lancedb.lance.namespace.model.CreateTableRequest; import com.lancedb.lance.namespace.model.CreateTableResponse; import com.lancedb.lance.namespace.model.DeregisterTableRequest; import com.lancedb.lance.namespace.model.DeregisterTableResponse; +import com.lancedb.lance.namespace.model.DescribeTableRequest; import com.lancedb.lance.namespace.model.DescribeTableResponse; import com.lancedb.lance.namespace.model.RegisterTableRequest; +import com.lancedb.lance.namespace.model.RegisterTableRequest.ModeEnum; import com.lancedb.lance.namespace.model.RegisterTableResponse; -import com.lancedb.lance.namespace.util.JsonUtil; import java.util.Map; +import java.util.Optional; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -47,8 +52,8 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; -import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.lance.common.ops.NamespaceWrapper; +import org.apache.gravitino.lance.common.utils.SerializationUtils; import org.apache.gravitino.lance.service.LanceExceptionMapper; import org.apache.gravitino.metrics.MetricNames; @@ -70,10 +75,14 @@ public LanceTableOperations(NamespaceWrapper lanceNamespace) { @ResponseMetered(name = "describe-table", absolute = true) public Response describeTable( @PathParam("id") String tableId, - @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") String delimiter) { + @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") String delimiter, + DescribeTableRequest request) { try { + validateDescribeTableRequest(request); DescribeTableResponse response = - lanceNamespace.asTableOps().describeTable(tableId, delimiter); + lanceNamespace + .asTableOps() + .describeTable(tableId, delimiter, Optional.ofNullable(request.getVersion())); return Response.ok(response).build(); } catch (Exception e) { return LanceExceptionMapper.toRESTResponse(tableId, e); @@ -97,13 +106,12 @@ public Response createTable( MultivaluedMap headersMap = headers.getRequestHeaders(); String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER); String tableProperties = headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER); - - Map props = - JsonUtil.mapper().readValue(tableProperties, new TypeReference<>() {}); + CreateTableRequest.ModeEnum modeEnum = CreateTableRequest.ModeEnum.fromValue(mode); + Map props = SerializationUtils.deserializeProperties(tableProperties); CreateTableResponse response = lanceNamespace .asTableOps() - .createTable(tableId, mode, delimiter, tableLocation, props, arrowStreamBody); + .createTable(tableId, modeEnum, delimiter, tableLocation, props, arrowStreamBody); return Response.ok(response).build(); } catch (Exception e) { return LanceExceptionMapper.toRESTResponse(tableId, e); @@ -112,27 +120,25 @@ public Response createTable( @POST @Path("/create-empty") + @Produces("application/json") @Timed(name = "create-empty-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) @ResponseMetered(name = "create-empty-table", absolute = true) public Response createEmptyTable( @PathParam("id") String tableId, - @QueryParam("mode") @DefaultValue("create") String mode, // create, exist_ok, overwrite @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) String delimiter, + CreateEmptyTableRequest request, @Context HttpHeaders headers) { try { - // Extract table properties from header - MultivaluedMap headersMap = headers.getRequestHeaders(); - String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER); - String tableProperties = headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER); + validateCreateEmptyTableRequest(request); + String tableLocation = request.getLocation(); Map props = - StringUtils.isBlank(tableProperties) - ? Map.of() - : JsonUtil.mapper().readValue(tableProperties, new TypeReference<>() {}); - CreateTableResponse response = - lanceNamespace - .asTableOps() - .createTable(tableId, mode, delimiter, tableLocation, props, null); + request.getProperties() == null + ? Maps.newHashMap() + : Maps.newHashMap(request.getProperties()); + + CreateEmptyTableResponse response = + lanceNamespace.asTableOps().createEmptyTable(tableId, delimiter, tableLocation, props); return Response.ok(response).build(); } catch (Exception e) { return LanceExceptionMapper.toRESTResponse(tableId, e); @@ -145,18 +151,18 @@ public Response createEmptyTable( @ResponseMetered(name = "register-table", absolute = true) public Response registerTable( @PathParam("id") String tableId, - @QueryParam("mode") @DefaultValue("create") String mode, // overwrite or @QueryParam("delimiter") @DefaultValue("$") String delimiter, @Context HttpHeaders headers, RegisterTableRequest registerTableRequest) { try { + validateRegisterTableRequest(registerTableRequest); + Map props = registerTableRequest.getProperties() == null ? Maps.newHashMap() : Maps.newHashMap(registerTableRequest.getProperties()); - props.put("register", "true"); - props.put("location", registerTableRequest.getLocation()); - props.put("format", "lance"); + props.put(LANCE_LOCATION, registerTableRequest.getLocation()); + ModeEnum mode = registerTableRequest.getMode(); RegisterTableResponse response = lanceNamespace.asTableOps().registerTable(tableId, mode, delimiter, props); @@ -176,6 +182,7 @@ public Response deregisterTable( @Context HttpHeaders headers, DeregisterTableRequest deregisterTableRequest) { try { + validateDeregisterTableRequest(deregisterTableRequest); DeregisterTableResponse response = lanceNamespace.asTableOps().deregisterTable(tableId, delimiter); return Response.ok(response).build(); @@ -183,4 +190,26 @@ public Response deregisterTable( return LanceExceptionMapper.toRESTResponse(tableId, e); } } + + private void validateCreateEmptyTableRequest( + @SuppressWarnings("unused") CreateEmptyTableRequest request) { + // No specific fields to validate for now + } + + private void validateRegisterTableRequest( + @SuppressWarnings("unused") RegisterTableRequest request) { + // No specific fields to validate for now + } + + private void validateDeregisterTableRequest( + @SuppressWarnings("unused") DeregisterTableRequest request) { + // We will ignore the id in the request body since it's already provided in the path param. + // No specific fields to validate for now + } + + private void validateDescribeTableRequest( + @SuppressWarnings("unused") DescribeTableRequest request) { + // We will ignore the id in the request body since it's already provided in the path param + // No specific fields to validate for now + } } diff --git a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java index f1fe0087820..c7454442b62 100644 --- a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java +++ b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java @@ -24,38 +24,68 @@ import com.lancedb.lance.namespace.LanceNamespace; import com.lancedb.lance.namespace.LanceNamespaceException; import com.lancedb.lance.namespace.LanceNamespaces; +import com.lancedb.lance.namespace.client.apache.ApiException; +import com.lancedb.lance.namespace.model.CreateEmptyTableRequest; +import com.lancedb.lance.namespace.model.CreateEmptyTableResponse; import com.lancedb.lance.namespace.model.CreateNamespaceRequest; import com.lancedb.lance.namespace.model.CreateNamespaceResponse; +import com.lancedb.lance.namespace.model.CreateTableRequest; +import com.lancedb.lance.namespace.model.CreateTableResponse; +import com.lancedb.lance.namespace.model.DeregisterTableRequest; +import com.lancedb.lance.namespace.model.DeregisterTableResponse; import com.lancedb.lance.namespace.model.DescribeNamespaceRequest; import com.lancedb.lance.namespace.model.DescribeNamespaceResponse; +import com.lancedb.lance.namespace.model.DescribeTableRequest; +import com.lancedb.lance.namespace.model.DescribeTableResponse; import com.lancedb.lance.namespace.model.DropNamespaceRequest; import com.lancedb.lance.namespace.model.DropNamespaceResponse; +import com.lancedb.lance.namespace.model.ErrorResponse; +import com.lancedb.lance.namespace.model.JsonArrowField; import com.lancedb.lance.namespace.model.ListNamespacesRequest; import com.lancedb.lance.namespace.model.ListNamespacesResponse; +import com.lancedb.lance.namespace.model.ListTablesRequest; import com.lancedb.lance.namespace.model.NamespaceExistsRequest; +import com.lancedb.lance.namespace.model.RegisterTableRequest; +import com.lancedb.lance.namespace.model.RegisterTableRequest.ModeEnum; +import com.lancedb.lance.namespace.model.RegisterTableResponse; import com.lancedb.lance.namespace.rest.RestNamespaceConfig; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Schema; import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.exceptions.NoSuchTableException; import org.apache.gravitino.integration.test.util.BaseIT; import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.lance.common.utils.ArrowUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.base.Joiner; public class LanceRESTServiceIT extends BaseIT { + private static final String CATALOG_NAME = GravitinoITUtils.genRandomName("lance_rest_catalog"); + private static final String SCHEMA_NAME = GravitinoITUtils.genRandomName("lance_rest_schema"); private GravitinoMetalake metalake; + private Catalog catalog; private Map properties = new HashMap<>() { { @@ -372,6 +402,272 @@ public void testNamespaceExists() { Assertions.assertEquals(404, exception.getCode()); } + @Test + void testCreateEmptyTable() throws ApiException { + catalog = createCatalog(CATALOG_NAME); + createSchema(); + + CreateEmptyTableRequest request = new CreateEmptyTableRequest(); + String location = tempDir + "/" + "empty_table/"; + request.setLocation(location); + request.setProperties( + ImmutableMap.of( + "key1", "v1", + "lance.storage.a", "value_a", + "lance.storage.b", "value_b")); + request.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "empty_table")); + + CreateEmptyTableResponse response = ns.createEmptyTable(request); + Assertions.assertNotNull(response); + Assertions.assertEquals(location, response.getLocation()); + Assertions.assertEquals("v1", response.getProperties().get("key1")); + Assertions.assertEquals("value_a", response.getStorageOptions().get("a")); + Assertions.assertEquals("value_b", response.getStorageOptions().get("b")); + + DescribeTableRequest describeTableRequest = new DescribeTableRequest(); + describeTableRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "empty_table")); + + DescribeTableResponse loadTable = ns.describeTable(describeTableRequest); + Assertions.assertNotNull(loadTable); + Assertions.assertEquals(location, loadTable.getLocation()); + + // Try to create the same table again should fail + LanceNamespaceException exception = + Assertions.assertThrows( + LanceNamespaceException.class, + () -> { + ns.createEmptyTable(request); + }); + Assertions.assertTrue(exception.getMessage().contains("Table already exists")); + Assertions.assertEquals(409, exception.getCode()); + } + + @Test + void testCreateTable() throws IOException, ApiException { + catalog = createCatalog(CATALOG_NAME); + createSchema(); + + String location = tempDir + "/" + "table/"; + List ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table"); + org.apache.arrow.vector.types.pojo.Schema schema = + new org.apache.arrow.vector.types.pojo.Schema( + Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("value", new ArrowType.Utf8()))); + byte[] body = ArrowUtils.generateIpcStream(schema); + + CreateTableRequest request = new CreateTableRequest(); + request.setId(ids); + request.setLocation(location); + request.setProperties( + ImmutableMap.of( + "key1", "v1", + "lance.storage.a", "value_a", + "lance.storage.b", "value_b")); + + CreateTableResponse response = ns.createTable(request, body); + Assertions.assertNotNull(response); + Assertions.assertEquals(location, response.getLocation()); + Assertions.assertEquals("v1", response.getProperties().get("key1")); + Assertions.assertEquals("value_a", response.getStorageOptions().get("a")); + Assertions.assertEquals("value_b", response.getStorageOptions().get("b")); + + DescribeTableRequest describeTableRequest = new DescribeTableRequest(); + describeTableRequest.setId(ids); + DescribeTableResponse loadTable = ns.describeTable(describeTableRequest); + Assertions.assertNotNull(loadTable); + Assertions.assertEquals(location, loadTable.getLocation()); + + List jsonArrowFields = loadTable.getSchema().getFields(); + for (int i = 0; i < jsonArrowFields.size(); i++) { + JsonArrowField jsonArrowField = jsonArrowFields.get(i); + Field originalField = schema.getFields().get(i); + Assertions.assertEquals(originalField.getName(), jsonArrowField.getName()); + + if (i == 0) { + Assertions.assertEquals("int32", jsonArrowField.getType().getType()); + } else if (i == 1) { + Assertions.assertEquals("utf8", jsonArrowField.getType().getType()); + } + } + // Check the location exists + Assertions.assertTrue(new File(location).exists()); + Assertions.assertEquals("v1", loadTable.getProperties().get("key1")); + Assertions.assertEquals("value_a", loadTable.getStorageOptions().get("a")); + Assertions.assertEquals("value_b", loadTable.getStorageOptions().get("b")); + + // Check overwrite mode + String newLocation = tempDir + "/" + "table_new/"; + request.setLocation(newLocation); + request.setMode(CreateTableRequest.ModeEnum.OVERWRITE); + request.setProperties( + ImmutableMap.of( + "key1", "v2", + "lance.storage.a", "value_va", + "lance.storage.b", "value_vb")); + + response = Assertions.assertDoesNotThrow(() -> ns.createTable(request, body)); + + Assertions.assertNotNull(response); + Assertions.assertEquals(newLocation, response.getLocation()); + Assertions.assertTrue(response.getProperties().get("key1").equals("v2")); + Assertions.assertEquals("value_va", response.getStorageOptions().get("a")); + Assertions.assertEquals("value_vb", response.getStorageOptions().get("b")); + Assertions.assertTrue(new File(newLocation).exists()); + Assertions.assertFalse(new File(location).exists()); + + // Check exist_ok mode + request.setMode(CreateTableRequest.ModeEnum.EXIST_OK); + response = Assertions.assertDoesNotThrow(() -> ns.createTable(request, body)); + + Assertions.assertNotNull(response); + Assertions.assertEquals("v2", response.getProperties().get("key1")); + Assertions.assertEquals("value_va", response.getStorageOptions().get("a")); + Assertions.assertEquals("value_vb", response.getStorageOptions().get("b")); + Assertions.assertEquals(newLocation, response.getLocation()); + Assertions.assertTrue(new File(newLocation).exists()); + + // Create table again without overwrite or exist_ok should fail + request.setMode(CreateTableRequest.ModeEnum.CREATE); + LanceNamespaceException exception = + Assertions.assertThrows(LanceNamespaceException.class, () -> ns.createTable(request, body)); + Assertions.assertTrue(exception.getMessage().contains("already exists")); + Assertions.assertEquals(409, exception.getCode()); + + // Create a table without location should fail + CreateTableRequest noLocationRequest = new CreateTableRequest(); + noLocationRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "no_location_table")); + LanceNamespaceException noLocationException = + Assertions.assertThrows( + LanceNamespaceException.class, () -> ns.createTable(noLocationRequest, body)); + Assertions.assertTrue( + noLocationException.getMessage().contains("No location specified for table")); + + // Create table with invalid schema should fail + byte[] invalidBody = "".getBytes(Charset.defaultCharset()); + CreateTableRequest invalidRequest = new CreateTableRequest(); + invalidRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "invalid_table")); + invalidRequest.setLocation(tempDir + "/" + "invalid_table/"); + LanceNamespaceException apiException = + Assertions.assertThrows( + LanceNamespaceException.class, () -> ns.createTable(invalidRequest, invalidBody)); + Assertions.assertTrue(apiException.getMessage().contains("Failed to parse Arrow IPC stream")); + Assertions.assertEquals(400, apiException.getCode()); + + // Create table with wrong ids should fail + CreateTableRequest wrongIdRequest = new CreateTableRequest(); + wrongIdRequest.setId(List.of(CATALOG_NAME, "wrong_schema")); // This is a schema NOT a table. + wrongIdRequest.setLocation(tempDir + "/" + "wrong_id_table/"); + LanceNamespaceException wrongIdException = + Assertions.assertThrows( + LanceNamespaceException.class, () -> ns.createTable(wrongIdRequest, body)); + Assertions.assertTrue(wrongIdException.getMessage().contains("Expected at 3-level namespace")); + Assertions.assertEquals(400, wrongIdException.getCode()); + + // Now test list tables + ListTablesRequest listRequest = new ListTablesRequest(); + listRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME)); + var listResponse = ns.listTables(listRequest); + Set stringSet = listResponse.getTables(); + Assertions.assertEquals(1, stringSet.size()); + Assertions.assertTrue(stringSet.contains(Joiner.on(".").join(ids))); + } + + @Test + void testRegisterTable() { + catalog = createCatalog(CATALOG_NAME); + createSchema(); + + String location = tempDir + "/" + "register/"; + List ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table_register"); + RegisterTableRequest registerTableRequest = new RegisterTableRequest(); + registerTableRequest.setLocation(location); + registerTableRequest.setMode(ModeEnum.CREATE); + registerTableRequest.setId(ids); + registerTableRequest.setProperties(ImmutableMap.of("key1", "value1")); + + RegisterTableResponse response = ns.registerTable(registerTableRequest); + Assertions.assertNotNull(response); + + DescribeTableRequest describeTableRequest = new DescribeTableRequest(); + describeTableRequest.setId(ids); + DescribeTableResponse loadTable = ns.describeTable(describeTableRequest); + Assertions.assertNotNull(loadTable); + Assertions.assertEquals(location, loadTable.getLocation()); + Assertions.assertTrue(loadTable.getProperties().containsKey("key1")); + + // Test register again with OVERWRITE mode + String newLocation = tempDir + "/" + "register_new/"; + registerTableRequest.setMode(ModeEnum.OVERWRITE); + registerTableRequest.setLocation(newLocation); + response = Assertions.assertDoesNotThrow(() -> ns.registerTable(registerTableRequest)); + Assertions.assertNotNull(response); + Assertions.assertEquals(newLocation, response.getLocation()); + + // Test deregister table + DeregisterTableRequest deregisterTableRequest = new DeregisterTableRequest(); + deregisterTableRequest.setId(ids); + DeregisterTableResponse deregisterTableResponse = ns.deregisterTable(deregisterTableRequest); + Assertions.assertNotNull(deregisterTableResponse); + Assertions.assertEquals(newLocation, deregisterTableResponse.getLocation()); + } + + @Test + void testDeregisterNonExistingTable() { + catalog = createCatalog(CATALOG_NAME); + createSchema(); + + List ids = List.of(CATALOG_NAME, SCHEMA_NAME, "non_existing_table"); + DeregisterTableRequest deregisterTableRequest = new DeregisterTableRequest(); + deregisterTableRequest.setId(ids); + + LanceNamespaceException exception = + Assertions.assertThrows( + LanceNamespaceException.class, () -> ns.deregisterTable(deregisterTableRequest)); + Assertions.assertEquals(404, exception.getCode()); + Assertions.assertTrue(exception.getMessage().contains("does not exist")); + Optional responseOptional = exception.getErrorResponse(); + Assertions.assertTrue(responseOptional.isPresent()); + Assertions.assertEquals( + NoSuchTableException.class.getSimpleName(), responseOptional.get().getType()); + + // Try to create a table and then deregister table + CreateEmptyTableRequest createEmptyTableRequest = new CreateEmptyTableRequest(); + String location = tempDir + "/" + "to_be_deregistered_table/"; + ids = List.of(CATALOG_NAME, SCHEMA_NAME, "to_be_deregistered_table"); + createEmptyTableRequest.setLocation(location); + createEmptyTableRequest.setProperties(ImmutableMap.of()); + createEmptyTableRequest.setId(ids); + CreateEmptyTableResponse response = + Assertions.assertDoesNotThrow(() -> ns.createEmptyTable(createEmptyTableRequest)); + Assertions.assertNotNull(response); + Assertions.assertEquals(location, response.getLocation()); + + // Now try to deregister + deregisterTableRequest.setId(ids); + DeregisterTableResponse deregisterTableResponse = + Assertions.assertDoesNotThrow(() -> ns.deregisterTable(deregisterTableRequest)); + Assertions.assertNotNull(deregisterTableResponse); + Assertions.assertEquals(location, deregisterTableResponse.getLocation()); + Assertions.assertTrue(Objects.equals(ids, deregisterTableResponse.getId())); + Assertions.assertTrue( + new File(location).exists(), "Data should still exist after deregistering the table."); + + // Now try to describe the table, should fail + DescribeTableRequest describeTableRequest = new DescribeTableRequest(); + describeTableRequest.setId(ids); + LanceNamespaceException lanceNamespaceException = + Assertions.assertThrows( + LanceNamespaceException.class, () -> ns.describeTable(describeTableRequest)); + Assertions.assertEquals(404, lanceNamespaceException.getCode()); + + describeTableRequest.setVersion(1L); + lanceNamespaceException = + Assertions.assertThrows( + LanceNamespaceException.class, () -> ns.describeTable(describeTableRequest)); + Assertions.assertEquals(406, lanceNamespaceException.getCode()); + } + private GravitinoMetalake createMetalake(String metalakeName) { return client.createMetalake(metalakeName, "metalake for lance rest service tests", null); } @@ -385,6 +681,13 @@ private Catalog createCatalog(String catalogName) { properties); } + private void createSchema() { + Map schemaProperties = Maps.newHashMap(); + String comment = "comment"; + catalog.asSchemas().createSchema(SCHEMA_NAME, comment, schemaProperties); + catalog.asSchemas().loadSchema(SCHEMA_NAME); + } + private String getLanceRestServiceUrl() { return String.format("http://%s:%d/lance", "localhost", getLanceRESTServerPort()); } diff --git a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java index 0ba8bf79b70..efe1f904360 100644 --- a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java +++ b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java @@ -27,13 +27,23 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; +import com.lancedb.lance.namespace.LanceNamespaceException; +import com.lancedb.lance.namespace.model.CreateEmptyTableRequest; +import com.lancedb.lance.namespace.model.CreateEmptyTableResponse; import com.lancedb.lance.namespace.model.CreateNamespaceRequest; import com.lancedb.lance.namespace.model.CreateNamespaceResponse; +import com.lancedb.lance.namespace.model.CreateTableResponse; +import com.lancedb.lance.namespace.model.DeregisterTableRequest; +import com.lancedb.lance.namespace.model.DeregisterTableResponse; import com.lancedb.lance.namespace.model.DescribeNamespaceResponse; +import com.lancedb.lance.namespace.model.DescribeTableRequest; +import com.lancedb.lance.namespace.model.DescribeTableResponse; import com.lancedb.lance.namespace.model.DropNamespaceRequest; import com.lancedb.lance.namespace.model.DropNamespaceResponse; import com.lancedb.lance.namespace.model.ErrorResponse; import com.lancedb.lance.namespace.model.ListNamespacesResponse; +import com.lancedb.lance.namespace.model.RegisterTableRequest; +import com.lancedb.lance.namespace.model.RegisterTableResponse; import java.io.IOException; import java.util.regex.Pattern; import javax.servlet.http.HttpServletRequest; @@ -42,6 +52,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.gravitino.exceptions.NoSuchCatalogException; +import org.apache.gravitino.lance.common.ops.LanceTableOperations; import org.apache.gravitino.lance.common.ops.NamespaceWrapper; import org.apache.gravitino.rest.RESTUtils; import org.glassfish.jersey.internal.inject.AbstractBinder; @@ -66,6 +77,7 @@ public HttpServletRequest get() { private static NamespaceWrapper namespaceWrapper = mock(NamespaceWrapper.class); private static org.apache.gravitino.lance.common.ops.LanceNamespaceOperations namespaceOps = mock(org.apache.gravitino.lance.common.ops.LanceNamespaceOperations.class); + private static LanceTableOperations tableOps = mock(LanceTableOperations.class); @Override protected Application configure() { @@ -78,6 +90,7 @@ protected Application configure() { ResourceConfig resourceConfig = new ResourceConfig(); resourceConfig.register(LanceNamespaceOperations.class); + resourceConfig.register(org.apache.gravitino.lance.service.rest.LanceTableOperations.class); resourceConfig.register( new AbstractBinder() { @Override @@ -93,6 +106,7 @@ protected void configure() { @BeforeAll public static void setup() { when(namespaceWrapper.asNamespaceOps()).thenReturn(namespaceOps); + when(namespaceWrapper.asTableOps()).thenReturn(tableOps); } @Test @@ -323,4 +337,291 @@ public void testDropNamespace() { Assertions.assertEquals("Test exception", errorResp.getError()); Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp.getType()); } + + @Test + void testCreateTable() { + String tableIds = "catalog.scheme.create_table"; + String delimiter = "."; + + // Test normal + CreateTableResponse createTableResponse = new CreateTableResponse(); + when(tableOps.createTable(any(), any(), any(), any(), any(), any())) + .thenReturn(createTableResponse); + + byte[] bytes = new byte[] {0x01, 0x02, 0x03}; + Response resp = + target(String.format("/v1/table/%s/create", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream")); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + + // Test illegal argument + when(tableOps.createTable(any(), any(), any(), any(), any(), any())) + .thenThrow(new IllegalArgumentException("Illegal argument")); + + resp = + target(String.format("/v1/table/%s/create", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream")); + Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + + // Test runtime exception + Mockito.reset(tableOps); + when(tableOps.createTable(any(), any(), any(), any(), any(), any())) + .thenThrow(new RuntimeException("Runtime exception")); + resp = + target(String.format("/v1/table/%s/create", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream")); + + Assertions.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + ErrorResponse errorResp = resp.readEntity(ErrorResponse.class); + Assertions.assertEquals("Runtime exception", errorResp.getError()); + Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp.getType()); + } + + @Test + void testCreateEmptyTable() { + String tableIds = "catalog.scheme.create_empty_table"; + String delimiter = "."; + + // Test normal + CreateEmptyTableResponse createTableResponse = new CreateEmptyTableResponse(); + createTableResponse.setLocation("/path/to/table"); + createTableResponse.setProperties(ImmutableMap.of("key", "value")); + when(tableOps.createEmptyTable(any(), any(), any(), any())).thenReturn(createTableResponse); + + CreateEmptyTableRequest tableRequest = new CreateEmptyTableRequest(); + tableRequest.setLocation("/path/to/table"); + + Response resp = + target(String.format("/v1/table/%s/create-empty", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + CreateEmptyTableResponse response = resp.readEntity(CreateEmptyTableResponse.class); + Assertions.assertEquals(createTableResponse.getLocation(), response.getLocation()); + Assertions.assertEquals(createTableResponse.getProperties(), response.getProperties()); + + Mockito.reset(tableOps); + // Test illegal argument + when(tableOps.createEmptyTable(any(), any(), any(), any())) + .thenThrow(new IllegalArgumentException("Illegal argument")); + + resp = + target(String.format("/v1/table/%s/create-empty", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + + // Test runtime exception + Mockito.reset(tableOps); + when(tableOps.createEmptyTable(any(), any(), any(), any())) + .thenThrow(new RuntimeException("Runtime exception")); + resp = + target(String.format("/v1/table/%s/create-empty", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + ErrorResponse errorResp = resp.readEntity(ErrorResponse.class); + Assertions.assertEquals("Runtime exception", errorResp.getError()); + Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp.getType()); + } + + @Test + void testRegisterTable() { + String tableIds = "catalog.scheme.register_table"; + String delimiter = "."; + + // Test normal + RegisterTableResponse registerTableResponse = new RegisterTableResponse(); + registerTableResponse.setLocation("/path/to/registered_table"); + registerTableResponse.setProperties(ImmutableMap.of("key", "value")); + when(tableOps.registerTable(any(), any(), any(), any())).thenReturn(registerTableResponse); + + RegisterTableRequest tableRequest = new RegisterTableRequest(); + tableRequest.setLocation("/path/to/registered_table"); + tableRequest.setMode(RegisterTableRequest.ModeEnum.CREATE); + + Response resp = + target(String.format("/v1/table/%s/register", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + RegisterTableResponse response = resp.readEntity(RegisterTableResponse.class); + Assertions.assertEquals(registerTableResponse.getLocation(), response.getLocation()); + Assertions.assertEquals(registerTableResponse.getProperties(), response.getProperties()); + + // Test illegal argument + Mockito.reset(tableOps); + when(tableOps.registerTable(any(), any(), any(), any())) + .thenThrow(new IllegalArgumentException("Illegal argument")); + resp = + target(String.format("/v1/table/%s/register", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + + // Test runtime exception + Mockito.reset(tableOps); + when(tableOps.registerTable(any(), any(), any(), any())) + .thenThrow(new RuntimeException("Runtime exception")); + resp = + target(String.format("/v1/table/%s/register", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + ErrorResponse errorResp = resp.readEntity(ErrorResponse.class); + Assertions.assertEquals("Runtime exception", errorResp.getError()); + Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp.getType()); + } + + @Test + void testDeregisterTable() { + String tableIds = "catalog.scheme.deregister_table"; + String delimiter = "."; + + DeregisterTableRequest tableRequest = new DeregisterTableRequest(); + + DeregisterTableResponse deregisterTableResponse = new DeregisterTableResponse(); + deregisterTableResponse.setLocation("/path/to/deregistered_table"); + deregisterTableResponse.setProperties(ImmutableMap.of("key", "value")); + // Test normal + when(tableOps.deregisterTable(any(), any())).thenReturn(deregisterTableResponse); + + Response resp = + target(String.format("/v1/table/%s/deregister", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + DeregisterTableResponse response = resp.readEntity(DeregisterTableResponse.class); + Assertions.assertEquals(deregisterTableResponse.getLocation(), response.getLocation()); + Assertions.assertEquals(deregisterTableResponse.getProperties(), response.getProperties()); + + // Test illegal argument + Mockito.reset(tableOps); + when(tableOps.deregisterTable(any(), any())) + .thenThrow(new IllegalArgumentException("Illegal argument")); + resp = + target(String.format("/v1/table/%s/deregister", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + + // Test not found exception + Mockito.reset(tableOps); + when(tableOps.deregisterTable(any(), any())) + .thenThrow( + LanceNamespaceException.notFound( + "Table not found", "NoSuchTableException", tableIds, "")); + resp = + target(String.format("/v1/table/%s/deregister", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), resp.getStatus()); + + // Test runtime exception + Mockito.reset(tableOps); + when(tableOps.deregisterTable(any(), any())) + .thenThrow(new RuntimeException("Runtime exception")); + resp = + target(String.format("/v1/table/%s/deregister", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + ErrorResponse errorResp = resp.readEntity(ErrorResponse.class); + Assertions.assertEquals("Runtime exception", errorResp.getError()); + Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp.getType()); + } + + @Test + void testDescribeTable() { + String tableIds = "catalog.scheme.describe_table"; + String delimiter = "."; + + // Test normal + DescribeTableResponse createTableResponse = new DescribeTableResponse(); + createTableResponse.setLocation("/path/to/describe_table"); + createTableResponse.setProperties(ImmutableMap.of("key", "value")); + when(tableOps.describeTable(any(), any(), any())).thenReturn(createTableResponse); + + DescribeTableRequest tableRequest = new DescribeTableRequest(); + Response resp = + target(String.format("/v1/table/%s/describe", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + DescribeTableResponse response = resp.readEntity(DescribeTableResponse.class); + Assertions.assertEquals(createTableResponse.getLocation(), response.getLocation()); + Assertions.assertEquals(createTableResponse.getProperties(), response.getProperties()); + + // Test not found exception + Mockito.reset(tableOps); + when(tableOps.describeTable(any(), any(), any())) + .thenThrow( + LanceNamespaceException.notFound( + "Table not found", "NoSuchTableException", tableIds, "")); + resp = + target(String.format("/v1/table/%s/describe", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), resp.getStatus()); + + // Test runtime exception + Mockito.reset(tableOps); + when(tableOps.describeTable(any(), any(), any())) + .thenThrow(new RuntimeException("Runtime exception")); + resp = + target(String.format("/v1/table/%s/describe", tableIds)) + .queryParam("delimiter", delimiter) + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + ErrorResponse errorResp = resp.readEntity(ErrorResponse.class); + Assertions.assertEquals("Runtime exception", errorResp.getError()); + Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp.getType()); + } } diff --git a/lance/lance-rest-server/src/test/resources/log4j2.properties b/lance/lance-rest-server/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..b5db8a1ffc2 --- /dev/null +++ b/lance/lance-rest-server/src/test/resources/log4j2.properties @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Log files location +property.logPath = ${sys:gravitino.log.path:-build/lance-rest-integration-test.log} + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = ${logPath} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger + +# File appender configuration for testcontainers +appender.testcontainersFile.type = File +appender.testcontainersFile.name = testcontainersLogger +appender.testcontainersFile.fileName = build/testcontainers.log +appender.testcontainersFile.layout.type = PatternLayout +appender.testcontainersFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n + +# Logger for testcontainers +logger.testcontainers.name = org.testcontainers +logger.testcontainers.level = debug +logger.testcontainers.additivity = false +logger.testcontainers.appenderRef.file.ref = testcontainersLogger + +logger.tc.name = tc +logger.tc.level = debug +logger.tc.additivity = false +logger.tc.appenderRef.file.ref = testcontainersLogger + +logger.docker.name = com.github.dockerjava +logger.docker.level = warn +logger.docker.additivity = false +logger.docker.appenderRef.file.ref = testcontainersLogger + +logger.http.name = com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire +logger.http.level = off