Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
Expand Down Expand Up @@ -304,6 +305,8 @@ public Table createTable(
getLakehouseCatalogOperations(newProperties);
return lanceCatalogOperations.createTable(
ident, columns, comment, newProperties, partitions, distribution, sortOrders, indexes);
} catch (EntityAlreadyExistsException e) {
throw new TableAlreadyExistsException(e, "Table %s already exists", ident);
} catch (IOException e) {
throw new RuntimeException("Failed to create table " + ident, e);
}
Expand Down Expand Up @@ -366,30 +369,30 @@ public Table alterTable(NameIdentifier ident, TableChange... changes)
}

@Override
public boolean dropTable(NameIdentifier ident) {
public boolean purgeTable(NameIdentifier ident) {
try {
TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, TableEntity.class);
LakehouseCatalogOperations lakehouseCatalogOperations =
getLakehouseCatalogOperations(tableEntity.getProperties());
return lakehouseCatalogOperations.dropTable(ident);
return lakehouseCatalogOperations.purgeTable(ident);
} catch (NoSuchTableException e) {
LOG.warn("Table {} does not exist, skip dropping it.", ident);
LOG.warn("Table {} does not exist, skip purging it.", ident);
return false;
} catch (IOException e) {
throw new RuntimeException("Failed to drop table: " + ident, e);
throw new RuntimeException("Failed to purge table: " + ident, e);
}
}

@Override
public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationException {
public boolean dropTable(NameIdentifier ident) throws UnsupportedOperationException {
try {
// Only delete the metadata entry here. The physical data will not be deleted.
if (!tableExists(ident)) {
return false;
}
return store.delete(ident, Entity.EntityType.TABLE);
} catch (IOException e) {
throw new RuntimeException("Failed to purge table " + ident, e);
throw new RuntimeException("Failed to drop table " + ident, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.apache.gravitino.catalog.lakehouse.lance;

import static org.apache.gravitino.Entity.EntityType.TABLE;
import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.lancedb.lance.Dataset;
import com.lancedb.lance.WriteParams;
Expand Down Expand Up @@ -58,6 +56,7 @@
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter;
import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.Column;
Expand Down Expand Up @@ -120,13 +119,7 @@ public Table createTable(
throws NoSuchSchemaException, TableAlreadyExistsException {
// Ignore partitions, distributions, sortOrders, and indexes for Lance tables;
String location = properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
Map<String, String> storageProps =
properties.entrySet().stream()
.filter(e -> e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX))
.collect(
Collectors.toMap(
e -> e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()),
Map.Entry::getValue));
Map<String, String> storageProps = LancePropertiesUtils.getLanceStorageOptions(properties);

try (Dataset ignored =
Dataset.create(
Expand Down Expand Up @@ -280,22 +273,32 @@ private IndexParams getIndexParamsByIndexType(IndexType indexType) {
}

@Override
public boolean dropTable(NameIdentifier ident) {
public boolean purgeTable(NameIdentifier ident) {
try {
TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, TableEntity.class);
Map<String, String> lancePropertiesMap = tableEntity.getProperties();
String location =
lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);

if (!store.delete(ident, Entity.EntityType.TABLE)) {
throw new RuntimeException("Failed to drop Lance table: " + ident.name());
throw new RuntimeException("Failed to purge Lance table: " + ident.name());
}

// Drop the Lance dataset from cloud storage.
Dataset.drop(location, ImmutableMap.of());
Dataset.drop(location, LancePropertiesUtils.getLanceStorageOptions(lancePropertiesMap));
return true;
} catch (IOException e) {
throw new RuntimeException("Failed to drop Lance table: " + ident.name(), e);
throw new RuntimeException("Failed to purge Lance table: " + ident.name(), e);
}
}

@Override
public boolean dropTable(NameIdentifier ident) {
// TODO We will handle it in GenericLakehouseCatalogOperations. However, we need
// to figure out it's a external table or not first. we will introduce a property
// to indicate that. Currently, all Lance tables are external tables. `drop` will
// just remove the metadata in metastore and will not delete data in storage.
throw new UnsupportedOperationException(
"LanceCatalogOperations does not support dropTable operation.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,80 @@
*/
package org.apache.gravitino.lance.common.ops;

import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.RegisterTableRequest;
import com.lancedb.lance.namespace.model.RegisterTableResponse;
import java.util.Map;
import java.util.Optional;

public interface LanceTableOperations {

DescribeTableResponse describeTable(String tableId, String delimiter);
/**
* Describe the details of a table.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param delimiter the delimiter used in the namespace
* @param version the version of the table to describe, if null, describe the latest version
* @return the table description
*/
DescribeTableResponse describeTable(String tableId, String delimiter, Optional<Long> version);

/**
* Create a new table.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param mode it can be CREATE, OVERWRITE, or EXIST_OK
* @param delimiter the delimiter used in the namespace
* @param tableLocation the location where the table data will be stored
* @param tableProperties the properties of the table
* @param arrowStreamBody the arrow stream bytes containing the schema and data
* @return the response of the create table operation
*/
CreateTableResponse createTable(
String tableId,
String mode,
CreateTableRequest.ModeEnum mode,
String delimiter,
String tableLocation,
Map<String, String> tableProperties,
byte[] arrowStreamBody);

/**
* Create an new table without schema.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param delimiter the delimiter used in the namespace
* @param tableLocation the location where the table data will be stored
* @param tableProperties the properties of the table
* @return the response of the create table operation
*/
CreateEmptyTableResponse createEmptyTable(
String tableId, String delimiter, String tableLocation, Map<String, String> tableProperties);

/**
* Register an existing table.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param mode it can be REGISTER or OVERWRITE.
* @param delimiter the delimiter used in the namespace
* @param tableProperties the properties of the table, it should contain the table location
* @return the response of the register table operation
*/
RegisterTableResponse registerTable(
String tableId, String mode, String delimiter, Map<String, String> tableProperties);
String tableId,
RegisterTableRequest.ModeEnum mode,
String delimiter,
Map<String, String> tableProperties);

/**
* Deregister a table. It will not delete the underlying lance data.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param delimiter the delimiter used in the namespace
* @return the response of the deregister table operation
*/
DeregisterTableResponse deregisterTable(String tableId, String delimiter);
}
Loading