Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand All @@ -35,6 +36,7 @@
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.SECONDS;

@DefunctConfig("iceberg.allow-legacy-snapshot-syntax")
public class IcebergConfig
{
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
Expand Down Expand Up @@ -64,7 +66,6 @@ public class IcebergConfig
// to avoid deleting those files if Trino is unable to check.
private boolean deleteSchemaLocationsFallback;
private double minimumAssignedSplitWeight = 0.05;
private boolean allowLegacySnapshotSyntax;
private Optional<String> materializedViewsStorageSchema = Optional.empty();

public CatalogType getCatalogType()
Expand Down Expand Up @@ -308,20 +309,6 @@ public double getMinimumAssignedSplitWeight()
return minimumAssignedSplitWeight;
}

@Config("iceberg.allow-legacy-snapshot-syntax")
Comment thread
findinpath marked this conversation as resolved.
Outdated
@Deprecated
public IcebergConfig setAllowLegacySnapshotSyntax(boolean allowLegacySnapshotSyntax)
{
this.allowLegacySnapshotSyntax = allowLegacySnapshotSyntax;
return this;
}

@Deprecated
public boolean isAllowLegacySnapshotSyntax()
{
return allowLegacySnapshotSyntax;
}

@NotNull
public Optional<String> getMaterializedViewsStorageSchema()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -188,7 +187,6 @@
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isAllowLegacySnapshotSyntax;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled;
Expand Down Expand Up @@ -220,7 +218,6 @@
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -265,8 +262,6 @@ public class IcebergMetadata
private final TrinoCatalog catalog;
private final TrinoFileSystemFactory fileSystemFactory;

private final Map<String, Long> snapshotIds = new ConcurrentHashMap<>();

private Transaction transaction;

public IcebergMetadata(
Expand Down Expand Up @@ -332,16 +327,11 @@ public IcebergTableHandle getTableHandle(
return null;
}

if (name.getSnapshotId().isPresent() && endVersion.isPresent()) {
throw new TrinoException(GENERIC_USER_ERROR, "Cannot specify end version both in table name and FOR clause");
}

Optional<Long> tableSnapshotId;
Schema tableSchema;
Optional<PartitionSpec> partitionSpec;
if (endVersion.isPresent() || name.getSnapshotId().isPresent()) {
long snapshotId = endVersion.map(connectorTableVersion -> getSnapshotIdFromVersion(table, connectorTableVersion))
.orElseGet(() -> resolveSnapshotId(table, name.getSnapshotId().get(), isAllowLegacySnapshotSyntax(session)));
if (endVersion.isPresent()) {
long snapshotId = getSnapshotIdFromVersion(table, endVersion.get());
tableSnapshotId = Optional.of(snapshotId);
tableSchema = schemaFor(table, snapshotId);
partitionSpec = Optional.empty();
Expand Down Expand Up @@ -436,21 +426,15 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
// Handled above.
break;
case HISTORY:
if (name.getSnapshotId().isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Snapshot ID not supported for history table: " + systemTableName);
}
return Optional.of(new HistoryTable(systemTableName, table));
case SNAPSHOTS:
if (name.getSnapshotId().isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Snapshot ID not supported for snapshots table: " + systemTableName);
}
return Optional.of(new SnapshotsTable(systemTableName, typeManager, table));
case PARTITIONS:
return Optional.of(new PartitionTable(systemTableName, typeManager, table, getSnapshotId(table, name.getSnapshotId(), isAllowLegacySnapshotSyntax(session))));
return Optional.of(new PartitionTable(systemTableName, typeManager, table, getCurrentSnapshotId(table)));
case MANIFESTS:
return Optional.of(new ManifestsTable(systemTableName, table, getSnapshotId(table, name.getSnapshotId(), isAllowLegacySnapshotSyntax(session))));
return Optional.of(new ManifestsTable(systemTableName, table, getCurrentSnapshotId(table)));
case FILES:
return Optional.of(new FilesTable(systemTableName, typeManager, table, getSnapshotId(table, name.getSnapshotId(), isAllowLegacySnapshotSyntax(session))));
return Optional.of(new FilesTable(systemTableName, typeManager, table, getCurrentSnapshotId(table)));
case PROPERTIES:
return Optional.of(new PropertiesTable(systemTableName, table));
}
Expand Down Expand Up @@ -2090,19 +2074,9 @@ public void setTableAuthorization(ConnectorSession session, SchemaTableName tabl
catalog.setTablePrincipal(session, tableName, principal);
}

private Optional<Long> getSnapshotId(Table table, Optional<Long> snapshotId, boolean allowLegacySnapshotSyntax)
{
// table.name() is an encoded version of SchemaTableName
return snapshotId
.map(id -> resolveSnapshotId(table, id, allowLegacySnapshotSyntax))
.or(() -> Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId));
}

private long resolveSnapshotId(Table table, long id, boolean allowLegacySnapshotSyntax)
private Optional<Long> getCurrentSnapshotId(Table table)
{
return snapshotIds.computeIfAbsent(
table.name() + "@" + id,
ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax));
return Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId);
}

Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public final class IcebergSessionProperties
private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "expire_snapshots_min_retention";
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention";
private static final String ALLOW_LEGACY_SNAPSHOT_SYNTAX = "allow_legacy_snapshot_syntax";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -253,11 +252,6 @@ public IcebergSessionProperties(
"Minimal retention period for remove_orphan_files procedure",
icebergConfig.getRemoveOrphanFilesMinRetention(),
false))
.add(booleanProperty(
ALLOW_LEGACY_SNAPSHOT_SYNTAX,
"Allow snapshot access based on timestamp and snapshotid",
icebergConfig.isAllowLegacySnapshotSyntax(),
false))
.build();
}

Expand Down Expand Up @@ -423,9 +417,4 @@ public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
}

public static boolean isAllowLegacySnapshotSyntax(ConnectorSession session)
{
return session.getProperty(ALLOW_LEGACY_SNAPSHOT_SYNTAX, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,26 @@
import io.trino.spi.TrinoException;

import java.util.Locale;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class IcebergTableName
{
private static final Pattern TABLE_PATTERN = Pattern.compile("" +
"(?<table>[^$@]+)" +
"(?:@(?<ver1>[0-9]+))?" +
"(?:\\$(?<type>[^@]+)(?:@(?<ver2>[0-9]+))?)?");
"(?:\\$(?<type>[^@]+))?");

private final String tableName;
private final TableType tableType;
private final Optional<Long> snapshotId;

public IcebergTableName(String tableName, TableType tableType, Optional<Long> snapshotId)
public IcebergTableName(String tableName, TableType tableType)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableType = requireNonNull(tableType, "tableType is null");
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
}

public String getTableName()
Expand All @@ -53,11 +48,6 @@ public TableType getTableType()
return tableType;
}

public Optional<Long> getSnapshotId()
{
return snapshotId;
}

public String getTableNameWithType()
{
return tableName + "$" + tableType.name().toLowerCase(Locale.ROOT);
Expand All @@ -66,7 +56,7 @@ public String getTableNameWithType()
@Override
public String toString()
{
return getTableNameWithType() + "@" + snapshotId;
return getTableNameWithType();
}

public static IcebergTableName from(String name)
Expand All @@ -78,8 +68,6 @@ public static IcebergTableName from(String name)

String table = match.group("table");
String typeString = match.group("type");
String ver1 = match.group("ver1");
String ver2 = match.group("ver2");

TableType type = TableType.DATA;
if (typeString != null) {
Expand All @@ -91,22 +79,6 @@ public static IcebergTableName from(String name)
}
}

Optional<Long> version = Optional.empty();
if (type == TableType.DATA || type == TableType.PARTITIONS || type == TableType.MANIFESTS || type == TableType.FILES) {
if (ver1 != null && ver2 != null) {
throw new TrinoException(NOT_SUPPORTED, "Invalid Iceberg table name (cannot specify two @ versions): " + name);
}
if (ver1 != null) {
version = Optional.of(parseLong(ver1));
}
else if (ver2 != null) {
version = Optional.of(parseLong(ver2));
}
}
else if (ver1 != null || ver2 != null) {
throw new TrinoException(NOT_SUPPORTED, format("Invalid Iceberg table name (cannot use @ version with table type '%s'): %s", type, name));
}

return new IcebergTableName(table, type, version);
return new IcebergTableName(table, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,10 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Lists.reverse;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
Expand Down Expand Up @@ -217,31 +215,6 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
return properties.buildOrThrow();
}

@Deprecated
public static long resolveSnapshotId(Table table, long snapshotId, boolean allowLegacySnapshotSyntax)
{
if (!allowLegacySnapshotSyntax) {
throw new TrinoException(
NOT_SUPPORTED,
format(
"Failed to access snapshot %s for table %s. This syntax for accessing Iceberg tables is not "
+ "supported. Use the AS OF syntax OR set the catalog session property "
+ "allow_legacy_snapshot_syntax=true for temporarily restoring previous behavior.",
snapshotId,
table.name()));
}

if (table.snapshot(snapshotId) != null) {
return snapshotId;
}

return reverse(table.history()).stream()
.filter(entry -> entry.timestampMillis() <= snapshotId)
.map(HistoryEntry::snapshotId)
.findFirst()
.orElseThrow(() -> new TrinoException(ICEBERG_INVALID_SNAPSHOT_ID, format("Invalid snapshot [%s] for table: %s", snapshotId, table)));
}

public static List<IcebergColumnHandle> getColumns(Schema schema, TypeManager typeManager)
{
return schema.columns().stream()
Expand Down
Loading