Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions api/src/main/java/com/netflix/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ public class PartitionSpec implements Serializable {
private final Schema schema;

// this is ordered so that DataFile has a consistent schema
private final int specId;
private final PartitionField[] fields;
private transient Map<Integer, PartitionField> fieldsBySourceId = null;
private transient Map<String, PartitionField> fieldsByName = null;
private transient Class<?>[] javaClasses = null;
private transient List<PartitionField> fieldList = null;

private PartitionSpec(Schema schema, List<PartitionField> fields) {
private PartitionSpec(Schema schema, int specId, List<PartitionField> fields) {
this.schema = schema;
this.specId = specId;
this.fields = new PartitionField[fields.size()];
for (int i = 0; i < this.fields.length; i += 1) {
this.fields[i] = fields.get(i);
Expand All @@ -71,6 +73,13 @@ public Schema schema() {
return schema;
}

/**
* @return the ID of this spec
*/
public int specId() {
return specId;
}

/**
* @return the list of {@link PartitionField partition fields} for this spec.
*/
Expand Down Expand Up @@ -146,6 +155,13 @@ public String partitionToPath(StructLike data) {
return sb.toString();
}

/**
* Returns true if this spec is equivalent to the other, with field names ignored. That is, if
* both specs have the same number of fields, field order, source columns, and transforms.
*
* @param other another PartitionSpec
* @return true if the specs have the same fields, source columns, and transforms.
*/
public boolean compatibleWith(PartitionSpec other) {
if (equals(other)) {
return true;
Expand Down Expand Up @@ -177,6 +193,9 @@ public boolean equals(Object other) {
}

PartitionSpec that = (PartitionSpec) other;
if (this.specId != that.specId) {
return false;
}
return Arrays.equals(fields, that.fields);
}

Expand Down Expand Up @@ -250,7 +269,7 @@ public String toString() {
}

private static final PartitionSpec UNPARTITIONED_SPEC =
new PartitionSpec(new Schema(), ImmutableList.of());
new PartitionSpec(new Schema(), 0, ImmutableList.of());

/**
* Returns a spec for unpartitioned tables.
Expand Down Expand Up @@ -280,6 +299,7 @@ public static class Builder {
private final Schema schema;
private final List<PartitionField> fields = Lists.newArrayList();
private final Set<String> partitionNames = Sets.newHashSet();
private int specId = 0;

private Builder(Schema schema) {
this.schema = schema;
Expand All @@ -293,6 +313,11 @@ private void checkAndAddPartitionName(String name) {
partitionNames.add(name);
}

public Builder withSpecId(int specId) {
this.specId = specId;
return this;
}

private Types.NestedField findSourceColumn(String sourceName) {
Types.NestedField sourceColumn = schema.findField(sourceName);
Preconditions.checkNotNull(sourceColumn, "Cannot find source column: %s", sourceName);
Expand Down Expand Up @@ -371,7 +396,7 @@ public Builder add(int sourceId, String name, String transform) {
}

public PartitionSpec build() {
PartitionSpec spec = new PartitionSpec(schema, fields);
PartitionSpec spec = new PartitionSpec(schema, specId, fields);
checkCompatibility(spec, schema);
return spec;
}
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/com/netflix/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ private ManifestReader(InputFile file) {
throw new RuntimeIOException(e);
}
this.schema = SchemaParser.fromJson(metadata.get("schema"));
this.spec = PartitionSpecParser.fromJson(schema, metadata.get("partition-spec"));
int specId = TableMetadata.INITIAL_SPEC_ID;
String specProperty = metadata.get("partition-spec-id");
if (specProperty != null) {
specId = Integer.parseInt(specProperty);
}
this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
this.entries = null;
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/com/netflix/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ private static <D> FileAppender<D> newAppender(FileFormat format, PartitionSpec
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJson(spec))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.build();
default:
throw new IllegalArgumentException("Unsupported format: " + format);
Expand Down
94 changes: 68 additions & 26 deletions core/src/main/java/com/netflix/iceberg/PartitionSpecParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,18 @@ public class PartitionSpecParser {
private PartitionSpecParser() {
}

private static final String SPEC_ID = "spec-id";
private static final String FIELDS = "fields";
private static final String SOURCE_ID = "source-id";
private static final String TRANSFORM = "transform";
private static final String NAME = "name";

public static void toJson(PartitionSpec spec, JsonGenerator generator) throws IOException {
generator.writeStartArray();
for (PartitionField field : spec.fields()) {
generator.writeStartObject();
generator.writeStringField(NAME, field.name());
generator.writeStringField(TRANSFORM, field.transform().toString());
generator.writeNumberField(SOURCE_ID, field.sourceId());
generator.writeEndObject();
}
generator.writeEndArray();
generator.writeStartObject();
generator.writeNumberField(SPEC_ID, spec.specId());
generator.writeFieldName(FIELDS);
toJsonFields(spec, generator);
generator.writeEndObject();
}

public static String toJson(PartitionSpec spec) {
Expand All @@ -74,23 +72,10 @@ public static String toJson(PartitionSpec spec, boolean pretty) {
}

public static PartitionSpec fromJson(Schema schema, JsonNode json) {
Preconditions.checkArgument(json.isArray(),
"Cannot parse partition spec, not an array: %s", json);

PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
Iterator<JsonNode> elements = json.elements();
while (elements.hasNext()) {
JsonNode element = elements.next();
Preconditions.checkArgument(element.isObject(),
"Cannot parse partition field, not an object: %s", element);

String name = JsonUtil.getString(NAME, element);
String transform = JsonUtil.getString(TRANSFORM, element);
int sourceId = JsonUtil.getInt(SOURCE_ID, element);

builder.add(sourceId, name, transform);
}

Preconditions.checkArgument(json.isObject(), "Cannot parse spec from non-object: %s", json);
int specId = JsonUtil.getInt(SPEC_ID, json);
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);
buildFromJsonFields(builder, json.get(FIELDS));
return builder.build();
}

Expand All @@ -113,4 +98,61 @@ public static PartitionSpec fromJson(Schema schema, String json) {
}
}
}

static void toJsonFields(PartitionSpec spec, JsonGenerator generator) throws IOException {
generator.writeStartArray();
for (PartitionField field : spec.fields()) {
generator.writeStartObject();
generator.writeStringField(NAME, field.name());
generator.writeStringField(TRANSFORM, field.transform().toString());
generator.writeNumberField(SOURCE_ID, field.sourceId());
generator.writeEndObject();
}
generator.writeEndArray();
}

static String toJsonFields(PartitionSpec spec) {
try {
StringWriter writer = new StringWriter();
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
toJsonFields(spec, generator);
generator.flush();
return writer.toString();

} catch (IOException e) {
throw new RuntimeIOException(e);
}
}

static PartitionSpec fromJsonFields(Schema schema, int specId, JsonNode json) {
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema).withSpecId(specId);
buildFromJsonFields(builder, json);
return builder.build();
}

static PartitionSpec fromJsonFields(Schema schema, int specId, String json) {
try {
return fromJsonFields(schema, specId, JsonUtil.mapper().readValue(json, JsonNode.class));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to parse partition spec fields: " + json);
}
}

private static void buildFromJsonFields(PartitionSpec.Builder builder, JsonNode json) {
Preconditions.checkArgument(json.isArray(),
"Cannot parse partition spec fields, not an array: %s", json);

Iterator<JsonNode> elements = json.elements();
while (elements.hasNext()) {
JsonNode element = elements.next();
Preconditions.checkArgument(element.isObject(),
"Cannot parse partition field, not an object: %s", element);

String name = JsonUtil.getString(NAME, element);
String transform = JsonUtil.getString(TRANSFORM, element);
int sourceId = JsonUtil.getInt(SOURCE_ID, element);

builder.add(sourceId, name, transform);
}
}
}
Loading