Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;

/**
* A {@link Table} implementation that exposes a table's partitions as rows.
Expand Down Expand Up @@ -72,7 +71,7 @@ MetadataTableType metadataTableType() {

private DataTask task(StaticTableScan scan) {
TableOperations ops = operations();
Iterable<Partition> partitions = partitions(scan);
Iterable<Partition> partitions = partitions(table(), scan);
if (table().spec().fields().size() < 1) {
// the table is unpartitioned, partitions contains only the root partition
return StaticDataTask.of(
Expand All @@ -93,16 +92,57 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount, partition.specId);
}

private static Iterable<Partition> partitions(StaticTableScan scan) {
private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
CloseableIterable<FileScanTask> tasks = planFiles(scan);
Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
PartitionMap partitions = new PartitionMap();

// cache a position map needed by each partition spec to normalize partitions to final schema
Map<Integer, int[]> normalizedPositionsBySpec = Maps.newHashMapWithExpectedSize(table.specs().size());

PartitionMap partitions = new PartitionMap(scan.table().spec().partitionType());
for (FileScanTask task : tasks) {
partitions.get(task.file().partition()).update(task.file());
PartitionData original = (PartitionData) task.file().partition();
int[] normalizedPositions = normalizedPositionsBySpec.computeIfAbsent(
task.spec().specId(), specId -> normalizedPositions(table, specId, normalizedPartitionType));
PartitionData normalized = normalizePartition(original, normalizedPartitionType, normalizedPositions);
partitions.get(normalized).update(task.file());
}
return partitions.all();
}

/**
* Builds an array of the field position of positions in the normalized partition type indexed by
* field position in the original partition type
*/
private static int[] normalizedPositions(Table table, int specId, Types.StructType normalizedType) {
Types.StructType originalType = table.specs().get(specId).partitionType();
int[] normalizedPositions = new int[originalType.fields().size()];
for (int originalIndex = 0; originalIndex < originalType.fields().size(); originalIndex++) {
Types.NestedField normalizedField = normalizedType.field(originalType.fields().get(originalIndex).fieldId());
normalizedPositions[originalIndex] = normalizedType.fields().indexOf(normalizedField);
}
return normalizedPositions;
}

/**
* Convert a partition data written by an old spec, to table's normalized partition type, which is a common partition
* type for all specs of the table.
* @param originalPartition un-normalized partition data
* @param normalizedPartitionType table's normalized partition type {@link Partitioning#partitionType(Table)}
* @param normalizedPositions field positions in the normalized partition type indexed by field position in
* the original partition type
* @return the normalized partition data
*/
private static PartitionData normalizePartition(PartitionData originalPartition,
Types.StructType normalizedPartitionType,
int[] normalizedPositions) {
PartitionData normalizedPartition = new PartitionData(normalizedPartitionType);
for (int originalIndex = 0; originalIndex < originalPartition.size(); originalIndex++) {
normalizedPartition.put(normalizedPositions[originalIndex], originalPartition.get(originalIndex));
}
return normalizedPartition;
}

@VisibleForTesting
static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {
Table table = scan.table();
Expand Down Expand Up @@ -140,20 +180,13 @@ private class PartitionsScan extends StaticTableScan {
}

static class PartitionMap {
private final Map<StructLikeWrapper, Partition> partitions = Maps.newHashMap();
private final Types.StructType type;
private final StructLikeWrapper reused;

PartitionMap(Types.StructType type) {
this.type = type;
this.reused = StructLikeWrapper.forType(type);
}
private final Map<PartitionData, Partition> partitions = Maps.newHashMap();

Partition get(StructLike key) {
Partition partition = partitions.get(reused.set(key));
Partition get(PartitionData key) {
Partition partition = partitions.get(key);
if (partition == null) {
partition = new Partition(key);
partitions.put(StructLikeWrapper.forType(type).set(key), partition);
Copy link
Member

@RussellSpitzer RussellSpitzer Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this change, I believe the issue here is that StructType does not have a well defined hashFunction (since implementations can do whatever they like) which is why we use the Wrapper to make sure we have a valid hash. (and equals)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to map of PartitionData (I feel, it should have been that way in the beginning)

partitions.put(key, partition);
}
return partition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -57,6 +58,7 @@
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
import static org.apache.iceberg.MetadataTableType.ENTRIES;
import static org.apache.iceberg.MetadataTableType.FILES;
import static org.apache.iceberg.MetadataTableType.PARTITIONS;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;

Expand Down Expand Up @@ -378,6 +380,171 @@ public void testEntriesMetadataTable() throws ParseException {
tableType);
}
}
@Test
public void testPartitionsTableAddRemoveFields() throws ParseException {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " +
"TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
initTable();
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

// verify the metadata tables while the current spec is still unpartitioned
Dataset<Row> df = loadMetadataTable(PARTITIONS);
Assert.assertTrue("Partition must be skipped", df.schema().getFieldIndex("partition").isEmpty());

Table table = validationCatalog.loadTable(tableIdent);

table.updateSpec()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could just do this in SparkSQL if you like and skip the refresh, but this is fine too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will just keep it for now then, matches existing tests

.addField("data")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

// verify the metadata tables after adding the first partition column
assertPartitions(
ImmutableList.of(row(new Object[]{null}), row("d1"), row("d2")),
"STRUCT<data:STRING>",
PARTITIONS);

table.updateSpec()
.addField("category")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

// verify the metadata tables after adding the second partition column
assertPartitions(ImmutableList.of(
row(null, null),
row("d1", null),
row("d1", "c1"),
row("d2", null),
row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS);

// verify the metadata tables after removing the first partition column
table.updateSpec()
.removeField("data")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(
ImmutableList.of(
row(null, null),
row(null, "c1"),
row(null, "c2"),
row("d1", null),
row("d1", "c1"),
row("d2", null),
row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS);
}

@Test
public void testPartitionsTableRenameFields() throws ParseException {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " +
"TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
initTable();

Table table = validationCatalog.loadTable(tableIdent);

table.updateSpec()
.addField("data")
.addField("category")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(ImmutableList.of(
row("d1", "c1"),
row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS);

table.updateSpec()
.renameField("category", "category_another_name")
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(
ImmutableList.of(
row("d1", "c1"),
row("d2", "c2")),
"STRUCT<data:STRING,category_another_name:STRING>",
PARTITIONS);
}

@Test
public void testPartitionsTableSwitchFields() throws Exception {
// Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292
// In V1, dropped partition fields show separately when field is re-added
// In V2, re-added field currently conflicts with its deleted form
Assume.assumeTrue(formatVersion == 1);

sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
initTable();
Table table = validationCatalog.loadTable(tableIdent);

// verify the metadata tables after re-adding the first dropped column in the second location
table.updateSpec()
.addField("data")
.addField("category")
.commit();
sql("REFRESH TABLE %s", tableName);

sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(ImmutableList.of(
row("d1", "c1"),
row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS);

table.updateSpec()
.removeField("data")
.commit();
sql("REFRESH TABLE %s", tableName);

sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(
ImmutableList.of(
row(null, "c1"),
row(null, "c2"),
row("d1", "c1"),
row("d2", "c2")),
"STRUCT<data:STRING,category:STRING>",
PARTITIONS);

table.updateSpec()
.addField("data")
.commit();
sql("REFRESH TABLE %s", tableName);

sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);

assertPartitions(
ImmutableList.of(
row(null, "c1", null),
row(null, "c1", "d1"),
row(null, "c2", null),
row(null, "c2", "d2"),
row("d1", "c1", null),
row("d2", "c2", null)),
"STRUCT<data_1000:STRING,category:STRING,data:STRING>",
PARTITIONS);
}

@Test
public void testMetadataTablesWithUnknownTransforms() {
Expand Down Expand Up @@ -429,6 +596,7 @@ private void assertPartitions(List<Object[]> expectedPartitions, String expected

DataType expectedType = spark.sessionState().sqlParser().parseDataType(expectedTypeAsString);
switch (tableType) {
case PARTITIONS:
case FILES:
case ALL_DATA_FILES:
DataType actualFilesType = df.schema().apply("partition").dataType();
Expand All @@ -447,6 +615,7 @@ private void assertPartitions(List<Object[]> expectedPartitions, String expected
}

switch (tableType) {
case PARTITIONS:
case FILES:
case ALL_DATA_FILES:
List<Row> actualFilesPartitions = df.orderBy("partition")
Expand Down