diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 22fdfb603f09..111c2a68b942 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -48,6 +48,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -58,6 +59,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Type.PrimitiveType; @@ -89,6 +91,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.Maps.immutableEntry; +import static com.google.common.collect.Streams.mapWithIndex; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity; @@ -147,6 +151,7 @@ import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.LocationProviders.locationsFor; +import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; @@ -744,4 +749,16 @@ public static void commit(SnapshotUpdate update, ConnectorSession session) update.set(TRINO_QUERY_ID_NAME, session.getQueryId()); update.commit(); } + + public static TableScan buildTableScan(Table icebergTable, MetadataTableType metadataTableType) + { + return createMetadataTableInstance(icebergTable, metadataTableType).newScan(); + } + + public static Map columnNameToPositionInSchema(Schema schema) + { + return mapWithIndex(schema.columns().stream(), + (column, position) -> immutableEntry(column.name(), Long.valueOf(position).intValue())) + .collect(toImmutableMap(Entry::getKey, Entry::getValue)); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java index bafcc19cfc64..81f9b60699dc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java @@ -28,20 +28,38 @@ import io.trino.spi.type.TimeZoneKey; import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; +import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; +import static io.trino.plugin.iceberg.IcebergUtil.buildTableScan; +import static io.trino.plugin.iceberg.IcebergUtil.columnNameToPositionInSchema; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataTableType.SNAPSHOTS; public class SnapshotsTable implements SystemTable { private final ConnectorTableMetadata tableMetadata; private final Table icebergTable; + private static final String COMMITTED_AT_COLUMN_NAME = "committed_at"; + private static final String SNAPSHOT_ID_COLUMN_NAME = "snapshot_id"; + private static final String PARENT_ID_COLUMN_NAME = "parent_id"; + private static final String OPERATION_COLUMN_NAME = "operation"; + private static final String MANIFEST_LIST_COLUMN_NAME = "manifest_list"; + private static final String SUMMARY_COLUMN_NAME = "summary"; public SnapshotsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable) { @@ -50,12 +68,12 @@ public SnapshotsTable(SchemaTableName tableName, TypeManager typeManager, Table this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), ImmutableList.builder() - .add(new ColumnMetadata("committed_at", TIMESTAMP_TZ_MILLIS)) - .add(new ColumnMetadata("snapshot_id", BIGINT)) - .add(new ColumnMetadata("parent_id", BIGINT)) - .add(new ColumnMetadata("operation", VARCHAR)) - .add(new ColumnMetadata("manifest_list", VARCHAR)) - .add(new ColumnMetadata("summary", typeManager.getType(TypeSignature.mapType(VARCHAR.getTypeSignature(), VARCHAR.getTypeSignature())))) + .add(new ColumnMetadata(COMMITTED_AT_COLUMN_NAME, TIMESTAMP_TZ_MILLIS)) + .add(new ColumnMetadata(SNAPSHOT_ID_COLUMN_NAME, BIGINT)) + .add(new ColumnMetadata(PARENT_ID_COLUMN_NAME, BIGINT)) + .add(new ColumnMetadata(OPERATION_COLUMN_NAME, VARCHAR)) + .add(new ColumnMetadata(MANIFEST_LIST_COLUMN_NAME, VARCHAR)) + .add(new ColumnMetadata(SUMMARY_COLUMN_NAME, typeManager.getType(TypeSignature.mapType(VARCHAR.getTypeSignature(), VARCHAR.getTypeSignature())))) .build()); } @@ -81,18 +99,46 @@ private static List buildPages(ConnectorTableMetadata tableMetadata, Conne { PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); + TableScan tableScan = buildTableScan(icebergTable, SNAPSHOTS); TimeZoneKey timeZoneKey = session.getTimeZoneKey(); - icebergTable.snapshots().forEach(snapshot -> { - pagesBuilder.beginRow(); - pagesBuilder.appendTimestampTzMillis(snapshot.timestampMillis(), timeZoneKey); - pagesBuilder.appendBigint(snapshot.snapshotId()); - pagesBuilder.appendBigint(snapshot.parentId()); - pagesBuilder.appendVarchar(snapshot.operation()); - pagesBuilder.appendVarchar(snapshot.manifestListLocation()); - pagesBuilder.appendVarcharVarcharMap(snapshot.summary()); - pagesBuilder.endRow(); - }); + + Map columnNameToPosition = columnNameToPositionInSchema(tableScan.schema()); + + try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + fileScanTasks.forEach(fileScanTask -> addRows((DataTask) fileScanTask, pagesBuilder, timeZoneKey, columnNameToPosition)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } return pagesBuilder.build(); } + + private static void addRows(DataTask dataTask, PageListBuilder pagesBuilder, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) + { + try (CloseableIterable dataRows = dataTask.rows()) { + dataRows.forEach(dataTaskRow -> addRow(pagesBuilder, dataTaskRow, timeZoneKey, columnNameToPositionInSchema)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map columnNameToPositionInSchema) + { + pagesBuilder.beginRow(); + + pagesBuilder.appendTimestampTzMillis( + structLike.get(columnNameToPositionInSchema.get(COMMITTED_AT_COLUMN_NAME), Long.class) / MICROSECONDS_PER_MILLISECOND, + timeZoneKey); + pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(SNAPSHOT_ID_COLUMN_NAME), Long.class)); + + Long parentId = structLike.get(columnNameToPositionInSchema.get(PARENT_ID_COLUMN_NAME), Long.class); + pagesBuilder.appendBigint(parentId != null ? parentId.longValue() : null); + + pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(OPERATION_COLUMN_NAME), String.class)); + pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(MANIFEST_LIST_COLUMN_NAME), String.class)); + pagesBuilder.appendVarcharVarcharMap(structLike.get(columnNameToPositionInSchema.get(SUMMARY_COLUMN_NAME), Map.class)); + pagesBuilder.endRow(); + } }