feat(plugin-iceberg): Add Iceberg metadata table $metadata_log_entries#24302
Conversation
hantangwangd
left a comment
There was a problem hiding this comment.
Thanks for adding this feature, overall looks good to me, except one little problem about timestamp with tz and some nits.
| .add(new ColumnMetadata("timestamp", TIMESTAMP_WITH_TIME_ZONE)) | ||
| .add(new ColumnMetadata("file", VARCHAR)) | ||
| .add(new ColumnMetadata("latest_snapshot_id", BIGINT)) | ||
| .add(new ColumnMetadata("latest_schema_id", BIGINT)) |
There was a problem hiding this comment.
nit: Should this type be INTEGER?
| { | ||
| InMemoryRecordSet.Builder table = InMemoryRecordSet.builder(COLUMNS); | ||
|
|
||
| TableMetadata metadata = ((org.apache.iceberg.BaseTable) icebergTable).operations().current(); |
| Long snapshotId = null; | ||
| Snapshot snapshot = null; | ||
| try { | ||
| snapshotId = SnapshotUtil.snapshotIdAsOfTime(icebergTable, entry.timestampMillis()); |
There was a problem hiding this comment.
| snapshotId = SnapshotUtil.snapshotIdAsOfTime(icebergTable, entry.timestampMillis()); | |
| snapshotId = snapshotIdAsOfTime(icebergTable, entry.timestampMillis()); |
nit: I know this code is from iceberg lib, but we can still use static import as much as possible.
|
|
||
| private void addRow(InMemoryRecordSet.Builder table, ConnectorSession session, long timestampMillis, String fileLocation, Long snapshotId, Snapshot snapshot) | ||
| { | ||
| table.addRow(packDateTimeWithZone(timestampMillis, session.getSqlFunctionProperties().getTimeZoneKey()), |
There was a problem hiding this comment.
Should we consider the situation when session.getSqlFunctionProperties().isLegacyTimestamp() is false? As I understand, in that case we should use UTC as time zone key. Any misunderstanding please let me know.
There was a problem hiding this comment.
Thanks for your review @hantangwangd
Currently with and w/o isLegacyTimestamp the output for timestamp in metadata_log_entries entries looks same -
presto:iceberg_schema> set session legacy_timestamp=true;
SET SESSION
presto:iceberg_schema> select * from "region_legacy$metadata_log_entries";
timestamp | file | latest_snapshot_id | latest_schema_id | latest_s>
--------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+---------------------+------------------+--------->
2025-01-02 22:39:00.666 Asia/Kolkata | hdfs://localhost:9000/user/hive/warehouse/iceberg_schema.db/region_legacy/metadata/00000-26e6389a-ab54-455b-b5ce-6648e241ce29.metadata.json | 7341611993609958569 | 0 | >
2025-01-02 22:39:12.478 Asia/Kolkata | hdfs://localhost:9000/user/hive/warehouse/iceberg_schema.db/region_legacy/metadata/00001-15c85566-fcf1-413d-8115-5fc4376426cf.metadata.json | 8958386941531340808 | 0 | >
(2 rows)
presto:iceberg_schema> set session legacy_timestamp=false;
SET SESSION
presto:iceberg_schema> select * from "region_nolegacy$metadata_log_entries";
timestamp | file | latest_snapshot_id | latest_schema_id | latest>
--------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+---------------------+------------------+------->
2025-01-02 22:40:50.877 Asia/Kolkata | hdfs://localhost:9000/user/hive/warehouse/iceberg_schema.db/region_nolegacy/metadata/00000-1de7672a-8a9e-4249-afe8-d526b094ca57.metadata.json | 1517277585224920583 | 0 | >
2025-01-02 22:41:03.948 Asia/Kolkata | hdfs://localhost:9000/user/hive/warehouse/iceberg_schema.db/region_nolegacy/metadata/00001-8bf52dfe-2601-4ce8-bb3c-30ac435573ea.metadata.json | 2705037583472111886 | 0 | >
(2 rows)
It looks like both cases are taking up my local time and timezone since the session object has local TZ
Could you please help me understand if this is not expected?
There was a problem hiding this comment.
My mistake, I confused the result column type timestamp with tz with timestamp. The property isLegacyTimestamp is used for timestamp type, so there is no need to consider it here.
| @Test | ||
| public void testMetadataLogTable() | ||
| { | ||
| try { | ||
| assertUpdate("CREATE TABLE test_table_metadatalog (id1 BIGINT, id2 BIGINT)"); | ||
| assertQuery("SELECT count(*) FROM \"test_table_metadatalog$metadata_log_entries\"", "VALUES 1"); | ||
| //metadata file created at table creation | ||
| assertQuery("SELECT latest_snapshot_id FROM \"test_table_metadatalog$metadata_log_entries\"", "VALUES NULL"); | ||
|
|
||
| assertUpdate("INSERT INTO test_table_metadatalog VALUES (0, 00), (1, 10), (2, 20)", 3); | ||
| Table icebergTable = loadTable("test_table_metadatalog"); | ||
| Snapshot latestSnapshot = icebergTable.currentSnapshot(); | ||
| assertQuery("SELECT count(*) FROM \"test_table_metadatalog$metadata_log_entries\"", "VALUES 2"); | ||
| assertQuery("SELECT latest_snapshot_id FROM \"test_table_metadatalog$metadata_log_entries\" order by timestamp DESC limit 1", "values " + latestSnapshot.snapshotId()); | ||
| } | ||
| finally { | ||
| assertUpdate("DROP TABLE IF EXISTS test_table_metadatalog"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Is it convenient to add some test cases considering different timezone and legacyTimestamp, and verify the output column timestamp?
There was a problem hiding this comment.
@hantangwangd Could you please provide me some example around which type of testcases would fit in here considering different timezone?
I just looked at other metadata tables with timestamp column, but couldn't find any example around same.
There was a problem hiding this comment.
Refer to Iceberg's test case, I think we can add some tests similar with the following code:
Session session = sessionWithTimezone(zoneId);
assertUpdate(session, "CREATE TABLE test_table_metadatalog (id1 BIGINT, id2 BIGINT)");
assertQuery(session, "SELECT count(*) FROM \"test_table_metadatalog$metadata_log_entries\"", "VALUES 1");
Table icebergTable = loadTable("test_table_metadatalog");
TableMetadata tableMetadata = ((HasTableOperations) icebergTable).operations().current();
ZonedDateTime zonedDateTime1 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(tableMetadata.lastUpdatedMillis()), ZoneId.of(zoneId));
String metadataFileLocation1 = "file:" + tableMetadata.metadataFileLocation();
assertUpdate(session, "INSERT INTO test_table_metadatalog VALUES (0, 00), (1, 10), (2, 20)", 3);
tableMetadata = ((HasTableOperations) icebergTable).operations().refresh();
ZonedDateTime zonedDateTime2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(tableMetadata.lastUpdatedMillis()), ZoneId.of(zoneId));
String metadataFileLocation2 = "file:" + tableMetadata.metadataFileLocation();
Snapshot latestSnapshot = tableMetadata.currentSnapshot();
MaterializedResult result = getQueryRunner().execute(session, "SELECT * FROM \"test_table_metadatalog$metadata_log_entries\"");
assertThat(result).hasSize(2);
assertThat(result)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, zonedDateTime1, metadataFileLocation1, null, null, null)))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, zonedDateTime2, metadataFileLocation2, latestSnapshot.snapshotId(), latestSnapshot.schemaId(), latestSnapshot.sequenceNumber())));
And test it under different zoneIds.
ZacBlanco
left a comment
There was a problem hiding this comment.
One minor thing. I also agree with @hantanwangd to make sure this works with proper TZ configuration. Otherwise lgtm
| @Override | ||
| public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint) | ||
| { | ||
| InMemoryRecordSet.Builder table = InMemoryRecordSet.builder(COLUMNS); |
There was a problem hiding this comment.
Rather than use the builder, I would recommend using the public constructor and passing an iterator. It will help reduce memory pressure on the coordinator by streaming records rather than requiring us to aggregate all at once in-memory. The overall footprint of this table shouldn't be too large but I think using an iterator approach to generate the records is not difficult to implement.
When generating records you can just use java's Stream and map operations and just call .iterator() at the end.
| List<MetadataLogEntry> metadataLogEntries = metadata.previousFiles(); | ||
|
|
||
| processMetadataLogEntries(table, session, metadataLogEntries); | ||
| addLatestMetadataEntry(table, session, metadata); |
There was a problem hiding this comment.
to add the latest entry I think you can just do Stream.concat+Stream.of()
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull branch, local doc build, looks good. Thank you for the documentation!
8f6f007 to
e66d10e
Compare
e66d10e to
fa36f6b
Compare
Reviewer's GuideImplements the Iceberg $metadata_log_entries system table and wires it into the connector, exposing metadata log history (with timestamps, file paths, and snapshot details) and adding tests including time-zone sensitive behavior and documentation hooks. Sequence diagram for querying the Iceberg $metadata_log_entries system tablesequenceDiagram
actor User
participant PrestoEngine
participant IcebergMetadata as IcebergAbstractMetadata
participant MetadataLogTable
participant BaseTable
participant TableOperations
participant TableMetadata
User->>PrestoEngine: SELECT * FROM table$metadata_log_entries
PrestoEngine->>IcebergMetadata: getIcebergSystemTable(tableName, icebergTable)
IcebergMetadata->>IcebergMetadata: resolve IcebergTableType METADATA_LOG_ENTRIES
IcebergMetadata-->>PrestoEngine: new MetadataLogTable(systemTableName, icebergTable)
PrestoEngine->>MetadataLogTable: cursor(transactionHandle, session, constraint)
MetadataLogTable->>BaseTable: operations()
BaseTable-->>MetadataLogTable: TableOperations
MetadataLogTable->>TableOperations: current()
TableOperations-->>MetadataLogTable: TableMetadata
loop for each previousFiles entry
MetadataLogTable->>TableMetadata: previousFiles()
TableMetadata-->>MetadataLogTable: iterator of MetadataLogEntry
MetadataLogTable->>MetadataLogTable: processMetadataLogEntries(session, metadataLogEntry)
end
MetadataLogTable->>TableMetadata: lastUpdatedMillis(), metadataFileLocation()
TableMetadata-->>MetadataLogTable: timestampMillis, fileLocation
MetadataLogTable->>BaseTable: operations().current()
BaseTable-->>MetadataLogTable: TableMetadata (current)
MetadataLogTable->>MetadataLogTable: buildLatestMetadataRow(session, currentMetadata)
MetadataLogTable-->>PrestoEngine: RecordCursor over metadata log rows
PrestoEngine-->>User: result set (timestamp, file, latest_snapshot_id, latest_schema_id, latest_sequence_number)
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- In
MetadataLogTable.cursor's iterator,next()returnsemptyList()when there are no more entries instead of throwingNoSuchElementException; it would be clearer and safer to align with the Iterator contract and make that branch unreachable (or throw) whenhasNext()is false. - In
MetadataLogTable.processMetadataLogEntries, the broadcatch (IllegalArgumentException ignored)silently swallows all such errors; consider tightening the condition or at least documenting the exact scenarios you expect here (and possibly logging unexpected cases) so debugging issues aroundsnapshotIdAsOfTimeis easier. - In
testMetadataLogTableWithTimeZoneId,resultBuilderusesgetSession()instead ofsessionForTimeZone, which may hide timezone-specific behavior; consider using the same session as the query execution to keep expectations consistent with the selected timezone.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `MetadataLogTable.cursor`'s iterator, `next()` returns `emptyList()` when there are no more entries instead of throwing `NoSuchElementException`; it would be clearer and safer to align with the Iterator contract and make that branch unreachable (or throw) when `hasNext()` is false.
- In `MetadataLogTable.processMetadataLogEntries`, the broad `catch (IllegalArgumentException ignored)` silently swallows all such errors; consider tightening the condition or at least documenting the exact scenarios you expect here (and possibly logging unexpected cases) so debugging issues around `snapshotIdAsOfTime` is easier.
- In `testMetadataLogTableWithTimeZoneId`, `resultBuilder` uses `getSession()` instead of `sessionForTimeZone`, which may hide timezone-specific behavior; consider using the same session as the query execution to keep expectations consistent with the selected timezone.
## Individual Comments
### Comment 1
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/MetadataLogTable.java:108` </location>
<code_context>
+ TableMetadata currentMetadata = ((BaseTable) icebergTable).operations().current();
+ return buildLatestMetadataRow(session, currentMetadata);
+ }
+ return emptyList();
+ }
+ };
</code_context>
<issue_to_address>
**issue (bug_risk):** Avoid returning an empty row when the iterator is exhausted; use NoSuchElementException instead.
`Iterator.next()` should throw `NoSuchElementException` when there are no more elements. The current `return emptyList();` would produce a row with an incorrect column count and violate `InMemoryRecordSet` expectations, leading to confusing failures. Replace the final return with `throw new NoSuchElementException();` to honor the iterator contract and fail explicitly.
</issue_to_address>
### Comment 2
<location> `presto-docs/src/main/sphinx/connector/iceberg.rst:551` </location>
<code_context>
assign a split to. Splits which read data from the same file within
the same chunk will hash to the same node. A smaller chunk size will
result in a higher probability splits being distributed evenly across
- the cluster, but reduce locality.
+ the cluster, but reduce locality.
</code_context>
<issue_to_address>
**suggestion (typo):** Consider adjusting the phrase to "higher probability of splits" for correct grammar.
This sentence is missing the preposition "of" after "probability," which is why it currently reads awkwardly.
```suggestion
result in a higher probability of splits being distributed evenly across
```
</issue_to_address>
### Comment 3
<location> `presto-docs/src/main/sphinx/connector/iceberg.rst:945` </location>
<code_context>
All above metadata tables, except `$changelog`, are supported in Presto C++.
+``$metadata_log_entries`` Table
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+* ``$metadata_log_entries`` : Provide metadata log entries for the table
+
+.. code-block:: sql
</code_context>
<issue_to_address>
**suggestion (typo):** Use "Provides" instead of "Provide" for subject–verb agreement.
Because `$metadata_log_entries` is singular, the description should be: "Provides metadata log entries for the table."
```suggestion
* ``$metadata_log_entries`` : Provides metadata log entries for the table
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/MetadataLogTable.java
Outdated
Show resolved
Hide resolved
| is hashed to a particular node when determining the which worker to | ||
| assign a split to. Splits which read data from the same file within | ||
| the same chunk will hash to the same node. A smaller chunk size will | ||
| result in a higher probability splits being distributed evenly across |
There was a problem hiding this comment.
suggestion (typo): Consider adjusting the phrase to "higher probability of splits" for correct grammar.
This sentence is missing the preposition "of" after "probability," which is why it currently reads awkwardly.
| result in a higher probability splits being distributed evenly across | |
| result in a higher probability of splits being distributed evenly across |
| All above metadata tables, except `$changelog`, are supported in Presto C++. | ||
| ``$metadata_log_entries`` Table | ||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
| * ``$metadata_log_entries`` : Provide metadata log entries for the table |
There was a problem hiding this comment.
suggestion (typo): Use "Provides" instead of "Provide" for subject–verb agreement.
Because $metadata_log_entries is singular, the description should be: "Provides metadata log entries for the table."
| * ``$metadata_log_entries`` : Provide metadata log entries for the table | |
| * ``$metadata_log_entries`` : Provides metadata log entries for the table |
9b264dc to
66bdfba
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Thanks for the documentation! One small nit, and a question.
66bdfba to
47b356e
Compare
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, new local doc build, looks good. Thanks!
hantangwangd
left a comment
There was a problem hiding this comment.
@agrawalreetika thanks for this feature. The change looks good to me, just one little thing left.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/MetadataLogTable.java
Outdated
Show resolved
Hide resolved
324c06c to
2ca5ef8
Compare
hantangwangd
left a comment
There was a problem hiding this comment.
LGTM! Please handle the maven checks failure.
2ca5ef8 to
d64783c
Compare
d64783c to
bb06774
Compare
## Description Fix protocol generation after changes introduced by #24302 by running the `make presto_protocol` command. ## Impact No impact ## Test Plan CI ## Contributor checklist - [x] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [x] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [x] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [x] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [x] Adequate tests were added if applicable. - [x] CI passed. - [x] If adding new dependencies, verified they have an [OpenSSF Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or higher (or obtained explicit TSC approval for lower scores). ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == NO RELEASE NOTE == ```
Description
Add Iceberg metadata table $metadata_log_entries
Motivation and Context
Add Iceberg metadata table $metadata_log_entries
This will help to get metadata changes on the Iceberg table https://iceberg.apache.org/docs/latest/spark-queries/#metadata-log-entries
Impact
Iceberg Connector
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.