Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

public final class Locations
{
private static final Pattern S3_TABLES = Pattern.compile("s3://(?!.*/).*--table-s3");
private static final Pattern S3_TABLES = Pattern.compile("s3://[^/]*--table-s3(?:/.*)?");

private Locations() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ void testIsS3Tables()
{
assertThat(Locations.isS3Tables("s3://e97725d9-dbfb-4334-784sox7edps35ncq16arh546frqa1use2b--table-s3")).isTrue();
assertThat(Locations.isS3Tables("s3://75fed916-b871-4909-mx9t6iohbseks57q16e5y6nf1c8gguse2b--table-s3")).isTrue();
assertThat(Locations.isS3Tables("s3://e97725d9-dbfb-4334-784sox7edps35ncq16arh546frqa1use2b--table-s3/")).isTrue();
assertThat(Locations.isS3Tables("s3://75fed916-b871-4909-mx9t6iohbseks57q16e5y6nf1c8gguse2b--table-s3/")).isTrue();

assertThat(Locations.isS3Tables("s3://e97725d9-dbfb-4334-784sox7edps35ncq16arh546frqa1use2b--table-s3/")).isFalse();
assertThat(Locations.isS3Tables("s3://75fed916-b871-4909-mx9t6iohbseks57q16e5y6nf1c8gguse2b--table-s3/")).isFalse();
assertThat(Locations.isS3Tables("s3://75fed916-b871-4909/mx9t6iohbseks57q16e5y6nf1c8gguse2b--table-s3")).isFalse();
assertThat(Locations.isS3Tables("s3://test-bucket")).isFalse();
assertThat(Locations.isS3Tables("s3://test-bucket/default")).isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.Locations;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.metastore.Column;
import io.trino.metastore.HiveMetastore;
Expand All @@ -49,6 +50,7 @@
import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer;
import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.catalog.rest.TrinoRestCatalog;
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle;
import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle;
Expand Down Expand Up @@ -249,7 +251,6 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Maps.transformValues;
import static com.google.common.collect.Sets.difference;
import static io.trino.filesystem.Locations.isS3Tables;
import static io.trino.plugin.base.filter.UtcConstraintExtractor.extractTupleDomain;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables;
Expand Down Expand Up @@ -1304,13 +1305,13 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

if (tableLocation == null) {
tableLocation = getTableLocation(tableMetadata.getProperties())
.orElseGet(() -> catalog.defaultTableLocation(session, tableMetadata.getTable()));
.orElseGet(() -> catalog.defaultTableLocation(session, tableMetadata.getTable()).orElse(null));
}
transaction = newCreateTableTransaction(catalog, tableMetadata, session, replace, tableLocation, allowedExtraProperties);
Location location = Location.of(transaction.table().location());
try {
// S3 Tables internally assigns a unique location for each table
if (!isS3Tables(location.toString())) {
if (!Locations.isS3Tables(location.toString())) {
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), transaction.table().io().properties());
if (!replace && fileSystem.listFiles(location).hasNext()) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("" +
Expand Down Expand Up @@ -1527,10 +1528,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(
appendFiles.scanManifestsWith(icebergScanExecutor);
commitUpdate(appendFiles, session, "insert");

if (isS3Tables(icebergTable.location())) {
log.debug("S3 Tables do not support statistics: %s", table.name());
}
else if (!computedStatistics.isEmpty()) {
if (!computedStatistics.isEmpty()) {
long newSnapshotId = icebergTable.currentSnapshot().snapshotId();

CollectedStatistics collectedStatistics = processComputedTableStatistics(icebergTable, computedStatistics);
Expand Down Expand Up @@ -2874,6 +2872,11 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector
return TableStatisticsMetadata.empty();
}

if (isS3Tables()) {
// S3 Tables throw "Malformed request: Cannot parse missing field: statistics" error when we try to commit extended statistics
return TableStatisticsMetadata.empty();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statistics is part of Iceberg spec for a while now, it's sad it's not supported by s3tables, because it makes queries inherently less performant on top of these tables. cc @pettyjamesm

Is there a public tracker link for this important and missing functionality?
(i mean AWS's issue, not an issue in Trino)

}

if (tableReplace) {
return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {});
}
Expand Down Expand Up @@ -2922,6 +2925,10 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
IcebergSessionProperties.EXTENDED_STATISTICS_ENABLED));
}

if (isS3Tables()) {
throw new TrinoException(NOT_SUPPORTED, "S3 Tables do not support analyze");
}

checkArgument(handle.getTableType() == DATA, "Cannot analyze non-DATA table: %s", handle.getTableType());

if (handle.getSnapshotId().isEmpty()) {
Expand Down Expand Up @@ -2987,9 +2994,6 @@ public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session,
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName());
if (isS3Tables(icebergTable.location())) {
throw new TrinoException(NOT_SUPPORTED, "S3 Tables do not support analyze");
}
beginTransaction(icebergTable);
return handle;
}
Expand Down Expand Up @@ -4043,6 +4047,11 @@ public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession sessi
return WriterScalingOptions.ENABLED;
}

private boolean isS3Tables()
{
return catalog instanceof TrinoRestCatalog restCatalog && restCatalog.isS3Tables();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have this new check, do we still need the Locations.isS3Tables check?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name "isS3Tables" is not clear to me, the method is using a catalog to check if it "isS3Tables", I don’t have a strong suggestion for an alternative name though

}

public Optional<Long> getIncrementalRefreshFromSnapshot()
{
return fromSnapshotForRefresh;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ protected Location createMaterializedViewStorage(
}
SchemaTableName storageTableName = new SchemaTableName(viewName.getSchemaName(), tableNameWithType(viewName.getTableName(), MATERIALIZED_VIEW_STORAGE));
String tableLocation = getTableLocation(materializedViewProperties)
.orElseGet(() -> defaultTableLocation(session, viewName));
.orElseGet(() -> defaultTableLocation(session, viewName).orElse(null));
List<ColumnMetadata> columns = columnsForMaterializedView(definition, materializedViewProperties);

Schema schema = schemaFromMetadata(columns);
Expand Down Expand Up @@ -349,7 +349,7 @@ protected SchemaTableName createMaterializedViewStorageTable(

ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, materializedViewProperties, Optional.empty());
String tableLocation = getTableLocation(tableMetadata.getProperties())
.orElseGet(() -> defaultTableLocation(session, tableMetadata.getTable()));
.orElseGet(() -> defaultTableLocation(session, tableMetadata.getTable()).orElse(null));
Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session, false, tableLocation, _ -> false);
AppendFiles appendFiles = transaction.newAppend();
commit(appendFiles, session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.security.TrinoPrincipal;
import jakarta.annotation.Nullable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -161,8 +160,7 @@ Transaction newCreateOrReplaceTableTransaction(

void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional<String> comment);

@Nullable
String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName);
Optional<String> defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ else if (isTrinoView(tableType, parameters) && !viewCache.asMap().containsKey(sc
}

@Override
public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<String> defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
{
String databaseLocation = stats.getGetDatabase().call(() ->
glueClient.getDatabase(x -> x.name(schemaTableName.getSchemaName()))
Expand All @@ -940,7 +940,7 @@ public String defaultTableLocation(ConnectorSession session, SchemaTableName sch
databaseLocation = appendPath(defaultSchemaLocation.get(), schemaDirectoryName);
}

return appendPath(databaseLocation, tableName);
return Optional.of(appendPath(databaseLocation, tableName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,14 +507,14 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName vi
}

@Override
public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<String> defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
{
Database database = metastore.getDatabase(schemaTableName.getSchemaName())
.orElseThrow(() -> new SchemaNotFoundException(schemaTableName.getSchemaName()));
String tableNameForLocation = createNewTableName(schemaTableName.getTableName());
String location = database.getLocation().orElseThrow(() ->
new TrinoException(HIVE_DATABASE_LOCATION_ERROR, format("Database '%s' location is not set", schemaTableName.getSchemaName())));
return appendPath(location, tableNameForLocation);
return Optional.of(appendPath(location, tableNameForLocation));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName sc
}

@Override
public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<String> defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
{
Namespace namespace = Namespace.of(schemaTableName.getSchemaName());
String tableName = createNewTableName(schemaTableName.getTableName());
Expand All @@ -449,7 +449,7 @@ public String defaultTableLocation(ConnectorSession session, SchemaTableName sch
String schemaLocation = databaseLocation.orElseGet(() ->
appendPath(defaultWarehouseDir, schemaTableName.getSchemaName()));

return appendPath(schemaLocation, tableName);
return Optional.of(appendPath(schemaLocation, tableName));
}

@Override
Expand All @@ -471,7 +471,7 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName,
.withDefaultNamespace(Namespace.of(schemaViewName.getSchemaName()))
.withDefaultCatalog(definition.getCatalog().orElse(null))
.withProperties(properties.buildOrThrow())
.withLocation(defaultTableLocation(session, schemaViewName));
.withLocation(defaultTableLocation(session, schemaViewName).orElseThrow());

if (replace) {
viewBuilder.createOrReplace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
}

@Override
public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<String> defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
{
Optional<String> databaseLocation = Optional.empty();
if (namespaceExists(session, schemaTableName.getSchemaName())) {
Expand All @@ -336,7 +336,7 @@ public String defaultTableLocation(ConnectorSession session, SchemaTableName sch
String schemaLocation = databaseLocation.orElseGet(() ->
appendPath(warehouseLocation, schemaTableName.getSchemaName()));

return appendPath(schemaLocation, createNewTableName(schemaTableName.getTableName()));
return Optional.of(appendPath(schemaLocation, createNewTableName(schemaTableName.getTableName())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -575,7 +576,7 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT
}

@Override
public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<String> defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
{
String tableName = createLocationForTable(schemaTableName.getTableName());

Expand All @@ -584,10 +585,10 @@ public String defaultTableLocation(ConnectorSession session, SchemaTableName sch
if (databaseLocation == null) {
// Iceberg REST catalog doesn't require location property.
// S3 Tables doesn't return the property.
return null;
return Optional.empty();
}

return appendPath(databaseLocation, tableName);
return Optional.of(appendPath(databaseLocation, tableName));
}

private String createLocationForTable(String baseTableName)
Expand Down Expand Up @@ -618,7 +619,7 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName,
.withDefaultNamespace(toRemoteNamespace(session, toNamespace(schemaViewName.getSchemaName())))
.withDefaultCatalog(definition.getCatalog().orElse(null))
.withProperties(properties.buildOrThrow())
.withLocation(defaultTableLocation(session, schemaViewName));
.withLocation(defaultTableLocation(session, schemaViewName).orElse(null));
try {
if (replace) {
viewBuilder.createOrReplace();
Expand Down Expand Up @@ -827,6 +828,12 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName sc
replaceViewVersion.commit();
}

public boolean isS3Tables()
{
String warehouse = restSessionCatalog.properties().get(CatalogProperties.WAREHOUSE_LOCATION);
return warehouse != null && warehouse.startsWith("s3tablescatalog/") && "sigv4".equals(restSessionCatalog.properties().get("rest.auth.type"));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth code comment where did s3tablescatalog come from?

}

private SessionCatalog.SessionContext convert(ConnectorSession session)
{
return switch (sessionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName sc
}

@Override
public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
public Optional<String> defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
{
throw new TrinoException(NOT_SUPPORTED, "Snowflake managed Iceberg tables do not support modifications");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER;
import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog;
import static io.trino.testing.SystemEnvironmentUtils.requireEnv;
import static io.trino.testing.TestingProperties.requiredNonEmptySystemProperty;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
Expand Down Expand Up @@ -284,6 +285,34 @@ static void main()
}
}

public static final class IcebergS3TablesQueryRunnerMain
{
private IcebergS3TablesQueryRunnerMain() {}

static void main()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

throws Exception
{
@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder("tpch")
.addCoordinatorProperty("http-server.http.port", "8080")
.addIcebergProperty("iceberg.catalog.type", "rest")
.addIcebergProperty("iceberg.rest-catalog.uri", "https://glue.%s.amazonaws.com/iceberg".formatted(requireEnv("AWS_REGION")))
.addIcebergProperty("iceberg.rest-catalog.warehouse", "s3tablescatalog/" + requireEnv("S3_TABLES_BUCKET"))
.addIcebergProperty("iceberg.rest-catalog.view-endpoints-enabled", "false")
.addIcebergProperty("iceberg.rest-catalog.security", "sigv4")
.addIcebergProperty("iceberg.rest-catalog.signing-name", "glue")
.addIcebergProperty("fs.native-s3.enabled", "true")
.addIcebergProperty("s3.region", requireEnv("AWS_REGION"))
.addIcebergProperty("s3.aws-access-key", requireEnv("AWS_ACCESS_KEY_ID"))
.addIcebergProperty("s3.aws-secret-key", requireEnv("AWS_SECRET_ACCESS_KEY"))
.build();

Logger log = Logger.get(IcebergS3TablesQueryRunnerMain.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
}
}

public static final class IcebergExternalQueryRunnerMain
{
private IcebergExternalQueryRunnerMain() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ void testHighlyNestedFields()
new Schema(column),
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
Optional.ofNullable(catalog.defaultTableLocation(SESSION, schemaTableName)),
catalog.defaultTableLocation(SESSION, schemaTableName),
ImmutableMap.of())
.commitTransaction();

Expand Down Expand Up @@ -1549,7 +1549,7 @@ void testAnalyzeNoSnapshot()
new Schema(Types.NestedField.optional(1, "x", Types.LongType.get())),
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
Optional.ofNullable(catalog.defaultTableLocation(SESSION, schemaTableName)),
catalog.defaultTableLocation(SESSION, schemaTableName),
ImmutableMap.of())
.commitTransaction();

Expand Down
Loading