From daacf7b9a709ab58cb608389ff0c4e3780fb0934 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 7 Aug 2020 13:43:35 -0700 Subject: [PATCH] Fix partition sets. --- .../apache/iceberg/BaseReplacePartitions.java | 2 +- .../org/apache/iceberg/DeleteFileIndex.java | 60 +++++++++++------- .../apache/iceberg/ManifestFilterManager.java | 50 +++++++-------- .../org/apache/iceberg/ManifestGroup.java | 4 +- .../iceberg/MergingSnapshotProducer.java | 16 +++-- .../org/apache/iceberg/SnapshotManager.java | 17 +++--- .../apache/iceberg/util/ManifestFileUtil.java | 39 +++++++++--- .../iceberg/util/StructLikeWrapper.java | 43 ++----------- .../apache/iceberg/TestDeleteFileIndex.java | 61 +++++++++++-------- 9 files changed, 152 insertions(+), 140 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index f422bc601ab5..7f57195edc25 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -42,7 +42,7 @@ protected String operation() { @Override public ReplacePartitions addFile(DataFile file) { - dropPartition(file.partition()); + dropPartition(file.specId(), file.partition()); add(file); return this; } diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index b68ba901a8b6..5ec4e1354941 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -40,12 +40,14 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeWrapper; import org.apache.iceberg.util.Tasks; @@ -53,31 +55,45 @@ /** * An index of {@link DeleteFile delete files} by sequence number. *

- * Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(int, long, DataFile)} or - * {@link #forEntry(int, ManifestEntry)} to get the the delete files to apply to a given data file. + * Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, DataFile)} or + * {@link #forEntry(ManifestEntry)} to get the the delete files to apply to a given data file. */ class DeleteFileIndex { private static final DeleteFile[] NO_DELETE_FILES = new DeleteFile[0]; + private final Map partitionTypeById; + private final Map> wrapperById; private final long[] globalSeqs; private final DeleteFile[] globalDeletes; private final Map, Pair> sortedDeletesByPartition; - private final ThreadLocal lookupWrapper = ThreadLocal.withInitial( - () -> StructLikeWrapper.wrap(null)); - DeleteFileIndex(long[] globalSeqs, DeleteFile[] globalDeletes, + DeleteFileIndex(Map specsById, long[] globalSeqs, DeleteFile[] globalDeletes, Map, Pair> sortedDeletesByPartition) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + specsById.forEach((specId, spec) -> builder.put(specId, spec.partitionType())); + this.partitionTypeById = builder.build(); + this.wrapperById = Maps.newHashMap(); this.globalSeqs = globalSeqs; this.globalDeletes = globalDeletes; this.sortedDeletesByPartition = sortedDeletesByPartition; } - DeleteFile[] forEntry(int specId, ManifestEntry entry) { - return forDataFile(specId, entry.sequenceNumber(), entry.file()); + private StructLikeWrapper newWrapper(int specId) { + return StructLikeWrapper.forType(partitionTypeById.get(specId)); } - DeleteFile[] forDataFile(int specId, long sequenceNumber, DataFile file) { - Pair partition = Pair.of(specId, lookupWrapper.get().set(file.partition())); + private Pair partition(int specId, StructLike struct) { + ThreadLocal wrapper = wrapperById.computeIfAbsent(specId, + id -> ThreadLocal.withInitial(() -> newWrapper(id))); + return Pair.of(specId, wrapper.get().set(struct)); + } + + DeleteFile[] forEntry(ManifestEntry entry) { + return forDataFile(entry.sequenceNumber(), entry.file()); + } + + DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { + Pair partition = partition(file.specId(), file.partition()); Pair partitionDeletes = sortedDeletesByPartition.get(partition); if (partitionDeletes == null) { @@ -164,15 +180,15 @@ Builder planWith(ExecutorService newExecutorService) { DeleteFileIndex build() { // read all of the matching delete manifests in parallel and accumulate the matching files in a queue - Queue>> deleteEntries = new ConcurrentLinkedQueue<>(); + Queue> deleteEntries = new ConcurrentLinkedQueue<>(); Tasks.foreach(deleteManifestReaders()) .stopOnFailure().throwFailureWhenFinished() .executeWith(executorService) - .run(specIdAndReader -> { - try (CloseableIterable> reader = specIdAndReader.second()) { + .run(deleteFile -> { + try (CloseableIterable> reader = deleteFile) { for (ManifestEntry entry : reader) { // copy with stats for better filtering against data file stats - deleteEntries.add(Pair.of(specIdAndReader.first(), entry.copy())); + deleteEntries.add(entry.copy()); } } catch (IOException e) { throw new RuntimeIOException("Failed to close", e); @@ -182,10 +198,11 @@ DeleteFileIndex build() { // build a map from (specId, partition) to delete file entries ListMultimap, ManifestEntry> deleteFilesByPartition = Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); - for (Pair> specIdAndEntry : deleteEntries) { - int specId = specIdAndEntry.first(); - ManifestEntry entry = specIdAndEntry.second(); - deleteFilesByPartition.put(Pair.of(specId, StructLikeWrapper.wrap(entry.file().partition())), entry); + for (ManifestEntry entry : deleteEntries) { + int specId = entry.file().specId(); + StructLikeWrapper wrapper = StructLikeWrapper.forType(specsById.get(specId).partitionType()) + .set(entry.file().partition()); + deleteFilesByPartition.put(Pair.of(specId, wrapper), entry); } // sort the entries in each map value by sequence number and split into sequence numbers and delete files lists @@ -237,10 +254,10 @@ DeleteFileIndex build() { } } - return new DeleteFileIndex(globalApplySeqs, globalDeletes, sortedDeletesByPartition); + return new DeleteFileIndex(specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition); } - private Iterable>>> deleteManifestReaders() { + private Iterable>> deleteManifestReaders() { LoadingCache evalCache = specsById == null ? null : Caffeine.newBuilder().build(specId -> { PartitionSpec spec = specsById.get(specId); @@ -257,13 +274,12 @@ private Iterable>>> de return Iterables.transform( matchingManifests, - manifest -> Pair.of( - manifest.partitionSpecId(), + manifest -> ManifestFiles.readDeleteManifest(manifest, io, specsById) .filterRows(dataFilter) .filterPartitions(partitionFilter) .caseSensitive(caseSensitive) - .liveEntries()) + .liveEntries() ); } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index ccb0fc2286fa..15c01e9eb6bd 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -42,7 +42,7 @@ import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; import org.apache.iceberg.util.ManifestFileUtil; -import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; @@ -65,9 +65,10 @@ public String partition() { } } + private final Map specsById; + private final PartitionSet deleteFilePartitions; + private final PartitionSet dropPartitions; private final Set deletePaths = CharSequenceSet.empty(); - private final Set deleteFilePartitions = Sets.newHashSet(); - private final Set dropPartitions = Sets.newHashSet(); private Expression deleteExpression = Expressions.alwaysFalse(); private long minSequenceNumber = 0; private boolean hasPathOnlyDeletes = false; @@ -82,7 +83,12 @@ public String partition() { private final Map> filteredManifestToDeletedFiles = Maps.newConcurrentMap(); - protected abstract PartitionSpec spec(int specId); + protected ManifestFilterManager(Map specsById) { + this.specsById = specsById; + this.deleteFilePartitions = PartitionSet.create(specsById); + this.dropPartitions = PartitionSet.create(specsById); + } + protected abstract void deleteFile(String location); protected abstract ManifestWriter newManifestWriter(PartitionSpec spec); protected abstract ManifestReader newManifestReader(ManifestFile manifest); @@ -110,10 +116,10 @@ protected void deleteByRowFilter(Expression expr) { /** * Add a partition tuple to drop from the table during the delete phase. */ - protected void dropPartition(StructLike partition) { + protected void dropPartition(int specId, StructLike partition) { Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null"); invalidateFilteredCache(); - dropPartitions.add(StructLikeWrapper.wrap(partition)); + dropPartitions.add(specId, partition); } /** @@ -138,7 +144,7 @@ void delete(F file) { Preconditions.checkNotNull(file, "Cannot delete file: null"); invalidateFilteredCache(); deletePaths.add(file.path()); - deleteFilePartitions.add(StructLikeWrapper.wrap(file.partition())); + deleteFilePartitions.add(file.specId(), file.partition()); } /** @@ -191,7 +197,7 @@ SnapshotSummary.Builder buildSummary(Iterable manifests) { SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); for (ManifestFile manifest : manifests) { - PartitionSpec manifestSpec = spec(manifest.partitionSpecId()); + PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId()); Iterable manifestDeletes = filteredManifestToDeletedFiles.get(manifest); if (manifestDeletes != null) { for (F file : manifestDeletes) { @@ -282,19 +288,16 @@ private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, Man } try (ManifestReader reader = newManifestReader(manifest)) { - // reused to compare file partitions with the drop set - StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null); - // this assumes that the manifest doesn't have files to remove and streams through the // manifest without copying data. if a manifest does have a file to remove, this will break // out of the loop and move on to filtering the manifest. - boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader, partitionWrapper); + boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader); if (!hasDeletedFiles) { filteredManifests.put(manifest, manifest); return manifest; } - return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader, partitionWrapper); + return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader); } catch (IOException e) { throw new RuntimeIOException("Failed to close manifest: " + manifest, e); @@ -305,7 +308,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) { boolean canContainExpressionDeletes; if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) { ManifestEvaluator manifestEvaluator = - ManifestEvaluator.forRowFilter(deleteExpression, spec(manifest.partitionSpecId()), true); + ManifestEvaluator.forRowFilter(deleteExpression, specsById.get(manifest.partitionSpecId()), true); canContainExpressionDeletes = manifestEvaluator.eval(manifest); } else { canContainExpressionDeletes = false; @@ -313,10 +316,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) { boolean canContainDroppedPartitions; if (dropPartitions.size() > 0) { - canContainDroppedPartitions = ManifestFileUtil.canContainAny( - manifest, - Iterables.transform(dropPartitions, StructLikeWrapper::get), - this::spec); + canContainDroppedPartitions = ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById); } else { canContainDroppedPartitions = false; } @@ -326,10 +326,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) { canContainDroppedFiles = true; } else if (deletePaths.size() > 0) { // because there were no path-only deletes, the set of deleted file partitions is valid - canContainDroppedFiles = ManifestFileUtil.canContainAny( - manifest, - Iterables.transform(deleteFilePartitions, StructLikeWrapper::get), - this::spec); + canContainDroppedFiles = ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById); } else { canContainDroppedFiles = false; } @@ -341,7 +338,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) { } private boolean manifestHasDeletedFiles( - StrictMetricsEvaluator metricsEvaluator, ManifestReader reader, StructLikeWrapper partitionWrapper) { + StrictMetricsEvaluator metricsEvaluator, ManifestReader reader) { boolean isDelete = reader.isDeleteManifestReader(); Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec()); Evaluator strict = strictDeleteEvaluator(reader.spec()); @@ -349,7 +346,7 @@ private boolean manifestHasDeletedFiles( for (ManifestEntry entry : reader.entries()) { F file = entry.file(); boolean fileDelete = deletePaths.contains(file.path()) || - dropPartitions.contains(partitionWrapper.set(file.partition())) || + dropPartitions.contains(file.specId(), file.partition()) || (isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber); if (fileDelete || inclusive.eval(file.partition())) { ValidationException.check( @@ -368,8 +365,7 @@ private boolean manifestHasDeletedFiles( } private ManifestFile filterManifestWithDeletedFiles( - StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader reader, - StructLikeWrapper partitionWrapper) { + StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader reader) { boolean isDelete = reader.isDeleteManifestReader(); Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec()); Evaluator strict = strictDeleteEvaluator(reader.spec()); @@ -384,7 +380,7 @@ private ManifestFile filterManifestWithDeletedFiles( reader.entries().forEach(entry -> { F file = entry.file(); boolean fileDelete = deletePaths.contains(file.path()) || - dropPartitions.contains(partitionWrapper.set(file.partition())) || + dropPartitions.contains(file.specId(), file.partition()) || (isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber); if (entry.status() != ManifestEntry.Status.DELETED) { if (fileDelete || inclusive.eval(file.partition())) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index a491080ee980..d323660ca3ac 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -172,10 +172,10 @@ public CloseableIterable planFiles() { ResidualEvaluator residuals = residualCache.get(specId); if (dropStats) { return CloseableIterable.transform(entries, e -> new BaseFileScanTask( - e.file().copyWithoutStats(), deleteFiles.forEntry(specId, e), schemaString, specString, residuals)); + e.file().copyWithoutStats(), deleteFiles.forEntry(e), schemaString, specString, residuals)); } else { return CloseableIterable.transform(entries, e -> new BaseFileScanTask( - e.file().copy(), deleteFiles.forEntry(specId, e), schemaString, specString, residuals)); + e.file().copy(), deleteFiles.forEntry(e), schemaString, specString, residuals)); } }); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e231e35cd284..43567d1b963b 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -132,10 +132,10 @@ protected void deleteByRowFilter(Expression expr) { /** * Add a partition tuple to drop from the table during the delete phase. */ - protected void dropPartition(StructLike partition) { + protected void dropPartition(int specId, StructLike partition) { // dropping the data in a partition also drops all deletes in the partition - filterManager.dropPartition(partition); - deleteFilterManager.dropPartition(partition); + filterManager.dropPartition(specId, partition); + deleteFilterManager.dropPartition(specId, partition); } /** @@ -368,9 +368,8 @@ private ManifestFile newDeleteFilesAsManifest() { } private class DataFileFilterManager extends ManifestFilterManager { - @Override - protected PartitionSpec spec(int specId) { - return ops.current().spec(specId); + private DataFileFilterManager() { + super(ops.current().specsById()); } @Override @@ -421,9 +420,8 @@ protected ManifestReader newManifestReader(ManifestFile manifest) { } private class DeleteFileFilterManager extends ManifestFilterManager { - @Override - protected PartitionSpec spec(int specId) { - return ops.current().spec(specId); + private DeleteFileFilterManager() { + super(ops.current().specsById()); } @Override diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java index 31cfc4457b52..92564e683695 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java @@ -21,14 +21,12 @@ import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.iceberg.exceptions.CherrypickAncestorCommitException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.iceberg.util.StructLikeWrapper; import org.apache.iceberg.util.WapUtil; public class SnapshotManager extends MergingSnapshotProducer implements ManageSnapshots { @@ -38,15 +36,17 @@ private enum SnapshotManagerOperation { ROLLBACK } + private final Map specsById; private SnapshotManagerOperation managerOperation = null; private Long targetSnapshotId = null; private String snapshotOperation = null; private Long requiredCurrentSnapshotId = null; private Long overwriteParentId = null; - private Set replacedPartitions = null; + private PartitionSet replacedPartitions = null; SnapshotManager(String tableName, TableOperations ops) { super(tableName, ops); + this.specsById = ops.current().specsById(); } @Override @@ -100,7 +100,7 @@ public ManageSnapshots cherrypick(long snapshotId) { this.managerOperation = SnapshotManagerOperation.CHERRYPICK; this.targetSnapshotId = snapshotId; this.snapshotOperation = cherryPickSnapshot.operation(); - this.replacedPartitions = Sets.newHashSet(); + this.replacedPartitions = PartitionSet.create(specsById); // check that all deleted files are still in the table failMissingDeletePaths(); @@ -108,7 +108,7 @@ public ManageSnapshots cherrypick(long snapshotId) { // copy adds from the picked snapshot for (DataFile addedFile : cherryPickSnapshot.addedFiles()) { add(addedFile); - replacedPartitions.add(StructLikeWrapper.wrap(addedFile.partition())); + replacedPartitions.add(addedFile.specId(), addedFile.partition()); } // copy deletes from the picked snapshot @@ -261,14 +261,13 @@ private static void validateNonAncestor(TableMetadata meta, long snapshotId) { } private static void validateReplacedPartitions(TableMetadata meta, Long parentId, - Set replacedPartitions) { + PartitionSet replacedPartitions) { if (replacedPartitions != null) { ValidationException.check(parentId == null || isCurrentAncestor(meta, parentId), "Cannot cherry-pick overwrite, based on non-ancestor of the current state: %s", parentId); List newFiles = SnapshotUtil.newFiles(parentId, meta.currentSnapshot().snapshotId(), meta::snapshot); - StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null); for (DataFile newFile : newFiles) { - ValidationException.check(!replacedPartitions.contains(partitionWrapper.set(newFile.partition())), + ValidationException.check(!replacedPartitions.contains(newFile.specId(), newFile.partition()), "Cannot cherry-pick replace partitions with changed partition: %s", newFile.partition()); } diff --git a/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java b/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java index a26d86eb4f11..018e017fb9d4 100644 --- a/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/ManifestFileUtil.java @@ -21,6 +21,7 @@ import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.function.Function; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; @@ -103,6 +104,36 @@ public static boolean canContainAny(ManifestFile manifest, return true; } + List> summaries = summaries(manifest, specLookup); + + for (StructLike partition : partitions) { + if (canContain(summaries, partition)) { + return true; + } + } + + return false; + } + + public static boolean canContainAny(ManifestFile manifest, + Iterable> partitions, + Map specsById) { + if (manifest.partitions() == null) { + return true; + } + + List> summaries = summaries(manifest, specsById::get); + + for (Pair partition : partitions) { + if (partition.first() == manifest.partitionSpecId() && canContain(summaries, partition.second())) { + return true; + } + } + + return false; + } + + private static List> summaries(ManifestFile manifest, Function specLookup) { Types.StructType partitionType = specLookup.apply(manifest.partitionSpecId()).partitionType(); List fieldSummaries = manifest.partitions(); List fields = partitionType.fields(); @@ -113,12 +144,6 @@ public static boolean canContainAny(ManifestFile manifest, summaries.add(new FieldSummary<>(primitive, fieldSummaries.get(pos))); } - for (StructLike partition : partitions) { - if (canContain(summaries, partition)) { - return true; - } - } - - return false; + return summaries; } } diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java b/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java index 809fc7bb0d69..36a774e932a7 100644 --- a/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java +++ b/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java @@ -20,7 +20,6 @@ package org.apache.iceberg.util; import java.util.Comparator; -import java.util.Objects; import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.JavaHash; @@ -31,10 +30,6 @@ */ public class StructLikeWrapper { - public static StructLikeWrapper wrap(StructLike struct) { - return new StructLikeWrapper(struct); - } - public static StructLikeWrapper forType(Types.StructType struct) { return new StructLikeWrapper(struct); } @@ -44,19 +39,9 @@ public static StructLikeWrapper forType(Types.StructType struct) { private Integer hashCode; private StructLike struct; - private StructLikeWrapper(StructLike struct) { - this((Types.StructType) null); - set(struct); - } - private StructLikeWrapper(Types.StructType type) { - if (type != null) { - this.comparator = Comparators.forType(type); - this.structHash = JavaHash.forType(type); - } else { - this.comparator = null; - this.structHash = null; - } + this.comparator = Comparators.forType(type); + this.structHash = JavaHash.forType(type); this.hashCode = null; } @@ -93,33 +78,13 @@ public boolean equals(Object other) { return false; } - if (comparator != null) { - return comparator.compare(this.struct, that.struct) == 0; - } - - for (int i = 0; i < len; i += 1) { - if (!Objects.equals(struct.get(i, Object.class), that.struct.get(i, Object.class))) { - return false; - } - } - - return true; + return comparator.compare(this.struct, that.struct) == 0; } @Override public int hashCode() { if (hashCode == null) { - if (structHash != null) { - this.hashCode = structHash.hash(struct); - } else { - int result = 97; - int len = struct.size(); - result = 41 * result + len; - for (int i = 0; i < len; i += 1) { - result = 41 * result + Objects.hashCode(struct.get(i, Object.class)); - } - this.hashCode = result; - } + this.hashCode = structHash.hash(struct); } return hashCode; diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java index 40d8363cc320..dd2252629b0f 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java @@ -78,58 +78,71 @@ public TestDeleteFileIndex() { @Test public void testUnpartitionedDeletes() { - DeleteFileIndex index = new DeleteFileIndex(new long[] { 3, 5, 5, 6 }, DELETE_FILES, ImmutableMap.of()); + DeleteFileIndex index = new DeleteFileIndex( + ImmutableMap.of( + PartitionSpec.unpartitioned().specId(), PartitionSpec.unpartitioned(), + 1, SPEC), + new long[] { 3, 5, 5, 6 }, DELETE_FILES, ImmutableMap.of()); Assert.assertArrayEquals("All deletes should apply to seq 0", - DELETE_FILES, index.forDataFile(1, 0, UNPARTITIONED_FILE)); + DELETE_FILES, index.forDataFile(0, UNPARTITIONED_FILE)); Assert.assertArrayEquals("All deletes should apply to seq 3", - DELETE_FILES, index.forDataFile(1, 3, UNPARTITIONED_FILE)); + DELETE_FILES, index.forDataFile(3, UNPARTITIONED_FILE)); Assert.assertArrayEquals("Last 3 deletes should apply to seq 4", - Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(1, 4, UNPARTITIONED_FILE)); + Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(4, UNPARTITIONED_FILE)); Assert.assertArrayEquals("Last 3 deletes should apply to seq 5", - Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(1, 5, UNPARTITIONED_FILE)); + Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(5, UNPARTITIONED_FILE)); Assert.assertArrayEquals("Last delete should apply to seq 6", - Arrays.copyOfRange(DELETE_FILES, 3, 4), index.forDataFile(1, 6, UNPARTITIONED_FILE)); + Arrays.copyOfRange(DELETE_FILES, 3, 4), index.forDataFile(6, UNPARTITIONED_FILE)); Assert.assertArrayEquals("No deletes should apply to seq 7", - new DataFile[0], index.forDataFile(1, 7, UNPARTITIONED_FILE)); + new DataFile[0], index.forDataFile(7, UNPARTITIONED_FILE)); Assert.assertArrayEquals("No deletes should apply to seq 10", - new DataFile[0], index.forDataFile(1, 10, UNPARTITIONED_FILE)); + new DataFile[0], index.forDataFile(10, UNPARTITIONED_FILE)); + // copy file A with a different spec ID + DataFile partitionedFileA = FILE_A.copy(); + ((BaseFile) partitionedFileA).setSpecId(1); Assert.assertArrayEquals("All global deletes should apply to a partitioned file", - DELETE_FILES, index.forDataFile(2, 0, FILE_B)); + DELETE_FILES, index.forDataFile(0, partitionedFileA)); } @Test public void testPartitionedDeleteIndex() { - DeleteFileIndex index = new DeleteFileIndex(null, null, ImmutableMap.of( - Pair.of(1, StructLikeWrapper.wrap(FILE_A.partition())), - Pair.of(new long[] { 3, 5, 5, 6 }, DELETE_FILES), - Pair.of(1, StructLikeWrapper.wrap(FILE_C.partition())), - Pair.of(new long[0], new DeleteFile[0]))); + DeleteFileIndex index = new DeleteFileIndex( + ImmutableMap.of( + SPEC.specId(), SPEC, + 1, PartitionSpec.unpartitioned()), + null, null, ImmutableMap.of( + Pair.of(SPEC.specId(), StructLikeWrapper.forType(SPEC.partitionType()).set(FILE_A.partition())), + Pair.of(new long[] { 3, 5, 5, 6 }, DELETE_FILES), + Pair.of(SPEC.specId(), StructLikeWrapper.forType(SPEC.partitionType()).set(FILE_C.partition())), + Pair.of(new long[0], new DeleteFile[0]))); Assert.assertArrayEquals("All deletes should apply to seq 0", - DELETE_FILES, index.forDataFile(1, 0, FILE_A)); + DELETE_FILES, index.forDataFile(0, FILE_A)); Assert.assertArrayEquals("All deletes should apply to seq 3", - DELETE_FILES, index.forDataFile(1, 3, FILE_A)); + DELETE_FILES, index.forDataFile(3, FILE_A)); Assert.assertArrayEquals("Last 3 deletes should apply to seq 4", - Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(1, 4, FILE_A)); + Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(4, FILE_A)); Assert.assertArrayEquals("Last 3 deletes should apply to seq 5", - Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(1, 5, FILE_A)); + Arrays.copyOfRange(DELETE_FILES, 1, 4), index.forDataFile(5, FILE_A)); Assert.assertArrayEquals("Last delete should apply to seq 6", - Arrays.copyOfRange(DELETE_FILES, 3, 4), index.forDataFile(1, 6, FILE_A)); + Arrays.copyOfRange(DELETE_FILES, 3, 4), index.forDataFile(6, FILE_A)); Assert.assertArrayEquals("No deletes should apply to seq 7", - new DataFile[0], index.forDataFile(1, 7, FILE_A)); + new DataFile[0], index.forDataFile(7, FILE_A)); Assert.assertArrayEquals("No deletes should apply to seq 10", - new DataFile[0], index.forDataFile(1, 10, FILE_A)); + new DataFile[0], index.forDataFile(10, FILE_A)); Assert.assertEquals("No deletes should apply to FILE_B, partition not in index", - 0, index.forDataFile(1, 0, FILE_B).length); + 0, index.forDataFile(0, FILE_B).length); Assert.assertEquals("No deletes should apply to FILE_C, no indexed delete files", - 0, index.forDataFile(1, 0, FILE_C).length); + 0, index.forDataFile(0, FILE_C).length); + DataFile unpartitionedFileA = FILE_A.copy(); + ((BaseFile) unpartitionedFileA).setSpecId(1); Assert.assertEquals("No deletes should apply to FILE_A with a different specId", - 0, index.forDataFile(2, 0, FILE_A).length); + 0, index.forDataFile(0, unpartitionedFileA).length); } @Test