diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index 14d7373163eb..6c906c93b17a 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -610,11 +610,13 @@ path metadata as a hidden column in each table: * ``$path``: Full file system path name of the file for this row -You can use this column in your SQL statements like any other column. This +* ``$file_modified_time``: Timestamp of the last modification of the file for this row + +You can use these columns in your SQL statements like any other column. This can be selected directly, or used in conditional statements. For example, you can inspect the file path for each record:: - SELECT *, "$path" + SELECT *, "$path", "$file_modified_time" FROM iceberg.web.page_views; Retrieve all records that belong to a specific file using ``"$path"`` filter:: @@ -623,6 +625,12 @@ Retrieve all records that belong to a specific file using ``"$path"`` filter:: FROM iceberg.web.page_views WHERE "$path" = '/usr/iceberg/table/web.page_views/data/file_01.parquet' +Retrieve all records that belong to a specific file using ``"$file_modified_time"`` filter:: + + SELECT * + FROM iceberg.web.page_views + WHERE "$file_modified_time" = CAST('2022-07-01 01:02:03.456 UTC' AS timestamp with time zone) + .. _iceberg-metadata-tables: Metadata tables diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index 33ef5be49c07..55801fa8aa49 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.Optional; +import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME; import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.MetadataColumns.IS_DELETED; @@ -168,6 +169,12 @@ public boolean isIsDeletedColumn() return id == IS_DELETED.fieldId(); } + @JsonIgnore + public boolean isFileModifiedTimeColumn() + { + return id == FILE_MODIFIED_TIME.getId(); + } + @Override public int hashCode() { @@ -216,6 +223,25 @@ public static ColumnMetadata pathColumnMetadata() .build(); } + public static IcebergColumnHandle fileModifiedTimeColumnHandle() + { + return new IcebergColumnHandle( + columnIdentity(FILE_MODIFIED_TIME), + FILE_MODIFIED_TIME.getType(), + ImmutableList.of(), + FILE_MODIFIED_TIME.getType(), + Optional.empty()); + } + + public static ColumnMetadata fileModifiedTimeColumnMetadata() + { + return ColumnMetadata.builder() + .setName(FILE_MODIFIED_TIME.getColumnName()) + .setType(FILE_MODIFIED_TIME.getType()) + .setHidden(true) + .build(); + } + private static ColumnIdentity columnIdentity(IcebergMetadataColumn metadata) { return new ColumnIdentity(metadata.getId(), metadata.getColumnName(), metadata.getTypeCategory(), ImmutableList.of()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index e1a07b80c6b2..4e189aaf7e9d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -163,11 +163,14 @@ import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_ID; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_NAME; +import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle; +import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnMetadata; import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle; import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME; import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH; import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention; @@ -525,6 +528,7 @@ public Map getColumnHandles(ConnectorSession session, Conn columnHandles.put(columnHandle.getName(), columnHandle); } columnHandles.put(FILE_PATH.getColumnName(), pathColumnHandle()); + columnHandles.put(FILE_MODIFIED_TIME.getColumnName(), fileModifiedTimeColumnHandle()); return columnHandles.buildOrThrow(); } @@ -1415,6 +1419,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema ImmutableList.Builder columns = ImmutableList.builder(); columns.addAll(getColumnMetadatas(icebergTable)); columns.add(pathColumnMetadata()); + columns.add(fileModifiedTimeColumnMetadata()); return new ConnectorTableMetadata(table, columns.build(), getIcebergTableProperties(icebergTable), getTableComment(icebergTable)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataColumn.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataColumn.java index 20679d2de69c..57543c76ed42 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataColumn.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataColumn.java @@ -22,11 +22,13 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory; import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.VarcharType.VARCHAR; public enum IcebergMetadataColumn { FILE_PATH(MetadataColumns.FILE_PATH.fieldId(), "$path", VARCHAR, PRIMITIVE), + FILE_MODIFIED_TIME(Integer.MAX_VALUE - 1001, "$file_modified_time", TIMESTAMP_TZ_MILLIS, PRIMITIVE), // https://github.com/apache/iceberg/issues/5240 /**/; private static final Set COLUMNS_ID = Stream.of(values()) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index dbd47267c7bd..e6f7ebf36764 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -85,6 +85,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockMissingException; @@ -120,6 +121,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; @@ -142,7 +144,9 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA; +import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME; import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH; import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcLazyReadSmallRanges; import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcMaxBufferSize; @@ -169,6 +173,8 @@ import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.UuidType.UUID; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -297,6 +303,18 @@ public ConnectorPageSource createPageSource( fileSize = fileIoProvider.createFileIo(hdfsContext, session.getQueryId()) .newInputFile(split.getPath()).getLength(); } + OptionalLong fileModifiedTime = OptionalLong.empty(); + if (requiredColumns.stream().anyMatch(IcebergColumnHandle::isFileModifiedTimeColumn)) { + try { + FileStatus fileStatus = hdfsEnvironment.doAs( + session.getIdentity(), + () -> hdfsEnvironment.getFileSystem(hdfsContext, new Path(split.getPath())).getFileStatus(new Path(split.getPath()))); + fileModifiedTime = OptionalLong.of(fileStatus.getModificationTime()); + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, e); + } + } ReaderPageSource dataPageSource = createDataPageSource( session, @@ -305,6 +323,7 @@ public ConnectorPageSource createPageSource( split.getStart(), split.getLength(), fileSize, + fileModifiedTime, split.getFileFormat(), split.getSchemaAsJson().map(SchemaParser::fromJson), requiredColumns, @@ -433,6 +452,7 @@ private ConnectorPageSource openDeletes( 0, delete.fileSizeInBytes(), delete.fileSizeInBytes(), + OptionalLong.empty(), IcebergFileFormat.fromIceberg(delete.format()), Optional.of(schemaFromHandles(columns)), columns, @@ -449,6 +469,7 @@ private ReaderPageSource createDataPageSource( long start, long length, long fileSize, + OptionalLong fileModifiedTime, IcebergFileFormat fileFormat, Optional fileSchema, List dataColumns, @@ -466,6 +487,7 @@ private ReaderPageSource createDataPageSource( start, length, fileSize, + fileModifiedTime, dataColumns, predicate, orcReaderOptions @@ -490,6 +512,7 @@ private ReaderPageSource createDataPageSource( start, length, fileSize, + fileModifiedTime, dataColumns, parquetReaderOptions .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)), @@ -503,6 +526,7 @@ private ReaderPageSource createDataPageSource( path, start, length, + fileModifiedTime, fileSchema.orElseThrow(), nameMapping, dataColumns); @@ -519,6 +543,7 @@ private static ReaderPageSource createOrcPageSource( long start, long length, long fileSize, + OptionalLong fileModifiedTime, List columns, TupleDomain effectivePredicate, OrcReaderOptions options, @@ -585,6 +610,9 @@ else if (partitionKeys.containsKey(column.getId())) { else if (column.isPathColumn()) { columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString())))); } + else if (column.isFileModifiedTimeColumn()) { + columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY)))); + } else if (column.isUpdateRowIdColumn()) { // $row_id is a composite of multiple physical columns. It is assembled by the IcebergPageSource columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType())); @@ -862,6 +890,7 @@ private static ReaderPageSource createParquetPageSource( long start, long length, long fileSize, + OptionalLong fileModifiedTime, List regularColumns, ParquetReaderOptions options, TupleDomain effectivePredicate, @@ -948,6 +977,9 @@ else if (partitionKeys.containsKey(column.getId())) { else if (column.isPathColumn()) { constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString()))); } + else if (column.isFileModifiedTimeColumn()) { + constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY))); + } else if (column.isUpdateRowIdColumn()) { // $row_id is a composite of multiple physical columns, it is assembled by the IcebergPageSource trinoTypes.add(column.getType()); @@ -1019,6 +1051,7 @@ private ReaderPageSource createAvroPageSource( Path path, long start, long length, + OptionalLong fileModifiedTime, Schema fileSchema, Optional nameMapping, List columns) @@ -1055,6 +1088,9 @@ private ReaderPageSource createAvroPageSource( if (column.isPathColumn()) { constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString()))); } + else if (column.isFileModifiedTimeColumn()) { + constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY))); + } // For delete else if (column.isRowPositionColumn()) { rowIndexChannels.add(true); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index de6992614d65..a4f0978647bd 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -88,6 +88,7 @@ import static com.google.common.collect.Iterables.getLast; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.SystemSessionProperties.SCALE_WRITERS; import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext; @@ -116,8 +117,10 @@ import static java.lang.String.format; import static java.lang.String.join; import static java.time.ZoneOffset.UTC; +import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; @@ -4505,6 +4508,38 @@ public void testPathHiddenColumn() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testFileModifiedTimeHiddenColumn() + { + ZonedDateTime beforeTime = (ZonedDateTime) computeScalar("SELECT current_timestamp(3)"); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_file_modified_time_", "(col) AS VALUES (1)")) { + MaterializedResult expectedColumns = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR) + .row("col", "integer", "", "") + .build(); + MaterializedResult actualColumns = computeActual("DESCRIBE " + table.getName()); + // Describe output should not have the $file_modified_time hidden column + assertEquals(actualColumns, expectedColumns); + + ZonedDateTime fileModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + table.getName()); + ZonedDateTime afterTime = (ZonedDateTime) computeScalar("SELECT current_timestamp(3)"); + assertThat(fileModifiedTime).isBetween(beforeTime, afterTime); + } + } + + @Test + public void testDeleteWithFileModifiedTimeColumn() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_with_file_modified_time_", "(key int)")) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1)", 1); + sleepUninterruptibly(1, MILLISECONDS); + assertUpdate("INSERT INTO " + table.getName() + " VALUES (2)", 1); + + ZonedDateTime oldModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + table.getName() + " WHERE key = 1"); + assertUpdate("DELETE FROM " + table.getName() + " WHERE \"$file_modified_time\" = from_iso8601_timestamp('" + oldModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES 2"); + } + } + @Test public void testExpireSnapshots() throws Exception