Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.Optional;

import static io.trino.plugin.hive.HiveCompressionCodec.ZSTD;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
Expand All @@ -37,6 +39,7 @@ public class IcebergConfig
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean projectionPushdownEnabled = true;
private Optional<String> hiveCatalogName = Optional.empty();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -166,4 +169,17 @@ public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnab
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

public Optional<String> getHiveCatalogName()
{
return hiveCatalogName;
}

@Config("iceberg.hive-catalog-name")
Comment thread
findinpath marked this conversation as resolved.
Outdated
@ConfigDescription("Catalog to redirect to when a Hive table is referenced")
public IcebergConfig setHiveCatalogName(String hiveCatalogName)
{
this.hiveCatalogName = Optional.ofNullable(hiveCatalogName);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableColumnsMetadata;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Variable;
Expand Down Expand Up @@ -115,6 +116,7 @@
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -151,7 +153,6 @@
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -206,7 +207,10 @@ public Optional<TrinoPrincipal> getSchemaOwner(ConnectorSession session, Catalog
public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
IcebergTableName name = IcebergTableName.from(tableName.getTableName());
verify(name.getTableType() == DATA, "Wrong table type: " + name.getTableNameWithType());
if (name.getTableType() != DATA) {
// Pretend the table does not exist to produce better error message in case of table redirects to Hive
return null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the table redirection towards Hive connector is not enabled,
in case of trying to query on the Iceberg connector
a metadata table of a Hive connector table,
the user will receive a table not found exception.

From io.trino.tests.product.iceberg.TestIcebergHiveTablesCompatibility#testIcebergSelectFromHiveTable

        assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM iceberg.default.\"" + tableName + "$data\""))
                .hasMessageMatching("Query failed \\(#\\w+\\):\\Q Not an Iceberg table: default." + tableName);

        assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM iceberg.default.\"" + tableName + "$files\""))
                .hasMessageMatching("Query failed \\(#\\w+\\):\\Q line 1:15: Table 'iceberg.default." + tableName + "$files' does not exist");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keeping verify in such case seems "OK" too, wdyt?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping verify would issue for the statement:

onTrino().executeQuery("SELECT * FROM iceberg.default.\"" + hiveTableName + "$files\"")

error messages like the following:

Wrong table type: test_iceberg_select_from_hive_63u5u11q3c70$files

I tend to say that the error message

Table 'iceberg.default." + hiveTableName + "$files' does not exist"

fits better (not ideal, but better).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires code comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make it clear from the code perspective:

io.trino.plugin.iceberg.IcebergMetadata#getTableHandle is reached by a select from hive_table_name$files because there is no system table in Iceberg found for the hive_table_name (IcebergMetadata#getSystemTable returns Optional.empty() in such cases).

In StatementAnalyzer#visitTable the logic of the method assumes that if the identifier doesn't correspond to a MV or to a view, it is certainly a table. For this reason IcebergMetadata#getTableHandle is called with the rather unexpected argument hive_table_name$files.

Probably a refactoring of StatementAnalyzer#visitTable method which verifies in the beginning whether we're dealing with an redirected table and acts accordingly would be the right way to go, but such a change (if it makes sense) would rather fit in a different PR.

}

Table table;
try {
Expand Down Expand Up @@ -236,6 +240,7 @@ public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTabl
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

@SuppressWarnings("TryWithIdenticalCatches")
private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName tableName)
{
IcebergTableName name = IcebergTableName.from(tableName.getTableName());
Expand All @@ -251,6 +256,10 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
catch (TableNotFoundException e) {
return Optional.empty();
}
catch (UnknownTableTypeException e) {
// avoid dealing with non Iceberg tables
return Optional.empty();
}

SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType());
switch (name.getTableType()) {
Expand Down Expand Up @@ -395,27 +404,43 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
List<SchemaTableName> tables = prefix.getTable()
.map(ignored -> singletonList(prefix.toSchemaTableName()))
.orElseGet(() -> listTables(session, prefix.getSchema()));
throw new UnsupportedOperationException("The deprecated listTableColumns is not supported because streamTableColumns is implemented instead");
}

ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
for (SchemaTableName table : tables) {
try {
columns.put(table, getTableMetadata(session, table).getColumns());
}
catch (TableNotFoundException e) {
// table disappeared during listing operation
}
catch (UnknownTableTypeException e) {
// ignore table of unknown type
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during column listing for %s", table, prefix);
}
@Override
@SuppressWarnings("TryWithIdenticalCatches")
public Stream<TableColumnsMetadata> streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
requireNonNull(prefix, "prefix is null");
List<SchemaTableName> schemaTableNames;
if (prefix.getTable().isEmpty()) {
schemaTableNames = catalog.listTables(session, prefix.getSchema());
}
else {
schemaTableNames = ImmutableList.of(prefix.toSchemaTableName());
}
Comment thread
findinpath marked this conversation as resolved.
Outdated
return columns.buildOrThrow();
return schemaTableNames.stream()
.flatMap(tableName -> {
try {
if (redirectTable(session, tableName).isPresent()) {
return Stream.of(TableColumnsMetadata.forRedirectedTable(tableName));
}
return Stream.of(TableColumnsMetadata.forTable(tableName, getTableMetadata(session, tableName).getColumns()));
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
return Stream.empty();
}
catch (UnknownTableTypeException e) {
// Skip unsupported table type in case that the table redirects are not enabled
return Stream.empty();
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during streaming table columns for %s", tableName, prefix);
return Stream.empty();
}
});
}

@Override
Expand Down Expand Up @@ -1394,6 +1419,12 @@ private Map<String, Optional<TableToken>> getMaterializedViewToken(ConnectorSess
return viewToken;
}

@Override
public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName)
{
return catalog.redirectTable(session, tableName);
}

private static class TableToken
{
// Current Snapshot ID of the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.inject.Inject;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -41,6 +42,7 @@
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static java.lang.String.format;

public final class IcebergSessionProperties
Expand Down Expand Up @@ -71,6 +73,7 @@ public final class IcebergSessionProperties
private static final String STATISTICS_ENABLED = "statistics_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -219,6 +222,13 @@ public IcebergSessionProperties(
"Target maximum size of written files; the actual size may be larger",
hiveConfig.getTargetMaxFileSize(),
false))
.add(stringProperty(
HIVE_CATALOG_NAME,
"Catalog to redirect to when a Hive table is referenced",
icebergConfig.getHiveCatalogName().orElse(null),
// Session-level redirections configuration does not work well with views, as view body is analyzed in context
// of a session with properties stripped off. Thus, this property is more of a test-only, or at most POC usefulness.
true))
Comment thread
findinpath marked this conversation as resolved.
Outdated
.build();
}

Expand Down Expand Up @@ -359,4 +369,9 @@ public static long getTargetMaxFileSize(ConnectorSession session)
{
return session.getProperty(TARGET_MAX_FILE_SIZE, DataSize.class).toBytes();
}

public static Optional<String> getHiveCatalogName(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.trino.plugin.iceberg.ColumnIdentity;
import io.trino.plugin.iceberg.UnknownTableTypeException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
Expand Down Expand Up @@ -119,4 +120,6 @@ void createMaterializedView(
void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target);

void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional<String> comment);

Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
Expand Down Expand Up @@ -74,16 +75,22 @@
import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData;
import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView;
import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getTableInput;
import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getViewTableInput;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.CatalogUtil.dropTableData;

public class TrinoGlueCatalog
Expand Down Expand Up @@ -575,4 +582,40 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou
{
throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for Iceberg Glue catalogs");
}

@Override
public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName)
{
requireNonNull(session, "session is null");
requireNonNull(tableName, "tableName is null");
Optional<String> targetCatalogName = getHiveCatalogName(session);
if (targetCatalogName.isEmpty()) {
return Optional.empty();
}
if (isHiveSystemSchema(tableName.getSchemaName())) {
return Optional.empty();
}

// we need to chop off any "$partitions" and similar suffixes from table name while querying the metastore for the Table object
int metadataMarkerIndex = tableName.getTableName().lastIndexOf('$');
SchemaTableName tableNameBase = (metadataMarkerIndex == -1) ? tableName : schemaTableName(
tableName.getSchemaName(),
tableName.getTableName().substring(0, metadataMarkerIndex));

Optional<com.amazonaws.services.glue.model.Table> table = getTable(new SchemaTableName(tableNameBase.getSchemaName(), tableNameBase.getTableName()));

if (table.isEmpty() || VIRTUAL_VIEW.name().equals(table.get().getTableType())) {
return Optional.empty();
}
if (!isIcebergTable(table.get())) {
// After redirecting, use the original table name, with "$partitions" and similar suffixes
return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, tableName));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, I'd use get instead of map here, makes it clear that an empty catalog name shouldn't ever show up at this point.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a few lines above:

        Optional<String> targetCatalogName = getHiveCatalogName(session);
        if (targetCatalogName.isEmpty()) {
            return Optional.empty();
        }

Also note that the method returns an Optional, so I'd have to do .get() and then wrap the result back to Optional.

}
return Optional.empty();
}

private static boolean isIcebergTable(com.amazonaws.services.glue.model.Table table)
Comment thread
findepi marked this conversation as resolved.
Outdated
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,19 @@
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition;
import static io.trino.plugin.iceberg.IcebergSchemaProperties.getSchemaLocation;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
Expand Down Expand Up @@ -622,6 +625,37 @@ private List<String> listNamespaces(ConnectorSession session, Optional<String> n
return listNamespaces(session);
}

@Override
public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName)
{
requireNonNull(session, "session is null");
requireNonNull(tableName, "tableName is null");
Optional<String> targetCatalogName = getHiveCatalogName(session);
if (targetCatalogName.isEmpty()) {
return Optional.empty();
}
if (isHiveSystemSchema(tableName.getSchemaName())) {
return Optional.empty();
}

// we need to chop off any "$partitions" and similar suffixes from table name while querying the metastore for the Table object
int metadataMarkerIndex = tableName.getTableName().lastIndexOf('$');
SchemaTableName tableNameBase = (metadataMarkerIndex == -1) ? tableName : schemaTableName(
tableName.getSchemaName(),
tableName.getTableName().substring(0, metadataMarkerIndex));

Optional<io.trino.plugin.hive.metastore.Table> table = metastore.getTable(tableNameBase.getSchemaName(), tableNameBase.getTableName());

if (table.isEmpty() || isHiveOrPrestoView(table.get().getTableType())) {
return Optional.empty();
}
if (!isIcebergTable(table.get())) {
// After redirecting, use the original table name, with "$partitions" and similar suffixes
return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, tableName));
}
return Optional.empty();
}

private static class MaterializedViewMayBeBeingRemovedException
extends RuntimeException
{
Expand Down
Loading