-
Notifications
You must be signed in to change notification settings - Fork 3k
Add persistent IDs to partition fields (WIP) #499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,11 +28,13 @@ | |
| */ | ||
| public class PartitionField implements Serializable { | ||
| private final int sourceId; | ||
| private final int id; | ||
| private final String name; | ||
| private final Transform<?, ?> transform; | ||
|
|
||
| PartitionField(int sourceId, String name, Transform<?, ?> transform) { | ||
| PartitionField(int sourceId, int id, String name, Transform<?, ?> transform) { | ||
| this.sourceId = sourceId; | ||
| this.id = id; | ||
| this.name = name; | ||
| this.transform = transform; | ||
| } | ||
|
|
@@ -44,6 +46,13 @@ public int sourceId() { | |
| return sourceId; | ||
| } | ||
|
|
||
| /** | ||
| * @return the field id of the source field in the {@link PartitionSpec spec's} table schema | ||
| */ | ||
| public int fieldId() { | ||
| return id; | ||
| } | ||
|
|
||
| /** | ||
| * @return the name of this partition field | ||
| */ | ||
|
|
@@ -71,7 +80,7 @@ public boolean equals(Object other) { | |
| if (other == null || getClass() != other.getClass()) { | ||
| return false; | ||
| } | ||
|
|
||
| // not considering field id, as field-id will be reused. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: adding this comment removed a spacing line. Could you add it back? |
||
| PartitionField that = (PartitionField) other; | ||
| return sourceId == that.sourceId && | ||
| name.equals(that.name) && | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,8 +48,7 @@ | |
| * represented by a named {@link PartitionField}. | ||
| */ | ||
| public class PartitionSpec implements Serializable { | ||
| // start assigning IDs for partition fields at 1000 | ||
| private static final int PARTITION_DATA_ID_START = 1000; | ||
| public static final int PARTITION_DATA_ID_START = 1000; | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to be public or can it be package-private?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, why remove the comment? |
||
|
|
||
| private final Schema schema; | ||
|
|
||
|
|
@@ -109,9 +108,8 @@ public Types.StructType partitionType() { | |
| PartitionField field = fields[i]; | ||
| Type sourceType = schema.findType(field.sourceId()); | ||
| Type resultType = field.transform().getResultType(sourceType); | ||
| // assign ids for partition fields starting at PARTITION_DATA_ID_START to leave room for data file's other fields | ||
| structFields.add( | ||
| Types.NestedField.optional(PARTITION_DATA_ID_START + i, field.name(), resultType)); | ||
| Types.NestedField.optional(field.fieldId(), field.name(), resultType)); | ||
| } | ||
|
|
||
| return Types.StructType.of(structFields); | ||
|
|
@@ -326,11 +324,17 @@ public static class Builder { | |
| private final Set<String> partitionNames = Sets.newHashSet(); | ||
| private Map<Integer, PartitionField> timeFields = Maps.newHashMap(); | ||
| private int specId = 0; | ||
| private int currentPartitionFieldId = PARTITION_DATA_ID_START - 1; | ||
|
|
||
| private Builder(Schema schema) { | ||
| this.schema = schema; | ||
| } | ||
|
|
||
| private int incrementAndGetPartitionFieldId() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about |
||
| currentPartitionFieldId = currentPartitionFieldId + 1; | ||
| return currentPartitionFieldId; | ||
| } | ||
|
|
||
| private void checkAndAddPartitionName(String name) { | ||
| checkAndAddPartitionName(name, null); | ||
| } | ||
|
|
@@ -376,7 +380,8 @@ Builder identity(String sourceName, String targetName) { | |
| Types.NestedField sourceColumn = findSourceColumn(sourceName); | ||
| checkAndAddPartitionName(targetName, sourceColumn.fieldId()); | ||
| fields.add(new PartitionField( | ||
| sourceColumn.fieldId(), targetName, Transforms.identity(sourceColumn.type()))); | ||
| sourceColumn.fieldId(), incrementAndGetPartitionFieldId(), targetName, | ||
| Transforms.identity(sourceColumn.type()))); | ||
| return this; | ||
| } | ||
|
|
||
|
|
@@ -388,7 +393,7 @@ public Builder year(String sourceName, String targetName) { | |
| checkAndAddPartitionName(targetName); | ||
| Types.NestedField sourceColumn = findSourceColumn(sourceName); | ||
| PartitionField field = new PartitionField( | ||
| sourceColumn.fieldId(), targetName, Transforms.year(sourceColumn.type())); | ||
| sourceColumn.fieldId(), incrementAndGetPartitionFieldId(), targetName, Transforms.year(sourceColumn.type())); | ||
| checkForRedundantPartitions(field); | ||
| fields.add(field); | ||
| return this; | ||
|
|
@@ -402,7 +407,7 @@ public Builder month(String sourceName, String targetName) { | |
| checkAndAddPartitionName(targetName); | ||
| Types.NestedField sourceColumn = findSourceColumn(sourceName); | ||
| PartitionField field = new PartitionField( | ||
| sourceColumn.fieldId(), targetName, Transforms.month(sourceColumn.type())); | ||
| sourceColumn.fieldId(), incrementAndGetPartitionFieldId(), targetName, Transforms.month(sourceColumn.type())); | ||
| checkForRedundantPartitions(field); | ||
| fields.add(field); | ||
| return this; | ||
|
|
@@ -416,7 +421,7 @@ public Builder day(String sourceName, String targetName) { | |
| checkAndAddPartitionName(targetName); | ||
| Types.NestedField sourceColumn = findSourceColumn(sourceName); | ||
| PartitionField field = new PartitionField( | ||
| sourceColumn.fieldId(), targetName, Transforms.day(sourceColumn.type())); | ||
| sourceColumn.fieldId(), incrementAndGetPartitionFieldId(), targetName, Transforms.day(sourceColumn.type())); | ||
| checkForRedundantPartitions(field); | ||
| fields.add(field); | ||
| return this; | ||
|
|
@@ -430,7 +435,7 @@ public Builder hour(String sourceName, String targetName) { | |
| checkAndAddPartitionName(targetName); | ||
| Types.NestedField sourceColumn = findSourceColumn(sourceName); | ||
| PartitionField field = new PartitionField( | ||
| sourceColumn.fieldId(), targetName, Transforms.hour(sourceColumn.type())); | ||
| sourceColumn.fieldId(), incrementAndGetPartitionFieldId(), targetName, Transforms.hour(sourceColumn.type())); | ||
| checkForRedundantPartitions(field); | ||
| fields.add(field); | ||
| return this; | ||
|
|
@@ -444,7 +449,8 @@ public Builder bucket(String sourceName, int numBuckets, String targetName) { | |
| checkAndAddPartitionName(targetName); | ||
| Types.NestedField sourceColumn = findSourceColumn(sourceName); | ||
| fields.add(new PartitionField( | ||
| sourceColumn.fieldId(), targetName, Transforms.bucket(sourceColumn.type(), numBuckets))); | ||
| sourceColumn.fieldId(), incrementAndGetPartitionFieldId(), targetName, | ||
| Transforms.bucket(sourceColumn.type(), numBuckets))); | ||
| return this; | ||
| } | ||
|
|
||
|
|
@@ -456,19 +462,20 @@ public Builder truncate(String sourceName, int width, String targetName) { | |
| checkAndAddPartitionName(targetName); | ||
| Types.NestedField sourceColumn = findSourceColumn(sourceName); | ||
| fields.add(new PartitionField( | ||
| sourceColumn.fieldId(), targetName, Transforms.truncate(sourceColumn.type(), width))); | ||
| sourceColumn.fieldId(), incrementAndGetPartitionFieldId(), targetName, | ||
| Transforms.truncate(sourceColumn.type(), width))); | ||
| return this; | ||
| } | ||
|
|
||
| public Builder truncate(String sourceName, int width) { | ||
| return truncate(sourceName, width, sourceName + "_trunc"); | ||
| } | ||
|
|
||
| Builder add(int sourceId, String name, String transform) { | ||
| Builder add(int sourceId, int partitionFieldId, String name, String transform) { | ||
| Types.NestedField column = schema.findField(sourceId); | ||
| checkAndAddPartitionName(name, column.fieldId()); | ||
| Preconditions.checkNotNull(column, "Cannot find source column: %d", sourceId); | ||
| fields.add(new PartitionField(sourceId, name, Transforms.fromString(column.type(), transform))); | ||
| fields.add(new PartitionField(sourceId, partitionFieldId, name, Transforms.fromString(column.type(), transform))); | ||
| return this; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ private PartitionSpecParser() { | |
| private static final String SPEC_ID = "spec-id"; | ||
| private static final String FIELDS = "fields"; | ||
| private static final String SOURCE_ID = "source-id"; | ||
| private static final String FIELD_ID = "field-id"; | ||
| private static final String TRANSFORM = "transform"; | ||
| private static final String NAME = "name"; | ||
|
|
||
|
|
@@ -101,6 +102,7 @@ static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOE | |
| generator.writeStringField(NAME, field.name()); | ||
| generator.writeStringField(TRANSFORM, field.transform().toString()); | ||
| generator.writeNumberField(SOURCE_ID, field.sourceId()); | ||
| generator.writeNumberField(FIELD_ID, field.fieldId()); | ||
| generator.writeEndObject(); | ||
| } | ||
| generator.writeEndArray(); | ||
|
|
@@ -138,6 +140,8 @@ private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode | |
| "Cannot parse partition spec fields, not an array: %s", json); | ||
|
|
||
| Iterator<JsonNode> elements = json.elements(); | ||
|
|
||
| int partitionFieldId = PartitionSpec.PARTITION_DATA_ID_START - 1; | ||
| while (elements.hasNext()) { | ||
| JsonNode element = elements.next(); | ||
| Preconditions.checkArgument(element.isObject(), | ||
|
|
@@ -146,8 +150,13 @@ private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode | |
| String name = JsonUtil.getString(NAME, element); | ||
| String transform = JsonUtil.getString(TRANSFORM, element); | ||
| int sourceId = JsonUtil.getInt(SOURCE_ID, element); | ||
|
|
||
| builder.add(sourceId, name, transform); | ||
| // to handle the backward compatibility where partitionFieldId was not part of the partitionSpec schema. | ||
| if (element.has(FIELD_ID)) { | ||
| partitionFieldId = JsonUtil.getInt(FIELD_ID, element); | ||
| } else { | ||
| partitionFieldId = partitionFieldId + 1; | ||
| } | ||
| builder.add(sourceId, partitionFieldId, name, transform); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems odd that I'd prefer to keep the logic for those cases separate to make this easier to follow. It isn't a good practice to rely on a hidden assumption that either all fields have ids or none do. |
||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ID will be reused, but assignment is consistent because we assume that partition specs are not modified before the addition of partition field IDs. That means that tables start with only one spec that might not have IDs. Because we assign incrementally, IDs will always match when assigned using the default (1000, 1001, etc.).
Because we do have consistent IDs, I think this should check field ID here.