Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@


public class HiveTypeToIcebergType extends HiveTypeUtil.HiveSchemaVisitor<Type> {
private static final String UNION_TO_STRUCT_CONVERSION_PREFIX = "tag_";
private static final String UNION_TO_STRUCT_CONVERSION_PREFIX = "field";
private int nextId = 1;

@Override
Expand All @@ -57,7 +57,8 @@ public Type list(ListTypeInfo list, Type elementResult) {
// Mimic the struct call behavior to construct a union converted struct type
@Override
public Type union(UnionTypeInfo union, List<Type> unionResults) {
List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(unionResults.size());
List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(unionResults.size() + 1);
fields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get()));
for (int i = 0; i < unionResults.size(); i++) {
fields.add(Types.NestedField.optional(allocateId(), UNION_TO_STRUCT_CONVERSION_PREFIX + i, unionResults.get(i)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testConversions() {
"struct<" +
"length:int,count:int,list:array<struct<lastword:string,lastwordlength:int>>," +
"wordcounts:map<string,int>>");
check("struct<1: tag_0: optional int, 2: tag_1: optional string>", "uniontype<int,string>");
check("struct<1: tag: required int, 2: field0: optional int, 3: field1: optional string>", "uniontype<int,string>");
}

private static void check(String icebergTypeStr, String hiveTypeStr) {
Expand Down
4 changes: 3 additions & 1 deletion orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,11 @@ private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Typ
Map<Integer, OrcField> mapping) {
TypeDescription orcType;
OrcField orcField = mapping.getOrDefault(fieldId, null);
// this branch means the iceberg struct schema actually correspond to an underlying union

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: corresponds

if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
orcType = TypeDescription.createUnion();
for (Types.NestedField nestedField : type.asStructType().fields()) {
List<Types.NestedField> nestedFields = type.asStructType().fields();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe precondition checking for the list size before accessing the second element and forwards?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the comment below, I don't think it's necessary here since It's already known for sure the length would be at least 2, this is an internal implementation detail.

for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) {
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addUnionChild(childType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private static <T> T visitRecord(TypeDescription record, OrcSchemaVisitor<T> vis
}

public String optionName(int ordinal) {
return "tag_" + ordinal;
return "field" + ordinal;
}

public String elementName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV
return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);

case UNION:
return visitUnion(iType, schema, visitor);
return visitor.visitUnion(iType, schema, visitor);

case LIST:
Types.ListType list = iType != null ? iType.asListType() : null;
Expand Down Expand Up @@ -71,12 +71,12 @@ protected T visitRecord(
return visitor.record(struct, record, names, results);
}

private static <T> T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor<T> visitor) {
protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor<T> visitor) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on why this change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to align with the same code pattern of other visitor's method, see visitRecord, also, this change enables subclass overriding, which might be necessary in the future, since this OrcSchemaWithTypeVisitor is a very generic visitor.

List<TypeDescription> types = union.getChildren();
List<T> options = Lists.newArrayListWithCapacity(types.size());

for (int i = 0; i < types.size(); i += 1) {
options.add(visit(type.asStructType().fields().get(i).type(), types.get(i), visitor));
options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor));
}

return visitor.union(type, union, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,17 @@ private UnionReader(List<OrcValueReader<?>> readers) {

@Override
public InternalRow nonNullRead(ColumnVector vector, int row) {
InternalRow struct = new GenericInternalRow(readers.length);
InternalRow struct = new GenericInternalRow(readers.length + 1);
UnionColumnVector unionColumnVector = (UnionColumnVector) vector;

int fieldIndex = unionColumnVector.tags[row];
Object value = this.readers[fieldIndex].read(unionColumnVector.fields[fieldIndex], row);

for (int i = 0; i < readers.length; i += 1) {
struct.setNullAt(i);
struct.setNullAt(i + 1);
}
struct.update(fieldIndex, value);
struct.update(0, fieldIndex);
struct.update(fieldIndex + 1, value);

return struct;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.spark.data.vectorized;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
Expand All @@ -36,6 +37,7 @@
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
Expand Down Expand Up @@ -443,11 +445,14 @@ public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector v
long batchOffsetInFile) {
UnionColumnVector unionColumnVector = (UnionColumnVector) vector;
List<Types.NestedField> fields = structType.fields();
assert fields.size() == unionColumnVector.fields.length;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we preserve this kind of assertion in terms of field size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary since I already know for sure the length would match, this is internal implementation detail.

assert fields.size() == optionConverters.size();

List<ColumnVector> fieldVectors = Lists.newArrayListWithExpectedSize(fields.size());
for (int i = 0; i < fields.size(); i += 1) {

LongColumnVector longColumnVector = new LongColumnVector();
longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray();

fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector,
OrcValueReaders.ints(), batchOffsetInFile));
for (int i = 0; i < fields.size() - 1; i += 1) {
fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,23 @@ public void testComplexUnion() throws IOException {

Schema expectedSchema = new Schema(
Types.NestedField.optional(0, "unionCol", Types.StructType.of(
Types.NestedField.optional(1, "tag_0", Types.IntegerType.get()),
Types.NestedField.optional(2, "tag_1", Types.StringType.get())))
Types.NestedField.optional(3, "tag", Types.IntegerType.get()),
Types.NestedField.optional(1, "field0", Types.IntegerType.get()),
Types.NestedField.optional(2, "field1", Types.StringType.get())))
);

final InternalRow expectedFirstRow = new GenericInternalRow(1);
final InternalRow field1 = new GenericInternalRow(2);
final InternalRow field1 = new GenericInternalRow(3);
field1.update(0, 0);
field1.update(1, null);
field1.update(1, 0);
field1.update(2, null);
expectedFirstRow.update(0, field1);

final InternalRow expectedSecondRow = new GenericInternalRow(1);
final InternalRow field2 = new GenericInternalRow(2);
field2.update(0, null);
field2.update(1, UTF8String.fromString("stringtype1"));
final InternalRow field2 = new GenericInternalRow(3);
field2.update(0, 1);
field2.update(1, null);
field2.update(2, UTF8String.fromString("foo-1"));
expectedSecondRow.update(0, field2);

Configuration conf = new Configuration();
Expand All @@ -103,7 +106,7 @@ public void testComplexUnion() throws IOException {
for (int i = 0; i < NUM_OF_ROWS; i += 1) {
complexUnion.tags[i] = i % 2;
longColumnVector.vector[i] = i;
String stringValue = "stringtype" + i;
String stringValue = "foo-" + i;
bytesColumnVector.setVal(i, stringValue.getBytes(StandardCharsets.UTF_8));
}

Expand All @@ -115,106 +118,28 @@ public void testComplexUnion() throws IOException {
writer.close();

// Test non-vectorized reader
List<InternalRow> internalRows = Lists.newArrayList();
List<InternalRow> actualRows = Lists.newArrayList();
try (CloseableIterable<InternalRow> reader = ORC.read(Files.localInput(orcFile))
.project(expectedSchema)
.createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema))
.build()) {
reader.forEach(internalRows::add);
reader.forEach(actualRows::add);

Assert.assertEquals(internalRows.size(), NUM_OF_ROWS);
assertEquals(expectedSchema, expectedFirstRow, internalRows.get(0));
assertEquals(expectedSchema, expectedSecondRow, internalRows.get(1));
Assert.assertEquals(actualRows.size(), NUM_OF_ROWS);
assertEquals(expectedSchema, expectedFirstRow, actualRows.get(0));
assertEquals(expectedSchema, expectedSecondRow, actualRows.get(1));
}

// Test vectorized reader
List<ColumnarBatch> columnarBatches = Lists.newArrayList();
try (CloseableIterable<ColumnarBatch> reader = ORC.read(Files.localInput(orcFile))
.project(expectedSchema)
.createBatchedReaderFunc(readOrcSchema ->
VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of()))
.build()) {
reader.forEach(columnarBatches::add);
Iterator<InternalRow> rowIterator = columnarBatches.get(0).rowIterator();
final Iterator<InternalRow> actualRowsIt = batchesToRows(reader.iterator());

Assert.assertEquals(columnarBatches.get(0).numRows(), NUM_OF_ROWS);
assertEquals(expectedSchema, expectedFirstRow, rowIterator.next());
assertEquals(expectedSchema, expectedSecondRow, rowIterator.next());
}
}

@Test
public void testSingleComponentUnion() throws IOException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why test for single component union is removed? How is [ "string" ] represented in the new schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already discussed before we won't support this schema and user should never use single type union cuz it doesn't make sense.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given it is a valid Avro schema, do we have a way to prevent user from creating this kinda of schema in the first place? What is the behavior if user do create it for current implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way to prevent it will be upon the due diligence of the user to not create such schema. We can just tell the user this case is not supported and behavior is undefined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I have a second thought on this, @autumnust when gobblin writes/converts an ORC dataset and encounters an Avor [null, string] type, does it write just string type to ORC file, or does it write a union? My intuition is that you guys write a single string as ORC schema is nullable by itself.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Gobblin does the transformation: https://github.com/apache/gobblin/blob/5bc22a5502ddeb810a35b0fb996dd9dfc8c81121/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java#L62

So a default-value representation in Avro shall not leave in a state as single-element union.

TypeDescription orcSchema =
TypeDescription.fromString("struct<unionCol:uniontype<int>>");

Schema expectedSchema = new Schema(
Types.NestedField.optional(0, "unionCol", Types.StructType.of(
Types.NestedField.optional(1, "tag_0", Types.IntegerType.get())))
);

final InternalRow expectedFirstRow = new GenericInternalRow(1);
final InternalRow field1 = new GenericInternalRow(1);
field1.update(0, 0);
expectedFirstRow.update(0, field1);

final InternalRow expectedSecondRow = new GenericInternalRow(1);
final InternalRow field2 = new GenericInternalRow(1);
field2.update(0, 3);
expectedSecondRow.update(0, field2);

Configuration conf = new Configuration();

File orcFile = temp.newFile();
Path orcFilePath = new Path(orcFile.getPath());

Writer writer = OrcFile.createWriter(orcFilePath,
OrcFile.writerOptions(conf)
.setSchema(orcSchema).overwrite(true));

VectorizedRowBatch batch = orcSchema.createRowBatch();
LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS);
UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector);
complexUnion.init();

for (int i = 0; i < NUM_OF_ROWS; i += 1) {
complexUnion.tags[i] = 0;
longColumnVector.vector[i] = 3 * i;
}

batch.size = NUM_OF_ROWS;
batch.cols[0] = complexUnion;

writer.addRowBatch(batch);
batch.reset();
writer.close();

// Test non-vectorized reader
List<InternalRow> internalRows = Lists.newArrayList();
try (CloseableIterable<InternalRow> reader = ORC.read(Files.localInput(orcFile))
.project(expectedSchema)
.createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema))
.build()) {
reader.forEach(internalRows::add);

Assert.assertEquals(internalRows.size(), NUM_OF_ROWS);
assertEquals(expectedSchema, expectedFirstRow, internalRows.get(0));
assertEquals(expectedSchema, expectedSecondRow, internalRows.get(1));
}

// Test vectorized reader
List<ColumnarBatch> columnarBatches = Lists.newArrayList();
try (CloseableIterable<ColumnarBatch> reader = ORC.read(Files.localInput(orcFile))
.project(expectedSchema)
.createBatchedReaderFunc(readOrcSchema ->
VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of()))
.build()) {
reader.forEach(columnarBatches::add);
Iterator<InternalRow> rowIterator = columnarBatches.get(0).rowIterator();

Assert.assertEquals(columnarBatches.get(0).numRows(), NUM_OF_ROWS);
assertEquals(expectedSchema, expectedFirstRow, rowIterator.next());
assertEquals(expectedSchema, expectedSecondRow, rowIterator.next());
assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next());
assertEquals(expectedSchema, expectedSecondRow, actualRowsIt.next());
}
}

Expand All @@ -225,23 +150,27 @@ public void testDeeplyNestedUnion() throws IOException {

Schema expectedSchema = new Schema(
Types.NestedField.optional(0, "c1", Types.StructType.of(
Types.NestedField.optional(1, "tag_0", Types.IntegerType.get()),
Types.NestedField.optional(2, "tag_1",
Types.NestedField.optional(100, "tag", Types.IntegerType.get()),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain how the value 100 is determined for the id field here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just a random id I picked.

Types.NestedField.optional(1, "field0", Types.IntegerType.get()),
Types.NestedField.optional(2, "field1",
Types.StructType.of(Types.NestedField.optional(3, "c2", Types.StringType.get()),
Types.NestedField.optional(4, "c3", Types.StructType.of(
Types.NestedField.optional(5, "tag_0", Types.IntegerType.get()),
Types.NestedField.optional(6, "tag_1", Types.StringType.get()))))))));
Types.NestedField.optional(101, "tag", Types.IntegerType.get()),
Types.NestedField.optional(5, "field0", Types.IntegerType.get()),
Types.NestedField.optional(6, "field1", Types.StringType.get()))))))));

final InternalRow expectedFirstRow = new GenericInternalRow(1);
final InternalRow inner1 = new GenericInternalRow(2);
inner1.update(0, null);
final InternalRow inner1 = new GenericInternalRow(3);
inner1.update(0, 1);
inner1.update(1, null);
final InternalRow inner2 = new GenericInternalRow(2);
inner2.update(0, UTF8String.fromString("foo0"));
final InternalRow inner3 = new GenericInternalRow(2);
final InternalRow inner3 = new GenericInternalRow(3);
inner3.update(0, 0);
inner3.update(1, null);
inner3.update(1, 0);
inner3.update(2, null);
inner2.update(1, inner3);
inner1.update(1, inner2);
inner1.update(2, inner2);
expectedFirstRow.update(0, inner1);

Configuration conf = new Configuration();
Expand Down Expand Up @@ -303,10 +232,9 @@ public void testDeeplyNestedUnion() throws IOException {
.createBatchedReaderFunc(readOrcSchema ->
VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of()))
.build()) {
final Iterator<InternalRow> actualRows = batchesToRows(reader.iterator());
final InternalRow actualFirstRow = actualRows.next();
final Iterator<InternalRow> actualRowsIt = batchesToRows(reader.iterator());

assertEquals(expectedSchema, expectedFirstRow, actualFirstRow);
assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next());
}
}

Expand Down