feat(plugin-iceberg): Add $snapshot_sequence_number as hidden column in iceberg table#26408
feat(plugin-iceberg): Add $snapshot_sequence_number as hidden column in iceberg table#26408agrawalreetika wants to merge 1 commit intoprestodb:masterfrom
Conversation
Reviewer's GuideThis PR adds $snapshot_sequence_number as a hidden metadata column in Iceberg tables and snapshots, implements utility methods and metadata handling to support sequence-based snapshot filtering, extends split management for incremental scans with sequence numbers, updates the snapshots system table, and includes tests and documentation for predicate pushdown and hidden column enforcement. Sequence diagram for incremental scan with $snapshot_sequence_number filteringsequenceDiagram
participant Q as "Query Engine"
participant M as "IcebergAbstractMetadata"
participant U as "IcebergUtil"
participant S as "IcebergSplitManager"
participant T as "Iceberg Table"
Q->>M: Request table layout with $snapshot_sequence_number filter
M->>U: Validate and fetch snapshots by sequence number
U->>T: Get snapshot by sequence number
U-->>M: Return start/end snapshots
M->>S: Create IcebergTableHandle with fromInclusive flag
S->>T: Plan incremental scan from start to end snapshot
S-->>Q: Return splits with snapshotSequenceNumber
Class diagram for new and updated Iceberg metadata and split classesclassDiagram
class IcebergMetadataColumn {
<<enum>>
DATA_SEQUENCE_NUMBER
IS_DELETED
DELETE_FILE_PATH
SNAPSHOT_SEQUENCE_NUMBER
}
class IcebergColumnHandle {
+isSnapshotSequenceNumberColumn()
+SNAPSHOT_SEQUENCE_NUMBER_COLUMN_HANDLE
+SNAPSHOT_SEQUENCE_NUMBER_COLUMN_METADATA
}
class IcebergTableHandle {
+boolean fromInclusive
+withUpdatedIcebergTableName(IcebergTableName, boolean)
}
class IcebergSplit {
+long snapshotSequenceNumber
+getSnapshotSequenceNumber()
}
class IcebergSplitSource {
+long snapshotSequenceNumber
}
class ChangelogSplitSource {
+long snapshotSequenceNumber
}
class EqualityDeletesSplitSource {
+long snapshotSequenceNumber
}
IcebergMetadataColumn <|-- IcebergColumnHandle
IcebergTableHandle --> IcebergSplitSource
IcebergSplitSource --> IcebergSplit
ChangelogSplitSource --> IcebergSplit
EqualityDeletesSplitSource --> IcebergSplit
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- getSnapshotBySequenceNumber returns null on missing snapshots, which may lead to NPEs in callers; consider throwing a clear PrestoException for invalid sequence numbers instead of returning null.
- There is repeated validation and append‐only checks across prepareIncrementalScan and prepareBetweenScan—extract and consolidate shared logic into a single helper to reduce duplication and ensure consistent error handling.
- Streaming through icebergTable.snapshots() for each lookup can become costly on tables with many snapshots; consider caching the snapshot list or using a map for direct sequence‐number lookup to improve performance.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- getSnapshotBySequenceNumber returns null on missing snapshots, which may lead to NPEs in callers; consider throwing a clear PrestoException for invalid sequence numbers instead of returning null.
- There is repeated validation and append‐only checks across prepareIncrementalScan and prepareBetweenScan—extract and consolidate shared logic into a single helper to reduce duplication and ensure consistent error handling.
- Streaming through icebergTable.snapshots() for each lookup can become costly on tables with many snapshots; consider caching the snapshot list or using a map for direct sequence‐number lookup to improve performance.
## Individual Comments
### Comment 1
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java:460` </location>
<code_context>
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
}
+ public static Snapshot getSnapshotBySequenceNumber(Table icebergTable, long sequenceNumber)
+ {
+ if (icebergTable == null) {
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Returning null instead of Optional may lead to NPEs.
Consider returning Optional<Snapshot> instead of null to clearly indicate when no snapshot is found and prevent potential NullPointerExceptions.
Suggested implementation:
```java
public static Optional<Snapshot> getSnapshotBySequenceNumber(Table icebergTable, long sequenceNumber)
{
if (icebergTable == null) {
throw new IllegalArgumentException("icebergTable is null");
}
return StreamSupport.stream(icebergTable.snapshots().spliterator(), false)
.filter(s -> s.sequenceNumber() == sequenceNumber)
.findFirst();
}
```
If there are any callers of `getSnapshotBySequenceNumber`, you will need to update them to handle the `Optional<Snapshot>` return type instead of a potentially null `Snapshot`.
</issue_to_address>
### Comment 2
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java:328` </location>
<code_context>
+ long seqNum = ((Number) range.getLowBoundedValue()).longValue();
+ handle = handle.withUpdatedIcebergTableName(prepareIncrementalScan(icebergTable, startSnapshot, name, seqNum));
+ }
+ else if (range.isLowInclusive() && range.isHighInclusive()) {
+ // WHERE $snapshot_id BETWEEN X AND Y
+ handle = handle.withUpdatedIcebergTableName(prepareBetweenScan(icebergTable, name, range));
</code_context>
<issue_to_address>
**question:** Check for BETWEEN logic may not handle exclusive bounds.
Exclusive bounds currently result in an unsupported predicate exception. Please clarify this in the error message or consider supporting exclusive bounds.
</issue_to_address>
### Comment 3
<location> `presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java:2286-2287` </location>
<code_context>
"VALUES (true, 1, 'a'), (false, 1, 'b'), (false, 2, 'a'), (false, 3, 'a')");
}
+ @Test
+ public void testSnapshotSequenceNumberHiddenColumnSimple()
+ {
+ String tableName = "test_snapshot_seq_num_hidden_" + randomTableSuffix();
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for tables with no snapshots.
Please add a test for a newly created table with no snapshots to verify the behavior of $snapshot_sequence_number in this scenario.
</issue_to_address>
### Comment 4
<location> `presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java:2299-2307` </location>
<code_context>
+ long sequenceNumber = iceberTable.currentSnapshot().sequenceNumber();
+ iceberTable.refresh();
+
+ assertQuery("SELECT COUNT(\"$snapshot_sequence_number\") FROM " + tableName, "VALUES 2");
+ assertQuery("SELECT \"$snapshot_sequence_number\", * FROM " + tableName, "VALUES " +
+ "(" + sequenceNumber + ", 100, 'a')," +
+ "(" + sequenceNumber + ", 200, 'b')");
</code_context>
<issue_to_address>
**suggestion (testing):** Test for $snapshot_sequence_number after table modifications (e.g., DELETE, UPDATE, OVERWRITE).
Please add tests for DELETE, UPDATE, and OVERWRITE operations to ensure $snapshot_sequence_number behaves as expected, given these are not supported for delta queries.
```suggestion
assertQuery("SELECT COUNT(\"$snapshot_sequence_number\") FROM " + tableName, "VALUES 2");
assertQuery("SELECT \"$snapshot_sequence_number\", * FROM " + tableName, "VALUES " +
"(" + sequenceNumber + ", 100, 'a')," +
"(" + sequenceNumber + ", 200, 'b')");
// Test DELETE
assertUpdate("DELETE FROM " + tableName + " WHERE id = 100", 1);
iceberTable.refresh();
long deleteSequenceNumber = iceberTable.currentSnapshot().sequenceNumber();
assertQuery("SELECT COUNT(\"$snapshot_sequence_number\") FROM " + tableName, "VALUES 1");
assertQuery("SELECT \"$snapshot_sequence_number\", * FROM " + tableName, "VALUES " +
"(" + deleteSequenceNumber + ", 200, 'b')");
// Test UPDATE
assertUpdate("UPDATE " + tableName + " SET data = 'bb' WHERE id = 200", 1);
iceberTable.refresh();
long updateSequenceNumber = iceberTable.currentSnapshot().sequenceNumber();
assertQuery("SELECT COUNT(\"$snapshot_sequence_number\") FROM " + tableName, "VALUES 1");
assertQuery("SELECT \"$snapshot_sequence_number\", * FROM " + tableName, "VALUES " +
"(" + updateSequenceNumber + ", 200, 'bb')");
// Test OVERWRITE
assertUpdate("INSERT OVERWRITE " + tableName + " VALUES (300, 'c')", 1);
iceberTable.refresh();
long overwriteSequenceNumber = iceberTable.currentSnapshot().sequenceNumber();
assertQuery("SELECT COUNT(\"$snapshot_sequence_number\") FROM " + tableName, "VALUES 1");
assertQuery("SELECT \"$snapshot_sequence_number\", * FROM " + tableName, "VALUES " +
"(" + overwriteSequenceNumber + ", 300, 'c')");
}
finally {
assertQuerySucceeds("DROP TABLE " + tableName);
}
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| .collect(toImmutableMap(Entry::getKey, Entry::getValue)); | ||
| } | ||
|
|
||
| public static Snapshot getSnapshotBySequenceNumber(Table icebergTable, long sequenceNumber) |
There was a problem hiding this comment.
suggestion (bug_risk): Returning null instead of Optional may lead to NPEs.
Consider returning Optional instead of null to clearly indicate when no snapshot is found and prevent potential NullPointerExceptions.
Suggested implementation:
public static Optional<Snapshot> getSnapshotBySequenceNumber(Table icebergTable, long sequenceNumber)
{
if (icebergTable == null) {
throw new IllegalArgumentException("icebergTable is null");
}
return StreamSupport.stream(icebergTable.snapshots().spliterator(), false)
.filter(s -> s.sequenceNumber() == sequenceNumber)
.findFirst();
}If there are any callers of getSnapshotBySequenceNumber, you will need to update them to handle the Optional<Snapshot> return type instead of a potentially null Snapshot.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
|
@agrawalreetika thanks for this change. A quick question: can you help me understand the decision behind using an inclusive lower bound for the incremental query rather than an exclusive one? For our use cases like refreshing materialized views, wouldn't the latter be more appropriate? |
@hantangwangd So my understanding here is when we have -
Lmk if I missed something here? |
Thanks for the explanation, I'm trying to make sure I understand the filter semantics correctly. As I understand it, If that's the case, what do you think about supporting and using |
|
@agrawalreetika What's the strategy for the $snapshot_id hidden column PR #26189? will it be discarded? |
b7ab30d to
07ba3fd
Compare
I have updated description with the covered scenario, please take a look and lmk if anything is missing. |
I have moved it back to Draft, as the current implementation has limitations with regard to comparison. And |
steveburnett
left a comment
There was a problem hiding this comment.
Thanks for the doc! What's here looks good in a local build.
Should $snapshot_sequence_number also be added in https://github.com/prestodb/presto/blob/master/presto-docs/src/main/sphinx/connector/iceberg.rst#extra-hidden-metadata-columns?
|
Thanks for the release note! Formatting nit: |
|
@agrawalreetika Can you point me where is the snapshot_sequence_number in the Iceberg spec? Is it https://iceberg.apache.org/spec/?h=sequence+number#sequence-numbers ? |
07ba3fd to
6df6146
Compare
|
Hi @agrawalreetika, here are a few high-level thoughts that came to my mind. They're pretty preliminary and are intended for discussion, so please let me know what you think:
|
|
I agree with @hantangwangd comment's. Especially on 3. I wonder the semantic correctness of |
|
@hantangwangd Thank you for your detailed feedback!
Here’s how the supported scenarios would look: Please let me know if this interpretation aligns with your expectation.
|
|
@agrawalreetika thanks for sharing your perspective. Your summary of the supported scenarios and expected behavior semantically aligns with my expectation.
As I see, currently we can query these hidden metadata columns as part of a SQL statement by including them in the SELECT part. This is fine for the other hidden metadata columns, as they contain useful information and are even documented in our Iceberg docs. My point is, for the new metadata column
The first thing that comes to my mind is that we can directly specify which snapshots to expire by calling the This is semantically equivalent to what you described above. The difference is that this implementation relies solely on the interfaces provided by Iceberg's |
6df6146 to
6231cd4
Compare
|
Got occupied, sorry for the delay. Please take a look at the changes when you get time. Thank you! |
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java:89-98` </location>
<code_context>
+ if (table.getIcebergTableName().getTableType() == INCREMENTAL) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential risk if snapshotId is missing for INCREMENTAL table type.
If both getSnapshotId() and getChangelogEndSnapshot() are empty, and oldestAncestor or currentSnapshot are also missing, a NullPointerException may occur. Please add explicit checks or error handling for missing snapshots.
</issue_to_address>
### Comment 2
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java:95` </location>
<code_context>
+ this.snapshotSequenceNumber = snapshotSequenceNumber;
}
@JsonProperty
</code_context>
<issue_to_address>
**suggestion:** Returning primitive long for getSnapshotSequenceNumber may be problematic if unset.
Consider using OptionalLong for getSnapshotSequenceNumber, or clearly document how unset or invalid values are represented.
Suggested implementation:
```java
@JsonProperty("snapshotSequenceNumber") OptionalLong snapshotSequenceNumber)
{
requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.path = requireNonNull(path, "path is null");
this.dataSequenceNumber = dataSequenceNumber;
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
this.affinitySchedulingFileSectionIndex = start / affinitySchedulingFileSectionSize;
this.snapshotSequenceNumber = requireNonNull(snapshotSequenceNumber, "snapshotSequenceNumber is null");
}
```
```java
@JsonProperty
public OptionalLong getSnapshotSequenceNumber()
{
return snapshotSequenceNumber;
}
```
You will need to:
1. Update the field declaration for `snapshotSequenceNumber` in the class to be `private final OptionalLong snapshotSequenceNumber;`.
2. Update any other usages of `snapshotSequenceNumber` in the class to handle `OptionalLong` (e.g., use `snapshotSequenceNumber.isPresent()` and `snapshotSequenceNumber.getAsLong()`).
3. Adjust JSON serialization/deserialization if you have custom logic elsewhere in the class or related classes.
4. Update any code that constructs this class to pass an `OptionalLong` instead of a primitive `long`.
</issue_to_address>
### Comment 3
<location> `presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java:748-750` </location>
<code_context>
.collect(toImmutableList());
+ // Reject if $snapshot_sequence_number is actually projected (not just used in WHERE)
+ if (icebergColumns.stream().anyMatch(colum -> colum.isSnapshotSequenceNumberColumn()
+ && !icebergLayout.getPredicateColumns().containsKey(colum.getName()))) {
+ throw new PrestoException(
+ NOT_SUPPORTED,
+ "The column $snapshot_sequence_number is internal and cannot be selected directly");
</code_context>
<issue_to_address>
**issue:** The check for $snapshot_sequence_number projection may not handle case sensitivity.
If column names differ in case, this check may not detect all instances of $snapshot_sequence_number. Normalize column names or enforce case sensitivity to ensure the restriction is applied consistently.
</issue_to_address>
### Comment 4
<location> `presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java:2286-2287` </location>
<code_context>
"VALUES (true, 1, 'a'), (false, 1, 'b'), (false, 2, 'a'), (false, 3, 'a')");
}
+ @Test
+ public void testSnapshotSequenceNumberHiddenColumnSimple()
+ {
+ String tableName = "test_snapshot_seq_num_hidden_" + randomTableSuffix();
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for error conditions when querying $snapshot_sequence_number with invalid sequence numbers.
Please add tests for queries using non-existent sequence numbers in the WHERE clause to verify correct error handling or empty results.
</issue_to_address>
### Comment 5
<location> `presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java:2292-2293` </location>
<code_context>
+ String tableName = "test_snapshot_seq_num_hidden_" + randomTableSuffix();
+
+ try {
+ assertUpdate("CREATE TABLE " + tableName + "(id int, data varchar)");
+ assertUpdate("INSERT INTO " + tableName + " VALUES (100, 'a')", 1);
+ assertUpdate("INSERT INTO " + tableName + " VALUES (200, 'b')", 1);
+ Table iceberTable = loadTable(tableName);
</code_context>
<issue_to_address>
**suggestion (testing):** Suggest adding a test for $snapshot_sequence_number on tables with overwrite or delete operations.
Please add a test that covers delete or overwrite operations, then queries $snapshot_sequence_number, to confirm the documented limitations are enforced.
</issue_to_address>
### Comment 6
<location> `presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java:163` </location>
<code_context>
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$snapshots\"",
"VALUES ('committed_at', 'timestamp with time zone', '', '', null, null, null)," +
"('snapshot_id', 'bigint', '', '', 19L, null, null)," +
+ "('snapshot_sequence_number', 'bigint', '', '', 19L, null, null)," +
"('parent_id', 'bigint', '', '', 19L, null, null)," +
"('operation', 'varchar', '', '', null, null, 2147483647L)," +
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test to verify the actual values returned for $snapshot_sequence_number in the $snapshots system table.
Adding a test that queries $snapshots and asserts the correctness of $snapshot_sequence_number for various snapshot operations would improve coverage.
Suggested implementation:
```java
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$snapshots\"",
"VALUES ('committed_at', 'timestamp with time zone', '', '', null, null, null)," +
"('snapshot_id', 'bigint', '', '', 19L, null, null)," +
"('snapshot_sequence_number', 'bigint', '', '', 19L, null, null)," +
"('parent_id', 'bigint', '', '', 19L, null, null)," +
"('operation', 'varchar', '', '', null, null, 2147483647L)," +
"('manifest_list', 'varchar', '', '', null, null, 2147483647L)," +
// Test: Verify snapshot_sequence_number values in $snapshots system table
assertUpdate("CREATE TABLE test_schema.test_table (id INTEGER)");
assertUpdate("INSERT INTO test_schema.test_table VALUES (1)");
assertUpdate("INSERT INTO test_schema.test_table VALUES (2)");
assertUpdate("DELETE FROM test_schema.test_table WHERE id = 1");
// Query the $snapshots system table and check snapshot_sequence_number values
assertQuery(
"SELECT snapshot_sequence_number, operation FROM test_schema.\"test_table$snapshots\" ORDER BY snapshot_sequence_number",
"VALUES (1, 'table creation')," +
"(2, 'insert')," +
"(3, 'insert')," +
"(4, 'delete')"
);
```
- You may need to adjust the expected values in the `VALUES` clause depending on the actual operation names and sequence numbers produced by your Iceberg connector.
- If your test setup uses different table names or schema, update accordingly.
- If the assertion helper `assertQuery` expects a different format, adapt the query and expected results.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if (table.getIcebergTableName().getTableType() == INCREMENTAL) { | ||
| long fromSnapshot = table.getIcebergTableName().getSnapshotId().orElseGet(() -> SnapshotUtil.oldestAncestor(icebergTable).snapshotId()); | ||
| long toSnapshot = table.getIcebergTableName().getChangelogEndSnapshot() | ||
| .orElseGet(icebergTable.currentSnapshot()::snapshotId); | ||
|
|
||
| IncrementalAppendScan scan = icebergTable.newIncrementalAppendScan() | ||
| .metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats())) | ||
| .filter(toIcebergExpression(predicate)) | ||
| .planWith(executor); | ||
|
|
There was a problem hiding this comment.
issue (bug_risk): Potential risk if snapshotId is missing for INCREMENTAL table type.
If both getSnapshotId() and getChangelogEndSnapshot() are empty, and oldestAncestor or currentSnapshot are also missing, a NullPointerException may occur. Please add explicit checks or error handling for missing snapshots.
| this.snapshotSequenceNumber = snapshotSequenceNumber; | ||
| } | ||
|
|
||
| @JsonProperty |
There was a problem hiding this comment.
suggestion: Returning primitive long for getSnapshotSequenceNumber may be problematic if unset.
Consider using OptionalLong for getSnapshotSequenceNumber, or clearly document how unset or invalid values are represented.
Suggested implementation:
@JsonProperty("snapshotSequenceNumber") OptionalLong snapshotSequenceNumber)
{
requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.path = requireNonNull(path, "path is null");
this.dataSequenceNumber = dataSequenceNumber;
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
this.affinitySchedulingFileSectionIndex = start / affinitySchedulingFileSectionSize;
this.snapshotSequenceNumber = requireNonNull(snapshotSequenceNumber, "snapshotSequenceNumber is null");
} @JsonProperty
public OptionalLong getSnapshotSequenceNumber()
{
return snapshotSequenceNumber;
}You will need to:
- Update the field declaration for
snapshotSequenceNumberin the class to beprivate final OptionalLong snapshotSequenceNumber;. - Update any other usages of
snapshotSequenceNumberin the class to handleOptionalLong(e.g., usesnapshotSequenceNumber.isPresent()andsnapshotSequenceNumber.getAsLong()). - Adjust JSON serialization/deserialization if you have custom logic elsewhere in the class or related classes.
- Update any code that constructs this class to pass an
OptionalLonginstead of a primitivelong.
| if (icebergColumns.stream().anyMatch(colum -> colum.isSnapshotSequenceNumberColumn() | ||
| && !icebergLayout.getPredicateColumns().containsKey(colum.getName()))) { | ||
| throw new PrestoException( |
There was a problem hiding this comment.
issue: The check for $snapshot_sequence_number projection may not handle case sensitivity.
If column names differ in case, this check may not detect all instances of $snapshot_sequence_number. Normalize column names or enforce case sensitivity to ensure the restriction is applied consistently.
hantangwangd
left a comment
There was a problem hiding this comment.
Hi @agrawalreetika, thanks for the change. Based on your work in this PR, I tried a small experiment where I completely removed the predicate on snapshot_sequence_number from the table's constraint. This approach seems to avoid exposing the column in the optimized plan's TableScanNode and FilterNode altogether. See here.
Please take a look at this change when you have a moment. Looking forward to your feedback.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
Thanks for the review, @hantangwangd. I went through the suggested change, and it does look more appropriate — especially since it removes the need to carry |
6231cd4 to
ddcc304
Compare
hantangwangd
left a comment
There was a problem hiding this comment.
@agrawalreetika thanks for supporting this feature. The approach and implementation overall looks good to me. I've left some comments, mostly minor things.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
ddcc304 to
d3bb40e
Compare
Thanks for the review @hantangwangd . I have made the changes based on the review comments, please take a look when you get a chance. |
d3bb40e to
6a0d42b
Compare
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
6a0d42b to
4ca83f1
Compare
hantangwangd
left a comment
There was a problem hiding this comment.
@agrawalreetika thanks for the fix, looks good to me!
| TupleDomain<ColumnHandle> newDomainPredicate; | ||
| Optional<Map<ColumnHandle, Domain>> optionalDomains = constraint.getSummary().getDomains(); | ||
| if (optionalDomains.isPresent()) { | ||
| Map<ColumnHandle, Domain> domains = optionalDomains.get(); | ||
| ImmutableMap.Builder<ColumnHandle, Domain> newDomains = ImmutableMap.builder(); | ||
| for (Map.Entry<ColumnHandle, Domain> entry : domains.entrySet()) { | ||
| IcebergColumnHandle column = (IcebergColumnHandle) entry.getKey(); | ||
|
|
||
| if (!column.getName().equalsIgnoreCase("$snapshot_sequence_number")) { | ||
| newDomains.put(entry.getKey(), entry.getValue()); | ||
| continue; | ||
| } | ||
|
|
||
| Domain domain = entry.getValue(); | ||
| if (domain.isSingleValue()) { | ||
| long sequenceNumber = ((Number) domain.getSingleValue()).longValue(); | ||
| Snapshot endSnapshot = validateAndFetchSnapshot(icebergTable, sequenceNumber); | ||
| handle = handle.withUpdatedIcebergTableName(prepareIncrementalScan(icebergTable, endSnapshot, name, endSnapshot, true), true); | ||
| continue; | ||
| } | ||
|
|
||
| List<Range> ranges = domain.getValues().getRanges().getOrderedRanges(); | ||
| if (ranges.size() != 1) { | ||
| throw unsupportedPredicate(); | ||
| } | ||
|
|
||
| Range range = ranges.get(0); | ||
| if (range.isSingleValue()) { | ||
| long seqNum = ((Number) range.getSingleValue()).longValue(); | ||
| Snapshot endSnapshot = validateAndFetchSnapshot(icebergTable, seqNum); | ||
| handle = handle.withUpdatedIcebergTableName(prepareIncrementalScan(icebergTable, endSnapshot, name, endSnapshot, true), true); | ||
| } | ||
| else if (!range.isLowUnbounded() && range.isLowInclusive() && range.isHighUnbounded()) { | ||
| // WHERE $snapshot_sequence_number >= X (delta from x to latest) | ||
| long lowerSequenceNumber = ((Number) range.getLowBoundedValue()).longValue(); | ||
| Snapshot start = validateAndFetchSnapshot(icebergTable, lowerSequenceNumber); | ||
| handle = handle.withUpdatedIcebergTableName(prepareIncrementalScan(icebergTable, start, name, latestSnapshot, true), true); | ||
| } | ||
| else if (!range.isLowUnbounded() && !range.isLowInclusive() && range.isHighUnbounded()) { | ||
| // WHERE $snapshot_sequence_number > X, excludes lower bound | ||
| long lowerSequenceNumber = ((Number) range.getLowBoundedValue()).longValue(); | ||
| Snapshot start = validateAndFetchSnapshot(icebergTable, lowerSequenceNumber); | ||
| handle = handle.withUpdatedIcebergTableName(prepareIncrementalScan(icebergTable, start, name, latestSnapshot, false), false); | ||
| } | ||
| else if (range.isLowInclusive() && range.isHighInclusive()) { | ||
| // WHERE $snapshot_sequence_number BETWEEN X AND Y | ||
| handle = handle.withUpdatedIcebergTableName(prepareBetweenScan(icebergTable, name, range, false), false); | ||
| } | ||
| else { | ||
| throw unsupportedPredicate(); | ||
| } | ||
| } | ||
| newDomainPredicate = TupleDomain.withColumnDomains(newDomains.build()); | ||
| } | ||
| else { | ||
| newDomainPredicate = TupleDomain.none(); | ||
| } |
There was a problem hiding this comment.
Can this be extracted into a new method?
4ca83f1 to
d661828
Compare
d661828 to
112a237
Compare
| final IcebergTableHandle newHandle; | ||
| final TupleDomain<ColumnHandle> newDomainPredicate; |
There was a problem hiding this comment.
Add visibility modifiers
| } | ||
| else if (range.isLowInclusive() && range.isHighInclusive()) { | ||
| // WHERE $snapshot_sequence_number BETWEEN X AND Y | ||
| handle = handle.withUpdatedIcebergTableName(prepareBetweenScan(icebergTable, name, range, false), false); |
There was a problem hiding this comment.
SQL between is inclusive, so shouldn't the fromInclusive flag be set to true?
There was a problem hiding this comment.
Yeah, I agree — for BETWEEN, the lower bound (X) should be inclusive as per SQL semantics.
I’m not sure if we previously discussed making it exclusive for a specific reason, or if that was just an oversight in the earlier summary. Checking if @hantangwangd recalls any context here
There was a problem hiding this comment.
There's no particular reason—the behavior for BETWEEN was directly carried over from @agrawalreetika's original design. I agree with keeping consistent with SQL semantics as well.
| if (domain.isSingleValue()) { | ||
| long sequenceNumber = ((Number) domain.getSingleValue()).longValue(); | ||
| Snapshot endSnapshot = validateAndFetchSnapshot(icebergTable, sequenceNumber); | ||
| handle = handle.withUpdatedIcebergTableName(prepareIncrementalScan(icebergTable, endSnapshot, name, endSnapshot, true), true); | ||
| continue; | ||
| } | ||
|
|
||
| List<Range> ranges = domain.getValues().getRanges().getOrderedRanges(); | ||
| if (ranges.size() != 1) { | ||
| throw unsupportedPredicate(); | ||
| } | ||
|
|
||
| Range range = ranges.get(0); | ||
| if (range.isSingleValue()) { | ||
| long seqNum = ((Number) range.getSingleValue()).longValue(); | ||
| Snapshot endSnapshot = validateAndFetchSnapshot(icebergTable, seqNum); | ||
| handle = handle.withUpdatedIcebergTableName(prepareIncrementalScan(icebergTable, endSnapshot, name, endSnapshot, true), true); |
There was a problem hiding this comment.
Will we ever actually hit range.isSingleValue()--would that be hit by domain.isSingleValue()?
There was a problem hiding this comment.
Yeah, this range.isSingleValue() block seems unreachable.
Tried a couple of scenarios -
WHERE "$snapshot_sequence_number" BETWEEN 1 AND 1;
WHERE "$snapshot_sequence_number" = 1;
WHERE "$snapshot_sequence_number" in (1);
| return new IcebergTableName(name.getTableName(), INCREMENTAL, Optional.of(lower.snapshotId()), Optional.of(upper.snapshotId())); | ||
| } | ||
|
|
||
| public static PrestoException unsupportedPredicate() |
There was a problem hiding this comment.
Can you make private whatever is not being used outside of IcebergUtil?
| private final List<SortField> sortOrder; | ||
| private final List<IcebergColumnHandle> updatedColumns; | ||
| private final Optional<SchemaTableName> materializedViewName; | ||
| private final boolean fromInclusive; |
There was a problem hiding this comment.
Could this field be easily moved into IcebergTableName?
| .collect(toImmutableMap(Entry::getKey, Entry::getValue)); | ||
| } | ||
|
|
||
| public static class IcebergPredicateRewriteResult |
There was a problem hiding this comment.
Instead of creating this holder class, can we directly return the handle, and have a new public utility method which just filters out the snapshot sequence ID from the domains of the TupleDomain?
112a237 to
96fad4f
Compare
|
Thanks for your review, @tdcmeehan. I have updated the PR based on the review comment. Please take a look. |
Description
Add
$snapshot_sequence_numberas a hidden column in the iceberg tableMotivation and Context
$snapshot_sequence_numberextra column in$snapshotsmetadata table$snapshot_sequence_numberas a hidden column in the iceberg tableAs
$snapshot_idis not incremental so while calculating Delta between 2 snapshots (With query likeWHERE $snapshot_id BETWEEN snap1 AND snap2) would return wrong results since comparison is not valid.Impact
Get the delta between 2 snapshots using
$snapshot_sequence_numberas column.Here these are the covered options -
WHERE "$snapshot_sequence_number" >= X(Get delta from start snapshot to X)WHERE "$snapshot_sequence_number" BETWEEN X AND Y(Get delta from start snapshot to X)WHERE "$snapshot_sequence_number" = XWHERE "$snapshot_sequence_number" > XLimitation -
Above are using
newIncrementalAppendScan()Iceberg API which is append-only snapshot operation mode. AlsoIncrementalChangelogScanhas limitation and doesn't support delta for snapshots with operation modesdelete&overwriteand fails here.Test Plan
Tests Added
$snapshot_sequence_numberfrom$snapshotsmetadata table$snapshot_sequence_numbercolumn for filter to find delta on a tableContributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.