Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
package org.apache.gravitino.catalog.lakehouse;

import static org.apache.gravitino.Entity.EntityType.TABLE;
import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
Expand Down Expand Up @@ -79,8 +82,10 @@ public class GenericLakehouseCatalogOperations
private static final Map<String, LakehouseCatalogOperations> SUPPORTED_FORMATS =
Maps.newHashMap();

private Map<String, String> catalogConfig;
private CatalogInfo catalogInfo;
private HasPropertyMetadata propertiesMetadata;

/**
* Initializes the generic lakehouse catalog operations with the provided configuration.
*
Expand All @@ -102,6 +107,9 @@ public void initialize(
StringUtils.isNotBlank(catalogLocation)
? Optional.of(catalogLocation).map(this::ensureTrailingSlash).map(Path::new)
: Optional.empty();
this.catalogConfig = conf;
this.catalogInfo = info;
this.propertiesMetadata = propertiesMetadata;
}

public GenericLakehouseCatalogOperations() {
Expand Down Expand Up @@ -207,11 +215,15 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
String format = properties.getOrDefault("format", "lance");
String tableLocation = calculateTableLocation(ident, properties);
Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
String tableLocation = calculateTableLocation(schema, ident, properties);
Map<String, String> tableStorageProps = calculateTableStorageProps(schema, properties);

Map<String, String> newProperties = Maps.newHashMap(properties);
newProperties.put("location", tableLocation);
newProperties.put(LOCATION, tableLocation);
newProperties.putAll(tableStorageProps);

String format = properties.getOrDefault("format", "lance");
LakehouseCatalogOperations lakehouseCatalogOperations =
SUPPORTED_FORMATS.compute(
format,
Expand All @@ -226,22 +238,13 @@ public Table createTable(
}

private String calculateTableLocation(
NameIdentifier tableIdent, Map<String, String> tableProperties) {
String tableLocation = tableProperties.get("location");
Schema schema, NameIdentifier tableIdent, Map<String, String> tableProperties) {
String tableLocation = tableProperties.get(LOCATION);
if (StringUtils.isNotBlank(tableLocation)) {
return ensureTrailingSlash(tableLocation);
}

String schemaLocation;
try {
Schema schema = loadSchema(NameIdentifier.of(tableIdent.namespace().levels()));
schemaLocation = schema.properties().get("location");
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
String.format(
"Failed to load schema for table %s to determine default location.", tableIdent),
e);
}
String schemaLocation = schema.properties() == null ? null : schema.properties().get(LOCATION);

// If we do not set location in table properties, and schema location is set, use schema
// location
Expand Down Expand Up @@ -322,4 +325,33 @@ private LakehouseCatalogOperations createLakehouseCatalogOperations(
operations.initialize(properties, catalogInfo, propertiesMetadata);
return operations;
}

/**
* Calculate the table storage properties by merging catalog config, schema properties and table
* properties. The precedence is: table properties > schema properties > catalog config.
*
* @param schema The schema of the table.
* @param tableProps The table properties.
* @return The merged table storage properties.
*/
private Map<String, String> calculateTableStorageProps(
Schema schema, Map<String, String> tableProps) {
Map<String, String> storageProps = getLanceTableStorageOptions(catalogConfig);
storageProps.putAll(getLanceTableStorageOptions(schema.properties()));
storageProps.putAll(getLanceTableStorageOptions(tableProps));
return storageProps;
}

private Map<String, String> getLanceTableStorageOptions(Map<String, String> properties) {
if (MapUtils.isEmpty(properties)) {
return Maps.newHashMap();
}
return properties.entrySet().stream()
.filter(
e ->
e.getKey()
.startsWith(
GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.gravitino.catalog.lakehouse;

import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;

import com.google.common.collect.ImmutableList;
Expand All @@ -42,7 +43,14 @@ public class GenericLakehouseCatalogPropertiesMetadata extends BaseCatalogProper
"The root directory of the lakehouse catalog.",
false /* immutable */,
null, /* defaultValue */
false /* hidden */));
false /* hidden */),
PropertyEntry.stringOptionalPropertyPrefixEntry(
LANCE_TABLE_STORAGE_OPTION_PREFIX,
"The storage options passed to Lance table.",
false /* immutable */,
null /* default value*/,
false /* hidden */,
false /* reserved */));

PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.gravitino.catalog.lakehouse;

import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;

import com.google.common.collect.ImmutableList;
Expand All @@ -41,7 +42,14 @@ public class GenericLakehouseSchemaPropertiesMetadata extends BasePropertiesMeta
"The root directory of the lakehouse schema.",
false /* immutable */,
null, /* defaultValue */
false /* hidden */));
false /* hidden */),
PropertyEntry.stringOptionalPropertyPrefixEntry(
LANCE_TABLE_STORAGE_OPTION_PREFIX,
"The storage options passed to Lance table.",
false /* immutable */,
null /* default value*/,
false /* hidden */,
false /* reserved */));

PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,43 @@
*/
package org.apache.gravitino.catalog.lakehouse;

import com.google.common.collect.ImmutableMap;
import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.connector.BasePropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;

public class GenericLakehouseTablePropertiesMetadata extends BasePropertiesMetadata {
public static final String LOCATION = "location";
public static final String LANCE_TABLE_STORAGE_OPTION_PREFIX = "lance.storage.";

private static final Map<String, PropertyEntry<?>> propertiesMetadata;
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
propertiesMetadata = ImmutableMap.of();
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringOptionalPropertyEntry(
LOCATION,
"The root directory of the lakehouse table.",
true /* immutable */,
null, /* defaultValue */
false /* hidden */),
PropertyEntry.stringOptionalPropertyPrefixEntry(
LANCE_TABLE_STORAGE_OPTION_PREFIX,
"The storage options passed to Lance table.",
false /* immutable */,
null /* default value*/,
false /* hidden */,
false /* reserved */));

PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return propertiesMetadata;
return PROPERTIES_METADATA;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.gravitino.catalog.lakehouse.lance;

import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
import static org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION;

import com.google.common.collect.ImmutableMap;
import com.lancedb.lance.Dataset;
import com.lancedb.lance.WriteParams;
Expand Down Expand Up @@ -99,13 +102,20 @@ public Table createTable(
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
// Ignore partitions, distributions, sortOrders, and indexes for Lance tables;
String location = properties.get("location");
String location = properties.get(LOCATION);
Map<String, String> storageProps =
properties.entrySet().stream()
.filter(e -> e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX))
.collect(
Collectors.toMap(
e -> e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()),
Map.Entry::getValue));
try (Dataset dataset =
Dataset.create(
new RootAllocator(),
location,
convertColumnsToSchema(columns),
new WriteParams.Builder().build())) {
new WriteParams.Builder().withStorageOptions(storageProps).build())) {
GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
return builder
.withName(ident.name())
Expand Down