Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
* deserialization.
*/
abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
protected static final String PARTITION_FIELD_PREFIX = "partition.";
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
private final TableOperations ops;
Expand All @@ -52,18 +51,17 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
* This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
* expression against the given metadata table.
* <p>
* The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform.
* The resulting partition spec maps partition.X fields to partition X using an identity partition transform.
* When this spec is used to project an expression for the given metadata table, the projection will remove
* predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields.
* predicates for non-partition fields (not in the spec) and will remove the "partition." prefix from fields.
*
* @param metadataTableSchema schema of the metadata table
* @param spec spec on which the metadata table schema is based
* @param partitionPrefix prefix to remove from each field in the partition spec
* @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
*/
static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec, String partitionPrefix) {
static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name()));
spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity"));
return identitySpecBuilder.build();
}

Expand Down
22 changes: 12 additions & 10 deletions core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.apache.iceberg;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.Map;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -108,16 +110,16 @@ protected CloseableIterable<FileScanTask> planFiles(
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
Expression partitionFilter = Projections
.inclusive(
transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
caseSensitive)
.project(rowFilter);
Map<Integer, PartitionSpec> specsById = table().specs();

ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
partitionFilter, table().spec(), caseSensitive);
CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
PartitionSpec transformedSpec = transformSpec(fileSchema, spec);
return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive);
});

CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests,
manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));

// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {

// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
Expression partitionFilter = Projections
.inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive)
.inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
.project(scan.filter());

ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
Expand Down
204 changes: 204 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -713,6 +714,209 @@ public void testPartitionColumnNamedPartition() throws Exception {
validateIncludesPartitionScan(tasksAndEq, 0);
}

@Test
public void testFilesTablePartitionFieldRemovalV1() {
Assume.assumeTrue(formatVersion == 1);
preparePartitionedTable();

// Change spec and add two data files
table.updateSpec()
.removeField(Expressions.bucket("data", 16))
.addField("id")
.commit();
PartitionSpec newSpec = table.spec();

// Add two data files with new spec
PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
data10Key.set(1, 10);
DataFile data10 = DataFiles.builder(newSpec)
.withPath("/path/to/data-10.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartition(data10Key)
.build();
PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
data10Key.set(1, 11);
DataFile data11 = DataFiles.builder(newSpec)
.withPath("/path/to/data-11.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartition(data11Key)
.build();

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();

Table metadataTable = new DataFilesTable(table.ops(), table);
Expression filter = Expressions.and(
Expressions.equal("partition.id", 10),
Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
CloseableIterable<FileScanTask> tasks = scan.planFiles();

// All 4 original data files written by old spec, plus one data file written by new spec
Assert.assertEquals(5, Iterables.size(tasks));

filter = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
tasks = scan.planFiles();

// 1 original data file written by old spec (V1 filters out new specs which don't have this value)
Assert.assertEquals(1, Iterables.size(tasks));
}

@Test
public void testFilesTablePartitionFieldRemovalV2() {
Assume.assumeTrue(formatVersion == 2);
preparePartitionedTable();

// Change spec and add two data and delete files each
table.updateSpec()
.removeField(Expressions.bucket("data", 16))
.addField("id").commit();
PartitionSpec newSpec = table.spec();

// Add two data files and two delete files with new spec
DataFile data10 = DataFiles.builder(newSpec)
.withPath("/path/to/data-10.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartitionPath("id=10")
.build();
DataFile data11 = DataFiles.builder(newSpec)
.withPath("/path/to/data-11.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartitionPath("id=11")
.build();

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();

Table metadataTable = new DataFilesTable(table.ops(), table);
Expression filter = Expressions.and(
Expressions.equal("partition.id", 10),
Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
CloseableIterable<FileScanTask> tasks = scan.planFiles();

// All 4 original data files written by old spec, plus one new data file written by new spec
Assert.assertEquals(5, Iterables.size(tasks));

filter = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
tasks = scan.planFiles();

// 1 original data files written by old spec, plus both of new data file written by new spec
Assert.assertEquals(3, Iterables.size(tasks));
}

@Test
public void testFilesTablePartitionFieldAddV1() {
Assume.assumeTrue(formatVersion == 1);
preparePartitionedTable();

// Change spec and add two data files
table.updateSpec()
.addField("id")
.commit();
PartitionSpec newSpec = table.spec();

// Add two data files with new spec
PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
data10Key.set(0, 0); // data=0
data10Key.set(1, 10); // id=10
DataFile data10 = DataFiles.builder(newSpec)
.withPath("/path/to/data-10.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartition(data10Key)
.build();
PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
data11Key.set(0, 1); // data=0
data10Key.set(1, 11); // id=11
DataFile data11 = DataFiles.builder(newSpec)
.withPath("/path/to/data-11.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartition(data11Key)
.build();

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();

Table metadataTable = new DataFilesTable(table.ops(), table);
Expression filter = Expressions.and(
Expressions.equal("partition.id", 10),
Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
CloseableIterable<FileScanTask> tasks = scan.planFiles();

// All 4 original data files written by old spec, plus one new data file written by new spec
Assert.assertEquals(5, Iterables.size(tasks));

filter = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
tasks = scan.planFiles();

// 1 original data file written by old spec, plus 1 new data file written by new spec
Assert.assertEquals(2, Iterables.size(tasks));
}

@Test
public void testPartitionSpecEvolutionAdditiveV2() {
Assume.assumeTrue(formatVersion == 2);
preparePartitionedTable();

// Change spec and add two data and delete files each
table.updateSpec()
.addField("id")
.commit();
PartitionSpec newSpec = table.spec();

// Add two data files and two delete files with new spec
DataFile data10 = DataFiles.builder(newSpec)
.withPath("/path/to/data-10.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0/id=10")
.build();
DataFile data11 = DataFiles.builder(newSpec)
.withPath("/path/to/data-11.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1/id=11")
.build();

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();

Table metadataTable = new DataFilesTable(table.ops(), table);
Expression filter = Expressions.and(
Expressions.equal("partition.id", 10),
Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
CloseableIterable<FileScanTask> tasks = scan.planFiles();

// All 4 original data files written by old spec, plus one new data file written by new spec
Assert.assertEquals(5, Iterables.size(tasks));

filter = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
tasks = scan.planFiles();

// 1 original data files written by old spec, plus 1 of new data file written by new spec
Assert.assertEquals(2, Iterables.size(tasks));
}

private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals) throws IOException {
try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
Expand Down
Loading