Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.spi.function.Description;
import org.apache.iceberg.FileFormat;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.Optional;

import static io.trino.plugin.hive.HiveCompressionCodec.GZIP;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
Expand All @@ -35,6 +38,7 @@ public class IcebergConfig
private int maxPartitionsPerWriter = 100;
private boolean uniqueTableLocation;
private CatalogType catalogType = HIVE_METASTORE;
private Optional<String> catalogWarehouse = Optional.empty();
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean projectionPushdownEnabled = true;
Expand All @@ -51,6 +55,20 @@ public IcebergConfig setCatalogType(CatalogType catalogType)
return this;
}

@NotNull
public Optional<String> getCatalogWarehouse()
{
return catalogWarehouse;
}

@Config("iceberg.catalog.warehouse")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about the name. something like "default storage location"?

cc @losipiuk @alexjo2144 @phd3

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to match https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/CatalogProperties.java#L31. Please let me know if other name is preferred, I can update to that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For schemas and tables we use "location". So probably we should use it also here. iceberg.default-schema-location?

@Description("Iceberg default warehouse location, used to generate default table location")
public IcebergConfig setCatalogWarehouse(String warehouse)
{
this.catalogWarehouse = Optional.ofNullable(warehouse);
return this;
}

@NotNull
public FileFormat getFileFormat()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import javax.inject.Inject;

import java.util.Optional;

import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
import static io.trino.plugin.iceberg.IcebergSecurityConfig.IcebergSecurity.SYSTEM;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand All @@ -37,6 +39,7 @@ public class TrinoCatalogFactory
private final IcebergTableOperationsProvider tableOperationsProvider;
private final String trinoVersion;
private final CatalogType catalogType;
private final Optional<String> catalogWarehouse;
private final boolean isUniqueTableLocation;
private final boolean isUsingSystemSecurity;

Expand All @@ -59,6 +62,7 @@ public TrinoCatalogFactory(
this.trinoVersion = requireNonNull(nodeVersion, "trinoVersion is null").toString();
requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
this.catalogWarehouse = config.getCatalogWarehouse();
this.isUniqueTableLocation = config.isUniqueTableLocation();
this.isUsingSystemSecurity = securityConfig.getSecuritySystem() == SYSTEM;
}
Expand All @@ -75,6 +79,7 @@ public TrinoCatalog create()
typeManager,
tableOperationsProvider,
trinoVersion,
catalogWarehouse,
isUniqueTableLocation,
isUsingSystemSecurity);
case GLUE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
Expand Down Expand Up @@ -95,6 +96,7 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
Expand Down Expand Up @@ -130,6 +132,7 @@ class TrinoHiveCatalog
private final String trinoVersion;
private final boolean useUniqueTableLocation;
private final boolean isUsingSystemSecurity;
private final Optional<String> catalogWarehouse;

private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>();
private final ViewReaderUtil.PrestoViewReader viewReader = new ViewReaderUtil.PrestoViewReader();
Expand All @@ -141,6 +144,7 @@ public TrinoHiveCatalog(
TypeManager typeManager,
IcebergTableOperationsProvider tableOperationsProvider,
String trinoVersion,
Optional<String> catalogWarehouse,
boolean useUniqueTableLocation,
boolean isUsingSystemSecurity)
{
Expand All @@ -150,6 +154,7 @@ public TrinoHiveCatalog(
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.catalogWarehouse = requireNonNull(catalogWarehouse, "catalogWarehouse is null");
this.useUniqueTableLocation = useUniqueTableLocation;
this.isUsingSystemSecurity = isUsingSystemSecurity;
}
Expand Down Expand Up @@ -319,6 +324,16 @@ public String defaultTableLocation(ConnectorSession session, SchemaTableName sch
if (useUniqueTableLocation) {
tableNameForLocation += "-" + randomUUID().toString().replace("-", "");
}
if (database.getLocation().isEmpty() || database.getLocation().get().isEmpty()) {
if (catalogWarehouse.isEmpty()) {
throw new TrinoException(HIVE_DATABASE_LOCATION_ERROR, format("Database '%s' location cannot be determined, " +
"please either set 'location' when creating the database, or set 'iceberg.catalog.warehouse' " +
"to allow a default database location at 'warehouse/database.db'", database.getDatabaseName()));
}
database = Database.builder(database)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not build a fake Database object.
Instead, let's modify getTableDefaultLocation (which already has related logic).

.setLocation(Optional.of(format("%s/%s.db", catalogWarehouse.get(), schemaTableName.getSchemaName())))
.build();
}
return getTableDefaultLocation(database, new HdfsEnvironment.HdfsContext(session), hdfsEnvironment,
schemaTableName.getSchemaName(), tableNameForLocation).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void testDefaults()
.setMaxPartitionsPerWriter(100)
.setUniqueTableLocation(false)
.setCatalogType(HIVE_METASTORE)
.setCatalogWarehouse(null)
.setDynamicFilteringWaitTimeout(new Duration(0, MINUTES))
.setTableStatisticsEnabled(true)
.setProjectionPushdownEnabled(true));
Expand All @@ -60,6 +61,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.dynamic-filtering.wait-timeout", "1h")
.put("iceberg.table-statistics-enabled", "false")
.put("iceberg.projection-pushdown-enabled", "false")
.put("iceberg.catalog.warehouse", "/tmp")
.build();

IcebergConfig expected = new IcebergConfig()
Expand All @@ -71,7 +73,8 @@ public void testExplicitPropertyMappings()
.setCatalogType(GLUE)
.setDynamicFilteringWaitTimeout(Duration.valueOf("1h"))
.setTableStatisticsEnabled(false)
.setProjectionPushdownEnabled(false);
.setProjectionPushdownEnabled(false)
.setCatalogWarehouse("/tmp");

assertFullMapping(properties, expected);
}
Expand Down