diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java index 0f85532e8c2..8e823a18a9e 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java @@ -19,6 +19,7 @@ package org.apache.gravitino.catalog.lakehouse; import static org.apache.gravitino.Entity.EntityType.TABLE; +import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -26,6 +27,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.Entity; @@ -67,15 +70,14 @@ public class GenericLakehouseCatalogOperations private static final String SLASH = "/"; private final ManagedSchemaOperations managedSchemaOps; - - @SuppressWarnings("unused") // todo: remove this after implementing table operations - private Optional catalogLakehouseDir; - private static final Map SUPPORTED_FORMATS = Maps.newHashMap(); + private Optional catalogLakehouseDir; + private Map catalogConfig; private CatalogInfo catalogInfo; private HasPropertyMetadata propertiesMetadata; + /** * Initializes the generic lakehouse catalog operations with the provided configuration. * @@ -97,6 +99,9 @@ public void initialize( StringUtils.isNotBlank(catalogDir) ? Optional.of(catalogDir).map(this::ensureTrailingSlash).map(Path::new) : Optional.empty(); + this.catalogConfig = conf; + this.catalogInfo = info; + this.propertiesMetadata = propertiesMetadata; } public GenericLakehouseCatalogOperations() { @@ -193,11 +198,15 @@ public Table createTable( SortOrder[] sortOrders, Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { - String format = properties.getOrDefault("format", "lance"); - String tableLocation = calculateTableLocation(ident, properties); + Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels())); + String tableLocation = calculateTableLocation(schema, ident, properties); + Map tableStorageProps = calculateTableStorageProps(schema, properties); + Map newProperties = Maps.newHashMap(properties); - newProperties.put("location", tableLocation); + newProperties.put(LOCATION, tableLocation); + newProperties.putAll(tableStorageProps); + String format = properties.getOrDefault("format", "lance"); LakehouseCatalogOperations lakehouseCatalogOperations = SUPPORTED_FORMATS.compute( format, @@ -212,22 +221,13 @@ public Table createTable( } private String calculateTableLocation( - NameIdentifier tableIdent, Map tableProperties) { - String tableLocation = tableProperties.get("location"); + Schema schema, NameIdentifier tableIdent, Map tableProperties) { + String tableLocation = tableProperties.get(LOCATION); if (StringUtils.isNotBlank(tableLocation)) { return ensureTrailingSlash(tableLocation); } - String schemaLocation; - try { - Schema schema = loadSchema(NameIdentifier.of(tableIdent.namespace().levels())); - schemaLocation = schema.properties().get("location"); - } catch (NoSuchSchemaException e) { - throw new RuntimeException( - String.format( - "Failed to load schema for table %s to determine default location.", tableIdent), - e); - } + String schemaLocation = schema.properties() == null ? null : schema.properties().get(LOCATION); // If we do not set location in table properties, and schema location is set, use schema // location @@ -323,4 +323,33 @@ private LakehouseCatalogOperations createLakehouseCatalogOperations( operations.initialize(properties, catalogInfo, propertiesMetadata); return operations; } + + /** + * Calculate the table storage properties by merging catalog config, schema properties and table + * properties. The precedence is: table properties > schema properties > catalog config. + * + * @param schema The schema of the table. + * @param tableProps The table properties. + * @return The merged table storage properties. + */ + private Map calculateTableStorageProps( + Schema schema, Map tableProps) { + Map storageProps = getLanceTableStorageOptions(catalogConfig); + storageProps.putAll(getLanceTableStorageOptions(schema.properties())); + storageProps.putAll(getLanceTableStorageOptions(tableProps)); + return storageProps; + } + + private Map getLanceTableStorageOptions(Map properties) { + if (MapUtils.isEmpty(properties)) { + return Maps.newHashMap(); + } + return properties.entrySet().stream() + .filter( + e -> + e.getKey() + .startsWith( + GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } } diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java index 01dfc1da171..e381558c321 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java @@ -19,6 +19,7 @@ package org.apache.gravitino.catalog.lakehouse; +import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX; import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry; import com.google.common.collect.ImmutableList; @@ -42,7 +43,14 @@ public class GenericLakehouseCatalogPropertiesMetadata extends BaseCatalogProper "The root directory of the lakehouse catalog.", false /* immutable */, null, /* defaultValue */ - false /* hidden */)); + false /* hidden */), + PropertyEntry.stringOptionalPropertyPrefixEntry( + LANCE_TABLE_STORAGE_OPTION_PREFIX, + "The storage options passed to Lance table.", + false /* immutable */, + null /* default value*/, + false /* hidden */, + false /* reserved */)); PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); } diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java index 52a65e7698d..a6da0ac2ded 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java @@ -18,6 +18,7 @@ */ package org.apache.gravitino.catalog.lakehouse; +import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX; import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry; import com.google.common.collect.ImmutableList; @@ -41,7 +42,14 @@ public class GenericLakehouseSchemaPropertiesMetadata extends BasePropertiesMeta "The root directory of the lakehouse schema.", false /* immutable */, null, /* defaultValue */ - false /* hidden */)); + false /* hidden */), + PropertyEntry.stringOptionalPropertyPrefixEntry( + LANCE_TABLE_STORAGE_OPTION_PREFIX, + "The storage options passed to Lance table.", + false /* immutable */, + null /* default value*/, + false /* hidden */, + false /* reserved */)); PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); } diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java index 362b10dbe4a..e9a61a6b0fc 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java @@ -18,21 +18,43 @@ */ package org.apache.gravitino.catalog.lakehouse; -import com.google.common.collect.ImmutableMap; +import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.util.List; import java.util.Map; import org.apache.gravitino.connector.BasePropertiesMetadata; import org.apache.gravitino.connector.PropertyEntry; public class GenericLakehouseTablePropertiesMetadata extends BasePropertiesMetadata { + public static final String LOCATION = "location"; + public static final String LANCE_TABLE_STORAGE_OPTION_PREFIX = "lance.storage."; - private static final Map> propertiesMetadata; + private static final Map> PROPERTIES_METADATA; static { - propertiesMetadata = ImmutableMap.of(); + List> propertyEntries = + ImmutableList.of( + stringOptionalPropertyEntry( + LOCATION, + "The root directory of the lakehouse table.", + true /* immutable */, + null, /* defaultValue */ + false /* hidden */), + PropertyEntry.stringOptionalPropertyPrefixEntry( + LANCE_TABLE_STORAGE_OPTION_PREFIX, + "The storage options passed to Lance table.", + false /* immutable */, + null /* default value*/, + false /* hidden */, + false /* reserved */)); + + PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); } @Override protected Map> specificPropertyEntries() { - return propertiesMetadata; + return PROPERTIES_METADATA; } } diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java index 342826a882d..dcfe6bd4896 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java @@ -19,6 +19,9 @@ package org.apache.gravitino.catalog.lakehouse.lance; +import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX; +import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.lancedb.lance.Dataset; @@ -114,13 +117,20 @@ public Table createTable( Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { // Ignore partitions, distributions, sortOrders, and indexes for Lance tables; - String location = properties.get("location"); + String location = properties.get(LOCATION); + Map storageProps = + properties.entrySet().stream() + .filter(e -> e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX)) + .collect( + Collectors.toMap( + e -> e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()), + Map.Entry::getValue)); try (Dataset dataset = Dataset.create( new RootAllocator(), location, convertColumnsToSchema(columns), - new WriteParams.Builder().build())) { + new WriteParams.Builder().withStorageOptions(storageProps).build())) { GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder(); return builder .withName(ident.name())