Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sdist/
coverage.xml
.pytest_cache/
spark/tmp/
spark-warehouse/
spark/spark-warehouse/
spark2/spark-warehouse/
spark3/spark-warehouse/
Expand Down
23 changes: 23 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/CloseableIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public interface CloseableIterator<T> extends Iterator<T>, Closeable {

Expand Down Expand Up @@ -54,4 +56,25 @@ public E next() {
}
};
}

static <I, O> CloseableIterator<O> transform(CloseableIterator<I> iterator, Function<I, O> transform) {
Preconditions.checkNotNull(transform, "Cannot apply a null transform");

return new CloseableIterator<O>() {
@Override
public void close() throws IOException {
iterator.close();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public O next() {
return transform.apply(iterator.next());
}
};
}
}
19 changes: 18 additions & 1 deletion orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public static class ReadBuilder {
private boolean caseSensitive = true;

private Function<TypeDescription, OrcRowReader<?>> readerFunc;
private Function<TypeDescription, OrcBatchReader<?>> batchedReaderFunc;
private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE;

private ReadBuilder(InputFile file) {
Preconditions.checkNotNull(file, "Input file cannot be null");
Expand Down Expand Up @@ -168,6 +170,8 @@ public ReadBuilder config(String property, String value) {
}

public ReadBuilder createReaderFunc(Function<TypeDescription, OrcRowReader<?>> readerFunction) {
Preconditions.checkArgument(this.batchedReaderFunc == null,
"Reader function cannot be set since the batched version is already set");
this.readerFunc = readerFunction;
return this;
}
Expand All @@ -177,9 +181,22 @@ public ReadBuilder filter(Expression newFilter) {
return this;
}

public ReadBuilder createBatchedReaderFunc(Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction) {
Preconditions.checkArgument(this.readerFunc == null,
"Batched reader function cannot be set since the non-batched version is already set");
this.batchedReaderFunc = batchReaderFunction;
return this;
}

public ReadBuilder recordsPerBatch(int numRecordsPerBatch) {
this.recordsPerBatch = numRecordsPerBatch;
return this;
}

public <D> CloseableIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
return new OrcIterable<>(file, conf, schema, start, length, readerFunc, caseSensitive, filter);
return new OrcIterable<>(file, conf, schema, start, length, readerFunc, caseSensitive, filter, batchedReaderFunc,
recordsPerBatch);
}
}

Expand Down
35 changes: 35 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/OrcBatchReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

/**
* Used for implementing ORC batch readers.
*/
@FunctionalInterface
public interface OrcBatchReader<T> {

/**
* Reads a row batch.
*/
T read(VectorizedRowBatch batch);

}
37 changes: 26 additions & 11 deletions orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg.orc;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -49,10 +48,13 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private final Function<TypeDescription, OrcRowReader<?>> readerFunction;
private final Expression filter;
private final boolean caseSensitive;
private final Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction;
private final int recordsPerBatch;

OrcIterable(InputFile file, Configuration config, Schema schema,
Long start, Long length,
Function<TypeDescription, OrcRowReader<?>> readerFunction, boolean caseSensitive, Expression filter) {
Function<TypeDescription, OrcRowReader<?>> readerFunction, boolean caseSensitive, Expression filter,
Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction, int recordsPerBatch) {
this.schema = schema;
this.readerFunction = readerFunction;
this.file = file;
Expand All @@ -61,6 +63,8 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
this.config = config;
this.caseSensitive = caseSensitive;
this.filter = (filter == Expressions.alwaysTrue()) ? null : filter;
this.batchReaderFunction = batchReaderFunction;
this.recordsPerBatch = recordsPerBatch;
}

@SuppressWarnings("unchecked")
Expand All @@ -75,16 +79,22 @@ public CloseableIterator<T> iterator() {
Expression boundFilter = Binder.bind(schema.asStruct(), filter, caseSensitive);
sarg = ExpressionToSearchArgument.convert(boundFilter, readOrcSchema);
}
Iterator<T> iterator = new OrcIterator(
newOrcIterator(file, readOrcSchema, start, length, orcFileReader, sarg),
readerFunction.apply(readOrcSchema));
return CloseableIterator.withClose(iterator);

VectorizedRowBatchIterator rowBatchIterator = newOrcIterator(file, readOrcSchema, start, length, orcFileReader,
sarg, recordsPerBatch);
if (batchReaderFunction != null) {
OrcBatchReader<T> batchReader = (OrcBatchReader<T>) batchReaderFunction.apply(readOrcSchema);
return CloseableIterator.transform(rowBatchIterator, batchReader::read);
} else {
return new OrcRowIterator<>(rowBatchIterator, (OrcRowReader<T>) readerFunction.apply(readOrcSchema));
}
}

private static VectorizedRowBatchIterator newOrcIterator(InputFile file,
TypeDescription readerSchema,
Long start, Long length,
Reader orcFileReader, SearchArgument sarg) {
Reader orcFileReader, SearchArgument sarg,
int recordsPerBatch) {
final Reader.Options options = orcFileReader.options();
if (start != null) {
options.range(start, length);
Expand All @@ -93,21 +103,22 @@ private static VectorizedRowBatchIterator newOrcIterator(InputFile file,
options.searchArgument(sarg, new String[]{});

try {
return new VectorizedRowBatchIterator(file.location(), readerSchema, orcFileReader.rows(options));
return new VectorizedRowBatchIterator(file.location(), readerSchema, orcFileReader.rows(options),
recordsPerBatch);
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Failed to get ORC rows for file: %s", file);
}
}

private static class OrcIterator<T> implements Iterator<T> {
private static class OrcRowIterator<T> implements CloseableIterator<T> {

private int nextRow;
private VectorizedRowBatch current;

private final VectorizedRowBatchIterator batchIter;
private final OrcRowReader<T> reader;

OrcIterator(VectorizedRowBatchIterator batchIter, OrcRowReader<T> reader) {
OrcRowIterator(VectorizedRowBatchIterator batchIter, OrcRowReader<T> reader) {
this.batchIter = batchIter;
this.reader = reader;
current = null;
Expand All @@ -128,6 +139,10 @@ public T next() {

return this.reader.read(current, nextRow++);
}
}

@Override
public void close() throws IOException {
batchIter.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package org.apache.iceberg.orc;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
Expand All @@ -32,16 +31,16 @@
* Because the same VectorizedRowBatch is reused on each call to next,
* it gets changed when hasNext or next is called.
*/
public class VectorizedRowBatchIterator implements Iterator<VectorizedRowBatch>, Closeable {
public class VectorizedRowBatchIterator implements CloseableIterator<VectorizedRowBatch> {
private final String fileLocation;
private final RecordReader rows;
private final VectorizedRowBatch batch;
private boolean advanced = false;

VectorizedRowBatchIterator(String fileLocation, TypeDescription schema, RecordReader rows) {
VectorizedRowBatchIterator(String fileLocation, TypeDescription schema, RecordReader rows, int recordsPerBatch) {
this.fileLocation = fileLocation;
this.rows = rows;
this.batch = schema.createRowBatch();
this.batch = schema.createRowBatch(recordsPerBatch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.Decimal;

/**
* Converts the OrcIterator, which returns ORC's VectorizedRowBatch to a
Expand Down Expand Up @@ -103,11 +102,7 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
case TIMESTAMP_INSTANT:
return SparkOrcValueReaders.timestampTzs();
case DECIMAL:
if (primitive.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
return new SparkOrcValueReaders.Decimal18Reader(primitive.getPrecision(), primitive.getScale());
} else {
return new SparkOrcValueReaders.Decimal38Reader(primitive.getPrecision(), primitive.getScale());
}
return SparkOrcValueReaders.decimals(primitive.getPrecision(), primitive.getScale());
case CHAR:
case VARCHAR:
case STRING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,26 @@
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;


class SparkOrcValueReaders {
public class SparkOrcValueReaders {
private SparkOrcValueReaders() {
}

static OrcValueReader<UTF8String> utf8String() {
public static OrcValueReader<UTF8String> utf8String() {
return StringReader.INSTANCE;
}

static OrcValueReader<?> timestampTzs() {
public static OrcValueReader<Long> timestampTzs() {
return TimestampTzReader.INSTANCE;
}

public static OrcValueReader<Decimal> decimals(int precision, int scale) {
if (precision <= Decimal.MAX_LONG_DIGITS()) {
return new SparkOrcValueReaders.Decimal18Reader(precision, scale);
} else {
return new SparkOrcValueReaders.Decimal38Reader(precision, scale);
}
}

static OrcValueReader<?> struct(
List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
return new StructReader(readers, struct, idToConstant);
Expand Down Expand Up @@ -164,7 +171,7 @@ public Long nonNullRead(ColumnVector vector, int row) {
}
}

static class Decimal18Reader implements OrcValueReader<Decimal> {
private static class Decimal18Reader implements OrcValueReader<Decimal> {
//TODO: these are being unused. check for bug
private final int precision;
private final int scale;
Expand All @@ -181,7 +188,7 @@ public Decimal nonNullRead(ColumnVector vector, int row) {
}
}

static class Decimal38Reader implements OrcValueReader<Decimal> {
private static class Decimal38Reader implements OrcValueReader<Decimal> {
private final int precision;
private final int scale;

Expand Down
Loading