Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 36 additions & 71 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.Map;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.StructLikeMap;

/** A {@link Table} implementation that exposes a table's partitions as rows. */
public class PartitionsTable extends BaseMetadataTable {
Expand Down Expand Up @@ -90,28 +90,22 @@ private DataTask task(StaticTableScan scan) {

private static StaticDataTask.Row convertPartition(Partition partition) {
return StaticDataTask.Row.of(
partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
partition.partitionData,
partition.specId,
partition.dataRecordCount,
partition.dataFileCount);
}

private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap();

// cache a position map needed by each partition spec to normalize partitions to final schema
Map<Integer, int[]> normalizedPositionsBySpec =
Maps.newHashMapWithExpectedSize(table.specs().size());
Types.StructType partitionType = Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap(partitionType);

CloseableIterable<DataFile> datafiles = planDataFiles(scan);
for (DataFile dataFile : datafiles) {
PartitionData original = (PartitionData) dataFile.partition();
int[] normalizedPositions =
normalizedPositionsBySpec.computeIfAbsent(
dataFile.specId(),
specId -> normalizedPositions(table, specId, normalizedPartitionType));

PartitionData normalized =
normalizePartition(original, normalizedPartitionType, normalizedPositions);
partitions.get(normalized).update(dataFile);
StructLike partition =
PartitionUtil.coercePartition(
partitionType, table.specs().get(dataFile.specId()), dataFile.partition());
partitions.get(partition).update(dataFile);
}

return partitions.all();
Expand Down Expand Up @@ -150,53 +144,6 @@ static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
return new ParallelIterable<>(tasks, scan.planExecutor());
}

/**
* Builds an integer array for a specific partition type to map its partitions to the final
* normalized type.
*
* <p>The array represents fields in the original partition type, with the index being the field's
* index in the original partition type, and the value being the field's index in the normalized
* partition type.
*
* @param table iceberg table
* @param specId spec id where original partition type is written
* @param normalizedType normalized partition type
*/
private static int[] normalizedPositions(
Table table, int specId, Types.StructType normalizedType) {
Types.StructType originalType = table.specs().get(specId).partitionType();
int[] normalizedPositions = new int[originalType.fields().size()];
for (int originalIndex = 0; originalIndex < originalType.fields().size(); originalIndex++) {
Types.NestedField normalizedField =
normalizedType.field(originalType.fields().get(originalIndex).fieldId());
normalizedPositions[originalIndex] = normalizedType.fields().indexOf(normalizedField);
}
return normalizedPositions;
}

/**
* Convert a partition data written by an old spec, to table's normalized partition type, which is
* a common partition type for all specs of the table.
*
* @param originalPartition un-normalized partition data
* @param normalizedPartitionType table's normalized partition type {@link
* Partitioning#partitionType(Table)}
* @param normalizedPositions field positions in the normalized partition type indexed by field
* position in the original partition type
* @return the normalized partition data
*/
private static PartitionData normalizePartition(
PartitionData originalPartition,
Types.StructType normalizedPartitionType,
int[] normalizedPositions) {
PartitionData normalizedPartition = new PartitionData(normalizedPartitionType);
for (int originalIndex = 0; originalIndex < originalPartition.size(); originalIndex++) {
normalizedPartition.put(
normalizedPositions[originalIndex], originalPartition.get(originalIndex));
}
return normalizedPartition;
}

private class PartitionsScan extends StaticTableScan {
PartitionsScan(Table table) {
super(
Expand All @@ -208,12 +155,18 @@ private class PartitionsScan extends StaticTableScan {
}

static class PartitionMap {
private final Map<PartitionData, Partition> partitions = Maps.newHashMap();
private final StructLikeMap<Partition> partitions;
private final Types.StructType keyType;

PartitionMap(Types.StructType type) {
this.partitions = StructLikeMap.create(type);
this.keyType = type;
}

Partition get(PartitionData key) {
Partition get(StructLike key) {
Partition partition = partitions.get(key);
if (partition == null) {
partition = new Partition(key);
partition = new Partition(key, keyType);
partitions.put(key, partition);
}
return partition;
Expand All @@ -225,13 +178,13 @@ Iterable<Partition> all() {
}

static class Partition {
private final StructLike key;
private final PartitionData partitionData;
private int specId;
private long dataRecordCount;
private int dataFileCount;

Partition(StructLike key) {
this.key = key;
Partition(StructLike key, Types.StructType keyType) {
this.partitionData = toPartitionData(key, keyType);
this.specId = 0;
this.dataRecordCount = 0;
this.dataFileCount = 0;
Expand All @@ -242,5 +195,17 @@ void update(DataFile file) {
this.dataFileCount += 1;
this.specId = file.specId();
}

/** Needed because StructProjection is not serializable */
private PartitionData toPartitionData(StructLike key, Types.StructType keyType) {
PartitionData data = new PartitionData(keyType);
for (int i = 0; i < keyType.fields().size(); i++) {
Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass());
if (val != null) {
data.set(i, val);
}
}
return data;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private PartitionUtil() {}
}

// adapts the provided partition data to match the table partition type
private static StructLike coercePartition(
public static StructLike coercePartition(
Types.StructType partitionType, PartitionSpec spec, StructLike partition) {
StructProjection projection =
StructProjection.createAllowMissing(spec.partitionType(), partitionType);
Expand Down