Skip to content

Commit d673332

Browse files
oneonestarebyhr
authored andcommitted
Add metadata_log_entries table to Iceberg
1 parent ad40089 commit d673332

File tree

8 files changed

+241
-2
lines changed

8 files changed

+241
-2
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,52 @@ The output of the query has the following columns:
825825
- Whether or not this snapshot is an ancestor of the current snapshot.
826826
:::
827827

828+
##### `$metadata_log_entries` table
829+
830+
The `$metadata_log_entries` table provides a view of metadata log entries
831+
of the Iceberg table.
832+
833+
You can retrieve the information about the metadata log entries of the Iceberg
834+
table `test_table` by using the following query:
835+
836+
```
837+
SELECT * FROM "test_table$metadata_log_entries"
838+
```
839+
840+
```text
841+
timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number
842+
---------------------------------------+----------------------------------------------------------------------------------------------------------------------------+---------------------+------------------+------------------------
843+
2024-01-16 15:55:31.172 Europe/Vienna | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/00000-39174715-be2a-48fa-9949-35413b8b736e.metadata.json | 1221802298419195590 | 0 | 1
844+
2024-01-16 17:19:56.118 Europe/Vienna | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/00001-e40178c9-271f-4a96-ad29-eed5e7aef9b0.metadata.json | 7124386610209126943 | 0 | 2
845+
```
846+
847+
The output of the query has the following columns:
848+
849+
:::{list-table} Metadata log entries columns
850+
:widths: 30, 30, 40
851+
:header-rows: 1
852+
853+
* - Name
854+
- Type
855+
- Description
856+
* - `timestamp`
857+
- `TIMESTAMP(3) WITH TIME ZONE`
858+
- The time when the metadata was created.
859+
* - `file`
860+
- `VARCHAR`
861+
- The location of the metadata file.
862+
* - `latest_snapshot_id`
863+
- `BIGINT`
864+
- The identifier of the latest snapshot when the metadata was updated.
865+
* - `latest_schema_id`
866+
- `INTEGER`
867+
- The identifier of the latest schema when the metadata was updated.
868+
* - `latest_sequence_number`
869+
- `BIGINT`
870+
- The data sequence number of the metadata file.
871+
:::
872+
873+
828874
##### `$snapshots` table
829875

830876
The `$snapshots` table provides a detailed view of snapshots of the Iceberg

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
575575
return switch (tableType) {
576576
case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above.
577577
case HISTORY -> Optional.of(new HistoryTable(tableName, table));
578+
case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table));
578579
case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table));
579580
case PARTITIONS -> Optional.of(new PartitionTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
580581
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import io.trino.plugin.iceberg.util.PageListBuilder;
18+
import io.trino.spi.connector.ColumnMetadata;
19+
import io.trino.spi.connector.ConnectorTableMetadata;
20+
import io.trino.spi.connector.SchemaTableName;
21+
import io.trino.spi.type.TimeZoneKey;
22+
import org.apache.iceberg.StructLike;
23+
import org.apache.iceberg.Table;
24+
25+
import java.util.Map;
26+
27+
import static io.trino.spi.type.BigintType.BIGINT;
28+
import static io.trino.spi.type.IntegerType.INTEGER;
29+
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
30+
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
31+
import static io.trino.spi.type.VarcharType.VARCHAR;
32+
import static java.util.Objects.requireNonNull;
33+
import static org.apache.iceberg.MetadataTableType.METADATA_LOG_ENTRIES;
34+
35+
public class MetadataLogEntriesTable
36+
extends BaseSystemTable
37+
{
38+
private static final String TIMESTAMP_COLUMN_NAME = "timestamp";
39+
private static final String FILE_COLUMN_NAME = "file";
40+
private static final String LATEST_SNAPSHOT_ID_COLUMN_NAME = "latest_snapshot_id";
41+
private static final String LATEST_SCHEMA_ID_COLUMN_NAME = "latest_schema_id";
42+
private static final String LATEST_SEQUENCE_NUMBER_COLUMN_NAME = "latest_sequence_number";
43+
44+
public MetadataLogEntriesTable(SchemaTableName tableName, Table icebergTable)
45+
{
46+
super(
47+
requireNonNull(icebergTable, "icebergTable is null"),
48+
createConnectorTableMetadata(requireNonNull(tableName, "tableName is null")),
49+
METADATA_LOG_ENTRIES);
50+
}
51+
52+
private static ConnectorTableMetadata createConnectorTableMetadata(SchemaTableName tableName)
53+
{
54+
return new ConnectorTableMetadata(
55+
tableName,
56+
ImmutableList.<ColumnMetadata>builder()
57+
.add(new ColumnMetadata(TIMESTAMP_COLUMN_NAME, TIMESTAMP_TZ_MILLIS))
58+
.add(new ColumnMetadata(FILE_COLUMN_NAME, VARCHAR))
59+
.add(new ColumnMetadata(LATEST_SNAPSHOT_ID_COLUMN_NAME, BIGINT))
60+
.add(new ColumnMetadata(LATEST_SCHEMA_ID_COLUMN_NAME, INTEGER))
61+
.add(new ColumnMetadata(LATEST_SEQUENCE_NUMBER_COLUMN_NAME, BIGINT))
62+
.build());
63+
}
64+
65+
@Override
66+
protected void addRow(PageListBuilder pagesBuilder, StructLike structLike, TimeZoneKey timeZoneKey, Map<String, Integer> columnNameToPositionInSchema)
67+
{
68+
pagesBuilder.beginRow();
69+
70+
pagesBuilder.appendTimestampTzMillis(
71+
structLike.get(columnNameToPositionInSchema.get(TIMESTAMP_COLUMN_NAME), Long.class) / MICROSECONDS_PER_MILLISECOND,
72+
timeZoneKey);
73+
pagesBuilder.appendVarchar(structLike.get(columnNameToPositionInSchema.get(FILE_COLUMN_NAME), String.class));
74+
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SNAPSHOT_ID_COLUMN_NAME), Long.class));
75+
pagesBuilder.appendInteger(structLike.get(columnNameToPositionInSchema.get(LATEST_SCHEMA_ID_COLUMN_NAME), Integer.class));
76+
pagesBuilder.appendBigint(structLike.get(columnNameToPositionInSchema.get(LATEST_SEQUENCE_NUMBER_COLUMN_NAME), Long.class));
77+
pagesBuilder.endRow();
78+
}
79+
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public enum TableType
1717
{
1818
DATA,
1919
HISTORY,
20+
METADATA_LOG_ENTRIES,
2021
SNAPSHOTS,
2122
MANIFESTS,
2223
PARTITIONS,

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6354,6 +6354,22 @@ public void testDeleteRetainsTableHistory()
63546354
List<Long> snapshotsAfterDelete = getTableHistory(tableName);
63556355
assertThat(snapshotsAfterDelete.size()).isGreaterThan(snapshots.size());
63566356
assertThat(snapshotsAfterDelete).containsAll(snapshots);
6357+
}
6358+
6359+
@Test
6360+
public void testDeleteRetainsMetadataFile()
6361+
{
6362+
String tableName = "test_delete_retains_metadata_file_" + randomNameSuffix();
6363+
assertUpdate("CREATE TABLE " + tableName + "(c1 INT, c2 INT)");
6364+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 1), (2, 2), (3, 3)", 3);
6365+
assertUpdate("INSERT INTO " + tableName + " VALUES (3, 3), (4, 4), (5, 5)", 3);
6366+
List<Long> metadataLogEntries = getLatestSequenceNumbersInMetadataLogEntries(tableName);
6367+
6368+
assertUpdate("DELETE FROM " + tableName + " WHERE c1 < 4", 4);
6369+
List<Long> metadataLogEntriesAfterDelete = getLatestSequenceNumbersInMetadataLogEntries(tableName);
6370+
assertThat(metadataLogEntriesAfterDelete)
6371+
.hasSizeGreaterThan(metadataLogEntries.size())
6372+
.containsAll(metadataLogEntries);
63576373
assertUpdate("DROP TABLE " + tableName);
63586374
}
63596375

@@ -7893,6 +7909,14 @@ private List<Long> getTableHistory(String tableName)
78937909
.collect(toImmutableList());
78947910
}
78957911

7912+
private List<Long> getLatestSequenceNumbersInMetadataLogEntries(String tableName)
7913+
{
7914+
return getQueryRunner().execute(format("SELECT latest_sequence_number FROM \"%s$metadata_log_entries\"", tableName))
7915+
.getOnlyColumn()
7916+
.map(Long.class::cast)
7917+
.collect(toImmutableList());
7918+
}
7919+
78967920
private long getCurrentSnapshotId(String tableName)
78977921
{
78987922
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.junit.jupiter.api.TestInstance;
2525

2626
import java.time.LocalDate;
27+
import java.util.ArrayList;
28+
import java.util.List;
2729
import java.util.Map;
2830
import java.util.function.Function;
2931

@@ -100,6 +102,7 @@ public void tearDown()
100102
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_drop_column");
101103
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_nan");
102104
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_dml");
105+
assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries");
103106
assertUpdate("DROP SCHEMA IF EXISTS test_schema");
104107
}
105108

@@ -197,6 +200,77 @@ public void testHistoryTable()
197200
assertQuery("SELECT count(*) FROM test_schema.\"test_table$history\"", "VALUES 3");
198201
}
199202

203+
@Test
204+
public void testMetadataLogEntriesTable()
205+
{
206+
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$metadata_log_entries\"",
207+
"VALUES ('timestamp', 'timestamp(3) with time zone', '', '')," +
208+
"('file', 'varchar', '', '')," +
209+
"('latest_snapshot_id', 'bigint', '', '')," +
210+
"('latest_schema_id', 'integer', '', '')," +
211+
"('latest_sequence_number', 'bigint', '', '')");
212+
213+
List<Integer> latestSchemaIds = new ArrayList<>();
214+
List<Long> latestSequenceNumbers = new ArrayList<>();
215+
216+
assertUpdate("CREATE TABLE test_schema.test_metadata_log_entries (c1 BIGINT)");
217+
latestSchemaIds.add(0);
218+
latestSequenceNumbers.add(1L);
219+
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);
220+
221+
assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1);
222+
// INSERT create two commits (https://github.com/trinodb/trino/issues/15439) and share a same snapshotId
223+
latestSchemaIds.add(0);
224+
latestSchemaIds.add(0);
225+
latestSequenceNumbers.add(2L);
226+
latestSequenceNumbers.add(2L);
227+
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);
228+
229+
assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries ADD COLUMN c2 VARCHAR");
230+
latestSchemaIds.add(0);
231+
latestSequenceNumbers.add(2L);
232+
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);
233+
234+
assertUpdate("DELETE FROM test_schema.test_metadata_log_entries WHERE c1 = 1", 1);
235+
latestSchemaIds.add(1);
236+
latestSequenceNumbers.add(3L);
237+
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);
238+
239+
// OPTIMIZE create two commits: update snapshot and rewrite statistics
240+
assertUpdate("ALTER TABLE test_schema.test_metadata_log_entries execute optimize");
241+
latestSchemaIds.add(1);
242+
latestSchemaIds.add(1);
243+
latestSequenceNumbers.add(4L);
244+
latestSequenceNumbers.add(4L);
245+
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);
246+
247+
assertUpdate("CREATE OR REPLACE TABLE test_schema.test_metadata_log_entries (c3 INTEGER)");
248+
latestSchemaIds.add(2);
249+
latestSequenceNumbers.add(5L);
250+
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);
251+
252+
assertUpdate("INSERT INTO test_schema.test_metadata_log_entries VALUES (1)", 1);
253+
latestSchemaIds.add(2);
254+
latestSequenceNumbers.add(6L);
255+
latestSchemaIds.add(2);
256+
latestSequenceNumbers.add(6L);
257+
assertMetadataLogEntries(latestSchemaIds, latestSequenceNumbers);
258+
259+
assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries");
260+
}
261+
262+
private void assertMetadataLogEntries(List<Integer> latestSchemaIds, List<Long> latestSequenceNumbers)
263+
{
264+
MaterializedResult result = computeActual("SELECT latest_schema_id, latest_sequence_number FROM test_schema.\"test_metadata_log_entries$metadata_log_entries\" ORDER BY timestamp");
265+
List<MaterializedRow> materializedRows = result.getMaterializedRows();
266+
267+
assertThat(result.getRowCount()).isEqualTo(latestSchemaIds.size());
268+
for (int i = 0; i < result.getRowCount(); i++) {
269+
assertThat(materializedRows.get(i).getField(0)).isEqualTo(latestSchemaIds.get(i));
270+
assertThat(materializedRows.get(i).getField(1)).isEqualTo(latestSequenceNumbers.get(i));
271+
}
272+
}
273+
200274
@Test
201275
public void testSnapshotsTable()
202276
{

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static io.trino.plugin.iceberg.TableType.HISTORY;
4242
import static io.trino.plugin.iceberg.TableType.MANIFESTS;
4343
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
44+
import static io.trino.plugin.iceberg.TableType.METADATA_LOG_ENTRIES;
4445
import static io.trino.plugin.iceberg.TableType.PARTITIONS;
4546
import static io.trino.plugin.iceberg.TableType.PROPERTIES;
4647
import static io.trino.plugin.iceberg.TableType.REFS;
@@ -300,6 +301,12 @@ public void testSelectSystemTable()
300301
.addCopies(GET_TABLE, 1)
301302
.build());
302303

304+
// select from $metadata_log_entries
305+
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"",
306+
ImmutableMultiset.<MetastoreMethod>builder()
307+
.add(GET_TABLE)
308+
.build());
309+
303310
// select from $snapshots
304311
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$snapshots\"",
305312
ImmutableMultiset.<MetastoreMethod>builder()
@@ -335,7 +342,7 @@ public void testSelectSystemTable()
335342

336343
// This test should get updated if a new system table is added.
337344
assertThat(TableType.values())
338-
.containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
345+
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
339346
}
340347

341348
@Test

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import static io.trino.plugin.iceberg.TableType.HISTORY;
5151
import static io.trino.plugin.iceberg.TableType.MANIFESTS;
5252
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
53+
import static io.trino.plugin.iceberg.TableType.METADATA_LOG_ENTRIES;
5354
import static io.trino.plugin.iceberg.TableType.PARTITIONS;
5455
import static io.trino.plugin.iceberg.TableType.PROPERTIES;
5556
import static io.trino.plugin.iceberg.TableType.REFS;
@@ -447,6 +448,12 @@ public void testSelectSystemTable()
447448
.addCopies(GET_TABLE, 1)
448449
.build());
449450

451+
// select from $metadata_log_entries
452+
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$metadata_log_entries\"",
453+
ImmutableMultiset.builder()
454+
.add(GET_TABLE)
455+
.build());
456+
450457
// select from $snapshots
451458
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$snapshots\"",
452459
ImmutableMultiset.builder()
@@ -488,7 +495,7 @@ public void testSelectSystemTable()
488495

489496
// This test should get updated if a new system table is added.
490497
assertThat(TableType.values())
491-
.containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
498+
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
492499
}
493500
finally {
494501
getQueryRunner().execute("DROP TABLE IF EXISTS test_select_snapshots");

0 commit comments

Comments
 (0)