Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

package org.apache.iceberg;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.Map;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -149,14 +150,15 @@ static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {
Snapshot snapshot = table.snapshot(scan.snapshot().snapshotId());
boolean caseSensitive = scan.isCaseSensitive();

// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
Expression partitionFilter = Projections
.inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
.project(scan.filter());
LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need the full LoadingCache here, but I'm ok with it if you like, we could probably just proactively build the full set of evaluators for all specs in the metadata. I probably should have suggested this before on the other PR as well. Since we know we will need every single evaluator

Copy link
Member Author

@szehon-ho szehon-ho Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed, at this point we keep all partition specs so this will save a cycle if we have some spec without any manifests.

PartitionSpec spec = table.specs().get(specId);
PartitionSpec transformedSpec = transformSpec(scan.schema(), spec);
return ManifestEvaluator.forRowFilter(scan.filter(), transformedSpec, caseSensitive);
});

ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
.caseSensitive(caseSensitive)
.filterPartitions(partitionFilter)
.filterManifests(m -> evalCache.get(m.partitionSpecId()).eval(m))
.select(scan.colStats() ? DataTableScan.SCAN_WITH_STATS_COLUMNS : DataTableScan.SCAN_COLUMNS)
.specsById(scan.table().specs())
.ignoreDeleted();
Expand Down
120 changes: 120 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,126 @@ public void testDeleteFilesTableSelection() throws IOException {
Assert.assertEquals(expected, scan.schema().asStruct());
}

@Test
public void testPartitionSpecEvolutionAdditive() {
preparePartitionedTable();

// Change spec and add two data files
table.updateSpec()
.addField("id")
.commit();
PartitionSpec newSpec = table.spec();

// Add two data files with new spec
PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
data10Key.set(0, 0); // data=0
data10Key.set(1, 10); // id=10
DataFile data10 = DataFiles.builder(newSpec)
.withPath("/path/to/data-10.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartition(data10Key)
.build();
PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
data11Key.set(0, 1); // data=0
data10Key.set(1, 11); // id=11
DataFile data11 = DataFiles.builder(newSpec)
.withPath("/path/to/data-11.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartition(data11Key)
.build();

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need two commits here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea , it messes up the test a little bit as it combines into one manifest (as the test is a bit low level and depends on how many manifestReadTask get spawned)


Table metadataTable = new PartitionsTable(table.ops(), table);
Expression filter = Expressions.and(
Expressions.equal("partition.id", 10),
Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
CloseableIterable<FileScanTask> tasks = PartitionsTable.planFiles((StaticTableScan) scan);

// Four data files of old spec, one new data file of new spec
Assert.assertEquals(5, Iterables.size(tasks));

filter = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
tasks = PartitionsTable.planFiles((StaticTableScan) scan);

// 1 original data file written by old spec, plus 1 new data file written by new spec
Assert.assertEquals(2, Iterables.size(tasks));
}

@Test
public void testPartitionSpecEvolutionRemoval() {
preparePartitionedTable();

// Remove partition field
table.updateSpec()
.removeField(Expressions.bucket("data", 16))
.addField("id")
.commit();
PartitionSpec newSpec = table.spec();

// Add two data files with new spec
// Partition Fields are replaced in V1 with void and actually removed in V2
int partIndex = (formatVersion == 1) ? 1 : 0;
PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
data10Key.set(partIndex, 10);
DataFile data10 = DataFiles.builder(newSpec)
.withPath("/path/to/data-10.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartition(data10Key)
.build();
PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
data11Key.set(partIndex, 11);
DataFile data11 = DataFiles.builder(newSpec)
.withPath("/path/to/data-11.parquet")
.withRecordCount(10)
.withFileSizeInBytes(10)
.withPartition(data11Key)
.build();

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();

Table metadataTable = new PartitionsTable(table.ops(), table);
Expression filter = Expressions.and(
Expressions.equal("partition.id", 10),
Expressions.greaterThan("record_count", 0));
TableScan scan = metadataTable.newScan().filter(filter);
CloseableIterable<FileScanTask> tasks = PartitionsTable.planFiles((StaticTableScan) scan);

// Four original files of original spec, one data file written by new spec
Assert.assertEquals(5, Iterables.size(tasks));

// Filter for a dropped partition spec field. Correct behavior is that only old partitions are returned.
filter = Expressions.and(
Expressions.equal("partition.data_bucket", 0),
Expressions.greaterThan("record_count", 0));
scan = metadataTable.newScan().filter(filter);
tasks = PartitionsTable.planFiles((StaticTableScan) scan);

if (formatVersion == 1) {
// 1 original data file written by old spec
Assert.assertEquals(1, Iterables.size(tasks));
} else {
// 1 original data/delete files written by old spec, plus both of new data file/delete file written by new spec
//
// Unlike in V1, V2 does not write (data=null) on newer files' partition data, so these cannot be filtered out
// early in scan planning here.
//
// However, these partition rows are filtered out later in Spark data filtering, as the newer partitions
// will have 'data=null' field added as part of normalization to the Partitions table final schema.
// The Partitions table final schema is a union of fields of all specs, including dropped fields.
Assert.assertEquals(3, Iterables.size(tasks));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this, I would have thought the filter would be used with the correct column and give us the same result as the v1 table?

Copy link
Member Author

@szehon-ho szehon-ho May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify the comment, can you see if it makes sense?

It's a bit confusing, but the background is, this occurs when trying to query with filter on a dropped partition field (data). The correct behavior, because this is a partition table, is that only partitions of old spec are returned and partitions of new spec without data should not be returned.

In V1, new files are written with void transform for the dropped field (data=null), so the predicate pushdown can filter them out early.

In V2 new files do not write any values for data, so predicate pushdown cannot filter them out early.

However, they are filtered out later by Spark data filtering, because the partition values are normalized to the Partioning,partitionType (union of all specs), and old field "data" is filled in as 'null' when returning to Spark. (That was done in #4560).

This is shown in the new test added added in TestMetadataTablesWithPartitionEvolution

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering if we need some kind of special filter, if you have a predicate on a column not present in the spec just return cannot match

Copy link
Member Author

@szehon-ho szehon-ho May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea i was thinking about it, but as we rely on existing ManifestEvaluator , it seems a bit heavy to implement a ManifestEvaluator only for this case (improving the perf for querying dropped partition fields in a metadata table), and also its a bit risky ( if we cannot definitively say a partition value matches or not, I feel safer not filtering), as there's bugs in the past : #4520

}
}

@Test
public void testPartitionColumnNamedPartition() throws Exception {
TestTables.clearTables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,7 @@ public void testEntriesMetadataTable() throws ParseException {
}
@Test
public void testPartitionsTableAddRemoveFields() throws ParseException {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " +
"TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg ", tableName);
initTable();
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
Expand Down Expand Up @@ -447,8 +446,7 @@ public void testPartitionsTableAddRemoveFields() throws ParseException {

@Test
public void testPartitionsTableRenameFields() throws ParseException {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " +
"TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
initTable();

Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -546,6 +544,154 @@ public void testPartitionsTableSwitchFields() throws Exception {
PARTITIONS);
}

@Test
public void testPartitionTableFilterAddRemoveFields() throws ParseException {
// Create un-partitioned table
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
initTable();

sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

// Partition Table with one partition column
Table table = validationCatalog.loadTable(tableIdent);
table.updateSpec()
.addField("data")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(
ImmutableList.of(row("d2")),
"STRUCT<data:STRING>",
PARTITIONS,
"partition.data = 'd2'");

// Partition Table with two partition column
table.updateSpec()
.addField("category")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(ImmutableList.of(row("d2", null), row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS,
"partition.data = 'd2'");
assertPartitions(
ImmutableList.of(row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS,
"partition.category = 'c2'");

// Partition Table with first partition column removed
table.updateSpec()
.removeField("data")
.commit();
sql("REFRESH TABLE %s", tableName);

sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd2')", tableName);
sql("INSERT INTO TABLE %s VALUES (4, 'c4', 'd2')", tableName);
sql("INSERT INTO TABLE %s VALUES (5, 'c2', 'd5')", tableName);
assertPartitions(
ImmutableList.of(row("d2", null), row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS,
"partition.data = 'd2'");
assertPartitions(
ImmutableList.of(row(null, "c2"), row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS,
"partition.category = 'c2'");
}

@Test
public void testPartitionTableFilterSwitchFields() throws Exception {
// Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292
// In V1, dropped partition fields show separately when field is re-added
// In V2, re-added field currently conflicts with its deleted form
Assume.assumeTrue(formatVersion == 1);

sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
initTable();
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
Table table = validationCatalog.loadTable(tableIdent);

// Two partition columns
table.updateSpec()
.addField("data")
.addField("category")
.commit();
sql("REFRESH TABLE %s", tableName);

sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

// Drop first partition column
table.updateSpec()
.removeField("data")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

// Re-add first partition column at the end
table.updateSpec()
.addField("data")
.commit();
sql("REFRESH TABLE %s", tableName);

sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(
ImmutableList.of(
row(null, "c2", null),
row(null, "c2", "d2"),
row("d2", "c2", null)),
"STRUCT<data_1000:STRING,category:STRING,data:STRING>",
PARTITIONS,
"partition.category = 'c2'");

assertPartitions(
ImmutableList.of(row(null, "c1", "d1")),
"STRUCT<data_1000:STRING,category:STRING,data:STRING>",
PARTITIONS,
"partition.data = 'd1'");
}

@Test
public void testPartitionsTableFilterRenameFields() throws ParseException {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
initTable();

Table table = validationCatalog.loadTable(tableIdent);

table.updateSpec()
.addField("data")
.addField("category")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

table.updateSpec()
.renameField("category", "category_another_name")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(
ImmutableList.of(row("d1", "c1")),
"STRUCT<data:STRING,category_another_name:STRING>",
PARTITIONS,
"partition.category_another_name = 'c1'");
}

@Test
public void testMetadataTablesWithUnknownTransforms() {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
Expand Down