From f61dee04789f666b8cd3abe762c40291ff5a901e Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Wed, 27 Jul 2022 16:54:28 +0530 Subject: [PATCH 1/3] add metadatalogs table --- docs/spark-queries.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/spark-queries.md b/docs/spark-queries.md index 1e0db8d443a2..ee5c088be922 100644 --- a/docs/spark-queries.md +++ b/docs/spark-queries.md @@ -227,6 +227,20 @@ SELECT * FROM prod.db.table.history **This shows a commit that was rolled back.** The example has two snapshots with the same parent, and one is *not* an ancestor of the current table state. {{< /hint >}} +### Metadata Logs + +To show table metadata logs : + +```sql +SELECT * from prod.db.table.metadata_logs +``` + +| timestamp_millis | file | latest_snapshot_id | latest_schema_id | latest_sequence_number | +| -- | -- | -- | -- | -- | +| 1658211678838 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null | +| 1658211679192 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 | +| 1658211679192 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 | + ### Snapshots To show the valid snapshots for a table: From 1b4564181462a41b8de5a6176745368432eb08ee Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Wed, 27 Jul 2022 22:07:12 +0530 Subject: [PATCH 2/3] address review feedback --- docs/spark-queries.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/spark-queries.md b/docs/spark-queries.md index ee5c088be922..1eb125d1ecf2 100644 --- a/docs/spark-queries.md +++ b/docs/spark-queries.md @@ -229,7 +229,7 @@ SELECT * FROM prod.db.table.history ### Metadata Logs -To show table metadata logs : +To show table metadata logs: ```sql SELECT * from prod.db.table.metadata_logs @@ -239,7 +239,7 @@ SELECT * from prod.db.table.metadata_logs | -- | -- | -- | -- | -- | | 1658211678838 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null | | 1658211679192 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 | -| 1658211679192 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 | +| 1658211679682 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 | ### Snapshots From 8dc40a07f37184051b4d3930f3ae7acbca8c6303 Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Thu, 28 Jul 2022 10:56:33 +0530 Subject: [PATCH 3/3] address review feedback - round 2 --- ...able.java => MetadataLogEntriesTable.java} | 37 ++++++++++--------- .../org/apache/iceberg/MetadataTableType.java | 2 +- .../apache/iceberg/MetadataTableUtils.java | 4 +- docs/spark-queries.md | 14 +++---- .../spark/extensions/TestMetadataTables.java | 26 +++++++------ .../spark/extensions/TestMetadataTables.java | 26 +++++++------ 6 files changed, 59 insertions(+), 50 deletions(-) rename core/src/main/java/org/apache/iceberg/{MetadataLogsTable.java => MetadataLogEntriesTable.java} (76%) diff --git a/core/src/main/java/org/apache/iceberg/MetadataLogsTable.java b/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java similarity index 76% rename from core/src/main/java/org/apache/iceberg/MetadataLogsTable.java rename to core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java index a8d02fb3eb27..1cbe9be8c438 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataLogsTable.java +++ b/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java @@ -24,27 +24,27 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -public class MetadataLogsTable extends BaseMetadataTable { +public class MetadataLogEntriesTable extends BaseMetadataTable { - private static final Schema METADATA_LOGS_SCHEMA = + private static final Schema METADATA_LOG_ENTRIES_SCHEMA = new Schema( - Types.NestedField.required(1, "timestamp_millis", Types.LongType.get()), + Types.NestedField.required(1, "timestamp", Types.TimestampType.withZone()), Types.NestedField.required(2, "file", Types.StringType.get()), Types.NestedField.optional(3, "latest_snapshot_id", Types.LongType.get()), Types.NestedField.optional(4, "latest_schema_id", Types.IntegerType.get()), Types.NestedField.optional(5, "latest_sequence_number", Types.LongType.get())); - MetadataLogsTable(TableOperations ops, Table table) { - this(ops, table, table.name() + ".metadata_logs"); + MetadataLogEntriesTable(TableOperations ops, Table table) { + this(ops, table, table.name() + ".metadata_log_entries"); } - MetadataLogsTable(TableOperations ops, Table table, String name) { + MetadataLogEntriesTable(TableOperations ops, Table table, String name) { super(ops, table, name); } @Override MetadataTableType metadataTableType() { - return MetadataTableType.METADATA_LOGS; + return MetadataTableType.METADATA_LOG_ENTRIES; } @Override @@ -54,7 +54,7 @@ public TableScan newScan() { @Override public Schema schema() { - return METADATA_LOGS_SCHEMA; + return METADATA_LOG_ENTRIES_SCHEMA; } private DataTask task(TableScan scan) { @@ -69,7 +69,8 @@ private DataTask task(TableScan scan) { schema(), scan.schema(), metadataLogEntries, - metadataLogEntry -> MetadataLogsTable.metadataLogToRow(metadataLogEntry, table())); + metadataLogEntry -> + MetadataLogEntriesTable.metadataLogEntryToRow(metadataLogEntry, table())); } private class MetadataLogScan extends StaticTableScan { @@ -77,18 +78,18 @@ private class MetadataLogScan extends StaticTableScan { super( ops, table, - METADATA_LOGS_SCHEMA, - MetadataTableType.METADATA_LOGS, - MetadataLogsTable.this::task); + METADATA_LOG_ENTRIES_SCHEMA, + MetadataTableType.METADATA_LOG_ENTRIES, + MetadataLogEntriesTable.this::task); } MetadataLogScan(TableOperations ops, Table table, TableScanContext context) { super( ops, table, - METADATA_LOGS_SCHEMA, - MetadataTableType.METADATA_LOGS, - MetadataLogsTable.this::task, + METADATA_LOG_ENTRIES_SCHEMA, + MetadataTableType.METADATA_LOG_ENTRIES, + MetadataLogEntriesTable.this::task, context); } @@ -100,11 +101,11 @@ protected TableScan newRefinedScan( @Override public CloseableIterable planFiles() { - return CloseableIterable.withNoopClose(MetadataLogsTable.this.task(this)); + return CloseableIterable.withNoopClose(MetadataLogEntriesTable.this.task(this)); } } - private static StaticDataTask.Row metadataLogToRow( + private static StaticDataTask.Row metadataLogEntryToRow( TableMetadata.MetadataLogEntry metadataLogEntry, Table table) { Long latestSnapshotId = null; Snapshot latestSnapshot = null; @@ -116,7 +117,7 @@ private static StaticDataTask.Row metadataLogToRow( } return StaticDataTask.Row.of( - metadataLogEntry.timestampMillis(), + metadataLogEntry.timestampMillis() * 1000, metadataLogEntry.file(), // latest snapshot in this file corresponding to the log entry latestSnapshotId, diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableType.java b/core/src/main/java/org/apache/iceberg/MetadataTableType.java index d1ea1c7d1ba4..edcbc7e79bd5 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableType.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableType.java @@ -26,7 +26,7 @@ public enum MetadataTableType { DATA_FILES, DELETE_FILES, HISTORY, - METADATA_LOGS, + METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java index 2554aee96b49..069101f90357 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java @@ -60,8 +60,8 @@ private static Table createMetadataTableInstance( return new HistoryTable(ops, baseTable, metadataTableName); case SNAPSHOTS: return new SnapshotsTable(ops, baseTable, metadataTableName); - case METADATA_LOGS: - return new MetadataLogsTable(ops, baseTable, metadataTableName); + case METADATA_LOG_ENTRIES: + return new MetadataLogEntriesTable(ops, baseTable, metadataTableName); case MANIFESTS: return new ManifestsTable(ops, baseTable, metadataTableName); case PARTITIONS: diff --git a/docs/spark-queries.md b/docs/spark-queries.md index 1eb125d1ecf2..207b9565563f 100644 --- a/docs/spark-queries.md +++ b/docs/spark-queries.md @@ -227,19 +227,19 @@ SELECT * FROM prod.db.table.history **This shows a commit that was rolled back.** The example has two snapshots with the same parent, and one is *not* an ancestor of the current table state. {{< /hint >}} -### Metadata Logs +### Metadata Log Entries -To show table metadata logs: +To show table metadata log entries: ```sql -SELECT * from prod.db.table.metadata_logs +SELECT * from prod.db.table.metadata_log_entries ``` -| timestamp_millis | file | latest_snapshot_id | latest_schema_id | latest_sequence_number | +| timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number | | -- | -- | -- | -- | -- | -| 1658211678838 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null | -| 1658211679192 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 | -| 1658211679682 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 | +| 2022-07-28 10:43:52.93 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null | +| 2022-07-28 10:43:57.487 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 | +| 2022-07-28 10:43:58.25 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 | ### Snapshots diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java index 973cf4844a18..cb9bcb072bc0 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java @@ -43,6 +43,7 @@ import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -381,7 +382,7 @@ public void testAllFilesPartitioned() throws Exception { } @Test - public void testMetadataLogs() throws Exception { + public void testMetadataLogEntries() throws Exception { // Create table and insert data sql( "CREATE TABLE %s (id bigint, data string) " @@ -408,24 +409,24 @@ public void testMetadataLogs() throws Exception { Lists.newArrayList(tableMetadata.previousFiles()); // Check metadataLog table - List metadataLogs = sql("SELECT * FROM %s.metadata_logs", tableName); + List metadataLogs = sql("SELECT * FROM %s.metadata_log_entries", tableName); assertEquals( - "MetadataLogsTable result should match the metadataLog entries", + "MetadataLogEntriesTable result should match the metadataLog entries", ImmutableList.of( row( - metadataLogEntries.get(0).timestampMillis(), + DateTimeUtils.toJavaTimestamp(metadataLogEntries.get(0).timestampMillis() * 1000), metadataLogEntries.get(0).file(), null, null, null), row( - metadataLogEntries.get(1).timestampMillis(), + DateTimeUtils.toJavaTimestamp(metadataLogEntries.get(1).timestampMillis() * 1000), metadataLogEntries.get(1).file(), parentSnapshot.snapshotId(), parentSnapshot.schemaId(), parentSnapshot.sequenceNumber()), row( - currentSnapshot.timestampMillis(), + DateTimeUtils.toJavaTimestamp(currentSnapshot.timestampMillis() * 1000), tableMetadata.metadataFileLocation(), currentSnapshot.snapshotId(), currentSnapshot.schemaId(), @@ -435,14 +436,16 @@ public void testMetadataLogs() throws Exception { // test filtering List metadataLogWithFilters = sql( - "SELECT * FROM %s.metadata_logs WHERE latest_snapshot_id = %s", + "SELECT * FROM %s.metadata_log_entries WHERE latest_snapshot_id = %s", tableName, currentSnapshotId); - Assert.assertEquals("metadataLog table should return 1 row", 1, metadataLogWithFilters.size()); + Assert.assertEquals( + "metadataLogEntries table should return 1 row", 1, metadataLogWithFilters.size()); assertEquals( "Result should match the latest snapshot entry", ImmutableList.of( row( - tableMetadata.currentSnapshot().timestampMillis(), + DateTimeUtils.toJavaTimestamp( + tableMetadata.currentSnapshot().timestampMillis() * 1000), tableMetadata.metadataFileLocation(), tableMetadata.currentSnapshot().snapshotId(), tableMetadata.currentSnapshot().schemaId(), @@ -455,9 +458,10 @@ public void testMetadataLogs() throws Exception { .map(TableMetadata.MetadataLogEntry::file) .collect(Collectors.toList()); metadataFiles.add(tableMetadata.metadataFileLocation()); - List metadataLogWithProjection = sql("SELECT file FROM %s.metadata_logs", tableName); + List metadataLogWithProjection = + sql("SELECT file FROM %s.metadata_log_entries", tableName); Assert.assertEquals( - "metadataLog table should return 3 rows", 3, metadataLogWithProjection.size()); + "metadataLogEntries table should return 3 rows", 3, metadataLogWithProjection.size()); assertEquals( "metadataLog entry should be of same file", metadataFiles.stream().map(this::row).collect(Collectors.toList()), diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java index 973cf4844a18..cb9bcb072bc0 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java @@ -43,6 +43,7 @@ import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -381,7 +382,7 @@ public void testAllFilesPartitioned() throws Exception { } @Test - public void testMetadataLogs() throws Exception { + public void testMetadataLogEntries() throws Exception { // Create table and insert data sql( "CREATE TABLE %s (id bigint, data string) " @@ -408,24 +409,24 @@ public void testMetadataLogs() throws Exception { Lists.newArrayList(tableMetadata.previousFiles()); // Check metadataLog table - List metadataLogs = sql("SELECT * FROM %s.metadata_logs", tableName); + List metadataLogs = sql("SELECT * FROM %s.metadata_log_entries", tableName); assertEquals( - "MetadataLogsTable result should match the metadataLog entries", + "MetadataLogEntriesTable result should match the metadataLog entries", ImmutableList.of( row( - metadataLogEntries.get(0).timestampMillis(), + DateTimeUtils.toJavaTimestamp(metadataLogEntries.get(0).timestampMillis() * 1000), metadataLogEntries.get(0).file(), null, null, null), row( - metadataLogEntries.get(1).timestampMillis(), + DateTimeUtils.toJavaTimestamp(metadataLogEntries.get(1).timestampMillis() * 1000), metadataLogEntries.get(1).file(), parentSnapshot.snapshotId(), parentSnapshot.schemaId(), parentSnapshot.sequenceNumber()), row( - currentSnapshot.timestampMillis(), + DateTimeUtils.toJavaTimestamp(currentSnapshot.timestampMillis() * 1000), tableMetadata.metadataFileLocation(), currentSnapshot.snapshotId(), currentSnapshot.schemaId(), @@ -435,14 +436,16 @@ public void testMetadataLogs() throws Exception { // test filtering List metadataLogWithFilters = sql( - "SELECT * FROM %s.metadata_logs WHERE latest_snapshot_id = %s", + "SELECT * FROM %s.metadata_log_entries WHERE latest_snapshot_id = %s", tableName, currentSnapshotId); - Assert.assertEquals("metadataLog table should return 1 row", 1, metadataLogWithFilters.size()); + Assert.assertEquals( + "metadataLogEntries table should return 1 row", 1, metadataLogWithFilters.size()); assertEquals( "Result should match the latest snapshot entry", ImmutableList.of( row( - tableMetadata.currentSnapshot().timestampMillis(), + DateTimeUtils.toJavaTimestamp( + tableMetadata.currentSnapshot().timestampMillis() * 1000), tableMetadata.metadataFileLocation(), tableMetadata.currentSnapshot().snapshotId(), tableMetadata.currentSnapshot().schemaId(), @@ -455,9 +458,10 @@ public void testMetadataLogs() throws Exception { .map(TableMetadata.MetadataLogEntry::file) .collect(Collectors.toList()); metadataFiles.add(tableMetadata.metadataFileLocation()); - List metadataLogWithProjection = sql("SELECT file FROM %s.metadata_logs", tableName); + List metadataLogWithProjection = + sql("SELECT file FROM %s.metadata_log_entries", tableName); Assert.assertEquals( - "metadataLog table should return 3 rows", 3, metadataLogWithProjection.size()); + "metadataLogEntries table should return 3 rows", 3, metadataLogWithProjection.size()); assertEquals( "metadataLog entry should be of same file", metadataFiles.stream().map(this::row).collect(Collectors.toList()),