Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionField.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Serializable;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;

/**
* Represents a single field in a {@link PartitionSpec}.
Expand Down Expand Up @@ -67,6 +68,20 @@ public String name() {
return transform;
}

/**
* Returns true if this partition field is compatible with another partition field.
* <p>
* Partition fields are considered compatible if they have the same source ID, field ID and their
* transforms are equivalent or one of them is always producing nulls.
*/
boolean compatibleWith(PartitionField other) {
return sourceId == other.sourceId && fieldId == other.fieldId && compatibleTransforms(transform, other.transform);
}

private boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?> t2) {
return t1.equals(t2) || t1.equals(Transforms.alwaysNull()) || t2.equals(Transforms.alwaysNull());
}

@Override
public String toString() {
return fieldId + ": " + name + ": " + transform + "(" + sourceId + ")";
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;

Expand Down Expand Up @@ -56,8 +57,9 @@ public TableScan newScan() {

@Override
public Schema schema() {
Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
if (table().spec().fields().size() < 1) {
StructType partitionType = Partitioning.partitionType(table());
Schema schema = new Schema(DataFile.getType(partitionType).fields());
if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/AllEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;

Expand All @@ -55,8 +56,9 @@ public TableScan newScan() {

@Override
public Schema schema() {
Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
if (table().spec().fields().size() < 1) {
StructType partitionType = Partitioning.partitionType(table());
Schema schema = ManifestEntry.getSchema(partitionType);
if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;

/**
* A {@link Table} implementation that exposes a table's data files as rows.
Expand All @@ -48,8 +49,9 @@ public TableScan newScan() {

@Override
public Schema schema() {
Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
if (table().spec().fields().size() < 1) {
StructType partitionType = Partitioning.partitionType(table());
Schema schema = new Schema(DataFile.getType(partitionType).fields());
if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field
return TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.StructProjection;

/**
Expand All @@ -54,8 +55,9 @@ public TableScan newScan() {

@Override
public Schema schema() {
Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
if (table().spec().fields().size() < 1) {
StructType partitionType = Partitioning.partitionType(table());
Schema schema = ManifestEntry.getSchema(partitionType);
if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
Expand Down
57 changes: 57 additions & 0 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@

package org.apache.iceberg;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.transforms.PartitionSpecVisitor;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

public class Partitioning {
private Partitioning() {
Expand Down Expand Up @@ -177,4 +186,52 @@ public Void alwaysNull(int fieldId, String sourceName, int sourceId) {
return null;
}
}

/**
* Builds a common partition type for all specs in a table.
* <p>
* Whenever a table has multiple specs, the partition type is a struct containing
* all columns that have ever been a part of any spec in the table.
*
* @param table a table with one or many specs
* @return the constructed common partition type
*/
public static StructType partitionType(Table table) {
if (table.specs().size() == 1) {
return table.spec().partitionType();
}

Map<Integer, PartitionField> fieldMap = Maps.newHashMap();
List<NestedField> structFields = Lists.newArrayList();

// sort the spec IDs in descending order to pick up the most recent field names
List<Integer> specIds = table.specs().keySet().stream()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a sort by spec ID to make sure we pick up the most recent field name (see a dedicated test too).

.sorted(Collections.reverseOrder())
.collect(Collectors.toList());

for (Integer specId : specIds) {
PartitionSpec spec = table.specs().get(specId);

for (PartitionField field : spec.fields()) {
int fieldId = field.fieldId();
PartitionField existingField = fieldMap.get(fieldId);

if (existingField == null) {
fieldMap.put(fieldId, field);
NestedField structField = spec.partitionType().field(fieldId);
structFields.add(structField);
} else {
// verify the fields are compatible as they may conflict in v1 tables
ValidationException.check(field.compatibleWith(existingField),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably just make equivalentIgnoringName a private method in this class for this.

"Conflicting partition fields: ['%s', '%s']",
field, existingField);
}
}
}

List<NestedField> sortedStructFields = structFields.stream()
.sorted(Comparator.comparingInt(NestedField::fieldId))
.collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for sorting by fieldId.

return StructType.of(sortedStructFields);
}
}
196 changes: 196 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestPartitioning.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.apache.iceberg.types.Types.NestedField.required;

public class TestPartitioning {

private static final int V1_FORMAT_VERSION = 1;
private static final int V2_FORMAT_VERSION = 2;
private static final Schema SCHEMA = new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()),
required(3, "category", Types.StringType.get())
);

@Rule
public TemporaryFolder temp = new TemporaryFolder();
private File tableDir = null;

@Before
public void setupTableDir() throws IOException {
this.tableDir = temp.newFolder();
}

@After
public void cleanupTables() {
TestTables.clearTables();
}

@Test
public void testPartitionTypeWithSpecEvolutionInV1Tables() {
PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
.identity("data")
.build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);

table.updateSpec()
.addField(Expressions.bucket("category", 8))
.commit();

Assert.assertEquals("Should have 2 specs", 2, table.specs().size());

StructType expectedType = StructType.of(
NestedField.optional(1000, "data", Types.StringType.get()),
NestedField.optional(1001, "category_bucket_8", Types.IntegerType.get())
);
StructType actualType = Partitioning.partitionType(table);
Assert.assertEquals("Types must match", expectedType, actualType);
}

@Test
public void testPartitionTypeWithSpecEvolutionInV2Tables() {
PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
.identity("data")
.build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION);

table.updateSpec()
.removeField("data")
.addField("category")
.commit();

Assert.assertEquals("Should have 2 specs", 2, table.specs().size());

StructType expectedType = StructType.of(
NestedField.optional(1000, "data", Types.StringType.get()),
NestedField.optional(1001, "category", Types.StringType.get())
);
StructType actualType = Partitioning.partitionType(table);
Assert.assertEquals("Types must match", expectedType, actualType);
}

@Test
public void testPartitionTypeWithRenamesInV1Table() {
PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
.identity("data", "p1")
.build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);

table.updateSpec()
.addField("category")
.commit();

table.updateSpec()
.renameField("p1", "p2")
.commit();

StructType expectedType = StructType.of(
NestedField.optional(1000, "p2", Types.StringType.get()),
NestedField.optional(1001, "category", Types.StringType.get())
);
StructType actualType = Partitioning.partitionType(table);
Assert.assertEquals("Types must match", expectedType, actualType);
}

@Test
public void testPartitionTypeWithAddingBackSamePartitionFieldInV1Table() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test validates we ignore field names when building the common type. The original spec will have 1000:data and the last spec will have 1000:data_1000 as the old field was renamed to avoid naming conflicts.

PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
.identity("data")
.build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);

table.updateSpec()
.removeField("data")
.commit();

table.updateSpec()
.addField("data")
.commit();

// in v1, we use void transforms instead of dropping partition fields
StructType expectedType = StructType.of(
NestedField.optional(1000, "data_1000", Types.StringType.get()),
NestedField.optional(1001, "data", Types.StringType.get())
);
StructType actualType = Partitioning.partitionType(table);
Assert.assertEquals("Types must match", expectedType, actualType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could argue that adding data back should re-use the old ID. Not something to fix here, but we should probably fix it at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed wierd. However, I would not worry too much here as we have already stated not to rename and drop fields in v1 tables.

}

@Test
public void testPartitionTypeWithAddingBackSamePartitionFieldInV2Table() {
PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
.identity("data")
.build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION);

table.updateSpec()
.removeField("data")
.commit();

table.updateSpec()
.addField("data")
.commit();

// in v2, we should be able to reuse the original partition spec
StructType expectedType = StructType.of(
NestedField.optional(1000, "data", Types.StringType.get())
);
StructType actualType = Partitioning.partitionType(table);
Assert.assertEquals("Types must match", expectedType, actualType);
}

@Test
public void testPartitionTypeWithIncompatibleSpecEvolution() {
PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
.identity("data")
.build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);

PartitionSpec newSpec = PartitionSpec.builderFor(table.schema())
.identity("category")
.build();

TableOperations ops = ((HasTableOperations) table).operations();
TableMetadata current = ops.current();
ops.commit(current, current.updatePartitionSpec(newSpec));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make change for the method updatePartitionSpec to avoid conflicts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is hidden from users. The user-facing API is UpdatePartitionSpec accessible via Table. That one actually ensures we don't hit this case. The spec evolution in v1 tables is actually limited as described here.

There could be some tables where people evolved partitioning before the public API appeared. It is an edge case but this test ensures we get a reasonable exception for such tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Assert.assertEquals("Should have 2 specs", 2, table.specs().size());

AssertHelpers.assertThrows("Should complain about incompatible specs",
ValidationException.class, "Conflicting partition fields",
() -> Partitioning.partitionType(table));
}
}
Loading