diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
index f422bc601ab5..7f57195edc25 100644
--- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
+++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
@@ -42,7 +42,7 @@ protected String operation() {
@Override
public ReplacePartitions addFile(DataFile file) {
- dropPartition(file.partition());
+ dropPartition(file.specId(), file.partition());
add(file);
return this;
}
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index b68ba901a8b6..5ec4e1354941 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -40,12 +40,14 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.Tasks;
@@ -53,31 +55,45 @@
/**
* An index of {@link DeleteFile delete files} by sequence number.
*
- * Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(int, long, DataFile)} or
- * {@link #forEntry(int, ManifestEntry)} to get the the delete files to apply to a given data file.
+ * Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, DataFile)} or
+ * {@link #forEntry(ManifestEntry)} to get the the delete files to apply to a given data file.
*/
class DeleteFileIndex {
private static final DeleteFile[] NO_DELETE_FILES = new DeleteFile[0];
+ private final Map partitionTypeById;
+ private final Map> wrapperById;
private final long[] globalSeqs;
private final DeleteFile[] globalDeletes;
private final Map, Pair> sortedDeletesByPartition;
- private final ThreadLocal lookupWrapper = ThreadLocal.withInitial(
- () -> StructLikeWrapper.wrap(null));
- DeleteFileIndex(long[] globalSeqs, DeleteFile[] globalDeletes,
+ DeleteFileIndex(Map specsById, long[] globalSeqs, DeleteFile[] globalDeletes,
Map, Pair> sortedDeletesByPartition) {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ specsById.forEach((specId, spec) -> builder.put(specId, spec.partitionType()));
+ this.partitionTypeById = builder.build();
+ this.wrapperById = Maps.newHashMap();
this.globalSeqs = globalSeqs;
this.globalDeletes = globalDeletes;
this.sortedDeletesByPartition = sortedDeletesByPartition;
}
- DeleteFile[] forEntry(int specId, ManifestEntry entry) {
- return forDataFile(specId, entry.sequenceNumber(), entry.file());
+ private StructLikeWrapper newWrapper(int specId) {
+ return StructLikeWrapper.forType(partitionTypeById.get(specId));
}
- DeleteFile[] forDataFile(int specId, long sequenceNumber, DataFile file) {
- Pair partition = Pair.of(specId, lookupWrapper.get().set(file.partition()));
+ private Pair partition(int specId, StructLike struct) {
+ ThreadLocal wrapper = wrapperById.computeIfAbsent(specId,
+ id -> ThreadLocal.withInitial(() -> newWrapper(id)));
+ return Pair.of(specId, wrapper.get().set(struct));
+ }
+
+ DeleteFile[] forEntry(ManifestEntry entry) {
+ return forDataFile(entry.sequenceNumber(), entry.file());
+ }
+
+ DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
+ Pair partition = partition(file.specId(), file.partition());
Pair partitionDeletes = sortedDeletesByPartition.get(partition);
if (partitionDeletes == null) {
@@ -164,15 +180,15 @@ Builder planWith(ExecutorService newExecutorService) {
DeleteFileIndex build() {
// read all of the matching delete manifests in parallel and accumulate the matching files in a queue
- Queue>> deleteEntries = new ConcurrentLinkedQueue<>();
+ Queue> deleteEntries = new ConcurrentLinkedQueue<>();
Tasks.foreach(deleteManifestReaders())
.stopOnFailure().throwFailureWhenFinished()
.executeWith(executorService)
- .run(specIdAndReader -> {
- try (CloseableIterable> reader = specIdAndReader.second()) {
+ .run(deleteFile -> {
+ try (CloseableIterable> reader = deleteFile) {
for (ManifestEntry entry : reader) {
// copy with stats for better filtering against data file stats
- deleteEntries.add(Pair.of(specIdAndReader.first(), entry.copy()));
+ deleteEntries.add(entry.copy());
}
} catch (IOException e) {
throw new RuntimeIOException("Failed to close", e);
@@ -182,10 +198,11 @@ DeleteFileIndex build() {
// build a map from (specId, partition) to delete file entries
ListMultimap, ManifestEntry> deleteFilesByPartition =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
- for (Pair> specIdAndEntry : deleteEntries) {
- int specId = specIdAndEntry.first();
- ManifestEntry entry = specIdAndEntry.second();
- deleteFilesByPartition.put(Pair.of(specId, StructLikeWrapper.wrap(entry.file().partition())), entry);
+ for (ManifestEntry entry : deleteEntries) {
+ int specId = entry.file().specId();
+ StructLikeWrapper wrapper = StructLikeWrapper.forType(specsById.get(specId).partitionType())
+ .set(entry.file().partition());
+ deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
}
// sort the entries in each map value by sequence number and split into sequence numbers and delete files lists
@@ -237,10 +254,10 @@ DeleteFileIndex build() {
}
}
- return new DeleteFileIndex(globalApplySeqs, globalDeletes, sortedDeletesByPartition);
+ return new DeleteFileIndex(specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
}
- private Iterable>>> deleteManifestReaders() {
+ private Iterable>> deleteManifestReaders() {
LoadingCache evalCache = specsById == null ? null :
Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
@@ -257,13 +274,12 @@ private Iterable>>> de
return Iterables.transform(
matchingManifests,
- manifest -> Pair.of(
- manifest.partitionSpecId(),
+ manifest ->
ManifestFiles.readDeleteManifest(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
- .liveEntries())
+ .liveEntries()
);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
index ccb0fc2286fa..15c01e9eb6bd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
@@ -42,7 +42,7 @@
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.ManifestFileUtil;
-import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
@@ -65,9 +65,10 @@ public String partition() {
}
}
+ private final Map specsById;
+ private final PartitionSet deleteFilePartitions;
+ private final PartitionSet dropPartitions;
private final Set deletePaths = CharSequenceSet.empty();
- private final Set deleteFilePartitions = Sets.newHashSet();
- private final Set dropPartitions = Sets.newHashSet();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
@@ -82,7 +83,12 @@ public String partition() {
private final Map> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();
- protected abstract PartitionSpec spec(int specId);
+ protected ManifestFilterManager(Map specsById) {
+ this.specsById = specsById;
+ this.deleteFilePartitions = PartitionSet.create(specsById);
+ this.dropPartitions = PartitionSet.create(specsById);
+ }
+
protected abstract void deleteFile(String location);
protected abstract ManifestWriter newManifestWriter(PartitionSpec spec);
protected abstract ManifestReader newManifestReader(ManifestFile manifest);
@@ -110,10 +116,10 @@ protected void deleteByRowFilter(Expression expr) {
/**
* Add a partition tuple to drop from the table during the delete phase.
*/
- protected void dropPartition(StructLike partition) {
+ protected void dropPartition(int specId, StructLike partition) {
Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null");
invalidateFilteredCache();
- dropPartitions.add(StructLikeWrapper.wrap(partition));
+ dropPartitions.add(specId, partition);
}
/**
@@ -138,7 +144,7 @@ void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
- deleteFilePartitions.add(StructLikeWrapper.wrap(file.partition()));
+ deleteFilePartitions.add(file.specId(), file.partition());
}
/**
@@ -191,7 +197,7 @@ SnapshotSummary.Builder buildSummary(Iterable manifests) {
SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
for (ManifestFile manifest : manifests) {
- PartitionSpec manifestSpec = spec(manifest.partitionSpecId());
+ PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId());
Iterable manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
@@ -282,19 +288,16 @@ private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, Man
}
try (ManifestReader reader = newManifestReader(manifest)) {
- // reused to compare file partitions with the drop set
- StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null);
-
// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
- boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader, partitionWrapper);
+ boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
}
- return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader, partitionWrapper);
+ return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader);
} catch (IOException e) {
throw new RuntimeIOException("Failed to close manifest: " + manifest, e);
@@ -305,7 +308,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainExpressionDeletes;
if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
ManifestEvaluator manifestEvaluator =
- ManifestEvaluator.forRowFilter(deleteExpression, spec(manifest.partitionSpecId()), true);
+ ManifestEvaluator.forRowFilter(deleteExpression, specsById.get(manifest.partitionSpecId()), true);
canContainExpressionDeletes = manifestEvaluator.eval(manifest);
} else {
canContainExpressionDeletes = false;
@@ -313,10 +316,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainDroppedPartitions;
if (dropPartitions.size() > 0) {
- canContainDroppedPartitions = ManifestFileUtil.canContainAny(
- manifest,
- Iterables.transform(dropPartitions, StructLikeWrapper::get),
- this::spec);
+ canContainDroppedPartitions = ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
} else {
canContainDroppedPartitions = false;
}
@@ -326,10 +326,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
canContainDroppedFiles = true;
} else if (deletePaths.size() > 0) {
// because there were no path-only deletes, the set of deleted file partitions is valid
- canContainDroppedFiles = ManifestFileUtil.canContainAny(
- manifest,
- Iterables.transform(deleteFilePartitions, StructLikeWrapper::get),
- this::spec);
+ canContainDroppedFiles = ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
} else {
canContainDroppedFiles = false;
}
@@ -341,7 +338,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
}
private boolean manifestHasDeletedFiles(
- StrictMetricsEvaluator metricsEvaluator, ManifestReader reader, StructLikeWrapper partitionWrapper) {
+ StrictMetricsEvaluator metricsEvaluator, ManifestReader reader) {
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
@@ -349,7 +346,7 @@ private boolean manifestHasDeletedFiles(
for (ManifestEntry entry : reader.entries()) {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
- dropPartitions.contains(partitionWrapper.set(file.partition())) ||
+ dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if (fileDelete || inclusive.eval(file.partition())) {
ValidationException.check(
@@ -368,8 +365,7 @@ private boolean manifestHasDeletedFiles(
}
private ManifestFile filterManifestWithDeletedFiles(
- StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader reader,
- StructLikeWrapper partitionWrapper) {
+ StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader reader) {
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
@@ -384,7 +380,7 @@ private ManifestFile filterManifestWithDeletedFiles(
reader.entries().forEach(entry -> {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
- dropPartitions.contains(partitionWrapper.set(file.partition())) ||
+ dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if (entry.status() != ManifestEntry.Status.DELETED) {
if (fileDelete || inclusive.eval(file.partition())) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index a491080ee980..d323660ca3ac 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -172,10 +172,10 @@ public CloseableIterable planFiles() {
ResidualEvaluator residuals = residualCache.get(specId);
if (dropStats) {
return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
- e.file().copyWithoutStats(), deleteFiles.forEntry(specId, e), schemaString, specString, residuals));
+ e.file().copyWithoutStats(), deleteFiles.forEntry(e), schemaString, specString, residuals));
} else {
return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
- e.file().copy(), deleteFiles.forEntry(specId, e), schemaString, specString, residuals));
+ e.file().copy(), deleteFiles.forEntry(e), schemaString, specString, residuals));
}
});
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index e231e35cd284..43567d1b963b 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -132,10 +132,10 @@ protected void deleteByRowFilter(Expression expr) {
/**
* Add a partition tuple to drop from the table during the delete phase.
*/
- protected void dropPartition(StructLike partition) {
+ protected void dropPartition(int specId, StructLike partition) {
// dropping the data in a partition also drops all deletes in the partition
- filterManager.dropPartition(partition);
- deleteFilterManager.dropPartition(partition);
+ filterManager.dropPartition(specId, partition);
+ deleteFilterManager.dropPartition(specId, partition);
}
/**
@@ -368,9 +368,8 @@ private ManifestFile newDeleteFilesAsManifest() {
}
private class DataFileFilterManager extends ManifestFilterManager {
- @Override
- protected PartitionSpec spec(int specId) {
- return ops.current().spec(specId);
+ private DataFileFilterManager() {
+ super(ops.current().specsById());
}
@Override
@@ -421,9 +420,8 @@ protected ManifestReader newManifestReader(ManifestFile manifest) {
}
private class DeleteFileFilterManager extends ManifestFilterManager {
- @Override
- protected PartitionSpec spec(int specId) {
- return ops.current().spec(specId);
+ private DeleteFileFilterManager() {
+ super(ops.current().specsById());
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
index 31cfc4457b52..92564e683695 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
@@ -21,14 +21,12 @@
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
-import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.WapUtil;
public class SnapshotManager extends MergingSnapshotProducer implements ManageSnapshots {
@@ -38,15 +36,17 @@ private enum SnapshotManagerOperation {
ROLLBACK
}
+ private final Map specsById;
private SnapshotManagerOperation managerOperation = null;
private Long targetSnapshotId = null;
private String snapshotOperation = null;
private Long requiredCurrentSnapshotId = null;
private Long overwriteParentId = null;
- private Set replacedPartitions = null;
+ private PartitionSet replacedPartitions = null;
SnapshotManager(String tableName, TableOperations ops) {
super(tableName, ops);
+ this.specsById = ops.current().specsById();
}
@Override
@@ -100,7 +100,7 @@ public ManageSnapshots cherrypick(long snapshotId) {
this.managerOperation = SnapshotManagerOperation.CHERRYPICK;
this.targetSnapshotId = snapshotId;
this.snapshotOperation = cherryPickSnapshot.operation();
- this.replacedPartitions = Sets.newHashSet();
+ this.replacedPartitions = PartitionSet.create(specsById);
// check that all deleted files are still in the table
failMissingDeletePaths();
@@ -108,7 +108,7 @@ public ManageSnapshots cherrypick(long snapshotId) {
// copy adds from the picked snapshot
for (DataFile addedFile : cherryPickSnapshot.addedFiles()) {
add(addedFile);
- replacedPartitions.add(StructLikeWrapper.wrap(addedFile.partition()));
+ replacedPartitions.add(addedFile.specId(), addedFile.partition());
}
// copy deletes from the picked snapshot
@@ -261,14 +261,13 @@ private static void validateNonAncestor(TableMetadata meta, long snapshotId) {
}
private static void validateReplacedPartitions(TableMetadata meta, Long parentId,
- Set replacedPartitions) {
+ PartitionSet replacedPartitions) {
if (replacedPartitions != null) {
ValidationException.check(parentId == null || isCurrentAncestor(meta, parentId),
"Cannot cherry-pick overwrite, based on non-ancestor of the current state: %s", parentId);
List newFiles = SnapshotUtil.newFiles(parentId, meta.currentSnapshot().snapshotId(), meta::snapshot);
- StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null);
for (DataFile newFile : newFiles) {
- ValidationException.check(!replacedPartitions.contains(partitionWrapper.set(newFile.partition())),
+ ValidationException.check(!replacedPartitions.contains(newFile.specId(), newFile.partition()),
"Cannot cherry-pick replace partitions with changed partition: %s",
newFile.partition());
}
diff --git a/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java b/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java
index a26d86eb4f11..018e017fb9d4 100644
--- a/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java
@@ -21,6 +21,7 @@
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
@@ -103,6 +104,36 @@ public static boolean canContainAny(ManifestFile manifest,
return true;
}
+ List> summaries = summaries(manifest, specLookup);
+
+ for (StructLike partition : partitions) {
+ if (canContain(summaries, partition)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public static boolean canContainAny(ManifestFile manifest,
+ Iterable> partitions,
+ Map specsById) {
+ if (manifest.partitions() == null) {
+ return true;
+ }
+
+ List> summaries = summaries(manifest, specsById::get);
+
+ for (Pair partition : partitions) {
+ if (partition.first() == manifest.partitionSpecId() && canContain(summaries, partition.second())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static List> summaries(ManifestFile manifest, Function specLookup) {
Types.StructType partitionType = specLookup.apply(manifest.partitionSpecId()).partitionType();
List fieldSummaries = manifest.partitions();
List fields = partitionType.fields();
@@ -113,12 +144,6 @@ public static boolean canContainAny(ManifestFile manifest,
summaries.add(new FieldSummary<>(primitive, fieldSummaries.get(pos)));
}
- for (StructLike partition : partitions) {
- if (canContain(summaries, partition)) {
- return true;
- }
- }
-
- return false;
+ return summaries;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java b/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
index 809fc7bb0d69..36a774e932a7 100644
--- a/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
+++ b/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
@@ -20,7 +20,6 @@
package org.apache.iceberg.util;
import java.util.Comparator;
-import java.util.Objects;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.JavaHash;
@@ -31,10 +30,6 @@
*/
public class StructLikeWrapper {
- public static StructLikeWrapper wrap(StructLike struct) {
- return new StructLikeWrapper(struct);
- }
-
public static StructLikeWrapper forType(Types.StructType struct) {
return new StructLikeWrapper(struct);
}
@@ -44,19 +39,9 @@ public static StructLikeWrapper forType(Types.StructType struct) {
private Integer hashCode;
private StructLike struct;
- private StructLikeWrapper(StructLike struct) {
- this((Types.StructType) null);
- set(struct);
- }
-
private StructLikeWrapper(Types.StructType type) {
- if (type != null) {
- this.comparator = Comparators.forType(type);
- this.structHash = JavaHash.forType(type);
- } else {
- this.comparator = null;
- this.structHash = null;
- }
+ this.comparator = Comparators.forType(type);
+ this.structHash = JavaHash.forType(type);
this.hashCode = null;
}
@@ -93,33 +78,13 @@ public boolean equals(Object other) {
return false;
}
- if (comparator != null) {
- return comparator.compare(this.struct, that.struct) == 0;
- }
-
- for (int i = 0; i < len; i += 1) {
- if (!Objects.equals(struct.get(i, Object.class), that.struct.get(i, Object.class))) {
- return false;
- }
- }
-
- return true;
+ return comparator.compare(this.struct, that.struct) == 0;
}
@Override
public int hashCode() {
if (hashCode == null) {
- if (structHash != null) {
- this.hashCode = structHash.hash(struct);
- } else {
- int result = 97;
- int len = struct.size();
- result = 41 * result + len;
- for (int i = 0; i < len; i += 1) {
- result = 41 * result + Objects.hashCode(struct.get(i, Object.class));
- }
- this.hashCode = result;
- }
+ this.hashCode = structHash.hash(struct);
}
return hashCode;
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
index 40d8363cc320..dd2252629b0f 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
@@ -78,58 +78,71 @@ public TestDeleteFileIndex() {
@Test
public void testUnpartitionedDeletes() {
- DeleteFileIndex index = new DeleteFileIndex(new long[] { 3, 5, 5, 6 }, DELETE_FILES, ImmutableMap.of());
+ DeleteFileIndex index = new DeleteFileIndex(
+ ImmutableMap.of(
+ PartitionSpec.unpartitioned().specId(), PartitionSpec.unpartitioned(),
+ 1, SPEC),
+ new long[] { 3, 5, 5, 6 }, DELETE_FILES, ImmutableMap.of());
Assert.assertArrayEquals("All deletes should apply to seq 0",
- DELETE_FILES, index.forDataFile(1, 0, UNPARTITIONED_FILE));
+ DELETE_FILES, index.forDataFile(0, UNPARTITIONED_FILE));
Assert.assertArrayEquals("All deletes should apply to seq 3",
- DELETE_FILES, index.forDataFile(1, 3, UNPARTITIONED_FILE));
+ DELETE_FILES, index.forDataFile(3, UNPARTITIONED_FILE));
Assert.assertArrayEquals("Last 3 deletes should apply to seq 4",
- Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(1, 4, UNPARTITIONED_FILE));
+ Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(4, UNPARTITIONED_FILE));
Assert.assertArrayEquals("Last 3 deletes should apply to seq 5",
- Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(1, 5, UNPARTITIONED_FILE));
+ Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(5, UNPARTITIONED_FILE));
Assert.assertArrayEquals("Last delete should apply to seq 6",
- Arrays.copyOfRange(DELETE_FILES, 3, 4), index.forDataFile(1, 6, UNPARTITIONED_FILE));
+ Arrays.copyOfRange(DELETE_FILES, 3, 4), index.forDataFile(6, UNPARTITIONED_FILE));
Assert.assertArrayEquals("No deletes should apply to seq 7",
- new DataFile[0], index.forDataFile(1, 7, UNPARTITIONED_FILE));
+ new DataFile[0], index.forDataFile(7, UNPARTITIONED_FILE));
Assert.assertArrayEquals("No deletes should apply to seq 10",
- new DataFile[0], index.forDataFile(1, 10, UNPARTITIONED_FILE));
+ new DataFile[0], index.forDataFile(10, UNPARTITIONED_FILE));
+ // copy file A with a different spec ID
+ DataFile partitionedFileA = FILE_A.copy();
+ ((BaseFile>) partitionedFileA).setSpecId(1);
Assert.assertArrayEquals("All global deletes should apply to a partitioned file",
- DELETE_FILES, index.forDataFile(2, 0, FILE_B));
+ DELETE_FILES, index.forDataFile(0, partitionedFileA));
}
@Test
public void testPartitionedDeleteIndex() {
- DeleteFileIndex index = new DeleteFileIndex(null, null, ImmutableMap.of(
- Pair.of(1, StructLikeWrapper.wrap(FILE_A.partition())),
- Pair.of(new long[] { 3, 5, 5, 6 }, DELETE_FILES),
- Pair.of(1, StructLikeWrapper.wrap(FILE_C.partition())),
- Pair.of(new long[0], new DeleteFile[0])));
+ DeleteFileIndex index = new DeleteFileIndex(
+ ImmutableMap.of(
+ SPEC.specId(), SPEC,
+ 1, PartitionSpec.unpartitioned()),
+ null, null, ImmutableMap.of(
+ Pair.of(SPEC.specId(), StructLikeWrapper.forType(SPEC.partitionType()).set(FILE_A.partition())),
+ Pair.of(new long[] { 3, 5, 5, 6 }, DELETE_FILES),
+ Pair.of(SPEC.specId(), StructLikeWrapper.forType(SPEC.partitionType()).set(FILE_C.partition())),
+ Pair.of(new long[0], new DeleteFile[0])));
Assert.assertArrayEquals("All deletes should apply to seq 0",
- DELETE_FILES, index.forDataFile(1, 0, FILE_A));
+ DELETE_FILES, index.forDataFile(0, FILE_A));
Assert.assertArrayEquals("All deletes should apply to seq 3",
- DELETE_FILES, index.forDataFile(1, 3, FILE_A));
+ DELETE_FILES, index.forDataFile(3, FILE_A));
Assert.assertArrayEquals("Last 3 deletes should apply to seq 4",
- Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(1, 4, FILE_A));
+ Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(4, FILE_A));
Assert.assertArrayEquals("Last 3 deletes should apply to seq 5",
- Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(1, 5, FILE_A));
+ Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(5, FILE_A));
Assert.assertArrayEquals("Last delete should apply to seq 6",
- Arrays.copyOfRange(DELETE_FILES, 3, 4), index.forDataFile(1, 6, FILE_A));
+ Arrays.copyOfRange(DELETE_FILES, 3, 4), index.forDataFile(6, FILE_A));
Assert.assertArrayEquals("No deletes should apply to seq 7",
- new DataFile[0], index.forDataFile(1, 7, FILE_A));
+ new DataFile[0], index.forDataFile(7, FILE_A));
Assert.assertArrayEquals("No deletes should apply to seq 10",
- new DataFile[0], index.forDataFile(1, 10, FILE_A));
+ new DataFile[0], index.forDataFile(10, FILE_A));
Assert.assertEquals("No deletes should apply to FILE_B, partition not in index",
- 0, index.forDataFile(1, 0, FILE_B).length);
+ 0, index.forDataFile(0, FILE_B).length);
Assert.assertEquals("No deletes should apply to FILE_C, no indexed delete files",
- 0, index.forDataFile(1, 0, FILE_C).length);
+ 0, index.forDataFile(0, FILE_C).length);
+ DataFile unpartitionedFileA = FILE_A.copy();
+ ((BaseFile>) unpartitionedFileA).setSpecId(1);
Assert.assertEquals("No deletes should apply to FILE_A with a different specId",
- 0, index.forDataFile(2, 0, FILE_A).length);
+ 0, index.forDataFile(0, unpartitionedFileA).length);
}
@Test