Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public BaseOverwriteFiles toBranch(String branch) {
}

@Override
protected void validate(TableMetadata base, Snapshot snapshot) {
protected void validate(TableMetadata base, Snapshot parent) {
if (validateAddedFilesMatchOverwriteFilter) {
PartitionSpec spec = dataSpec();
Expression rowFilter = rowFilter();
Expand Down Expand Up @@ -139,19 +139,19 @@ protected void validate(TableMetadata base, Snapshot snapshot) {
}

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), snapshot);
validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), parent);
}

if (validateNewDeletes) {
if (rowFilter() != Expressions.alwaysFalse()) {
Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter();
validateNoNewDeleteFiles(base, startingSnapshotId, filter, snapshot);
validateDeletedDataFiles(base, startingSnapshotId, filter, snapshot);
validateNoNewDeleteFiles(base, startingSnapshotId, filter, parent);
validateDeletedDataFiles(base, startingSnapshotId, filter, parent);
}

if (deletedDataFiles.size() > 0) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, conflictDetectionFilter, deletedDataFiles, snapshot);
base, startingSnapshotId, conflictDetectionFilter, deletedDataFiles, parent);
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,25 @@ public BaseReplacePartitions toBranch(String branch) {
}

@Override
public void validate(TableMetadata currentMetadata, Snapshot snapshot) {
public void validate(TableMetadata currentMetadata, Snapshot parent) {
if (validateConflictingData) {
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot);
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot);
validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}

if (validateConflictingDeletes) {
if (dataSpec().isUnpartitioned()) {
validateDeletedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot);
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
validateNoNewDeleteFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot);
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot);
validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot);
validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public BaseRewriteFiles toBranch(String branch) {
}

@Override
protected void validate(TableMetadata base, Snapshot snapshot) {
protected void validate(TableMetadata base, Snapshot parent) {
if (replacedDataFiles.size() > 0) {
// if there are replaced data files, there cannot be any new row-level deletes for those data
// files
validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, snapshot);
validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, parent);
}
}
}
40 changes: 40 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -394,4 +396,42 @@ public static Schema schemaFor(Table table, Long snapshotId, Long timestampMilli

return table.schema();
}

/**
* Fetch the snapshot at the head of the given branch in the given table.
*
* <p>This method calls {@link Table#currentSnapshot()} instead of using branch API {@link
* Table#snapshot(String)} for the main branch so that existing code still goes through the old
* code path to ensure backwards compatibility.
*
* @param table a {@link Table}
* @param branch branch name of the table
* @return the latest snapshot for the given branch
*/
public static Snapshot latestSnapshot(Table table, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.currentSnapshot();
}

return table.snapshot(branch);
}

/**
* Fetch the snapshot at the head of the given branch in the given table.
*
* <p>This method calls {@link TableMetadata#currentSnapshot()} instead of using branch API {@link
* TableMetadata#ref(String)}} for the main branch so that existing code still goes through the
* old code path to ensure backwards compatibility.
*
* @param metadata a {@link TableMetadata}
* @param branch branch name of the table metadata
* @return the latest snapshot for the given branch
*/
public static Snapshot latestSnapshot(TableMetadata metadata, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return metadata.currentSnapshot();
}

return metadata.snapshot(metadata.ref(branch).snapshotId());
}
}
17 changes: 0 additions & 17 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,23 +384,6 @@ Snapshot apply(SnapshotUpdate snapshotUpdate, String branch) {
}
}

@SuppressWarnings("checkstyle:HiddenField")
Snapshot latestSnapshot(Table table, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.currentSnapshot();
}

return table.snapshot(branch);
}

Snapshot latestSnapshot(TableMetadata metadata, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return metadata.currentSnapshot();
}

return metadata.snapshot(metadata.ref(branch).snapshotId());
}

void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile... newFiles) {
Assert.assertEquals(
"Should not change delete manifests",
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.iceberg.ManifestEntry.Status;
Expand Down
1 change: 1 addition & 0 deletions core/src/test/java/org/apache/iceberg/TestMergeAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import static org.apache.iceberg.relocated.com.google.common.collect.Iterators.concat;
import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot;

import java.io.File;
import java.io.IOException;
Expand Down
9 changes: 5 additions & 4 deletions core/src/test/java/org/apache/iceberg/TestOverwrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.iceberg.expressions.Expressions.lessThan;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -182,16 +183,16 @@ public void testOverwriteFailsDelete() {
@Test
public void testOverwriteWithAppendOutsideOfDelete() {
TableMetadata base = TestTables.readMetadata(TABLE_NAME);
long baseId =
latestSnapshot(base, branch) == null ? -1 : latestSnapshot(base, branch).snapshotId();
Snapshot latestSnapshot = latestSnapshot(base, branch);
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to handle in this PR but given #6651 we will end up using the SnapshotUtil api which is introduced. We can either update this PR with the proposed utility API since there's consensus on that or just later refactor to use the utility API when it gets merged as part of branch writes. Since this is a test class, I'm okay either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can just add the SnapshotUtil change in this PR? I don't think we can resolve the comments in that PR in time based on the current situation.

long baseId = latestSnapshot == null ? -1 : latestSnapshot.snapshotId();

commit(
table,
table
.newOverwrite()
.overwriteByRowFilter(equal("date", "2018-06-08"))
.addFile(FILE_10_TO_14),
branch); // in 2018-06-09, NOT in 2018-06-08
.addFile(FILE_10_TO_14), // in 2018-06-09, NOT in 2018-06-08
branch);

long overwriteId = latestSnapshot(table, branch).snapshotId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot;

import java.io.File;
import java.io.IOException;
Expand Down
Loading