Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,28 +514,37 @@ public int hashCode() {
public static class StructType extends NestedType {
private static final Joiner FIELD_SEP = Joiner.on(", ");

public static StructType of(boolean isUnion, NestedField... fields) {
return of(Arrays.asList(fields), isUnion);
}

public static StructType of(NestedField... fields) {
return of(Arrays.asList(fields));
}

public static StructType of(List<NestedField> fields) {
return new StructType(fields);
return new StructType(fields, false);
}

private final NestedField[] fields;
public static StructType of(List<NestedField> fields, boolean isUnionSchema) {
return new StructType(fields, isUnionSchema);
}

private final NestedField[] fields;
private final boolean isUnionSchema;
// lazy values
private transient List<NestedField> fieldList = null;
private transient Map<String, NestedField> fieldsByName = null;
private transient Map<String, NestedField> fieldsByLowerCaseName = null;
private transient Map<Integer, NestedField> fieldsById = null;

private StructType(List<NestedField> fields) {
private StructType(List<NestedField> fields, boolean isUnionSchema) {
Preconditions.checkNotNull(fields, "Field list cannot be null");
this.fields = new NestedField[fields.size()];
for (int i = 0; i < this.fields.length; i += 1) {
this.fields[i] = fields.get(i);
}
this.isUnionSchema = isUnionSchema;
}

@Override
Expand Down Expand Up @@ -641,6 +650,13 @@ private Map<Integer, NestedField> lazyFieldsById() {
}
return fieldsById;
}

/**
* @return true if struct represents union schema
*/
public boolean isUnionSchema() {

@rdsr rdsr May 21, 2020

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this in the main API? The API should not expose unions or info about union IMO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check if we can make it package private, Avro visitor needs it so that it can add property in schema to reflect union type.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Iceberg's type system should have no knowledge of union types because unions aren't supported.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is naming issue. this flag indicate that , field is storing union type by converting it to record/struct.

return isUnionSchema;
}
}

public static class ListType extends NestedType {
Expand Down
30 changes: 18 additions & 12 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,23 @@ private AvroSchemaUtil() {}
public static final String VALUE_ID_PROP = "value-id";
public static final String ELEMENT_ID_PROP = "element-id";
public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc";
public static final String UNION_SCHEMA_TO_RECORD = "union-schema-to-record";

private static final Schema NULL = Schema.create(Schema.Type.NULL);
private static final Schema.Type MAP = Schema.Type.MAP;
private static final Schema.Type ARRAY = Schema.Type.ARRAY;
private static final Schema.Type UNION = Schema.Type.UNION;
private static final Schema.Type RECORD = Schema.Type.RECORD;

public static Schema convert(org.apache.iceberg.Schema schema,
String tableName) {
public static Schema convert(
org.apache.iceberg.Schema schema,
String tableName) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: please revert non-functional changes like this one. That makes it easier to review and less likely to conflict with other commits.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack I did not do it intentionally , formatting style from .baseline applied these changes.

return convert(schema, ImmutableMap.of(schema.asStruct(), tableName));
}

public static Schema convert(org.apache.iceberg.Schema schema,
Map<Types.StructType, String> names) {
public static Schema convert(
org.apache.iceberg.Schema schema,
Map<Types.StructType, String> names) {
return TypeUtil.visit(schema, new TypeToSchema(names));
}

Expand Down Expand Up @@ -99,8 +102,9 @@ public static Schema pruneColumns(Schema schema, Set<Integer> selectedIds, NameM
return new PruneColumns(selectedIds, nameMapping).rootSchema(schema);
}

public static Schema buildAvroProjection(Schema schema, org.apache.iceberg.Schema expected,
Map<String, String> renames) {
public static Schema buildAvroProjection(
Schema schema, org.apache.iceberg.Schema expected,
Map<String, String> renames) {
return AvroCustomOrderSchemaVisitor.visit(schema, new BuildAvroProjection(expected, renames));
}

Expand All @@ -120,7 +124,7 @@ public static boolean isTimestamptz(Schema schema) {
}

public static boolean isOptionSchema(Schema schema) {
if (schema.getType() == UNION && schema.getTypes().size() == 2) {
if (schema.getType() == UNION && schema.getTypes().size() >= 2) {
Comment thread
sudssf marked this conversation as resolved.
if (schema.getTypes().get(0).getType() == Schema.Type.NULL) {
return true;
} else if (schema.getTypes().get(1).getType() == Schema.Type.NULL) {
Expand Down Expand Up @@ -166,8 +170,9 @@ public static boolean isKeyValueSchema(Schema schema) {
return schema.getType() == RECORD && schema.getFields().size() == 2;
}

static Schema createMap(int keyId, Schema keySchema,
int valueId, Schema valueSchema) {
static Schema createMap(
int keyId, Schema keySchema,
int valueId, Schema valueSchema) {
String keyValueName = "k" + keyId + "_v" + valueId;

Schema.Field keyField = new Schema.Field("key", keySchema, null, (Object) null);
Expand All @@ -181,9 +186,10 @@ static Schema createMap(int keyId, Schema keySchema,
keyValueName, null, null, false, ImmutableList.of(keyField, valueField))));
}

static Schema createProjectionMap(String recordName,
int keyId, String keyName, Schema keySchema,
int valueId, String valueName, Schema valueSchema) {
static Schema createProjectionMap(
String recordName,
int keyId, String keyName, Schema keySchema,
int valueId, String valueName, Schema valueSchema) {
String keyValueName = "k" + keyId + "_v" + valueId;

Schema.Field keyField = new Schema.Field("key", keySchema, null, (Object) null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;

Expand Down Expand Up @@ -52,7 +55,12 @@ private WriteBuilder() {

@Override
public ValueWriter<?> record(Schema record, List<String> names, List<ValueWriter<?>> fields) {
return ValueWriters.record(fields);
Object isUnionSchema = record.getObjectProp(AvroSchemaUtil.UNION_SCHEMA_TO_RECORD);
if (isUnionSchema != null && (boolean) isUnionSchema) {
return new UnionSchemaWriter<>(record, fields);
} else {
return ValueWriters.record(fields);
}
}

@Override
Expand Down Expand Up @@ -133,4 +141,38 @@ public ValueWriter<?> primitive(Schema primitive) {
}
}
}

public static class UnionSchemaWriter<V extends Object> implements ValueWriter<V> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to support writing out union types? Iceberg types does not support union and almost all compute engines do not support unions as well. I'm not sure if there's a usecase here to support it.

On the read side it makes sense to us to support reading unions, because that data could have been written by non Iceberg writers.

What are your thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct,
since iceberg does not support union type but avro does, this approach is similar to spark-avro where union types are converted to record.
I dont think it make sense to support union types as it will mean adding support in schema for that which is larger change and has more blast radius.
spark does not support union types so it make sense to align with spark and keep scope limited IMO

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files written by Iceberg must always conform to the Iceberg spec. Iceberg tables can allow reading data that is imported, but should never write data that cannot be read by a generic implementation of the Iceberg spec.

private final ValueWriter<Object>[] writers;
private final Schema schema;

@SuppressWarnings("unchecked")
protected UnionSchemaWriter(Schema schema, List<ValueWriter<?>> writers) {
this.schema = Schema.createUnion(schema.getFields()
.stream()
.flatMap(x -> x.schema().getTypes().stream())
.filter(x -> x.getType() != Schema.Type.NULL) // only process non-null types
.collect(Collectors.toList()));
this.writers = (ValueWriter<Object>[]) Array.newInstance(ValueWriter.class, writers.size());
for (int i = 0; i < this.writers.length; i += 1) {
this.writers[i] = (ValueWriter<Object>) writers.get(i);
}
}

public ValueWriter<?> writer(int pos) {
return writers[pos];
}

@Override
public void write(V row, Encoder encoder) throws IOException {
int index = GenericData.get().resolveUnion(schema, row);
for (int i = 0; i < this.writers.length; i += 1) {
if (i == index) {
writers[i].write(row, encoder);
} else {
writers[i].write(null, encoder);
}
}
}
}
}
33 changes: 22 additions & 11 deletions core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,27 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
public Type union(Schema union, List<Type> options) {
Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union),
"Unsupported type: non-option union: %s", union);
// records, arrays, and maps will check nullability later
if (options.get(0) == null) {
return options.get(1);
} else {
if (options.size() == 1) {
return options.get(0);
} else if (options.size() == 2) {
if (options.get(0) == null) {
return options.get(1);
} else {
return options.get(0);
}
} else {
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior of the spark Avro SchemaConverter
List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(options.size());
for (int i = 0; i < options.size(); i += 1) {
Type fieldType = options.get(i);
if (fieldType == null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would fieldType be null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for avro type null , fieldType is null. this basically ignore null from union

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see. The reason is that all branches of the union are optional and there is no way to encode whether one branch (let alone only one branch) will be non-null?

continue;
}
// All fields are optional because only one of them is set at a time
fields.add(Types.NestedField.optional(allocateId(), "member" + i, fieldType));
}
return Types.StructType.of(fields, true);
}
}

Expand All @@ -133,7 +149,6 @@ public Type array(Schema array, Type elementType) {
return Types.MapType.ofRequired(
keyField.fieldId(), valueField.fieldId(), keyField.type(), valueField.type());
}

} else {
// normal array
Schema elementSchema = array.getElementType();
Expand Down Expand Up @@ -169,18 +184,15 @@ public Type primitive(Schema primitive) {
return Types.DecimalType.of(
((LogicalTypes.Decimal) logical).getPrecision(),
((LogicalTypes.Decimal) logical).getScale());

} else if (logical instanceof LogicalTypes.Date) {
return Types.DateType.get();

} else if (
logical instanceof LogicalTypes.TimeMillis ||
logical instanceof LogicalTypes.TimeMicros) {
logical instanceof LogicalTypes.TimeMicros) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this file also has lots of formatting changes that should be reverted before committing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, I did not intentionally do these changes , formatting style from repo did these changes. I suggest same in our internal repo :) , I will revert these changes.

return Types.TimeType.get();

} else if (
logical instanceof LogicalTypes.TimestampMillis ||
logical instanceof LogicalTypes.TimestampMicros) {
logical instanceof LogicalTypes.TimestampMicros) {
Object adjustToUTC = primitive.getObjectProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP);
Preconditions.checkArgument(adjustToUTC instanceof Boolean,
"Invalid value for adjust-to-utc: %s", adjustToUTC);
Expand All @@ -189,7 +201,6 @@ public Type primitive(Schema primitive) {
} else {
return Types.TimestampType.withoutZone();
}

} else if (LogicalTypes.uuid().getName().equals(name)) {
return Types.UUIDType.get();
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Schema struct(Types.StructType struct, List<Schema> fieldSchemas) {
Types.NestedField structField = structFields.get(i);
String origFieldName = structField.name();
boolean isValidFieldName = AvroSchemaUtil.validAvroName(origFieldName);
String fieldName = isValidFieldName ? origFieldName : AvroSchemaUtil.sanitize(origFieldName);
String fieldName = isValidFieldName ? origFieldName : AvroSchemaUtil.sanitize(origFieldName);
Schema.Field field = new Schema.Field(
fieldName, fieldSchemas.get(i), null,
structField.isOptional() ? JsonProperties.NULL_VALUE : null);
Expand All @@ -112,7 +112,9 @@ public Schema struct(Types.StructType struct, List<Schema> fieldSchemas) {
}

recordSchema = Schema.createRecord(recordName, null, null, false, fields);

if (struct.isUnionSchema()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some reason why this needs to be a round-trip conversion? I think it should be fine to convert back to a record instead of a union schema. That's what we should be writing into a table with a schema converted from an Avro union schema anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed when avro writer is java client which is writing directly by consuming avro records.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for spark , spark avro convert its into record so no changes needed there. I will add test for spark

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only way for GenericAvroWriter to know if record represents union schema type is to use property set here. otherwise ValueWriters will fail to write union schema record as they will expect new record schema instead of union record schema.

recordSchema.addProp(AvroSchemaUtil.UNION_SCHEMA_TO_RECORD, true);
}
results.put(struct, recordSchema);

return recordSchema;
Expand Down Expand Up @@ -160,7 +162,6 @@ public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) {
map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema);
mapSchema.addProp(AvroSchemaUtil.KEY_ID_PROP, map.keyId());
mapSchema.addProp(AvroSchemaUtil.VALUE_ID_PROP, map.valueId());

} else {
mapSchema = AvroSchemaUtil.createMap(map.keyId(), keySchema,
map.valueId(), map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema);
Expand Down
Loading