Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
Expand Down Expand Up @@ -304,6 +305,8 @@ public Table createTable(
getLakehouseCatalogOperations(newProperties);
return lanceCatalogOperations.createTable(
ident, columns, comment, newProperties, partitions, distribution, sortOrders, indexes);
} catch (EntityAlreadyExistsException e) {
throw new TableAlreadyExistsException(e, "Table %s already exists", ident);
} catch (IOException e) {
throw new RuntimeException("Failed to create table " + ident, e);
}
Expand Down Expand Up @@ -366,30 +369,30 @@ public Table alterTable(NameIdentifier ident, TableChange... changes)
}

@Override
public boolean dropTable(NameIdentifier ident) {
public boolean purgeTable(NameIdentifier ident) {
try {
TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, TableEntity.class);
LakehouseCatalogOperations lakehouseCatalogOperations =
getLakehouseCatalogOperations(tableEntity.getProperties());
return lakehouseCatalogOperations.dropTable(ident);
return lakehouseCatalogOperations.purgeTable(ident);
} catch (NoSuchTableException e) {
LOG.warn("Table {} does not exist, skip dropping it.", ident);
LOG.warn("Table {} does not exist, skip purging it.", ident);
return false;
} catch (IOException e) {
throw new RuntimeException("Failed to drop table: " + ident, e);
throw new RuntimeException("Failed to purge table: " + ident, e);
}
}

@Override
public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationException {
public boolean dropTable(NameIdentifier ident) throws UnsupportedOperationException {
try {
// Only delete the metadata entry here. The physical data will not be deleted.
if (!tableExists(ident)) {
return false;
}
return store.delete(ident, Entity.EntityType.TABLE);
} catch (IOException e) {
throw new RuntimeException("Failed to purge table " + ident, e);
throw new RuntimeException("Failed to drop table " + ident, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.apache.gravitino.catalog.lakehouse.lance;

import static org.apache.gravitino.Entity.EntityType.TABLE;
import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.lancedb.lance.Dataset;
import com.lancedb.lance.WriteParams;
Expand Down Expand Up @@ -58,6 +56,7 @@
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter;
import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.Column;
Expand Down Expand Up @@ -120,13 +119,7 @@ public Table createTable(
throws NoSuchSchemaException, TableAlreadyExistsException {
// Ignore partitions, distributions, sortOrders, and indexes for Lance tables;
String location = properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
Map<String, String> storageProps =
properties.entrySet().stream()
.filter(e -> e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX))
.collect(
Collectors.toMap(
e -> e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()),
Map.Entry::getValue));
Map<String, String> storageProps = LancePropertiesUtils.getLanceStorageOptions(properties);

try (Dataset ignored =
Dataset.create(
Expand Down Expand Up @@ -280,22 +273,32 @@ private IndexParams getIndexParamsByIndexType(IndexType indexType) {
}

@Override
public boolean dropTable(NameIdentifier ident) {
public boolean purgeTable(NameIdentifier ident) {
try {
TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, TableEntity.class);
Map<String, String> lancePropertiesMap = tableEntity.getProperties();
String location =
lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);

if (!store.delete(ident, Entity.EntityType.TABLE)) {
throw new RuntimeException("Failed to drop Lance table: " + ident.name());
throw new RuntimeException("Failed to purge Lance table: " + ident.name());
}
Comment on lines 283 to 285

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The false means "failed to store" or "the table not existed"? If the latter, it should return false, not throwing an exception.

The semantics are misleading; you need to confirm with @mchades .

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Generic lakehouse will return false if the table not existed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, the current code organization scatters the logic for table operations between GenericLakehouseCatalogOperation and LanceCatalogOperation, which reduces code readability (you need to pay attention to both code logics when operating Lance tables) and introduces some repetitive operations (such as repeatedly gettingTable from store), I think we should consider this issue in our future refactoring.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, why do you call store.get twice?

The code here is confused; both the caller and callee will access the DB, and the callee handles some exceptions, but some rely on the caller.

The lance should only focus on lance-specific logic, whereas the generic lakehouse operation should only focus on the metadata-related CRUD operations.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, the current code organization scatters the logic for table operations between GenericLakehouseCatalogOperation and LanceCatalogOperation, which reduces code readability (you need to pay attention to both code logics when operating Lance tables) and introduces some repetitive operations (such as repeatedly gettingTable from store), I think we should consider this issue in our future refactoring.

I agree, the implementation is not good, it messes with different logic, and doesn't clearly separate the responsibilities.


// Drop the Lance dataset from cloud storage.
Dataset.drop(location, ImmutableMap.of());
Dataset.drop(location, LancePropertiesUtils.getLanceStorageOptions(lancePropertiesMap));
return true;
} catch (IOException e) {
throw new RuntimeException("Failed to drop Lance table: " + ident.name(), e);
throw new RuntimeException("Failed to purge Lance table: " + ident.name(), e);
}
}

@Override
public boolean dropTable(NameIdentifier ident) {
// TODO We will handle it in GenericLakehouseCatalogOperations. However, we need
// to figure out it's a external table or not first. we will introduce a property
// to indicate that. Currently, all Lance tables are external tables. `drop` will
// just remove the metadata in metastore and will not delete data in storage.
throw new UnsupportedOperationException(
"LanceCatalogOperations does not support dropTable operation.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,67 @@
*/
package org.apache.gravitino.lance.common.ops;

import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.RegisterTableRequest;
import com.lancedb.lance.namespace.model.RegisterTableResponse;
import java.util.Map;
import java.util.Optional;

public interface LanceTableOperations {

DescribeTableResponse describeTable(String tableId, String delimiter);
/**
* Describe the details of a table.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param delimiter the delimiter used in the namespace
* @param version the version of the table to describe, if null, describe the latest version
* @return the table description
*/
DescribeTableResponse describeTable(String tableId, String delimiter, Optional<Long> version);

/**
* Create a new table.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
* @param mode it can be CREATE, OVERWRITE, or EXIST_OK
* @param delimiter the delimiter used in the namespace
* @param tableLocation the location where the table data will be stored
* @param tableProperties the properties of the table
* @param arrowStreamBody the arrow stream bytes containing the schema and data
* @return the response of the create table operation
*/
CreateTableResponse createTable(
String tableId,
String mode,
CreateTableRequest.ModeEnum mode,
String delimiter,
String tableLocation,
Map<String, String> tableProperties,
byte[] arrowStreamBody);

/**
* Register an existing table.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
* @param mode it can be REGISTER or OVERWRITE.
* @param delimiter the delimiter used in the namespace
* @param tableProperties the properties of the table, it should contain the table location
* @return the response of the register table operation
*/
RegisterTableResponse registerTable(
String tableId, String mode, String delimiter, Map<String, String> tableProperties);
String tableId,
RegisterTableRequest.ModeEnum mode,
String delimiter,
Map<String, String> tableProperties);

/**
* Deregister a table. It will not delete the underlying lance data.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
* @param delimiter the delimiter used in the namespace
* @return the response of the deregister table operation
*/
DeregisterTableResponse deregisterTable(String tableId, String delimiter);
}
Loading