Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,28 +514,37 @@ public int hashCode() {
public static class StructType extends NestedType {
private static final Joiner FIELD_SEP = Joiner.on(", ");

public static StructType of(boolean isUnion, NestedField... fields) {
return of(Arrays.asList(fields), isUnion);
}

public static StructType of(NestedField... fields) {
return of(Arrays.asList(fields));
}

public static StructType of(List<NestedField> fields) {
return new StructType(fields);
return new StructType(fields, false);
}

private final NestedField[] fields;
public static StructType of(List<NestedField> fields, boolean convertedFromUnionSchema) {
return new StructType(fields, convertedFromUnionSchema);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think that Iceberg needs to model that this was converted. The conversion back to a union schema is not something that Iceberg should support. Iceberg must only write Iceberg data files and those are not allowed to contain Avro unions. The use case that we aim to support is reading data files with existing unions, if I understand correctly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no other way to pass this information for GenericAvroWriter as it needs to know that input generic record has union schema and valuewriters needs to be created accordingly. otherwise we get cast exception. this avoids creating new generic object with record schema ( which is really expensive IMO). what alternative is possible if don't want to add convertedFromUnionSchema?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no other way to pass this information for GenericAvroWriter

I understand that there is no other way to pass the information. What I'm saying is that GenericAvroWriter cannot write non-option unions anyway. Iceberg must not write anything that is not allowed by the spec. This feature can be used to read existing Avro data files, but cannot be used to write Avro data files with unions.

@rdsr rdsr May 26, 2020

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 We shouldn't support writing unions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but cannot be used to write Avro data files with unions. this feature is converting union schema to record similar to spark-avro and allows iceberg to ingest data even if there is union schema (similar to spark avro). there is open questions about schema evolution which needs to be addressed.
why we need this : avro support union schema and writing such schema to iceberg fails as it is not supported. with this feature we will support writing and transformation will be taken care by iceberg client instead of performing transformation externally.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading data written with a union schema is okay, but Iceberg cannot write data files with a union schema (other than options).

}

private final NestedField[] fields;
private final boolean convertedFromUnionSchema;
// lazy values
private transient List<NestedField> fieldList = null;
private transient Map<String, NestedField> fieldsByName = null;
private transient Map<String, NestedField> fieldsByLowerCaseName = null;
private transient Map<Integer, NestedField> fieldsById = null;

private StructType(List<NestedField> fields) {
private StructType(List<NestedField> fields, boolean convertedFromUnionSchema) {
Preconditions.checkNotNull(fields, "Field list cannot be null");
this.fields = new NestedField[fields.size()];
for (int i = 0; i < this.fields.length; i += 1) {
this.fields[i] = fields.get(i);
}
this.convertedFromUnionSchema = convertedFromUnionSchema;
}

@Override
Expand Down Expand Up @@ -641,6 +650,13 @@ private Map<Integer, NestedField> lazyFieldsById() {
}
return fieldsById;
}

/**
* @return true if struct represents union schema converted to struct type
*/
public boolean isConvertedFromUnionSchema() {
return convertedFromUnionSchema;
}
}

public static class ListType extends NestedType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private AvroSchemaUtil() {}
public static final String VALUE_ID_PROP = "value-id";
public static final String ELEMENT_ID_PROP = "element-id";
public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc";
public static final String UNION_SCHEMA_TO_RECORD = "union-schema-to-record";

private static final Schema NULL = Schema.create(Schema.Type.NULL);
private static final Schema.Type MAP = Schema.Type.MAP;
Expand Down Expand Up @@ -120,10 +121,10 @@ public static boolean isTimestamptz(Schema schema) {
}

public static boolean isOptionSchema(Schema schema) {
if (schema.getType() == UNION && schema.getTypes().size() == 2) {
if (schema.getType() == UNION && schema.getTypes().size() >= 2) {
Comment thread
sudssf marked this conversation as resolved.
if (schema.getTypes().get(0).getType() == Schema.Type.NULL) {
return true;
} else if (schema.getTypes().get(1).getType() == Schema.Type.NULL) {
} else if (schema.getTypes().size() == 2 && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;

Expand Down Expand Up @@ -52,7 +55,12 @@ private WriteBuilder() {

@Override
public ValueWriter<?> record(Schema record, List<String> names, List<ValueWriter<?>> fields) {
return ValueWriters.record(fields);
Object isUnionSchema = record.getObjectProp(AvroSchemaUtil.UNION_SCHEMA_TO_RECORD);
if (isUnionSchema != null && (boolean) isUnionSchema) {
return new UnionSchemaWriter<>(record, fields);
} else {
return ValueWriters.record(fields);
}
}

@Override
Expand Down Expand Up @@ -133,4 +141,38 @@ public ValueWriter<?> primitive(Schema primitive) {
}
}
}

public static class UnionSchemaWriter<V extends Object> implements ValueWriter<V> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to support writing out union types? Iceberg types does not support union and almost all compute engines do not support unions as well. I'm not sure if there's a usecase here to support it.

On the read side it makes sense to us to support reading unions, because that data could have been written by non Iceberg writers.

What are your thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct,
since iceberg does not support union type but avro does, this approach is similar to spark-avro where union types are converted to record.
I dont think it make sense to support union types as it will mean adding support in schema for that which is larger change and has more blast radius.
spark does not support union types so it make sense to align with spark and keep scope limited IMO

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files written by Iceberg must always conform to the Iceberg spec. Iceberg tables can allow reading data that is imported, but should never write data that cannot be read by a generic implementation of the Iceberg spec.

private final ValueWriter<Object>[] writers;
private final Schema schema;

@SuppressWarnings("unchecked")
protected UnionSchemaWriter(Schema schema, List<ValueWriter<?>> writers) {
this.schema = Schema.createUnion(schema.getFields()
.stream()
.flatMap(x -> x.schema().getTypes().stream())
.filter(x -> x.getType() != Schema.Type.NULL) // only process non-null types
.collect(Collectors.toList()));
this.writers = (ValueWriter<Object>[]) Array.newInstance(ValueWriter.class, writers.size());
for (int i = 0; i < this.writers.length; i += 1) {
this.writers[i] = (ValueWriter<Object>) writers.get(i);
}
}

public ValueWriter<?> writer(int pos) {
return writers[pos];
}

@Override
public void write(V row, Encoder encoder) throws IOException {
int index = GenericData.get().resolveUnion(schema, row);
for (int i = 0; i < this.writers.length; i += 1) {
if (i == index) {
writers[i].write(row, encoder);
} else {
writers[i].write(null, encoder);
}
}
}
}
}
24 changes: 20 additions & 4 deletions core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,27 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
public Type union(Schema union, List<Type> options) {
Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union),
"Unsupported type: non-option union: %s", union);
// records, arrays, and maps will check nullability later
if (options.get(0) == null) {
return options.get(1);
} else {
if (options.size() == 1) {
return options.get(0);
} else if (options.size() == 2) {
if (options.get(0) == null) {
return options.get(1);
} else {
return options.get(0);
}
} else {
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior of the spark Avro SchemaConverter
List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(options.size());
for (int i = 0; i < options.size(); i += 1) {
Type fieldType = options.get(i);
if (fieldType == null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would fieldType be null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for avro type null , fieldType is null. this basically ignore null from union

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see. The reason is that all branches of the union are optional and there is no way to encode whether one branch (let alone only one branch) will be non-null?

continue;
}
// All fields are optional because only one of them is set at a time
fields.add(Types.NestedField.optional(allocateId(), "member" + i, fieldType));
}
return Types.StructType.of(fields, true);
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ public Schema struct(Types.StructType struct, List<Schema> fieldSchemas) {
}

recordSchema = Schema.createRecord(recordName, null, null, false, fields);

if (struct.isConvertedFromUnionSchema()) {
recordSchema.addProp(AvroSchemaUtil.UNION_SCHEMA_TO_RECORD, true);
}
results.put(struct, recordSchema);

return recordSchema;
Expand Down Expand Up @@ -160,7 +162,6 @@ public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) {
map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema);
mapSchema.addProp(AvroSchemaUtil.KEY_ID_PROP, map.keyId());
mapSchema.addProp(AvroSchemaUtil.VALUE_ID_PROP, map.valueId());

} else {
mapSchema = AvroSchemaUtil.createMap(map.keyId(), keySchema,
map.valueId(), map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.avro;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileAppender;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class AvroDataUnionRecordTest {

@Rule
public TemporaryFolder temp = new TemporaryFolder();

protected void writeAndValidate(
List<GenericData.Record> actualWrite,
List<GenericData.Record> expectedRead,
Schema icebergSchema) throws IOException {
File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(testFile))
.schema(icebergSchema)
.named("test")
.build()) {
for (GenericData.Record rec : actualWrite) {
writer.add(rec);
}
}

List<GenericData.Record> rows;
try (AvroIterable<GenericData.Record> reader = Avro.read(Files.localInput(testFile))
.project(icebergSchema)
.build()) {
rows = Lists.newArrayList(reader);
}

for (int i = 0; i < expectedRead.size(); i += 1) {
AvroTestHelpers.assertEquals(icebergSchema.asStruct(), expectedRead.get(i), rows.get(i));
}
}

@Test
public void testMapOfUnionValues() throws IOException {
String schema1 = "{\n" +
" \"name\": \"MapOfUnion\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"map\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"map\",\n" +
" \"values\": [\n" +
" \"null\",\n" +
" \"boolean\",\n" +
" \"int\",\n" +
" \"long\",\n" +
" \"float\",\n" +
" \"double\",\n" +
" \"bytes\",\n" +
" \"string\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";
org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema1);
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
org.apache.avro.Schema avroSchemaUnionRecord = AvroSchemaUtil.convert(icebergSchema, "test");
org.apache.avro.Schema unionRecordSchema =
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);

List<GenericData.Record> expectedRead = new ArrayList<>();
List<GenericData.Record> actualWrite = new ArrayList<>();

for (long i = 0; i < 10; i++) {
Map<String, Object> map = new HashMap<>();
Map<String, Object> mapRead = new HashMap<>();
updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
.set("map", mapRead)
.build();
GenericData.Record record = new GenericRecordBuilder(avroSchema)
.set("map", map)
.build();
actualWrite.add(record);
expectedRead.add(recordRead);
}
writeAndValidate(actualWrite, expectedRead, icebergSchema);
}

private void updateMapsForUnionSchema(
org.apache.avro.Schema unionRecordSchema,
Map<String, Object> map,
Map<String, Object> mapRead,
Long index) {
map.put("boolean", index % 2 == 0);
map.put("int", index.intValue());
map.put("long", index);
map.put("float", index.floatValue());
map.put("double", index.doubleValue());
map.put("bytes", ByteBuffer.wrap(("bytes_" + index).getBytes()));
map.put("string", "string_" + index);

map.entrySet().stream().forEach(e -> {
String key = e.getKey();
GenericData.Record record = getGenericRecordForUnionType(unionRecordSchema, map, key);
mapRead.put(key, record);
});
}

private GenericData.Record getGenericRecordForUnionType(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what types of union evolution are allowed by Avro but not supported here?

If we convert a schema with a union and create a table from it, then for the union's struct we will generate a name mapping to read existing data files, like {"id" 6, "member1"}, {"id" 7, "member2"}, . . .. If the union drops member2, this assignment becomes invalid because member3 will now convert to use the name member2.

If we are going to support reading unions, it would be good to note that you can't remove members, they must have a stable order, and members can only be added to the end. Anything else that I'm missing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since schema get stored with avro file I don't think ordering matters. reader will always reassignIds from writer schema. member names are hardcoded for test for union type it should work. I will check if I can add test for schema evolution.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order does matter. If the second option in the union is always named "member2" then what happens if the actual type of member2 is different between two file schemas?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file schema should match original schema version, all rules for struct datatype evolution still apply. GenericAvroReader will always resolve schema against reader schema
fileSchema <- schema used by avro writer when avro file was created
readSchema <- schema of iceberg table or projection which may be reflect new fields after schema evolution.

return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema, null);

my understanding is reassignIds should take care of schema evolution. may be I am wrong?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data files with unions must be written by non-Iceberg producers because Iceberg will not write invalid data files. A union schema cannot contain Iceberg field IDs because there is no defined way to encode them -- union has no extra properties and while types could be replaced with a JSON object with properties, there isn't anything that pulls IDs out.

For external data files, what would happen is the user would convert a schema and create a table from it, at which point the IDs get assigned. Next, the user would create a name mapping with that schema's names to Iceberg field IDs. That mapping is how we recover IDs from field names. That works because Avro is name-based so we can map its schema evolution into Iceberg.

But name-based doesn't work with unions that are converted to structs because we are automatically assigning the names using a counter. If the definition of member2 changes, we will still map it to the same Iceberg ID by name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data files with unions must be written by non-Iceberg producers because Iceberg , usecase here is we are writing to iceberg table using iceberg client ( avro writer) and input stream has union schema. current alternative is to transform incoming generic record on-fly to record schema and write. with this approach no such transformation is needed.
since this code path is totally new , none of existing functionality will be changed.

I think I understand issue with naming based on counter. simple solution is support evolution by adding field to end of the supported types. let me add test case to validate behaviour

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean the input stream has a union schema? The records that you're trying to write?

Incoming records are required to match the write schema. Because the write schema cannot contain a union, the records also cannot contain a union. The union must be represented in memory as a struct of the union types.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The union must be represented in memory as a struct of the union types.
since generic records generated by producer are generated with union schema we are transforming records to struct of union schema in stream processing client which writes to iceberg table.
transformation is expensive at high volume as it needs to generate new generic record with iceberg table schema and copy all fields from incoming generic record except union schema field.
with this PR ( proof of concept) , GenericAvroWriter is adding support so that no transformation is needed.
"The union must be represented in memory as a struct of the union types." is not possible unless avro deserialization supports this. ( we are using kafka for stream and kafka consumer gets generic record which gets ingested to iceberg table)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to deserialize directly to that representation, you can use the same readers to read an single encoded message. Here's the Iceberg generics decoder, which is similar but uses Iceberg generics: https://github.com/apache/incubator-iceberg/blob/master/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think it will work without some of above changes as call to AvroSchemaUtil.convert(writeSchema, "table") will fail on union schema but I will take a look thanks for the link. I will try to document problem statement and motive behind proposing this change.

org.apache.avro.Schema unionRecordSchema,
Map<String, Object> map,
String key) {
GenericRecordBuilder rec = new GenericRecordBuilder(unionRecordSchema);
switch (key) {
case "boolean":
return rec
.set("member1", map.get(key))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use "option_1" instead of "member1"? Avro refers to these as union options.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

.build();
case "int":
return rec
.set("member2", map.get(key))
.build();
case "long":
return rec
.set("member3", map.get(key))
.build();
case "float":
return rec
.set("member4", map.get(key))
.build();
case "double":
return rec
.set("member5", map.get(key))
.build();
case "bytes":
return rec
.set("member6", map.get(key))
.build();
case "string":
return rec
.set("member7", map.get(key))
.build();
default:
throw new IllegalStateException("key mapping not found for " + key);
}
}
}
Loading