Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 116 additions & 31 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -47,7 +50,27 @@ public class PartitionsTable extends BaseMetadataTable {
Types.NestedField.required(
2, "record_count", Types.LongType.get(), "Count of records in data files"),
Types.NestedField.required(
3, "file_count", Types.IntegerType.get(), "Count of data files"));
3, "file_count", Types.IntegerType.get(), "Count of data files"),
Types.NestedField.required(
5,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note:
spec_id (4) is present at the top.

"position_delete_record_count",
Types.LongType.get(),
"Count of records in position delete files"),
Types.NestedField.required(
6,
"position_delete_file_count",
Types.IntegerType.get(),
"Count of position delete files"),
Types.NestedField.required(
7,
"equality_delete_record_count",
Types.LongType.get(),
"Count of records in equality delete files"),
Types.NestedField.required(
8,
"equality_delete_file_count",
Types.IntegerType.get(),
"Count of equality delete files"));
}

@Override
Expand All @@ -58,7 +81,13 @@ public TableScan newScan() {
@Override
public Schema schema() {
if (table().spec().fields().size() < 1) {
return schema.select("record_count", "file_count");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's actually a bug here, and it only checks latest spec for Unpartitioned. if we have partition fields before but removed them, we will not show them. See other metadata tables like BaseFilesTable.schema.

Anyway its unrelated, but made #7533 to track it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I will explore this in the follow-up.

return schema.select(
"record_count",
"file_count",
"position_delete_record_count",
"position_delete_file_count",
"equality_delete_record_count",
"equality_delete_file_count");
}
return schema;
}
Expand All @@ -77,7 +106,14 @@ private DataTask task(StaticTableScan scan) {
schema(),
scan.schema(),
partitions,
root -> StaticDataTask.Row.of(root.dataRecordCount, root.dataFileCount));
root ->
StaticDataTask.Row.of(
root.dataRecordCount,
root.dataFileCount,
root.posDeleteRecordCount,
root.posDeleteFileCount,
root.eqDeleteRecordCount,
root.eqDeleteFileCount));
} else {
return StaticDataTask.of(
io().newInputFile(table().operations().current().metadataFileLocation()),
Expand All @@ -93,31 +129,66 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
partition.partitionData,
partition.specId,
partition.dataRecordCount,
partition.dataFileCount);
partition.dataFileCount,
partition.posDeleteRecordCount,
partition.posDeleteFileCount,
partition.eqDeleteRecordCount,
partition.eqDeleteFileCount);
}

private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType partitionType = Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap(partitionType);

CloseableIterable<DataFile> datafiles = planDataFiles(scan);
for (DataFile dataFile : datafiles) {
StructLike partition =
PartitionUtil.coercePartition(
partitionType, table.specs().get(dataFile.specId()), dataFile.partition());
partitions.get(partition).update(dataFile);
try (CloseableIterable<ContentFile<?>> files = planFiles(scan)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for closing this

for (ContentFile<?> file : files) {
StructLike partition =
PartitionUtil.coercePartition(
partitionType, table.specs().get(file.specId()), file.partition());
partitions.get(partition).update(file);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return partitions.all();
}

@VisibleForTesting
static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
static CloseableIterable<ContentFile<?>> planFiles(StaticTableScan scan) {
Table table = scan.table();
Snapshot snapshot = scan.snapshot();

CloseableIterable<ManifestFile> dataManifests =
CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
CloseableIterable<ManifestFile> filteredManifests =
filteredManifests(scan, table, scan.snapshot().allManifests(table.io()));

Iterable<CloseableIterable<ContentFile<?>>> tasks =
CloseableIterable.transform(
filteredManifests,
manifest ->
CloseableIterable.transform(
ManifestFiles.open(manifest, table.io(), table.specs())
.caseSensitive(scan.isCaseSensitive())
.select(scanColumns(manifest.content())), // don't select stats columns
t -> (ContentFile<?>) t));

return new ParallelIterable<>(tasks, scan.planExecutor());
}

private static List<String> scanColumns(ManifestContent content) {
switch (content) {
case DATA:
return BaseScan.SCAN_COLUMNS;
case DELETES:
return BaseScan.DELETE_SCAN_COLUMNS;
default:
throw new UnsupportedOperationException("Cannot read unknown manifest type: " + content);
}
}

private static CloseableIterable<ManifestFile> filteredManifests(
StaticTableScan scan, Table table, List<ManifestFile> manifestFilesList) {
CloseableIterable<ManifestFile> manifestFiles =
CloseableIterable.withNoopClose(manifestFilesList);

LoadingCache<Integer, ManifestEvaluator> evalCache =
Caffeine.newBuilder()
Expand All @@ -129,19 +200,8 @@ static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
scan.filter(), transformedSpec, scan.isCaseSensitive());
});

CloseableIterable<ManifestFile> filteredManifests =
CloseableIterable.filter(
dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));

Iterable<CloseableIterable<DataFile>> tasks =
CloseableIterable.transform(
filteredManifests,
manifest ->
ManifestFiles.read(manifest, table.io(), table.specs())
.caseSensitive(scan.isCaseSensitive())
.select(BaseScan.SCAN_COLUMNS)); // don't select stats columns

return new ParallelIterable<>(tasks, scan.planExecutor());
return CloseableIterable.filter(
manifestFiles, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
}

private class PartitionsScan extends StaticTableScan {
Expand Down Expand Up @@ -182,18 +242,43 @@ static class Partition {
private int specId;
private long dataRecordCount;
private int dataFileCount;
private long posDeleteRecordCount;
private int posDeleteFileCount;
private long eqDeleteRecordCount;
private int eqDeleteFileCount;

Partition(StructLike key, Types.StructType keyType) {
this.partitionData = toPartitionData(key, keyType);
this.specId = 0;
this.dataRecordCount = 0;
this.dataFileCount = 0;
this.posDeleteRecordCount = 0;
this.posDeleteFileCount = 0;
this.eqDeleteRecordCount = 0;
this.eqDeleteFileCount = 0;
}

void update(DataFile file) {
this.dataRecordCount += file.recordCount();
this.dataFileCount += 1;
this.specId = file.specId();
void update(ContentFile<?> file) {
switch (file.content()) {
case DATA:
this.dataRecordCount += file.recordCount();
this.dataFileCount += 1;
this.specId = file.specId();
break;
case POSITION_DELETES:
this.posDeleteRecordCount = file.recordCount();
this.posDeleteFileCount += 1;
this.specId = file.specId();
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about specId here and below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this partition value, while updating the data file count, the Spec id would have been updated.
I don't think there will be delete files without data file entries. So, I assumed that again updating here would be redundant. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it now thinking if the delete happens after the partition evolution, it should reflect the latest spec id.

case EQUALITY_DELETES:
this.eqDeleteRecordCount = file.recordCount();
this.eqDeleteFileCount += 1;
this.specId = file.specId();
break;
default:
throw new UnsupportedOperationException(
"Unsupported file content type: " + file.content());
}
}

/** Needed because StructProjection is not serializable */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
}

protected void validateSingleFieldPartition(
CloseableIterable<DataFile> dataFiles, int partitionValue) {
validatePartition(dataFiles, 0, partitionValue);
CloseableIterable<ContentFile<?>> files, int partitionValue) {
validatePartition(files, 0, partitionValue);
}

protected void validatePartition(
CloseableIterable<DataFile> dataFiles, int position, int partitionValue) {
CloseableIterable<ContentFile<?>> files, int position, int partitionValue) {
Assert.assertTrue(
"File scan tasks do not include correct file",
StreamSupport.stream(dataFiles.spliterator(), false)
StreamSupport.stream(files.spliterator(), false)
.anyMatch(
dataFile -> {
StructLike partition = dataFile.partition();
file -> {
StructLike partition = file.partition();
if (position >= partition.size()) {
return false;
}
Expand Down
Loading