Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions api/src/main/java/org/apache/iceberg/PartitionField.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
*/
public class PartitionField implements Serializable {
private final int sourceId;
private final int fieldId;
private final String name;
private final Transform<?, ?> transform;

PartitionField(int sourceId, String name, Transform<?, ?> transform) {
PartitionField(int sourceId, int fieldId, String name, Transform<?, ?> transform) {
this.sourceId = sourceId;
this.fieldId = fieldId;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check fieldId is larger than 1000?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. That is a convention that we use, but not strictly required by the spec.

this.name = name;
this.transform = transform;
}
Expand All @@ -44,6 +46,13 @@ public int sourceId() {
return sourceId;
}

/**
* @return the partition field id across all the table metadata's partition specs
*/
public int fieldId() {
return fieldId;
}

/**
* @return the name of this partition field
*/
Expand All @@ -60,7 +69,7 @@ public String name() {

@Override
public String toString() {
return name + ": " + transform + "(" + sourceId + ")";
return fieldId + ": " + name + ": " + transform + "(" + sourceId + ")";
}

@Override
Expand All @@ -73,12 +82,13 @@ public boolean equals(Object other) {

PartitionField that = (PartitionField) other;
return sourceId == that.sourceId &&
fieldId == that.fieldId &&
name.equals(that.name) &&
transform.equals(that.transform);
}

@Override
public int hashCode() {
return Objects.hashCode(sourceId, name, transform);
return Objects.hashCode(sourceId, fieldId, name, transform);
}
}
58 changes: 42 additions & 16 deletions api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.transforms.UnknownTransform;
Expand All @@ -47,7 +48,7 @@
* represented by a named {@link PartitionField}.
*/
public class PartitionSpec implements Serializable {
// start assigning IDs for partition fields at 1000
// IDs for partition fields start at 1000
private static final int PARTITION_DATA_ID_START = 1000;

private final Schema schema;
Expand All @@ -58,14 +59,16 @@ public class PartitionSpec implements Serializable {
private transient volatile ListMultimap<Integer, PartitionField> fieldsBySourceId = null;
private transient volatile Class<?>[] lazyJavaClasses = null;
private transient volatile List<PartitionField> fieldList = null;
private final int lastAssignedFieldId;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use TypeUtil::NextID?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tracking something different. Here, this is the highest ID assigned to any partition, so that the next ID assigned will be unique.


private PartitionSpec(Schema schema, int specId, List<PartitionField> fields) {
private PartitionSpec(Schema schema, int specId, List<PartitionField> fields, int lastAssignedFieldId) {
this.schema = schema;
this.specId = specId;
this.fields = new PartitionField[fields.size()];
for (int i = 0; i < this.fields.length; i += 1) {
this.fields[i] = fields.get(i);
}
this.lastAssignedFieldId = lastAssignedFieldId;
}

/**
Expand All @@ -89,6 +92,10 @@ public List<PartitionField> fields() {
return lazyFieldList();
}

int lastAssignedFieldId() {
return lastAssignedFieldId;
}

/**
* @param fieldId a field id from the source schema
* @return the {@link PartitionField field} that partitions the given source field
Expand All @@ -107,9 +114,8 @@ public StructType partitionType() {
PartitionField field = fields[i];
Type sourceType = schema.findType(field.sourceId());
Type resultType = field.transform().getResultType(sourceType);
// assign ids for partition fields starting at PARTITION_DATA_ID_START to leave room for data file's other fields
structFields.add(
Types.NestedField.optional(PARTITION_DATA_ID_START + i, field.name(), resultType));
Types.NestedField.optional(field.fieldId(), field.name(), resultType));
}

return Types.StructType.of(structFields);
Expand Down Expand Up @@ -168,8 +174,8 @@ public String partitionToPath(StructLike data) {
}

/**
* Returns true if this spec is equivalent to the other, with field names ignored. That is, if
* both specs have the same number of fields, field order, source columns, and transforms.
* Returns true if this spec is equivalent to the other, with field names and partition field ids ignored.
* That is, if both specs have the same number of fields, field order, source columns, and transforms.
*
* @param other another PartitionSpec
* @return true if the specs have the same fields, source columns, and transforms.
Expand Down Expand Up @@ -275,7 +281,7 @@ public String toString() {
}

private static final PartitionSpec UNPARTITIONED_SPEC =
new PartitionSpec(new Schema(), 0, ImmutableList.of());
new PartitionSpec(new Schema(), 0, ImmutableList.of(), PARTITION_DATA_ID_START - 1);

/**
* Returns a spec for unpartitioned tables.
Expand Down Expand Up @@ -307,11 +313,16 @@ public static class Builder {
private final Set<String> partitionNames = Sets.newHashSet();
private Map<Integer, PartitionField> timeFields = Maps.newHashMap();
private int specId = 0;
private final AtomicInteger lastAssignedFieldId = new AtomicInteger(PARTITION_DATA_ID_START - 1);

private Builder(Schema schema) {
this.schema = schema;
}

private int nextFieldId() {
return lastAssignedFieldId.incrementAndGet();
}

private void checkAndAddPartitionName(String name) {
checkAndAddPartitionName(name, null);
}
Expand Down Expand Up @@ -357,7 +368,7 @@ Builder identity(String sourceName, String targetName) {
Types.NestedField sourceColumn = findSourceColumn(sourceName);
checkAndAddPartitionName(targetName, sourceColumn.fieldId());
fields.add(new PartitionField(
sourceColumn.fieldId(), targetName, Transforms.identity(sourceColumn.type())));
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.identity(sourceColumn.type())));
return this;
}

Expand All @@ -369,7 +380,7 @@ public Builder year(String sourceName, String targetName) {
checkAndAddPartitionName(targetName);
Types.NestedField sourceColumn = findSourceColumn(sourceName);
PartitionField field = new PartitionField(
sourceColumn.fieldId(), targetName, Transforms.year(sourceColumn.type()));
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.year(sourceColumn.type()));
checkForRedundantPartitions(field);
fields.add(field);
return this;
Expand All @@ -383,7 +394,7 @@ public Builder month(String sourceName, String targetName) {
checkAndAddPartitionName(targetName);
Types.NestedField sourceColumn = findSourceColumn(sourceName);
PartitionField field = new PartitionField(
sourceColumn.fieldId(), targetName, Transforms.month(sourceColumn.type()));
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.month(sourceColumn.type()));
checkForRedundantPartitions(field);
fields.add(field);
return this;
Expand All @@ -397,7 +408,7 @@ public Builder day(String sourceName, String targetName) {
checkAndAddPartitionName(targetName);
Types.NestedField sourceColumn = findSourceColumn(sourceName);
PartitionField field = new PartitionField(
sourceColumn.fieldId(), targetName, Transforms.day(sourceColumn.type()));
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.day(sourceColumn.type()));
checkForRedundantPartitions(field);
fields.add(field);
return this;
Expand All @@ -411,7 +422,7 @@ public Builder hour(String sourceName, String targetName) {
checkAndAddPartitionName(targetName);
Types.NestedField sourceColumn = findSourceColumn(sourceName);
PartitionField field = new PartitionField(
sourceColumn.fieldId(), targetName, Transforms.hour(sourceColumn.type()));
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.hour(sourceColumn.type()));
checkForRedundantPartitions(field);
fields.add(field);
return this;
Expand All @@ -425,7 +436,7 @@ public Builder bucket(String sourceName, int numBuckets, String targetName) {
checkAndAddPartitionName(targetName);
Types.NestedField sourceColumn = findSourceColumn(sourceName);
fields.add(new PartitionField(
sourceColumn.fieldId(), targetName, Transforms.bucket(sourceColumn.type(), numBuckets)));
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.bucket(sourceColumn.type(), numBuckets)));
return this;
}

Expand All @@ -437,24 +448,30 @@ public Builder truncate(String sourceName, int width, String targetName) {
checkAndAddPartitionName(targetName);
Types.NestedField sourceColumn = findSourceColumn(sourceName);
fields.add(new PartitionField(
sourceColumn.fieldId(), targetName, Transforms.truncate(sourceColumn.type(), width)));
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.truncate(sourceColumn.type(), width)));
return this;
}

public Builder truncate(String sourceName, int width) {
return truncate(sourceName, width, sourceName + "_trunc");
}

// add a partition field with an auto-increment partition field id starting from PARTITION_DATA_ID_START
Builder add(int sourceId, String name, String transform) {
return add(sourceId, nextFieldId(), name, transform);
}

Builder add(int sourceId, int fieldId, String name, String transform) {
Types.NestedField column = schema.findField(sourceId);
checkAndAddPartitionName(name, column.fieldId());
Preconditions.checkNotNull(column, "Cannot find source column: %s", sourceId);
fields.add(new PartitionField(sourceId, name, Transforms.fromString(column.type(), transform)));
fields.add(new PartitionField(sourceId, fieldId, name, Transforms.fromString(column.type(), transform)));
lastAssignedFieldId.getAndAccumulate(fieldId, Math::max);
return this;
}

public PartitionSpec build() {
PartitionSpec spec = new PartitionSpec(schema, specId, fields);
PartitionSpec spec = new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get());
checkCompatibility(spec, schema);
return spec;
}
Expand All @@ -473,4 +490,13 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) {
sourceType, field.transform());
}
}

static boolean hasSequentialIds(PartitionSpec spec) {
for (int i = 0; i < spec.fields.length; i += 1) {
if (spec.fields[i].fieldId() != PARTITION_DATA_ID_START + i) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ public void testMultipleDatePartitionsWithDifferentSourceColumns() {
PartitionSpec.builderFor(SCHEMA).hour("d").hour("another_d").build();
}


@Test
public void testSettingPartitionTransformsWithCustomTargetNames() {
Assert.assertEquals(PartitionSpec.builderFor(SCHEMA).year("ts", "custom_year")
Expand Down Expand Up @@ -205,4 +204,48 @@ public void testMissingSourceColumn() {
IllegalArgumentException.class, "Cannot find source column",
() -> PartitionSpec.builderFor(SCHEMA).identity("missing").build());
}

@Test
public void testAutoSettingPartitionFieldIds() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.year("ts", "custom_year")
.bucket("ts", 4, "custom_bucket")
.add(1, "id_partition2", "bucket[4]")
.truncate("s", 1, "custom_truncate")
.build();

Assert.assertEquals(1000, spec.fields().get(0).fieldId());
Assert.assertEquals(1001, spec.fields().get(1).fieldId());
Assert.assertEquals(1002, spec.fields().get(2).fieldId());
Assert.assertEquals(1003, spec.fields().get(3).fieldId());
Assert.assertEquals(1003, spec.lastAssignedFieldId());
}

@Test
public void testAddPartitionFieldsWithFieldIds() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.add(1, 1005, "id_partition1", "bucket[4]")
.add(1, 1006, "id_partition2", "bucket[5]")
.add(1, 1002, "id_partition3", "bucket[6]")
.build();

Assert.assertEquals(1005, spec.fields().get(0).fieldId());
Assert.assertEquals(1006, spec.fields().get(1).fieldId());
Assert.assertEquals(1002, spec.fields().get(2).fieldId());
Assert.assertEquals(1006, spec.lastAssignedFieldId());
}

@Test
public void testAddPartitionFieldsWithAndWithoutFieldIds() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.add(1, "id_partition2", "bucket[5]")
.add(1, 1005, "id_partition1", "bucket[4]")
.truncate("s", 1, "custom_truncate")
.build();

Assert.assertEquals(1000, spec.fields().get(0).fieldId());
Assert.assertEquals(1005, spec.fields().get(1).fieldId());
Assert.assertEquals(1006, spec.fields().get(2).fieldId());
Assert.assertEquals(1006, spec.lastAssignedFieldId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void testTransforms() throws Exception {
PartitionSpec.builderFor(schema).truncate("dec", 10).build(),
PartitionSpec.builderFor(schema).truncate("s", 10).build(),
PartitionSpec.builderFor(schema).add(6, "dec_unsupported", "unsupported").build(),
PartitionSpec.builderFor(schema).add(6, 1111, "dec_unsupported", "unsupported").build(),
};

for (PartitionSpec spec : specs) {
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/iceberg/PartitionSpecParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private PartitionSpecParser() {
private static final String SPEC_ID = "spec-id";
private static final String FIELDS = "fields";
private static final String SOURCE_ID = "source-id";
private static final String FIELD_ID = "field-id";
private static final String TRANSFORM = "transform";
private static final String NAME = "name";

Expand Down Expand Up @@ -101,6 +102,7 @@ static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOE
generator.writeStringField(NAME, field.name());
generator.writeStringField(TRANSFORM, field.transform().toString());
generator.writeNumberField(SOURCE_ID, field.sourceId());
generator.writeNumberField(FIELD_ID, field.fieldId());
generator.writeEndObject();
}
generator.writeEndArray();
Expand Down Expand Up @@ -138,6 +140,7 @@ private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode
"Cannot parse partition spec fields, not an array: %s", json);

Iterator<JsonNode> elements = json.elements();
int fieldIdCount = 0;
while (elements.hasNext()) {
JsonNode element = elements.next();
Preconditions.checkArgument(element.isObject(),
Expand All @@ -147,7 +150,17 @@ private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode
String transform = JsonUtil.getString(TRANSFORM, element);
int sourceId = JsonUtil.getInt(SOURCE_ID, element);

builder.add(sourceId, name, transform);
// partition field ids are missing in old PartitionSpec, they always auto-increment from PARTITION_DATA_ID_START
if (element.has(FIELD_ID)) {
builder.add(sourceId, JsonUtil.getInt(FIELD_ID, element), name, transform);
fieldIdCount++;
} else {
builder.add(sourceId, name, transform);
}
}

Preconditions.checkArgument(fieldIdCount == 0 || fieldIdCount == json.size(),
"Cannot parse spec with missing field IDs: %s missing of %s fields.",
json.size() - fieldIdCount, json.size());
}
}
Loading