Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.apache.iceberg;

import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.SnapshotUtil;

class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta {
private Long startingSnapshotId = null; // check all versions by default
Expand Down Expand Up @@ -95,24 +97,44 @@ public RowDelta validateNoConflictingDeleteFiles() {
return this;
}

@Override
public RowDelta toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
protected void validate(TableMetadata base, Snapshot snapshot) {
if (base.currentSnapshot() != null) {
if (snapshot != null) {
boolean startingIsNullOrAncestor =
startingSnapshotId == null
|| SnapshotUtil.isAncestorOf(
snapshot.snapshotId(), startingSnapshotId, base::snapshot);

ValidationException.check(
startingIsNullOrAncestor,
"Cannot perform validation starting snapshot %d is not an ancestor of %d",
startingSnapshotId,
snapshot.snapshotId());

if (!referencedDataFiles.isEmpty()) {
validateDataFilesExist(
base,
startingSnapshotId,
snapshot.snapshotId(),
referencedDataFiles,
!validateDeletes,
conflictDetectionFilter);
}

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter);
validateAddedDataFiles(
base, startingSnapshotId, snapshot.snapshotId(), conflictDetectionFilter);
}

if (validateNewDeleteFiles) {
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter);
validateNoNewDeleteFiles(
base, startingSnapshotId, snapshot.snapshotId(), conflictDetectionFilter);
}
}
}
Expand Down
191 changes: 188 additions & 3 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ protected void validateAddedDataFiles(
}

/**
* Validates that no files matching a filter have been added to the table since a starting
* ToDo: Remove after branch writing implementations complete.
*
* <p>Validates that no files matching a filter have been added to the table since a starting
* snapshot.
*
* @param base table metadata to validate
Expand All @@ -323,10 +325,43 @@ protected void validateAddedDataFiles(
}
}

/**
* Validates that no files matching a filter have been added to the table from a starting snapshot
* to an ending snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param conflictDetectionFilter an expression used to find new conflicting data files
*/
protected void validateAddedDataFiles(
TableMetadata base,
Long startingSnapshotId,
Long endingSnapshotId,
Expression conflictDetectionFilter) {
CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
addedDataFiles(base, startingSnapshotId, endingSnapshotId, conflictDetectionFilter, null);

try (CloseableIterator<ManifestEntry<DataFile>> conflicts = conflictEntries.iterator()) {
if (conflicts.hasNext()) {
throw new ValidationException(
"Found conflicting files that can contain records matching %s: %s",
conflictDetectionFilter,
Iterators.toString(
Iterators.transform(conflicts, entry -> entry.file().path().toString())));
}

} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to validate no appends matching %s", conflictDetectionFilter), e);
}
}

/**
* Returns an iterable of files matching a filter have been added to the table since a starting
* snapshot.
*
* <p>ToDo: Remove when branch writing operations are complete.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find new data files
Expand Down Expand Up @@ -369,6 +404,57 @@ private CloseableIterable<ManifestEntry<DataFile>> addedDataFiles(
return manifestGroup.entries();
}

/**
* Returns an iterable of files matching a filter have been added to the table since a starting
* snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find new data files
* @param partitionSet a set of partitions to find new data files
*/
private CloseableIterable<ManifestEntry<DataFile>> addedDataFiles(
TableMetadata base,
Long startingSnapshotId,
Long endingSnapshotId,
Expression dataFilter,
PartitionSet partitionSet) {
// if there is no current table state, no files have been added
if (base.currentSnapshot() == null) {
return CloseableIterable.empty();
}

Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(
base,
startingSnapshotId,
endingSnapshotId,
VALIDATE_ADDED_FILES_OPERATIONS,
ManifestContent.DATA);
List<ManifestFile> manifests = history.first();
Set<Long> newSnapshots = history.second();

ManifestGroup manifestGroup =
new ManifestGroup(ops.io(), manifests, ImmutableList.of())
.caseSensitive(caseSensitive)
.filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId()))
.specsById(base.specsById())
.ignoreDeleted()
.ignoreExisting();

if (dataFilter != null) {
manifestGroup = manifestGroup.filterData(dataFilter);
}

if (partitionSet != null) {
manifestGroup =
manifestGroup.filterManifestEntries(
entry -> partitionSet.contains(entry.file().specId(), entry.file().partition()));
}

return manifestGroup.entries();
}

/**
* Validates that no new delete files that must be applied to the given data files have been added
* to the table since a starting snapshot.
Expand Down Expand Up @@ -451,8 +537,10 @@ private void validateNoNewDeletesForDataFiles(
}

/**
* Validates that no delete files matching a filter have been added to the table since a starting
* snapshot.
* ToDo: Remove after branch writing implementation complete
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the existing validate after we implement the different write operations because we will always have an ending snapshot. This is fine because validations are all protected/private.

*
* <p>Validates that no delete files matching a filter have been added to the table since a
* starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
Expand All @@ -468,6 +556,25 @@ protected void validateNoNewDeleteFiles(
Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
}

/**
* Validates that no delete files matching a filter have been added to the table from a starting
* snapshot to the ending snapshot
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find new conflicting delete files
*/
protected void validateNoNewDeleteFiles(
TableMetadata base, Long startingSnapshotId, Long endingSnapshotId, Expression dataFilter) {
DeleteFileIndex deletes =
addedDeleteFiles(base, startingSnapshotId, endingSnapshotId, dataFilter, null);
ValidationException.check(
deletes.isEmpty(),
"Found new conflicting delete files that can apply to records matching %s: %s",
dataFilter,
Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
}

/**
* Validates that no delete files matching a partition set have been added to the table since a
* starting snapshot.
Expand Down Expand Up @@ -518,6 +625,40 @@ protected DeleteFileIndex addedDeleteFiles(
return buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, partitionSet);
}

/**
* Returns matching delete files have been added to the table since a starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find delete files
* @param partitionSet a partition set used to find delete files
*/
protected DeleteFileIndex addedDeleteFiles(
TableMetadata base,
Long startingSnapshotId,
Long endingSnapshotId,
Expression dataFilter,
PartitionSet partitionSet) {
// if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
.specsById(base.specsById())
.build();
}

Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(
base,
startingSnapshotId,
endingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS,
ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();

long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId);
return buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, partitionSet);
}

/**
* Validates that no files matching a filter have been deleted from the table since a starting
* snapshot.
Expand Down Expand Up @@ -659,6 +800,7 @@ private DeleteFileIndex buildDeleteFileIndex(
protected void validateDataFilesExist(
TableMetadata base,
Long startingSnapshotId,
Long endingSnapshotId,
CharSequenceSet requiredDataFiles,
boolean skipDeletes,
Expression conflictDetectionFilter) {
Expand Down Expand Up @@ -705,6 +847,7 @@ protected void validateDataFilesExist(
}
}

// ToDo: Remove this after branch write implementations complete.
private Pair<List<ManifestFile>, Set<Long>> validationHistory(
TableMetadata base,
Long startingSnapshotId,
Expand Down Expand Up @@ -747,6 +890,48 @@ private Pair<List<ManifestFile>, Set<Long>> validationHistory(
return Pair.of(manifests, newSnapshots);
}

private Pair<List<ManifestFile>, Set<Long>> validationHistory(
TableMetadata base,
Long startingSnapshotId,
Long endingSnapshotId,
Set<String> matchingOperations,
ManifestContent content) {
List<ManifestFile> manifests = Lists.newArrayList();
Set<Long> newSnapshots = Sets.newHashSet();

Snapshot lastSnapshot = null;
Iterable<Snapshot> snapshots =
SnapshotUtil.ancestorsBetween(endingSnapshotId, startingSnapshotId, base::snapshot);
for (Snapshot currentSnapshot : snapshots) {
lastSnapshot = currentSnapshot;

if (matchingOperations.contains(currentSnapshot.operation())) {
newSnapshots.add(currentSnapshot.snapshotId());
if (content == ManifestContent.DATA) {
for (ManifestFile manifest : currentSnapshot.dataManifests(ops.io())) {
if (manifest.snapshotId() == currentSnapshot.snapshotId()) {
manifests.add(manifest);
}
}
} else {
for (ManifestFile manifest : currentSnapshot.deleteManifests(ops.io())) {
if (manifest.snapshotId() == currentSnapshot.snapshotId()) {
manifests.add(manifest);
}
}
}
}
}

ValidationException.check(
lastSnapshot == null || Objects.equals(lastSnapshot.parentId(), startingSnapshotId),
"Cannot determine history between starting snapshot %s and the last known ancestor %s",
startingSnapshotId,
lastSnapshot != null ? lastSnapshot.snapshotId() : null);

return Pair.of(manifests, newSnapshots);
}

@Override
protected Map<String, String> summary() {
summaryBuilder.setPartitionSummaryLimit(
Expand Down
Loading