Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/iceberg/PartitionStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public Long lastUpdatedSnapshotId() {
*
* @param file the {@link ContentFile} from the manifest entry.
* @param snapshot the snapshot corresponding to the live entry.
* @deprecated since 1.10.0, visibility will be reduced in 1.11.0
*/
@Deprecated // will become package-private
public void liveEntry(ContentFile<?> file, Snapshot snapshot) {
Preconditions.checkArgument(specId == file.specId(), "Spec IDs must match");

Expand Down Expand Up @@ -128,18 +130,54 @@ public void liveEntry(ContentFile<?> file, Snapshot snapshot) {
* Updates the modified time and snapshot ID for the deleted manifest entry.
*
* @param snapshot the snapshot corresponding to the deleted manifest entry.
* @deprecated since 1.10.0, visibility will be reduced in 1.11.0
*/
@Deprecated // will become package-private
public void deletedEntry(Snapshot snapshot) {
if (snapshot != null) {
updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis());
}
}

/**
* Decrement the counters as it was included in the previous stats and updates the modified time
* and snapshot ID for the deleted manifest entry.
*
* @param snapshot the snapshot corresponding to the deleted manifest entry.
*/
void deletedEntryForIncrementalCompute(ContentFile<?> file, Snapshot snapshot) {
Preconditions.checkArgument(specId == file.specId(), "Spec IDs must match");

switch (file.content()) {
case DATA:
this.dataRecordCount -= file.recordCount();
this.dataFileCount -= 1;
this.totalDataFileSizeInBytes -= file.fileSizeInBytes();
break;
case POSITION_DELETES:
this.positionDeleteRecordCount -= file.recordCount();
this.positionDeleteFileCount -= 1;
break;
case EQUALITY_DELETES:
this.equalityDeleteRecordCount -= file.recordCount();
this.equalityDeleteFileCount -= 1;
break;
default:
throw new UnsupportedOperationException("Unsupported file content type: " + file.content());
}

if (snapshot != null) {
updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis());
}
}

/**
* Appends statistics from given entry to current entry.
*
* @param entry the entry from which statistics will be sourced.
* @deprecated since 1.10.0, visibility will be reduced in 1.11.0
*/
@Deprecated // will become package-private
public void appendStats(PartitionStats entry) {
Preconditions.checkArgument(specId == entry.specId(), "Spec IDs must match");

Expand Down
202 changes: 159 additions & 43 deletions core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,33 @@
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionMap;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers
Expand All @@ -56,6 +65,8 @@ public class PartitionStatsHandler {

private PartitionStatsHandler() {}

private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsHandler.class);

public static final int PARTITION_FIELD_ID = 0;
public static final String PARTITION_FIELD_NAME = "partition";
public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", IntegerType.get());
Expand Down Expand Up @@ -106,13 +117,19 @@ public static Schema schema(StructType unifiedPartitionType) {
}

/**
* Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot.
* Computes the stats incrementally after the snapshot that has partition stats file till the
* current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
* merging the stats for a given table's current snapshot.
*
* <p>Does a full compute if previous statistics file does not exist.
*
* @param table The {@link Table} for which the partition statistics is computed.
* @return {@link PartitionStatisticsFile} for the current snapshot, or null if no statistics are
* present.
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException {
Preconditions.checkArgument(table != null, "Invalid table: null");

if (table.currentSnapshot() == null) {
return null;
}
Expand All @@ -121,7 +138,11 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro
}

/**
* Computes and writes the {@link PartitionStatisticsFile} for a given table and snapshot.
* Computes the stats incrementally after the snapshot that has partition stats file till the
* given snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
* merging the stats for a given snapshot.
*
* <p>Does a full compute if previous statistics file does not exist.
*
* @param table The {@link Table} for which the partition statistics is computed.
* @param snapshotId snapshot for which partition statistics are computed.
Expand All @@ -130,15 +151,28 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId)
throws IOException {
Preconditions.checkArgument(table != null, "Invalid table: null");
Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId);

Collection<PartitionStats> stats = computeStats(table, snapshot);
StructType partitionType = Partitioning.partitionType(table);

Collection<PartitionStats> stats;
PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId());
if (statisticsFile == null) {
LOG.info(
"Using full compute as previous statistics file is not present for incremental compute.");
stats = computeStats(table, snapshot, file -> true, false /* incremental */).values();
} else {
stats = computeAndMergeStatsIncremental(table, snapshot, partitionType, statisticsFile);
}

if (stats.isEmpty()) {
// empty branch case
return null;
}

StructType partitionType = Partitioning.partitionType(table);
List<PartitionStats> sortedStats = sortStatsByPartition(stats, partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
Expand Down Expand Up @@ -174,6 +208,9 @@ static PartitionStatisticsFile writePartitionStatsFile(
*/
public static CloseableIterable<PartitionStats> readPartitionStatsFile(
Schema schema, InputFile inputFile) {
Preconditions.checkArgument(schema != null, "Invalid schema: null");
Preconditions.checkArgument(inputFile != null, "Invalid input file: null");

FileFormat fileFormat = FileFormat.fromFileName(inputFile.location());
Preconditions.checkArgument(
fileFormat != null, "Unable to determine format of file: %s", inputFile.location());
Expand Down Expand Up @@ -230,34 +267,108 @@ private static PartitionStats recordToPartitionStats(StructLike record) {
return stats;
}

private static Collection<PartitionStats> computeStats(Table table, Snapshot snapshot) {
Preconditions.checkArgument(table != null, "table cannot be null");
Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must be partitioned");
Preconditions.checkArgument(snapshot != null, "snapshot cannot be null");
private static Collection<PartitionStats> computeAndMergeStatsIncremental(
Table table,
Snapshot snapshot,
StructType partitionType,
PartitionStatisticsFile previousStatsFile)
throws IOException {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
// read previous stats, note that partition field will be read as GenericRecord
try (CloseableIterable<PartitionStats> oldStats =
readPartitionStatsFile(schema(partitionType), Files.localInput(previousStatsFile.path()))) {
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
}

// incrementally compute the new stats, partition field will be written as PartitionData
PartitionMap<PartitionStats> incrementalStatsMap =
computeStatsDiff(table, table.snapshot(previousStatsFile.snapshotId()), snapshot);

// convert PartitionData into GenericRecord and merge stats
incrementalStatsMap.forEach(
(key, value) ->
statsMap.merge(
Pair.of(key.first(), partitionDataToRecord((PartitionData) key.second())),
value,
(existingEntry, newEntry) -> {
existingEntry.appendStats(newEntry);
return existingEntry;
}));

return statsMap.values();
}

private static GenericRecord partitionDataToRecord(PartitionData data) {
GenericRecord record = GenericRecord.create(data.getPartitionType());
for (int index = 0; index < record.size(); index++) {
record.set(index, data.get(index));
}

return record;
}

@VisibleForTesting
static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) {
List<PartitionStatisticsFile> partitionStatisticsFiles = table.partitionStatisticsFiles();
if (partitionStatisticsFiles.isEmpty()) {
return null;
}

Map<Long, PartitionStatisticsFile> stats =
partitionStatisticsFiles.stream()
.collect(Collectors.toMap(PartitionStatisticsFile::snapshotId, file -> file));
for (Snapshot snapshot : SnapshotUtil.ancestorsOf(snapshotId, table::snapshot)) {
if (stats.containsKey(snapshot.snapshotId())) {
return stats.get(snapshot.snapshotId());
}
}

// This is unlikely to happen.
throw new RuntimeException(
"Unable to find previous stats with valid snapshot. Invalidate partition stats for all the snapshots to use full compute.");
}

private static PartitionMap<PartitionStats> computeStatsDiff(
Table table, Snapshot fromSnapshot, Snapshot toSnapshot) {
Set<Long> snapshotIdsRange =
Sets.newHashSet(
SnapshotUtil.ancestorIdsBetween(
toSnapshot.snapshotId(), fromSnapshot.snapshotId(), table::snapshot));
Predicate<ManifestFile> manifestFilePredicate =
manifestFile -> snapshotIdsRange.contains(manifestFile.snapshotId());
return computeStats(table, toSnapshot, manifestFilePredicate, true /* incremental */);
}

private static PartitionMap<PartitionStats> computeStats(
Table table, Snapshot snapshot, Predicate<ManifestFile> predicate, boolean incremental) {
StructType partitionType = Partitioning.partitionType(table);
List<ManifestFile> manifests = snapshot.allManifests(table.io());
List<ManifestFile> manifests =
snapshot.allManifests(table.io()).stream().filter(predicate).collect(Collectors.toList());

Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType)));
.run(
manifest ->
statsByManifest.add(
collectStatsForManifest(table, manifest, partitionType, incremental)));

return mergeStats(statsByManifest, table.specs());
}
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
for (PartitionMap<PartitionStats> stats : statsByManifest) {
mergePartitionMap(stats, statsMap);
}

private static List<PartitionStats> sortStatsByPartition(
Collection<PartitionStats> stats, StructType partitionType) {
List<PartitionStats> entries = Lists.newArrayList(stats);
entries.sort(
Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType)));
return entries;
return statsMap;
}

private static PartitionMap<PartitionStats> collectStats(
Table table, ManifestFile manifest, StructType partitionType) {
try (ManifestReader<?> reader = openManifest(table, manifest)) {
private static PartitionMap<PartitionStats> collectStatsForManifest(
Table table, ManifestFile manifest, StructType partitionType, boolean incremental) {
List<String> projection = BaseScan.scanColumns(manifest.content());
try (ManifestReader<?> reader = ManifestFiles.open(manifest, table.io()).select(projection)) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
int specId = manifest.partitionSpecId();
PartitionSpec spec = table.specs().get(specId);
Expand All @@ -275,9 +386,17 @@ private static PartitionMap<PartitionStats> collectStats(
((PartitionData) file.partition()).copy(),
() -> new PartitionStats(key, specId));
if (entry.isLive()) {
stats.liveEntry(file, snapshot);
// Live can have both added and existing entries. Consider only added entries for
// incremental compute as existing entries was already included in previous compute.
if (!incremental || entry.status() == ManifestEntry.Status.ADDED) {
stats.liveEntry(file, snapshot);
}
} else {
stats.deletedEntry(snapshot);
if (incremental) {
stats.deletedEntryForIncrementalCompute(file, snapshot);
} else {
stats.deletedEntry(snapshot);
}
}
}

Expand All @@ -287,27 +406,24 @@ private static PartitionMap<PartitionStats> collectStats(
}
}

private static ManifestReader<?> openManifest(Table table, ManifestFile manifest) {
List<String> projection = BaseScan.scanColumns(manifest.content());
return ManifestFiles.open(manifest, table.io()).select(projection);
private static void mergePartitionMap(
PartitionMap<PartitionStats> fromMap, PartitionMap<PartitionStats> toMap) {
fromMap.forEach(
(key, value) ->
toMap.merge(
key,
value,
(existingEntry, newEntry) -> {
existingEntry.appendStats(newEntry);
return existingEntry;
}));
}

private static Collection<PartitionStats> mergeStats(
Queue<PartitionMap<PartitionStats>> statsByManifest, Map<Integer, PartitionSpec> specs) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(specs);

for (PartitionMap<PartitionStats> stats : statsByManifest) {
stats.forEach(
(key, value) ->
statsMap.merge(
key,
value,
(existingEntry, newEntry) -> {
existingEntry.appendStats(newEntry);
return existingEntry;
}));
}

return statsMap.values();
private static List<PartitionStats> sortStatsByPartition(
Collection<PartitionStats> stats, StructType partitionType) {
List<PartitionStats> entries = Lists.newArrayList(stats);
entries.sort(
Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType)));
return entries;
}
}
Loading