Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcRowReader;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
Expand All @@ -56,7 +56,7 @@
/**
* ORC reader for Generic Record.
*/
public class GenericOrcReader implements OrcValueReader<Record> {
public class GenericOrcReader implements OrcRowReader<Record> {

private final Schema schema;
private final List<TypeDescription> columns;
Expand All @@ -82,7 +82,7 @@ private Converter[] buildConverters() {
return newConverters;
}

public static OrcValueReader<Record> buildReader(Schema expectedSchema, TypeDescription fileSchema) {
public static OrcRowReader<Record> buildReader(Schema expectedSchema, TypeDescription fileSchema) {
return new GenericOrcReader(expectedSchema, fileSchema);
}

Expand Down
4 changes: 2 additions & 2 deletions orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static class ReadBuilder {
private Long start = null;
private Long length = null;

private Function<TypeDescription, OrcValueReader<?>> readerFunc;
private Function<TypeDescription, OrcRowReader<?>> readerFunc;

private ReadBuilder(InputFile file) {
Preconditions.checkNotNull(file, "Input file cannot be null");
Expand Down Expand Up @@ -163,7 +163,7 @@ public ReadBuilder config(String property, String value) {
return this;
}

public ReadBuilder createReaderFunc(Function<TypeDescription, OrcValueReader<?>> readerFunction) {
public ReadBuilder createReaderFunc(Function<TypeDescription, OrcRowReader<?>> readerFunction) {
this.readerFunc = readerFunction;
return this;
}
Expand Down
8 changes: 7 additions & 1 deletion orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo
orcType = convert(fieldId, type, false);
}
}

orcType.setAttribute(ICEBERG_ID_ATTRIBUTE, fieldId.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness sake, also set ICEBERG_REQUIRED_ATTRIBUTE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this is actually causing failures

org.apache.iceberg.data.orc.TestGenericReadProjection > testRenamedAddedField FAILED
    java.lang.IllegalArgumentException: No conversion of type LONG to self needed
        at org.apache.orc.impl.ConvertTreeReaderFactory.createAnyIntegerConvertTreeReader(ConvertTreeReaderFactory.java:1671)
        at org.apache.orc.impl.ConvertTreeReaderFactory.createConvertTreeReader(ConvertTreeReaderFactory.java:2124)
        at org.apache.orc.impl.TreeReaderFactory.createTreeReader(TreeReaderFactory.java:2331)
        at org.apache.orc.impl.TreeReaderFactory$StructTreeReader.<init>(TreeReaderFactory.java:1961)
        at org.apache.orc.impl.TreeReaderFactory.createTreeReader(TreeReaderFactory.java:2371)
        at org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:227)
        at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:752)
        at org.apache.iceberg.orc.OrcIterable.newOrcIterator(OrcIterable.java:80)
        at org.apache.iceberg.orc.OrcIterable.iterator(OrcIterable.java:65)
        at com.google.common.collect.Iterables.getOnlyElement(Iterables.java:254)
        at org.apache.iceberg.data.orc.TestGenericReadProjection.writeAndRead(TestGenericReadProjection.java:53)

And I vaguely remember we fixed a similar bug before in ORC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to know what's going on here. Since this is just a projection schema and the reader is built with the Iceberg schema (that has required/optional), I don't think it is really a blocker. But setting a property here shouldn't cause ORC to fail, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll file the necessary followups.

return orcType;
}

Expand Down Expand Up @@ -382,6 +382,12 @@ private static Optional<Integer> icebergID(TypeDescription orcType) {
.map(Integer::parseInt);
}

static int fieldId(TypeDescription orcType) {
String idStr = orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE);
Preconditions.checkNotNull(idStr, "Missing expected '%s' property", ICEBERG_ID_ATTRIBUTE);
return Integer.parseInt(idStr);
}

private static boolean isRequired(TypeDescription orcType) {
String isRequiredStr = orcType.getAttributeValue(ICEBERG_REQUIRED_ATTRIBUTE);
if (isRequiredStr != null) {
Expand Down
8 changes: 4 additions & 4 deletions orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private final InputFile file;
private final Long start;
private final Long length;
private final Function<TypeDescription, OrcValueReader<?>> readerFunction;
private final Function<TypeDescription, OrcRowReader<?>> readerFunction;

OrcIterable(InputFile file, Configuration config, Schema schema,
Long start, Long length,
Function<TypeDescription, OrcValueReader<?>> readerFunction) {
Function<TypeDescription, OrcRowReader<?>> readerFunction) {
this.schema = schema;
this.readerFunction = readerFunction;
this.file = file;
Expand Down Expand Up @@ -91,9 +91,9 @@ private static class OrcIterator<T> implements Iterator<T> {
private VectorizedRowBatch current;

private final VectorizedRowBatchIterator batchIter;
private final OrcValueReader<T> reader;
private final OrcRowReader<T> reader;

OrcIterator(VectorizedRowBatchIterator batchIter, OrcValueReader<T> reader) {
OrcIterator(VectorizedRowBatchIterator batchIter, OrcRowReader<T> reader) {
this.batchIter = batchIter;
this.reader = reader;
current = null;
Expand Down
34 changes: 34 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/OrcRowReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

/**
* Used for implementing ORC row readers.
*/
public interface OrcRowReader<T> {

/**
* Reads a row.
*/
T read(VectorizedRowBatch batch, int row);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import com.google.common.collect.Lists;
import java.util.List;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;


public abstract class OrcSchemaWithTypeVisitor<T> {
public static <T> T visit(
org.apache.iceberg.Schema iSchema, TypeDescription schema, OrcSchemaWithTypeVisitor<T> visitor) {
return visit(iSchema.asStruct(), schema, visitor);
}

public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeVisitor<T> visitor) {
switch (schema.getCategory()) {
case STRUCT:
return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);

case LIST:
Types.ListType list = iType != null ? iType.asListType() : null;
return visitor.list(
list, schema,
visit(list.elementType(), schema.getChildren().get(0), visitor));

case MAP:
Types.MapType map = iType != null ? iType.asMapType() : null;
return visitor.map(
map, schema,
visit(map != null ? map.keyType() : null, schema.getChildren().get(0), visitor),
visit(map != null ? map.valueType() : null, schema.getChildren().get(1), visitor));

default:
return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, schema);
}
}

private static <T> T visitRecord(
Types.StructType struct, TypeDescription record, OrcSchemaWithTypeVisitor<T> visitor) {
List<TypeDescription> fields = record.getChildren();
List<String> names = record.getFieldNames();
List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
for (TypeDescription field : fields) {
int fieldId = ORCSchemaUtil.fieldId(field);
Types.NestedField iField = struct != null ? struct.field(fieldId) : null;
results.add(visit(iField != null ? iField.type() : null, field, visitor));
}
return visitor.record(struct, record, names, results);
}

public T record(Types.StructType iStruct, TypeDescription record, List<String> names, List<T> fields) {
return null;
}

public T list(Types.ListType iList, TypeDescription array, T element) {
return null;
}

public T map(Types.MapType iMap, TypeDescription map, T key, T value) {
return null;
}

public T primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
return null;
}
}
20 changes: 11 additions & 9 deletions orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@

package org.apache.iceberg.orc;

import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;

/**
* Used for implementing ORC value readers.
*/
public interface OrcValueReader<T> {

/**
* Reads a value in row.
*/
T read(VectorizedRowBatch batch, int row);
public interface OrcValueReader<T> {
default T read(ColumnVector vector, int row) {
int rowIndex = vector.isRepeating ? 0 : row;
if (!vector.noNulls && vector.isNull[rowIndex]) {
return null;
} else {
return nonNullRead(vector, rowIndex);
}
}

T nonNullRead(ColumnVector vector, int row);
}
Loading