Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,14 @@ DeleteFileIndex build() {
});

// build a map from (specId, partition) to delete file entries
Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>> deleteFilesByPartition =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
for (ManifestEntry<DeleteFile> entry : deleteEntries) {
int specId = entry.file().specId();
StructLikeWrapper wrapper = StructLikeWrapper.forType(specsById.get(specId).partitionType())
.set(entry.file().partition());
StructLikeWrapper wrapper = wrappersBySpecId
.computeIfAbsent(specId, id -> StructLikeWrapper.forType(specsById.get(id).partitionType()))
.copyFor(entry.file().partition());
deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
CloseableIterator<FileScanTask> tasksIter) {
ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
Maps.newHashMap(), Lists::newArrayList);
StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(spec.partitionType());
try (CloseableIterator<FileScanTask> iterator = tasksIter) {
iterator.forEachRemaining(task -> {
StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
StructLikeWrapper structLike = partitionWrapper.copyFor(task.file().partition());
tasksGroupedByPartition.put(structLike, task);
});
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public T get(Object key) {

@Override
public T put(StructLike key, T value) {
return wrapperMap.put(StructLikeWrapper.forType(type).set(key), value);
return wrapperMap.put(wrappers.get().copyFor(key), value);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/util/StructLikeSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public <T> T[] toArray(T[] destArray) {

@Override
public boolean add(StructLike struct) {
return wrapperSet.add(StructLikeWrapper.forType(type).set(struct));
return wrapperSet.add(wrappers.get().copyFor(struct));
}

@Override
Expand All @@ -126,7 +126,7 @@ public boolean containsAll(Collection<?> objects) {
public boolean addAll(Collection<? extends StructLike> structs) {
if (structs != null) {
return Iterables.addAll(wrapperSet,
Iterables.transform(structs, struct -> StructLikeWrapper.forType(type).set(struct)));
Iterables.transform(structs, struct -> wrappers.get().copyFor(struct)));
}
return false;
}
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,28 @@ public static StructLikeWrapper forType(Types.StructType struct) {
private StructLike struct;

private StructLikeWrapper(Types.StructType type) {
this.comparator = Comparators.forType(type);
this.structHash = JavaHash.forType(type);
this(Comparators.forType(type), JavaHash.forType(type));
}

private StructLikeWrapper(Comparator<StructLike> comparator, JavaHash<StructLike> structHash) {
this.comparator = comparator;
this.structHash = structHash;
this.hashCode = null;
}

/**
* Creates a copy of this wrapper that wraps a struct.
* <p>
* This is equivalent to {@code new StructLikeWrapper(type).set(newStruct)} but is cheaper because no analysis of the
* type is necessary.
*
* @param newStruct a {@link StructLike} row
* @return a copy of this wrapper wrapping the give struct
*/
public StructLikeWrapper copyFor(StructLike newStruct) {
return new StructLikeWrapper(comparator, structHash).set(newStruct);
}

public StructLikeWrapper set(StructLike newStruct) {
this.struct = newStruct;
this.hashCode = null;
Expand Down
5 changes: 2 additions & 3 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ private List<Predicate<T>> applyEqDeletes() {
Iterable<DeleteFile> deletes = entry.getValue();

Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
InternalRecordWrapper wrapper = new InternalRecordWrapper(deleteSchema.asStruct());

// a projection to select and reorder fields of the file schema to match the delete rows
StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
Expand All @@ -159,9 +160,7 @@ private List<Predicate<T>> applyEqDeletes() {
CloseableIterable.concat(deleteRecords), Record::copy);

StructLikeSet deleteSet = Deletes.toEqualitySet(
CloseableIterable.transform(
records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)),
deleteSchema.asStruct());
CloseableIterable.transform(records, wrapper::copyFor), deleteSchema.asStruct());

Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
isInDeleteSets.add(isInDeleteSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ public class InternalRecordWrapper implements StructLike {

@SuppressWarnings("unchecked")
public InternalRecordWrapper(Types.StructType struct) {
this.transforms = struct.fields().stream()
this(struct.fields().stream()
.map(field -> converter(field.type()))
.toArray(length -> (Function<Object, Object>[]) Array.newInstance(Function.class, length));
.toArray(length -> (Function<Object, Object>[]) Array.newInstance(Function.class, length)));
}

private InternalRecordWrapper(Function<Object, Object>[] transforms) {
this.transforms = transforms;
}

private static Function<Object, Object> converter(Type type) {
Expand Down Expand Up @@ -68,6 +72,10 @@ public StructLike get() {
return wrapped;
}

public InternalRecordWrapper copyFor(StructLike record) {
return new InternalRecordWrapper(transforms).wrap(record);
}

public InternalRecordWrapper wrap(StructLike record) {
this.wrapped = record;
return this;
Expand Down