feat(iceberg): Add $snapshot_id as hidden column in iceberg table#26189
feat(iceberg): Add $snapshot_id as hidden column in iceberg table#26189agrawalreetika wants to merge 1 commit intoprestodb:masterfrom
Conversation
Reviewer's GuideThis PR introduces $snapshot_id as a hidden metadata column by extending split representations and split sources to carry snapshot IDs, integrating the new column into column handles and table metadata, exposing it in page sources, and validating the feature via an integration test. ER diagram for new $snapshot_id metadata column in table schemaerDiagram
ICEBERG_TABLE {
VARCHAR $path
BIGINT $data_sequence_number
BOOLEAN $deleted
VARCHAR $delete_file_path
BIGINT $snapshot_id
}
ICEBERG_TABLE ||--o{ ICEBERG_SPLIT : contains
ICEBERG_SPLIT {
BIGINT snapshotId
}
Class diagram for updated IcebergSplit and related classesclassDiagram
class IcebergSplit {
- long dataSequenceNumber
- long affinitySchedulingFileSectionSize
- long affinitySchedulingFileSectionIndex
+ long snapshotId
+ getSnapshotId(): long
}
class ChangelogSplitSource {
- long snapshotId
+ ChangelogSplitSource(..., long snapshotId)
}
class EqualityDeletesSplitSource {
- long snapshotId
+ EqualityDeletesSplitSource(..., long snapshotId)
}
class IcebergSplitSource {
- long snapshotId
+ IcebergSplitSource(...)
}
IcebergSplitSource --> IcebergSplit
ChangelogSplitSource --> IcebergSplit
EqualityDeletesSplitSource --> IcebergSplit
Class diagram for IcebergColumnHandle and IcebergMetadataColumn changesclassDiagram
class IcebergColumnHandle {
+ static SNAPSHOT_ID_COLUMN_HANDLE: IcebergColumnHandle
+ static SNAPSHOT_ID_COLUMN_METADATA: ColumnMetadata
+ isSnapshotId(): boolean
}
class IcebergMetadataColumn {
+ SNAPSHOT_ID
}
IcebergColumnHandle --> IcebergMetadataColumn
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- Consider refactoring snapshotId propagation into a common base or builder to avoid repeating it across all split source constructors and reduce boilerplate.
- Include snapshotId in the split info returned by IcebergSplit#getInfo() so that split logs or traces will clearly show which snapshot each split belongs to for easier debugging.
- Add an integration test for point-in-time scans (using fromSnapshot/toSnapshot) to verify that $snapshot_id in query results matches the intended historical snapshot, not just the current one.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider refactoring snapshotId propagation into a common base or builder to avoid repeating it across all split source constructors and reduce boilerplate.
- Include snapshotId in the split info returned by IcebergSplit#getInfo() so that split logs or traces will clearly show which snapshot each split belongs to for easier debugging.
- Add an integration test for point-in-time scans (using fromSnapshot/toSnapshot) to verify that $snapshot_id in query results matches the intended historical snapshot, not just the current one.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
350af58 to
66a2beb
Compare
6d42a74 to
17c11a0
Compare
|
In cases where it's unambiguous to do so, this should also push down into Iceberg via |
| affinitySchedulingFileSectionSize); | ||
| affinitySchedulingFileSectionSize, | ||
| snapshotId); |
There was a problem hiding this comment.
I'm a bit concerned about the snapshotId selection here. It seems like we are using the table-level snapshotId taken when the entire table was scanned, but my understanding is that it should be the snapshotId calculated based on the corresponding data file and delete files, right?
There was a problem hiding this comment.
@hantangwangd this is an important observation. Selecting this column would be more useful if it returned the snapshot ID of the data file, i.e. which snapshot ID created the file. However, this column is primarily intended for filtering, as a way of altering the table handle to force a time travel on the table without introducing a new SPI or connector optimizer. Given this column will be hidden and not intended for direct use, I am comfortable with this being the snapshot ID of the scan, as that fulfills the intended purpose.
There was a problem hiding this comment.
@tdcmeehan thanks for the detailed explanation. Based on my understanding of PR #26164 and the comments here, the primary purpose of this $snapshot_id column is to enable predicate pushdown for the filter WHERE $snapshot_id > xxx which is used to query incremental data since a specified snapshot. Therefore column $snapshot_id should be disallowed to be specified directly in a query, and shouldn't exist in any filter node which couldn't be completely pushdown to Iceberg connector, is this correct?
|
One additional thing we should probably do is fail in case any predicate is provided which compares the snapshot ID to any non-constant value, or any less than predicate is supplied, as they're just too dangerous. Only greater than should be supported, since this will use the latest schema. |
17c11a0 to
29cfe1a
Compare
| // Only support >= X | ||
| Optional<Long> lower = Optional.of(((Number) range.getLowBoundedValue()).longValue()); | ||
| handle = handle.withUpdatedIcebergTableName( | ||
| new IcebergTableName(name.getTableName(), name.getTableType(), lower, name.getChangelogEndSnapshot())); |
There was a problem hiding this comment.
@tdcmeehan I wanted to confirm a point here -
Currently, I am updating IcebergTableHandle with the lower bound (X) here (for >= X), but shouldn't we use the latest available snapshot whose ID >= X when the query predicate is $snapshot_id >= X instead? So it ensures we always read using the most recent snapshot schema, which avoids issues that can occur if older snapshots have outdated or incompatible schemas.
| session.getRuntimeStats()); | ||
|
|
||
| return new EqualityDeletesSplitSource(session, icebergTable, deleteFiles); | ||
| return new EqualityDeletesSplitSource(session, icebergTable, deleteFiles, table.getIcebergTableName().getSnapshotId().get()); |
There was a problem hiding this comment.
getSnapshotId() returns Optional.
Need to check isPresent first before calling get.
| new IcebergTableName(name.getTableName(), name.getTableType(), lower, name.getChangelogEndSnapshot())); | ||
| } | ||
| else { | ||
| throw new PrestoException(NOT_SUPPORTED, "Unsupported predicate for $snapshot_id; only >= constant is allowed"); |
There was a problem hiding this comment.
Should we change the message to >= and = since both of them are supported.
| }); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Could you also add a case where there are multiple snapshots?
|
|
||
| if (domain.isSingleValue()) { | ||
| Optional<Long> snapshotId = Optional.of(((Number) domain.getSingleValue()).longValue()); | ||
| handle = handle.withUpdatedIcebergTableName( |
There was a problem hiding this comment.
What if we are querying time travel tables? Here we will always overwrite the snapshotId.
Probably we should add some check for the snapshot in predicate and the snapshot specified in time travel.
|
Should |
|
@PingLiuPing Thanks for the review, but having snapshot_id with Filter will have issue. As $snapshot_id is not incremental so while calculating delta between 2 snapshots (With query like Could you please review #26408 which is based on |
Description
Add $snapshot_id as a hidden column in the iceberg table
Motivation and Context
Add $snapshot_id as a hidden column in the iceberg table
Addresses #26164
Impact
Test Plan
Integration test added
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.