Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ is used.
materialized view definition. When the ``storage_schema`` materialized
view property is specified, it takes precedence over this catalog property.
- Empty
* - ``iceberg.register-table-procedure.enabled``
- Enable to allow user to call ``register_table`` procedure
- ``false``

ORC format configuration
^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -748,6 +751,25 @@ and rename operations, including in nested structures.
Table partitioning can also be changed and the connector can still
query data created before the partitioning change.

Register table
--------------
The connector can register existing Iceberg tables with the catalog.

An SQL procedure ``system.register_table`` allows the caller to register an existing Iceberg
table in the metastore, using its existing metadata and data files::

CALL iceberg.system.register_table(schema_name => 'testdb', table_name => 'customer_orders', table_location => 'hdfs://hadoop-master:9000/user/hive/warehouse/customer_orders-581fad8517934af6be1857a903559d44')

In addition, you can provide a file name to register a table
with specific metadata. This may be used to register the table with
some specific table state, or may be necessary if the connector cannot
automatically figure out the metadata version to use::

CALL iceberg.system.register_table(schema_name => 'testdb', table_name => 'customer_orders', table_location => 'hdfs://hadoop-master:9000/user/hive/warehouse/customer_orders-581fad8517934af6be1857a903559d44', metadata_file_name => '00003-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json')

To prevent unauthorized users from accessing data, this procedure is disabled by default.
The procedure is enabled only when ``iceberg.register-table-procedure.enabled`` is set to ``true``.

Migrating existing tables
-------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class IcebergConfig
private boolean tableStatisticsEnabled = true;
private boolean extendedStatisticsEnabled;
private boolean projectionPushdownEnabled = true;
private boolean registerTableProcedureEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
Expand Down Expand Up @@ -210,6 +211,19 @@ public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnab
return this;
}

public boolean isRegisterTableProcedureEnabled()
{
return registerTableProcedureEnabled;
}

@Config("iceberg.register-table-procedure.enabled")
@ConfigDescription("Allow users to call the register_table procedure")
public IcebergConfig setRegisterTableProcedureEnabled(boolean registerTableProcedureEnabled)
{
this.registerTableProcedureEnabled = registerTableProcedureEnabled;
return this;
}

public Optional<String> getHiveCatalogName()
{
return hiveCatalogName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure;
import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
import io.trino.plugin.iceberg.procedure.RegisterTableProcedure;
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
Expand Down Expand Up @@ -80,6 +81,7 @@ public void configure(Binder binder)

Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);

Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
Expand Down Expand Up @@ -73,6 +74,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -87,6 +89,7 @@
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY;
Expand Down Expand Up @@ -128,6 +131,7 @@
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.parseFloat;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
Expand All @@ -147,6 +151,9 @@

public final class IcebergUtil
{
private static final Logger log = Logger.get(IcebergUtil.class);

public static final String METADATA_FOLDER_NAME = "metadata";
public static final String METADATA_FILE_EXTENSION = ".metadata.json";
private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*");

Expand Down Expand Up @@ -626,6 +633,23 @@ private static void validateOrcBloomFilterColumns(ConnectorTableMetadata tableMe
}
}

public static OptionalInt parseVersion(String metadataLocation)
throws TrinoException
{
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
if (versionStart == 0 || versionEnd == -1) {
throw new TrinoException(ICEBERG_BAD_DATA, "Invalid metadata location: " + metadataLocation);
}
try {
return OptionalInt.of(parseInt(metadataLocation.substring(versionStart, versionEnd)));
}
catch (NumberFormatException e) {
log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation);
return OptionalInt.empty();
}
}

public static String fixBrokenMetadataLocation(String location)
{
// Version 393-394 stored metadata location with double slash https://github.com/trinodb/trino/commit/e95fdcc7d1ec110b10977d17458e06fc4e6f217d#diff-9bbb7c0b6168f0e6b4732136f9a97f820aa082b04efb5609b6138afc118831d7R46
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.iceberg.catalog;

import io.airlift.log.Logger;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -42,9 +41,10 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveType.toHiveType;
import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME;
import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation;
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
import static java.lang.Integer.parseInt;
import static io.trino.plugin.iceberg.IcebergUtil.parseVersion;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
Expand All @@ -58,10 +58,6 @@
public abstract class AbstractIcebergTableOperations
implements IcebergTableOperations
{
private static final Logger log = Logger.get(AbstractIcebergTableOperations.class);

protected static final String METADATA_FOLDER_NAME = "metadata";

public static final StorageFormat ICEBERG_METASTORE_STORAGE_FORMAT = StorageFormat.create(
LazySimpleSerDe.class.getName(),
FileInputFormat.class.getName(),
Expand Down Expand Up @@ -102,7 +98,7 @@ public void initializeFromMetadata(TableMetadata tableMetadata)
currentMetadata = tableMetadata;
currentMetadataLocation = tableMetadata.metadataFileLocation();
shouldRefresh = false;
version = parseVersion(currentMetadataLocation);
version = parseVersion(currentMetadataLocation).orElse(-1);
}

@Override
Expand Down Expand Up @@ -232,7 +228,7 @@ protected void refreshFromMetadataLocation(String newLocation)

currentMetadata = newMetadata.get();
currentMetadataLocation = newLocation;
version = parseVersion(newLocation);
version = parseVersion(newLocation).orElse(-1);
shouldRefresh = false;
}

Expand All @@ -251,19 +247,6 @@ protected static String metadataFileLocation(TableMetadata metadata, String file
return format("%s/%s/%s", stripTrailingSlash(metadata.location()), METADATA_FOLDER_NAME, filename);
}

protected static int parseVersion(String metadataLocation)
{
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
try {
return parseInt(metadataLocation.substring(versionStart, versionEnd));
}
catch (NumberFormatException | IndexOutOfBoundsException e) {
log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation);
return -1;
}
}

protected static List<Column> toHiveColumns(List<NestedField> columns)
{
return columns.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ Transaction newCreateTableTransaction(
String location,
Map<String, String> properties);

void registerTable(ConnectorSession session, SchemaTableName tableName, String tableLocation, String metadataLocation);

void dropTable(ConnectorSession session, SchemaTableName schemaTableName);

void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.CatalogUtil.dropTableData;

public class TrinoGlueCatalog
Expand Down Expand Up @@ -370,6 +372,17 @@ public Transaction newCreateTableTransaction(
Optional.of(session.getUser()));
}

@Override
public void registerTable(ConnectorSession session, SchemaTableName schemaTableName, String tableLocation, String metadataLocation)
throws TrinoException
{
TableInput tableInput = getTableInput(schemaTableName.getTableName(), Optional.of(session.getUser()), ImmutableMap.<String, String>builder()
.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH))
.put(METADATA_LOCATION_PROP, metadataLocation)
.buildOrThrow());
createTable(schemaTableName.getSchemaName(), tableInput);
}

@Override
public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HivePrincipal;
import io.trino.plugin.hive.metastore.MetastoreUtil;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.plugin.hive.util.HiveUtil;
Expand All @@ -42,6 +43,7 @@
import io.trino.spi.connector.ViewNotFoundException;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -83,6 +85,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT;
import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand All @@ -94,6 +97,9 @@
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.CatalogUtil.dropTableData;

public class TrinoHiveCatalog
Expand Down Expand Up @@ -261,6 +267,29 @@ public Transaction newCreateTableTransaction(
isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser()));
}

@Override
public void registerTable(ConnectorSession session, SchemaTableName schemaTableName, String tableLocation, String metadataLocation)
throws TrinoException
{
Optional<String> owner = isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser());

io.trino.plugin.hive.metastore.Table.Builder builder = io.trino.plugin.hive.metastore.Table.builder()
.setDatabaseName(schemaTableName.getSchemaName())
.setTableName(schemaTableName.getTableName())
.setOwner(owner)
// Table needs to be EXTERNAL, otherwise table rename in HMS would rename table directory and break table contents.
.setTableType(TableType.EXTERNAL_TABLE.name())
.withStorage(storage -> storage.setLocation(tableLocation))
.withStorage(storage -> storage.setStorageFormat(ICEBERG_METASTORE_STORAGE_FORMAT))
// This is a must-have property for the EXTERNAL_TABLE table type
.setParameter("EXTERNAL", "TRUE")
.setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH))
.setParameter(METADATA_LOCATION_PROP, metadataLocation);

PrincipalPrivileges privileges = owner.map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES);
metastore.createTable(builder.build(), privileges);
}

@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> namespace)
{
Expand Down
Loading