Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
public class IcebergInputInfo
{
private final Optional<Long> snapshotId;
private final boolean partitioned;
private final Optional<Boolean> partitioned;
private final String tableDefaultFileFormat;

@JsonCreator
public IcebergInputInfo(
@JsonProperty("snapshotId") Optional<Long> snapshotId,
@JsonProperty("partitioned") boolean partitioned,
@JsonProperty("partitioned") Optional<Boolean> partitioned,
@JsonProperty("fileFormat") String tableDefaultFileFormat)
{
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
this.partitioned = partitioned;
this.partitioned = requireNonNull(partitioned, "partitioned is null");
this.tableDefaultFileFormat = requireNonNull(tableDefaultFileFormat, "tableDefaultFileFormat is null");
}

Expand All @@ -45,7 +45,7 @@ public Optional<Long> getSnapshotId()
}

@JsonProperty
public boolean isPartitioned()
public Optional<Boolean> getPartitioned()
{
return partitioned;
}
Expand All @@ -66,7 +66,7 @@ public boolean equals(Object o)
return false;
}
IcebergInputInfo that = (IcebergInputInfo) o;
return partitioned == that.partitioned
return partitioned.equals(that.partitioned)
&& snapshotId.equals(that.snapshotId)
&& tableDefaultFileFormat.equals(that.tableDefaultFileFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,18 +316,31 @@ public IcebergTableHandle getTableHandle(
throw new TrinoException(GENERIC_USER_ERROR, "Cannot specify end version both in table name and FOR clause");
}

Optional<Long> snapshotId = endVersion.map(version -> getSnapshotIdFromVersion(table, version))
.or(() -> getSnapshotId(table, name.getSnapshotId(), isAllowLegacySnapshotSyntax(session)));
Optional<Long> tableSnapshotId;
Schema tableSchema;
Optional<PartitionSpec> partitionSpec;
if (endVersion.isPresent() || name.getSnapshotId().isPresent()) {
long snapshotId = endVersion.map(connectorTableVersion -> getSnapshotIdFromVersion(table, connectorTableVersion))
.orElseGet(() -> resolveSnapshotId(table, name.getSnapshotId().get(), isAllowLegacySnapshotSyntax(session)));
tableSnapshotId = Optional.of(snapshotId);
tableSchema = table.schemas().get(table.snapshot(snapshotId).schemaId());
partitionSpec = Optional.empty();
}
else {
tableSnapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId);
tableSchema = table.schema();
partitionSpec = Optional.of(table.spec());
}

Map<String, String> tableProperties = table.properties();
String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING);
return new IcebergTableHandle(
tableName.getSchemaName(),
name.getTableName(),
name.getTableType(),
snapshotId,
SchemaParser.toJson(table.schema()),
PartitionSpecParser.toJson(table.spec()),
tableSnapshotId,
SchemaParser.toJson(tableSchema),
partitionSpec.map(PartitionSpecParser::toJson),
table.operations().current().formatVersion(),
TupleDomain.all(),
TupleDomain.all(),
Expand Down Expand Up @@ -508,7 +521,10 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
return getTableMetadata(session, ((IcebergTableHandle) table).getSchemaTableName());
IcebergTableHandle tableHandle = (IcebergTableHandle) table;
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
List<ColumnMetadata> columns = getColumnMetadatas(SchemaParser.fromJson(tableHandle.getTableSchemaJson()));
return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columns, getIcebergTableProperties(icebergTable), getTableComment(icebergTable));
}

@Override
Expand Down Expand Up @@ -566,7 +582,10 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
if (redirectTable(session, tableName).isPresent()) {
return Stream.of(TableColumnsMetadata.forRedirectedTable(tableName));
}
return Stream.of(TableColumnsMetadata.forTable(tableName, getTableMetadata(session, tableName).getColumns()));

Table icebergTable = catalog.loadTable(session, tableName);
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema());
return Stream.of(TableColumnsMetadata.forTable(tableName, columns));
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
Expand Down Expand Up @@ -1282,13 +1301,12 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc
public Optional<Object> getInfo(ConnectorTableHandle tableHandle)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(
SchemaParser.fromJson(icebergTableHandle.getTableSchemaJson()),
icebergTableHandle.getPartitionSpecJson());
Optional<Boolean> partitioned = icebergTableHandle.getPartitionSpecJson()
.map(partitionSpecJson -> PartitionSpecParser.fromJson(SchemaParser.fromJson(icebergTableHandle.getTableSchemaJson()), partitionSpecJson).isPartitioned());

return Optional.of(new IcebergInputInfo(
icebergTableHandle.getSnapshotId(),
partitionSpec.isPartitioned(),
partitioned,
getFileFormat(icebergTableHandle.getStorageProperties()).name()));
}

Expand Down Expand Up @@ -1409,24 +1427,11 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
icebergTable.updateSchema().renameColumn(columnHandle.getName(), target).commit();
}

/**
* @throws TableNotFoundException when table cannot be found
*/
private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table)
private List<ColumnMetadata> getColumnMetadatas(Schema schema)
{
Table icebergTable = catalog.loadTable(session, table);

ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
columns.addAll(getColumnMetadatas(icebergTable));
columns.add(pathColumnMetadata());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, personally I think it make sense to add the path column to the list here. The other method is responsible for columns that come from the Iceberg schema.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this refactor with the intention of not having code duplication in the methods where the column metadatas are built.
Should I try a better naming for the method?

columns.add(fileModifiedTimeColumnMetadata());

return new ConnectorTableMetadata(table, columns.build(), getIcebergTableProperties(icebergTable), getTableComment(icebergTable));
}

private List<ColumnMetadata> getColumnMetadatas(Table table)
{
return table.schema().columns().stream()
List<ColumnMetadata> schemaColumns = schema.columns().stream()
.map(column ->
ColumnMetadata.builder()
.setName(column.name())
Expand All @@ -1435,6 +1440,10 @@ private List<ColumnMetadata> getColumnMetadatas(Table table)
.setComment(Optional.ofNullable(column.doc()))
.build())
.collect(toImmutableList());
columns.addAll(schemaColumns);
columns.add(pathColumnMetadata());
columns.add(fileModifiedTimeColumnMetadata());
return columns.build();
}

@Override
Expand Down Expand Up @@ -1958,13 +1967,17 @@ private Optional<Long> getSnapshotId(Table table, Optional<Long> snapshotId, boo
{
// table.name() is an encoded version of SchemaTableName
return snapshotId
.map(id ->
snapshotIds.computeIfAbsent(
table.name() + "@" + id,
ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax)))
.map(id -> resolveSnapshotId(table, id, allowLegacySnapshotSyntax))
.or(() -> Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId));
}

private long resolveSnapshotId(Table table, long id, boolean allowLegacySnapshotSyntax)
{
return snapshotIds.computeIfAbsent(
table.name() + "@" + id,
ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowLegacySnapshotSyntax should be part of the cache key

(pre-existing, but still sth to fix)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static long resolveSnapshotId(Table table, long snapshotId, boolean allowLegacySnapshotSyntax)
{
if (!allowLegacySnapshotSyntax) {
throw new TrinoException(
NOT_SUPPORTED,
format(
"Failed to access snapshot %s for table %s. This syntax for accessing Iceberg tables is not "
+ "supported. Use the AS OF syntax OR set the catalog session property "
+ "allow_legacy_snapshot_syntax=true for temporarily restoring previous behavior.",
snapshotId,
table.name()));
}

In case that allowLegacySnapshotSyntax is false we'd get a Trino exception.

I don't see the purpose behind this request.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case that allowLegacySnapshotSyntax is false we'd get a Trino exception.

only if snapshotIds entry doesn't exist yet.
cc @phd3

}

Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
{
return catalog.loadTable(session, schemaTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,10 @@ public ConnectorPageSource createPageSource(

Supplier<IcebergPageSink> updatedRowPageSinkSupplier = () -> new IcebergPageSink(
tableSchema,
PartitionSpecParser.fromJson(tableSchema, table.getPartitionSpecJson()),
PartitionSpecParser.fromJson(
tableSchema,
table.getPartitionSpecJson()
.orElseThrow(() -> new VerifyException("Partition spec missing in the table handle"))),
locationProvider,
fileWriterFactory,
pageIndexerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class IcebergTableHandle
private final TableType tableType;
private final Optional<Long> snapshotId;
private final String tableSchemaJson;
private final String partitionSpecJson;
// Empty means the partitioning spec is not known (can be the case for certain time travel queries).
private final Optional<String> partitionSpecJson;
private final int formatVersion;
private final String tableLocation;
private final Map<String, String> storageProperties;
Expand Down Expand Up @@ -71,7 +72,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
@JsonProperty("tableType") TableType tableType,
@JsonProperty("snapshotId") Optional<Long> snapshotId,
@JsonProperty("tableSchemaJson") String tableSchemaJson,
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionSpecJson") Optional<String> partitionSpecJson,
@JsonProperty("formatVersion") int formatVersion,
@JsonProperty("unenforcedPredicate") TupleDomain<IcebergColumnHandle> unenforcedPredicate,
@JsonProperty("enforcedPredicate") TupleDomain<IcebergColumnHandle> enforcedPredicate,
Expand Down Expand Up @@ -108,7 +109,7 @@ public IcebergTableHandle(
TableType tableType,
Optional<Long> snapshotId,
String tableSchemaJson,
String partitionSpecJson,
Optional<String> partitionSpecJson,
int formatVersion,
TupleDomain<IcebergColumnHandle> unenforcedPredicate,
TupleDomain<IcebergColumnHandle> enforcedPredicate,
Expand Down Expand Up @@ -171,7 +172,7 @@ public String getTableSchemaJson()
}

@JsonProperty
public String getPartitionSpecJson()
public Optional<String> getPartitionSpecJson()
{
return partitionSpecJson;
}
Expand Down
Loading