Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ jobs:
if: matrix.modules == 'plugin/trino-bigquery' && env.BIGQUERY_CASE_INSENSITIVE_CREDENTIALS_KEY != ''
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-case-insensitive-mapping -Dbigquery.credentials-key="${BIGQUERY_CASE_INSENSITIVE_CREDENTIALS_KEY}"
- name: Iceberg Glue Catalog Tests
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESSKEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRETKEY }}
AWS_REGION: us-east-2
S3_BUCKET: presto-ci-test
if: contains(matrix.modules, 'plugin/trino-iceberg') && (env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '')
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-iceberg -P test-glue-catalog -Ds3.bucket=${S3_BUCKET}
- name: Sanitize artifact name
if: always()
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,10 @@ public List<QualifiedObjectName> listTables(Session session, QualifiedTablePrefi

Optional<QualifiedObjectName> objectName = prefix.asQualifiedObjectName();
if (objectName.isPresent()) {
if (isExistingRelation(session, objectName.get())) {
return ImmutableList.of(objectName.get());
Optional<Boolean> exists = isExistingRelationForListing(session, objectName.get());
if (exists.isPresent()) {
return exists.get() ? ImmutableList.of(objectName.get()) : ImmutableList.of();
}
return ImmutableList.of();
}

Optional<CatalogMetadata> catalog = getOptionalCatalogMetadata(session, prefix.getCatalogName());
Expand All @@ -572,20 +572,27 @@ public List<QualifiedObjectName> listTables(Session session, QualifiedTablePrefi
return ImmutableList.copyOf(tables);
}

private boolean isExistingRelation(Session session, QualifiedObjectName name)
private Optional<Boolean> isExistingRelationForListing(Session session, QualifiedObjectName name)
{
if (isMaterializedView(session, name)) {
return true;
return Optional.of(true);
}
if (isView(session, name)) {
return true;
return Optional.of(true);
}

// If the table is not redirected, table handle existence is checked.
// If the table is redirected, the target table handle is retrieved. If it does not exist, an
// exception is thrown. This behavior is currently inconsistent with the unfiltered case of table listing.
// TODO: the behavior may change with a different way to resolve relation names. https://github.com/trinodb/trino/issues/9400
return getRedirectionAwareTableHandle(session, name).getTableHandle().isPresent();
// TODO: consider a better way to resolve relation names: https://github.com/trinodb/trino/issues/9400
try {
return Optional.of(getRedirectionAwareTableHandle(session, name).getTableHandle().isPresent());
}
catch (TrinoException e) {
// ignore redirection errors for consistency with listing
if (e.getErrorCode().equals(TABLE_REDIRECTION_ERROR.toErrorCode())) {
return Optional.of(true);
}
// we don't know if it exists or not
return Optional.empty();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -126,12 +127,22 @@ private static CoralTableRedirectionResolver coralTableRedirectionResolver(

public static boolean isPrestoView(Table table)
{
return "true".equals(table.getParameters().get(PRESTO_VIEW_FLAG));
return isPrestoView(table.getParameters());
}

public static boolean isPrestoView(Map<String, String> tableParameters)
{
return "true".equals(tableParameters.get(PRESTO_VIEW_FLAG));
}

public static boolean isHiveOrPrestoView(Table table)
{
return table.getTableType().equals(TableType.VIRTUAL_VIEW.name());
return isHiveOrPrestoView(table.getTableType());
}

public static boolean isHiveOrPrestoView(String tableType)
{
return tableType.equals(TableType.VIRTUAL_VIEW.name());
}

public static boolean canDecodeView(Table table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ public void invalidateTable(String databaseName, String tableName)
.filter(userTableKey -> userTableKey.matches(databaseName, tableName))
.forEach(tablePrivilegesCache::invalidate);
invalidateTableStatisticsCache(databaseName, tableName);
invalidateTablesWithParameterCache(databaseName, tableName);
invalidatePartitionCache(databaseName, tableName);
}

Expand All @@ -604,6 +605,17 @@ private void invalidateTableStatisticsCache(String databaseName, String tableNam
.forEach(tableCache::invalidate);
}

private void invalidateTablesWithParameterCache(String databaseName, String tableName)
{
tablesWithParameterCache.asMap().keySet().stream()
.filter(cacheKey -> cacheKey.getDatabaseName().equals(databaseName))
.filter(cacheKey -> {
List<String> cacheValue = tablesWithParameterCache.getIfPresent(cacheKey);
return cacheValue != null && cacheValue.contains(tableName);
})
.forEach(tablesWithParameterCache::invalidate);
}

private Partition getExistingPartition(Table table, List<String> partitionValues)
{
return getPartition(table, partitionValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public GlueHiveMetastore(
this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats);
}

private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
{
ClientConfiguration clientConfig = new ClientConfiguration()
.withMaxConnections(config.getMaxGlueConnections())
Expand Down
37 changes: 37 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down Expand Up @@ -320,6 +335,9 @@
<configuration>
<excludes>
<exclude>**/TestIceberg*FailureRecoveryTest.java</exclude>
<exclude>**/TestIcebergGlueCatalogConnectorSmokeTest.java</exclude>
<exclude>**/TestTrinoGlueCatalogTest.java</exclude>
<exclude>**/TestSharedGlueMetastore.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -371,5 +389,24 @@
</plugins>
</build>
</profile>

<profile>
<id>test-glue-catalog</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/TestIcebergGlueCatalogConnectorSmokeTest.java</include>
<include>**/TestTrinoGlueCatalogTest.java</include>
<include>**/TestSharedGlueMetastore.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public enum IcebergErrorCode
ICEBERG_CURSOR_ERROR(9, EXTERNAL),
ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR),
ICEBERG_COMMIT_ERROR(12, EXTERNAL),
ICEBERG_CATALOG_ERROR(13, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.plugin.hive.HiveApplyProjectionUtil;
import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation;
import io.trino.plugin.hive.HiveWrittenPartitions;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
Expand Down Expand Up @@ -141,9 +142,9 @@
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TrinoHiveCatalog.DEPENDS_ON_TABLES;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.DEPENDS_ON_TABLES;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);

binder.bind(TrinoCatalogFactory.class).in(Scopes.SINGLETON);
binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON);

jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -105,7 +105,11 @@
import static org.apache.iceberg.LocationProviders.locationsFor;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;

Expand All @@ -120,10 +124,10 @@ public static boolean isIcebergTable(io.trino.plugin.hive.metastore.Table table)
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP));
}

public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table)
public static Table loadIcebergTable(TrinoCatalog catalog, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table)
{
TableOperations operations = tableOperationsProvider.createTableOperations(
metastore,
catalog,
session,
table.getSchemaName(),
table.getTableName(),
Expand All @@ -133,14 +137,14 @@ public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperat
}

public static Table getIcebergTableWithMetadata(
HiveMetastore metastore,
TrinoCatalog catalog,
IcebergTableOperationsProvider tableOperationsProvider,
ConnectorSession session,
SchemaTableName table,
TableMetadata tableMetadata)
{
IcebergTableOperations operations = tableOperationsProvider.createTableOperations(
metastore,
catalog,
session,
table.getSchemaName(),
table.getTableName(),
Expand Down Expand Up @@ -232,7 +236,7 @@ public static Optional<String> getTableComment(Table table)
return Optional.ofNullable(table.properties().get(TABLE_COMMENT));
}

private static String quotedTableName(SchemaTableName name)
public static String quotedTableName(SchemaTableName name)
{
return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName());
}
Expand Down Expand Up @@ -403,4 +407,15 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec

return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, targetPath, propertiesBuilder.buildOrThrow());
}

public static void validateTableCanBeDropped(Table table)
{
// TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861
if (table.properties().containsKey(OBJECT_STORE_PATH) ||
table.properties().containsKey(WRITE_NEW_DATA_LOCATION) ||
table.properties().containsKey(WRITE_METADATA_LOCATION) ||
table.properties().containsKey(WRITE_DATA_LOCATION)) {
throw new TrinoException(NOT_SUPPORTED, "Table contains Iceberg path override properties and cannot be dropped from Trino: " + table.name());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.plugin.hive.gcs.HiveGcsModule;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.s3.HiveS3Module;
import io.trino.plugin.iceberg.catalog.IcebergCatalogModule;
import io.trino.spi.NodeManager;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.classloader.ThreadContextClassLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.procedure.Procedure;
Expand Down
Loading