Skip to content

Add support for union record type#1046

Closed
sudssf wants to merge 7 commits intoapache:masterfrom
sudssf:ISSUE-189
Closed

Add support for union record type#1046
sudssf wants to merge 7 commits intoapache:masterfrom
sudssf:ISSUE-189

Conversation

@sudssf
Copy link
Copy Markdown
Contributor

@sudssf sudssf commented May 18, 2020

#189
Add test using union record
reference: use conversion similar to spark-avro which will convert union type of size > 2 to struct type.

@sudssf sudssf force-pushed the ISSUE-189 branch 2 times, most recently from b838a77 to 711b6e7 Compare May 18, 2020 02:28
@sudssf sudssf changed the title Add support for union record type ISSUE-189: Add support for union record type May 18, 2020
@sudssf sudssf force-pushed the ISSUE-189 branch 2 times, most recently from ff18d2f to accdd87 Compare May 18, 2020 15:26
@sudssf
Copy link
Copy Markdown
Contributor Author

sudssf commented May 18, 2020

FYI @rdsr @rdblue

@sudssf sudssf force-pushed the ISSUE-189 branch 2 times, most recently from 44201eb to 0f30882 Compare May 20, 2020 00:30
Sudarshan S added 2 commits May 19, 2020 21:57
apache#189
Add test using union record
reference: use conversion similar to spark-avro which will convert union type of size > 2 to struct type.
/**
* @return true if struct represents union schema
*/
public boolean isUnionSchema() {
Copy link
Copy Markdown
Contributor

@rdsr rdsr May 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this in the main API? The API should not expose unions or info about union IMO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check if we can make it package private, Avro visitor needs it so that it can add property in schema to reflect union type.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Iceberg's type system should have no knowledge of union types because unions aren't supported.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is naming issue. this flag indicate that , field is storing union type by converting it to record/struct.

@rdsr
Copy link
Copy Markdown
Contributor

rdsr commented May 21, 2020

Thanks @sudssf there's some similar work done on this feature by @shardulm94. @shardulm94 please review and see if there's possibility of collaboration here.

}
}

public static class UnionSchemaWriter<V extends Object> implements ValueWriter<V> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to support writing out union types? Iceberg types does not support union and almost all compute engines do not support unions as well. I'm not sure if there's a usecase here to support it.

On the read side it makes sense to us to support reading unions, because that data could have been written by non Iceberg writers.

What are your thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct,
since iceberg does not support union type but avro does, this approach is similar to spark-avro where union types are converted to record.
I dont think it make sense to support union types as it will mean adding support in schema for that which is larger change and has more blast radius.
spark does not support union types so it make sense to align with spark and keep scope limited IMO

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files written by Iceberg must always conform to the Iceberg spec. Iceberg tables can allow reading data that is imported, but should never write data that cannot be read by a generic implementation of the Iceberg spec.

@sudssf
Copy link
Copy Markdown
Contributor Author

sudssf commented May 21, 2020

Thanks @sudssf there's some similar work done on this feature by @shardulm94. @shardulm94 please review and see if there's possibility of collaboration here.

@shardulm94 can you please share PR?

String tableName) {
public static Schema convert(
org.apache.iceberg.Schema schema,
String tableName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: please revert non-functional changes like this one. That makes it easier to review and less likely to conflict with other commits.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack I did not do it intentionally , formatting style from .baseline applied these changes.

List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(options.size());
for (int i = 0; i < options.size(); i += 1) {
Type fieldType = options.get(i);
if (fieldType == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would fieldType be null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for avro type null , fieldType is null. this basically ignore null from union

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see. The reason is that all branches of the union are optional and there is no way to encode whether one branch (let alone only one branch) will be non-null?

} else if (
logical instanceof LogicalTypes.TimeMillis ||
logical instanceof LogicalTypes.TimeMicros) {
logical instanceof LogicalTypes.TimeMicros) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this file also has lots of formatting changes that should be reverted before committing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, I did not intentionally do these changes , formatting style from repo did these changes. I suggest same in our internal repo :) , I will revert these changes.


recordSchema = Schema.createRecord(recordName, null, null, false, fields);

if (struct.isUnionSchema()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some reason why this needs to be a round-trip conversion? I think it should be fine to convert back to a record instead of a union schema. That's what we should be writing into a table with a schema converted from an Avro union schema anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed when avro writer is java client which is writing directly by consuming avro records.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for spark , spark avro convert its into record so no changes needed there. I will add test for spark

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only way for GenericAvroWriter to know if record represents union schema type is to use property set here. otherwise ValueWriters will fail to write union schema record as they will expect new record schema instead of union record schema.

switch (key) {
case "boolean":
return rec
.set("member1", map.get(key))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use "option_1" instead of "member1"? Avro refers to these as union options.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

});
}

private GenericData.Record getGenericRecordForUnionType(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what types of union evolution are allowed by Avro but not supported here?

If we convert a schema with a union and create a table from it, then for the union's struct we will generate a name mapping to read existing data files, like {"id" 6, "member1"}, {"id" 7, "member2"}, . . .. If the union drops member2, this assignment becomes invalid because member3 will now convert to use the name member2.

If we are going to support reading unions, it would be good to note that you can't remove members, they must have a stable order, and members can only be added to the end. Anything else that I'm missing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since schema get stored with avro file I don't think ordering matters. reader will always reassignIds from writer schema. member names are hardcoded for test for union type it should work. I will check if I can add test for schema evolution.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order does matter. If the second option in the union is always named "member2" then what happens if the actual type of member2 is different between two file schemas?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file schema should match original schema version, all rules for struct datatype evolution still apply. GenericAvroReader will always resolve schema against reader schema
fileSchema <- schema used by avro writer when avro file was created
readSchema <- schema of iceberg table or projection which may be reflect new fields after schema evolution.

return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema, null);

my understanding is reassignIds should take care of schema evolution. may be I am wrong?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data files with unions must be written by non-Iceberg producers because Iceberg will not write invalid data files. A union schema cannot contain Iceberg field IDs because there is no defined way to encode them -- union has no extra properties and while types could be replaced with a JSON object with properties, there isn't anything that pulls IDs out.

For external data files, what would happen is the user would convert a schema and create a table from it, at which point the IDs get assigned. Next, the user would create a name mapping with that schema's names to Iceberg field IDs. That mapping is how we recover IDs from field names. That works because Avro is name-based so we can map its schema evolution into Iceberg.

But name-based doesn't work with unions that are converted to structs because we are automatically assigning the names using a counter. If the definition of member2 changes, we will still map it to the same Iceberg ID by name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data files with unions must be written by non-Iceberg producers because Iceberg , usecase here is we are writing to iceberg table using iceberg client ( avro writer) and input stream has union schema. current alternative is to transform incoming generic record on-fly to record schema and write. with this approach no such transformation is needed.
since this code path is totally new , none of existing functionality will be changed.

I think I understand issue with naming based on counter. simple solution is support evolution by adding field to end of the supported types. let me add test case to validate behaviour

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean the input stream has a union schema? The records that you're trying to write?

Incoming records are required to match the write schema. Because the write schema cannot contain a union, the records also cannot contain a union. The union must be represented in memory as a struct of the union types.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The union must be represented in memory as a struct of the union types.
since generic records generated by producer are generated with union schema we are transforming records to struct of union schema in stream processing client which writes to iceberg table.
transformation is expensive at high volume as it needs to generate new generic record with iceberg table schema and copy all fields from incoming generic record except union schema field.
with this PR ( proof of concept) , GenericAvroWriter is adding support so that no transformation is needed.
"The union must be represented in memory as a struct of the union types." is not possible unless avro deserialization supports this. ( we are using kafka for stream and kafka consumer gets generic record which gets ingested to iceberg table)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to deserialize directly to that representation, you can use the same readers to read an single encoded message. Here's the Iceberg generics decoder, which is similar but uses Iceberg generics: https://github.com/apache/incubator-iceberg/blob/master/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think it will work without some of above changes as call to AvroSchemaUtil.convert(writeSchema, "table") will fail on union schema but I will take a look thanks for the link. I will try to document problem statement and motive behind proposing this change.

Sudarshan S added 4 commits May 24, 2020 11:35
null should always be first in union schema ( as per avro spec)
…uct conversion.

this should not impact existing code path.
public static StructType of(List<NestedField> fields, boolean isUnionSchema) {
return new StructType(fields, isUnionSchema);
public static StructType of(List<NestedField> fields, boolean convertedFromUnionSchema) {
return new StructType(fields, convertedFromUnionSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think that Iceberg needs to model that this was converted. The conversion back to a union schema is not something that Iceberg should support. Iceberg must only write Iceberg data files and those are not allowed to contain Avro unions. The use case that we aim to support is reading data files with existing unions, if I understand correctly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no other way to pass this information for GenericAvroWriter as it needs to know that input generic record has union schema and valuewriters needs to be created accordingly. otherwise we get cast exception. this avoids creating new generic object with record schema ( which is really expensive IMO). what alternative is possible if don't want to add convertedFromUnionSchema?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no other way to pass this information for GenericAvroWriter

I understand that there is no other way to pass the information. What I'm saying is that GenericAvroWriter cannot write non-option unions anyway. Iceberg must not write anything that is not allowed by the spec. This feature can be used to read existing Avro data files, but cannot be used to write Avro data files with unions.

Copy link
Copy Markdown
Contributor

@rdsr rdsr May 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 We shouldn't support writing unions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but cannot be used to write Avro data files with unions. this feature is converting union schema to record similar to spark-avro and allows iceberg to ingest data even if there is union schema (similar to spark avro). there is open questions about schema evolution which needs to be addressed.
why we need this : avro support union schema and writing such schema to iceberg fails as it is not supported. with this feature we will support writing and transformation will be taken care by iceberg client instead of performing transformation externally.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading data written with a union schema is okay, but Iceberg cannot write data files with a union schema (other than options).

@teabot
Copy link
Copy Markdown

teabot commented May 28, 2020

Hello, can anyone point me to the discussion/reasoning for not supporting the union type within Iceberg? I do understand that traditionally support has been very poor in nearly all data processing frameworks, however, more recently the union appears to be a good solve for event sourcing type applications where ordering is important and events may be of disparate types. While it is possible to do this by modelling as a sparse record, it is contractually weaker - only business logic can ensure that only one field is non-null.

@shardulm94
Copy link
Copy Markdown
Contributor

Thanks @rdsr for the tag here, however I feel the motivation of this PR is a bit different my motivation which was to allow reading data files written with non-Iceberg writers which may contain union types. I have been thinking about this on and off for a while now and have tried to jot down my thoughts in https://docs.google.com/document/d/1Go2NrOoeCKfrDJw8MAZnsMYbE2KZbso1zkSY8cpLzaQ/edit#heading=h.gd2qof50gbzs. Since the motivation of this PR seems different though, we may want to continue this conversation in #189.

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented May 28, 2020

@teabot, I think the main problem with unions is the support in processing engines. It is unlikely that Spark, Presto, Impala, etc. will add support for unions. Since Iceberg is a format that we want to be suitable for a common at-rest store, it doesn't make sense to have a type that requires work-around in those engines but has only a small benefit (ensuring only one option is non-null).

I think it is unlikely that processing engines will support unions because it isn't clear how users would interact with them in SQL. For example, how do I filter to just records with a particular option of the union? That might seem easy, but it exposes underlying problems with unions and schema evolution, like identifying union fields. If we generate names based on position, what happens when that position changes? If we do it based on ID, then we're exposing internal IDs to users.

Also, what if a file is written with a version of the schema that has a new union option that isn't in the table schema? Do we choose another incorrect branch (null or default) or do we throw an exception?

I think it is the right choice to continue using the more standard and well-defined types rather than adding union, since it would make it much harder to integrate Iceberg into processing engines.

@teabot
Copy link
Copy Markdown

teabot commented May 28, 2020

Thanks for the comprehensive reply @rdblue. The SQL experience argument is a compelling one, and I agree that if we look at formats that support union, we find that support for this type is often (and perhaps understandably) overlooked/avoided within integrations and that this would probably be the case with Iceberg also. In our platforms we've tended to 'prohibit' producers from using unions because they effectively end up creating unreadable batch datasets downstream. The choice that Iceberg has made seems a sensible one.

That said, we're now seeing use cases in the stream domain that in my opinion are better modelled with union (multi-type streams). In practice these are currently modelled using loose set of independent schemas that have no complete explicit contract. I don't like this because it pushes elements of the producer/consumer schema contract outside of Avro and into an implicit convention that cannot enjoy compatibility checking for example.

I digress - what I hope to distill from this kind of discourse are good patterns for bridging data in streams and at rest for these use cases. But I admit that this is not a problem that Iceberg should solve.

Apologies for the distraction and thanks for your time.

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented May 28, 2020

Thanks for the context, @teabot.

For our streaming systems, we aren't supporting the case you're talking about and each stream (Kafka topic) is getting its own Iceberg schema so that schema evolution works mostly the same way for a topic as it does for a table. That aligns well with streaming SQL use cases, which we hope will make it easier to use streaming systems. It doesn't help the problem of event ordering across event types, though, which are required to be in separate streams unless they use the union-as-struct convention.

I think the trade-off is worth it. In most cases you don't need strict order guarantees across event types. Where you do, you can either join two streams with a common ordinal field or make the schema slightly more complex using the union-as-struct approach if you need to avoid the join.

@rdblue rdblue linked an issue Jul 4, 2020 that may be closed by this pull request
@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Jul 4, 2020

Since there hasn't been any further progress on this, I'm going to close it. Feel free to reopen it if you'd like to continue working on a read path for existing data files with union types. I don't think that it is likely that we would make changes that allow writing data with unions, though.

@rdblue rdblue closed this Jul 4, 2020
@rdblue rdblue changed the title ISSUE-189: Add support for union record type Add support for union record type Jul 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow reading non-optional unions as struct of optional fields

6 participants