diff --git a/build.gradle b/build.gradle index ae8920ee2961..1f677e0b97ea 100644 --- a/build.gradle +++ b/build.gradle @@ -76,7 +76,7 @@ subprojects { ext { hadoopVersion = '2.7.3' avroVersion = '1.8.2' - orcVersion = '1.4.2' + orcVersion = '1.5.5' parquetVersion = '1.10.0' hiveVersion = '1.2.1' @@ -264,6 +264,7 @@ project(':iceberg-spark') { compile project(':iceberg-api') compile project(':iceberg-common') compile project(':iceberg-core') + compile project(':iceberg-orc') compile project(':iceberg-parquet') compileOnly "org.apache.avro:avro:$avroVersion" @@ -367,11 +368,13 @@ project(':iceberg-presto-runtime') { dependencies { shadow project(':iceberg-api') shadow project(':iceberg-core') + shadow project(':iceberg-orc') shadow project(':iceberg-parquet') shadow project(':iceberg-hive') shadow "org.apache.parquet:parquet-avro:$parquetVersion" shadow "org.apache.avro:avro:$avroVersion" + shadow "org.apache.orc:orc-core:$orcVersion:nohive" shadow ("org.apache.hive:hive-metastore:$hiveVersion") { exclude group: 'org.apache.hadoop', module: 'hadoop-common' // exclude group: 'org.apache.orc', module: 'orc-core' diff --git a/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java b/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java index 330554f63e14..16dc3b0294ca 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java @@ -1,17 +1,20 @@ /* - * Copyright 2018 Hortonworks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg.orc; diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 157a761f1752..c6cb03649733 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -1,38 +1,47 @@ /* - * Copyright 2018 Hortonworks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg.orc; import com.google.common.base.Preconditions; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.Schema; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; -import org.apache.orc.Reader; import org.apache.orc.TypeDescription; +import static org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch.DEFAULT_SIZE; + public class ORC { + + static final String VECTOR_ROW_BATCH_SIZE = "iceberg.orc.vectorbatch.size"; + private ORC() { } @@ -44,6 +53,7 @@ public static class WriteBuilder { private final OutputFile file; private final Configuration conf; private Schema schema = null; + private Function> createWriterFunc; private Map metadata = new HashMap<>(); private WriteBuilder(OutputFile file) { @@ -65,15 +75,27 @@ public WriteBuilder config(String property, String value) { return this; } + public WriteBuilder createWriterFunc(Function> writerFunction) { + this.createWriterFunc = writerFunction; + return this; + } + + public WriteBuilder setAll(Map properties) { + properties.forEach(conf::set); + return this; + } + public WriteBuilder schema(Schema schema) { this.schema = schema; return this; } - public OrcFileAppender build() { - OrcFile.WriterOptions options = - OrcFile.writerOptions(conf); - return new OrcFileAppender(schema, file, options, metadata); + public FileAppender build() { + Preconditions.checkNotNull(schema, "Schema is required"); + OrcFile.WriterOptions options = OrcFile.writerOptions(conf); + return new OrcFileAppender<>(TypeConversion.toOrc(schema, new ColumnIdMap()), + this.file, createWriterFunc, options, metadata, + conf.getInt(VECTOR_ROW_BATCH_SIZE, DEFAULT_SIZE)); } } @@ -88,6 +110,8 @@ public static class ReadBuilder { private Long start = null; private Long length = null; + private Function> readerFunction; + private ReadBuilder(InputFile file) { Preconditions.checkNotNull(file, "Input file cannot be null"); this.file = file; @@ -116,27 +140,24 @@ public ReadBuilder schema(org.apache.iceberg.Schema schema) { return this; } + public ReadBuilder caseSensitive(boolean caseSensitive) { + OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(this.conf, caseSensitive); + return this; + } + public ReadBuilder config(String property, String value) { conf.set(property, value); return this; } - public OrcIterator build() { + public ReadBuilder createReaderFunc(Function> readerFunction) { + this.readerFunction = readerFunction; + return this; + } + + public CloseableIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); - try { - Path path = new Path(file.location()); - Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); - ColumnIdMap columnIds = new ColumnIdMap(); - TypeDescription orcSchema = TypeConversion.toOrc(schema, columnIds); - Reader.Options options = reader.options(); - if (start != null) { - options.range(start, length); - } - options.schema(orcSchema); - return new OrcIterator(path, orcSchema, reader.rows(options)); - } catch (IOException e) { - throw new RuntimeException("Can't open " + file.location(), e); - } + return new OrcIterable<>(file, conf, schema, start, length, readerFunction); } } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index 257e0848f2aa..60c738cd0678 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -1,18 +1,22 @@ /* - * Copyright 2018 Hortonworks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + package org.apache.iceberg.orc; import com.google.common.base.Preconditions; @@ -20,9 +24,9 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Metrics; -import org.apache.iceberg.Schema; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; import org.apache.orc.ColumnStatistics; @@ -34,36 +38,40 @@ /** * Create a file appender for ORC. */ -public class OrcFileAppender implements FileAppender { - private final Writer writer; +class OrcFileAppender implements FileAppender { + private final int batchSize; private final TypeDescription orcSchema; private final ColumnIdMap columnIds = new ColumnIdMap(); private final Path path; + private final Writer writer; + private final VectorizedRowBatch batch; + private final OrcValueWriter valueWriter; private boolean isClosed = false; - public static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids"; + private static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids"; - OrcFileAppender(Schema schema, - OutputFile file, - OrcFile.WriterOptions options, - Map metadata) { - orcSchema = TypeConversion.toOrc(schema, columnIds); - options.setSchema(orcSchema); + OrcFileAppender(TypeDescription schema, OutputFile file, + Function> createWriterFunc, + OrcFile.WriterOptions options, Map metadata, + int batchSize) { + orcSchema = schema; path = new Path(file.location()); - try { - writer = OrcFile.createWriter(path, options); - } catch (IOException e) { - throw new RuntimeException("Can't create file " + path, e); - } - writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize()); - metadata.forEach( - (key,value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); + this.batchSize = batchSize; + batch = orcSchema.createRowBatch(this.batchSize); + + options.setSchema(orcSchema); + writer = newOrcWriter(file, columnIds, options, metadata); + valueWriter = newOrcValueWriter(orcSchema, createWriterFunc); } @Override - public void add(VectorizedRowBatch datum) { + public void add(D datum) { try { - writer.addRowBatch(datum); + valueWriter.write(datum, batch); + if (batch.size == this.batchSize) { + writer.addRowBatch(batch); + batch.reset(); + } } catch (IOException e) { throw new RuntimeException("Problem writing to ORC file " + path, e); } @@ -108,12 +116,39 @@ public long length() { @Override public void close() throws IOException { if (!isClosed) { - this.isClosed = true; - writer.close(); + try { + if (batch.size > 0) { + writer.addRowBatch(batch); + batch.reset(); + } + } finally { + writer.close(); + this.isClosed = true; + } } } - public TypeDescription getSchema() { - return orcSchema; + private static Writer newOrcWriter(OutputFile file, + ColumnIdMap columnIds, + OrcFile.WriterOptions options, Map metadata) { + final Path locPath = new Path(file.location()); + final Writer writer; + + try { + writer = OrcFile.createWriter(locPath, options); + } catch (IOException e) { + throw new RuntimeException("Can't create file " + locPath, e); + } + + writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize()); + metadata.forEach((key,value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); + + return writer; + } + + @SuppressWarnings("unchecked") + private static OrcValueWriter newOrcValueWriter(TypeDescription schema, + Function> createWriterFunc) { + return (OrcValueWriter) createWriterFunc.apply(schema); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java new file mode 100644 index 000000000000..b4bed83dad80 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.io.IOException; +import java.util.Iterator; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +/** + * Iterable used to read rows from ORC. + */ +class OrcIterable extends CloseableGroup implements CloseableIterable { + private final Configuration config; + private final Schema schema; + private final InputFile file; + private final Long start; + private final Long length; + private final Function> readerFunction; + + OrcIterable(InputFile file, Configuration config, Schema schema, + Long start, Long length, + Function> readerFunction) { + this.schema = schema; + this.readerFunction = readerFunction; + this.file = file; + this.start = start; + this.length = length; + this.config = config; + } + + @SuppressWarnings("unchecked") + @Override + public Iterator iterator() { + return new OrcIterator( + newOrcIterator(file, TypeConversion.toOrc(schema, new ColumnIdMap()), + start, length, newFileReader(file, config)), + readerFunction.apply(schema)); + } + + private static VectorizedRowBatchIterator newOrcIterator(InputFile file, + TypeDescription readerSchema, + Long start, Long length, + Reader orcFileReader) { + final Reader.Options options = orcFileReader.options(); + if (start != null) { + options.range(start, length); + } + options.schema(readerSchema); + + try { + return new VectorizedRowBatchIterator(file.location(), readerSchema, orcFileReader.rows(options)); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Failed to get ORC rows for file: %s", file); + } + } + + private static Reader newFileReader(InputFile file, Configuration config) { + try { + return OrcFile.createReader(new Path(file.location()), + OrcFile.readerOptions(config)); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Failed to open file: %s", file); + } + } + + private static class OrcIterator implements Iterator { + + private int nextRow; + private VectorizedRowBatch current; + + final VectorizedRowBatchIterator batchIter; + final OrcValueReader reader; + + OrcIterator(VectorizedRowBatchIterator batchIter, OrcValueReader reader) { + this.batchIter = batchIter; + this.reader = reader; + current = null; + nextRow = 0; + } + + @Override + public boolean hasNext() { + return (current != null && nextRow < current.size) || batchIter.hasNext(); + } + + @Override + public T next() { + if (current == null || nextRow >= current.size) { + current = batchIter.next(); + nextRow = 0; + } + + return this.reader.read(current, nextRow++); + } + } + +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java new file mode 100644 index 000000000000..cfc9ebb8afc3 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +/** + * Used for implementing ORC value readers. + */ +public interface OrcValueReader { + + /** + * Reads a value in row. + */ + T read(VectorizedRowBatch batch, int row); + +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java new file mode 100644 index 000000000000..5f1e167fa4b3 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.io.IOException; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +/** + * Write data value of a schema. + */ +public interface OrcValueWriter { + + /** + * Writes the data. + * @param value the data value to write. + * @param output the VectorizedRowBatch to which the output will be written. + * @throws IOException if there's any IO error while writing the data value. + */ + void write(T value, VectorizedRowBatch output) throws IOException; +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java b/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java index bc57f8d03ebe..f9839f637cd5 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java +++ b/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java @@ -1,17 +1,20 @@ /* - * Copyright 2018 Hortonworks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg.orc; diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java b/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java similarity index 55% rename from orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java rename to orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java index 589e5eea410f..ddc0bce97ca4 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java +++ b/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java @@ -1,17 +1,20 @@ /* - * Copyright 2018 Hortonworks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iceberg.orc; @@ -19,7 +22,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.Iterator; -import org.apache.hadoop.fs.Path; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; @@ -29,14 +31,14 @@ * Because the same VectorizedRowBatch is reused on each call to next, * it gets changed when hasNext or next is called. */ -public class OrcIterator implements Iterator, Closeable { - private final Path filename; +public class VectorizedRowBatchIterator implements Iterator, Closeable { + private final String fileLocation; private final RecordReader rows; private final VectorizedRowBatch batch; private boolean advanced = false; - OrcIterator(Path filename, TypeDescription schema, RecordReader rows) { - this.filename = filename; + VectorizedRowBatchIterator(String fileLocation, TypeDescription schema, RecordReader rows) { + this.fileLocation = fileLocation; this.rows = rows; this.batch = schema.createRowBatch(); } @@ -51,7 +53,7 @@ private void advance() { try { rows.nextBatch(batch); } catch (IOException e) { - throw new RuntimeException("Problem reading ORC file " + filename, e); + throw new RuntimeException("Problem reading ORC file " + fileLocation, e); } advanced = true; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java new file mode 100644 index 000000000000..301a55064ae0 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -0,0 +1,782 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.ColumnIdMap; +import org.apache.iceberg.orc.OrcValueReader; +import org.apache.iceberg.orc.TypeConversion; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.storage.serde2.io.DateWritable; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; +import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter; +import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; +import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.Platform; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.List; + +/** + * Converts the OrcInterator, which returns ORC's VectorizedRowBatch to a + * set of Spark's UnsafeRows. + * + * It minimizes allocations by reusing most of the objects in the implementation. + */ +public class SparkOrcReader implements OrcValueReader { + private final static int INITIAL_SIZE = 128 * 1024; + private final int numFields; + private final TypeDescription readSchema; + + public SparkOrcReader(Schema readSchema) { + this.readSchema = TypeConversion.toOrc(readSchema, new ColumnIdMap()); + numFields = readSchema.columns().size(); + } + + private Converter[] buildConverters(final UnsafeRowWriter writer) { + final Converter[] converters = new Converter[numFields]; + for(int c = 0; c < numFields; ++c) { + converters[c] = buildConverter(writer, readSchema.getChildren().get(c)); + } + return converters; + } + + @Override + public InternalRow read(VectorizedRowBatch batch, int row) { + final UnsafeRowWriter rowWriter = new UnsafeRowWriter(numFields, INITIAL_SIZE); + final Converter[] converters = buildConverters(rowWriter); + + rowWriter.reset(); + rowWriter.zeroOutNullBytes(); + for(int c=0; c < batch.cols.length; ++c) { + converters[c].convert(rowWriter, c, batch.cols[c], row); + } + return rowWriter.getRow(); + } + + private static String rowToString(SpecializedGetters row, TypeDescription schema) { + final List children = schema.getChildren(); + final StringBuilder rowBuilder = new StringBuilder("{"); + + for(int c = 0; c < children.size(); ++c) { + rowBuilder.append("\""); + rowBuilder.append(schema.getFieldNames().get(c)); + rowBuilder.append("\": "); + rowBuilder.append(rowEntryToString(row, c, children.get(c))); + if (c != children.size() - 1) { + rowBuilder.append(", "); + } + } + rowBuilder.append("}"); + return rowBuilder.toString(); + } + + private static String rowEntryToString(SpecializedGetters row, int ord, TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return Boolean.toString(row.getBoolean(ord)); + case BYTE: + return Byte.toString(row.getByte(ord)); + case SHORT: + return Short.toString(row.getShort(ord)); + case INT: + return Integer.toString(row.getInt(ord)); + case LONG: + return Long.toString(row.getLong(ord)); + case FLOAT: + return Float.toString(row.getFloat(ord)); + case DOUBLE: + return Double.toString(row.getDouble(ord)); + case CHAR: + case VARCHAR: + case STRING: + return "\"" + row.getUTF8String(ord) + "\""; + case BINARY: { + byte[] bin = row.getBinary(ord); + final StringBuilder binStr; + if (bin == null) { + binStr = new StringBuilder("null"); + } else { + binStr = new StringBuilder("["); + for (int i = 0; i < bin.length; ++i) { + if (i != 0) { + binStr.append(", "); + } + int v = bin[i] & 0xff; + if (v < 16) { + binStr.append("0"); + binStr.append(Integer.toHexString(v)); + } else { + binStr.append(Integer.toHexString(v)); + } + } + binStr.append("]"); + } + return binStr.toString(); + } + case DECIMAL: + return row.getDecimal(ord, schema.getPrecision(), schema.getScale()).toString(); + case DATE: + return "\"" + new DateWritable(row.getInt(ord)) + "\""; + case TIMESTAMP: + return "\"" + new Timestamp(row.getLong(ord)) + "\""; + case STRUCT: + return rowToString(row.getStruct(ord, schema.getChildren().size()), schema); + case LIST: { + TypeDescription child = schema.getChildren().get(0); + final StringBuilder listStr = new StringBuilder("["); + ArrayData list = row.getArray(ord); + for(int e=0; e < list.numElements(); ++e) { + if (e != 0) { + listStr.append(", "); + } + listStr.append(rowEntryToString(list, e, child)); + } + listStr.append("]"); + return listStr.toString(); + } + case MAP: { + TypeDescription keyType = schema.getChildren().get(0); + TypeDescription valueType = schema.getChildren().get(1); + MapData map = row.getMap(ord); + ArrayData keys = map.keyArray(); + ArrayData values = map.valueArray(); + StringBuilder mapStr = new StringBuilder("["); + for(int e=0; e < map.numElements(); ++e) { + if (e != 0) { + mapStr.append(", "); + } + mapStr.append(rowEntryToString(keys, e, keyType)); + mapStr.append(": "); + mapStr.append(rowEntryToString(values, e, valueType)); + } + mapStr.append("]"); + return mapStr.toString(); + } + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } + + private static int getArrayElementSize(TypeDescription type) { + switch (type.getCategory()) { + case BOOLEAN: + case BYTE: + return 1; + case SHORT: + return 2; + case INT: + case FLOAT: + return 4; + default: + return 8; + } + } + + /** + * The common interface for converting from a ORC ColumnVector to a Spark + * UnsafeRow. UnsafeRows need two different interfaces for writers and thus + * we have two methods the first is for structs (UnsafeRowWriter) and the + * second is for lists and maps (UnsafeArrayWriter). If Spark adds a common + * interface similar to SpecializedGetters we could that and a single set of + * methods. + */ + interface Converter { + void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row); + void convert(UnsafeArrayWriter writer, int element, ColumnVector vector, int row); + } + + private static class BooleanConverter implements Converter { + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + writer.write(column, ((LongColumnVector) vector).vector[row] != 0); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + writer.write(element, ((LongColumnVector) vector).vector[row] != 0); + } + } + } + + private static class ByteConverter implements Converter { + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + writer.write(column, (byte) ((LongColumnVector) vector).vector[row]); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + writer.write(element, (byte) ((LongColumnVector) vector).vector[row]); + } + } + } + + private static class ShortConverter implements Converter { + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + writer.write(column, (short) ((LongColumnVector) vector).vector[row]); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + writer.write(element, (short) ((LongColumnVector) vector).vector[row]); + } + } + } + + private static class IntConverter implements Converter { + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + writer.write(column, (int) ((LongColumnVector) vector).vector[row]); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + writer.write(element, (int) ((LongColumnVector) vector).vector[row]); + } + } + } + + private static class LongConverter implements Converter { + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + writer.write(column, ((LongColumnVector) vector).vector[row]); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + writer.write(element, ((LongColumnVector) vector).vector[row]); + } + } + } + + private static class FloatConverter implements Converter { + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + writer.write(column, (float) ((DoubleColumnVector) vector).vector[row]); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + writer.write(element, (float) ((DoubleColumnVector) vector).vector[row]); + } + } + } + + private static class DoubleConverter implements Converter { + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + writer.write(column, ((DoubleColumnVector) vector).vector[row]); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + writer.write(element, ((DoubleColumnVector) vector).vector[row]); + } + } + } + + private static class TimestampConverter implements Converter { + + private long convert(TimestampColumnVector vector, int row) { + // compute microseconds past 1970. + return (vector.time[row]/1000) * 1_000_000 + vector.nanos[row] / 1000; + } + + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + writer.write(column, convert((TimestampColumnVector) vector, row)); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + writer.write(element, convert((TimestampColumnVector) vector, row)); + } + } + } + + private static class BinaryConverter implements Converter { + + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + BytesColumnVector v = (BytesColumnVector) vector; + writer.write(column, v.vector[row], v.start[row], v.length[row]); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + final BytesColumnVector v = (BytesColumnVector) vector; + writer.write(element, v.vector[row], v.start[row], v.length[row]); + } + } + } + + private static class Decimal18Converter implements Converter { + final int precision; + final int scale; + + Decimal18Converter(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row]; + writer.write(column, + new Decimal().set(v.serialize64(v.scale()), v.precision(), v.scale()), + precision, scale); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row]; + writer.write(element, + new Decimal().set(v.serialize64(v.scale()), v.precision(), v.scale()), + precision, scale); + } + } + } + + private static class Decimal38Converter implements Converter { + final int precision; + final int scale; + + Decimal38Converter(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + BigDecimal v = ((DecimalColumnVector) vector).vector[row] + .getHiveDecimal().bigDecimalValue(); + writer.write(column, + new Decimal().set(new scala.math.BigDecimal(v), precision, scale), + precision, scale); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + BigDecimal v = ((DecimalColumnVector) vector).vector[row] + .getHiveDecimal().bigDecimalValue(); + writer.write(element, + new Decimal().set(new scala.math.BigDecimal(v), precision, scale), + precision, scale); + } + } + } + + private static class StructConverter implements Converter { + private final Converter[] children; + private final UnsafeRowWriter childWriter; + + StructConverter(final UnsafeWriter parentWriter, final TypeDescription schema) { + children = new Converter[schema.getChildren().size()]; + for(int c=0; c < children.length; ++c) { + children[c] = buildConverter(parentWriter, schema.getChildren().get(c)); + } + childWriter = new UnsafeRowWriter(parentWriter, children.length); + } + + int writeStruct(StructColumnVector vector, int row) { + int start = childWriter.cursor(); + childWriter.resetRowWriter(); + for(int c=0; c < children.length; ++c) { + children[c].convert(childWriter, c, vector.fields[c], row); + } + return start; + } + + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + int start = writeStruct((StructColumnVector) vector, row); + writer.setOffsetAndSizeFromPreviousCursor(column, start); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + int start = writeStruct((StructColumnVector) vector, row); + writer.setOffsetAndSizeFromPreviousCursor(element, start); + } + } + } + + private static class ListConverter implements Converter { + private final Converter children; + private final UnsafeArrayWriter childWriter; + + ListConverter(final UnsafeWriter parentWriter, TypeDescription schema) { + TypeDescription child = schema.getChildren().get(0); + children = buildConverter(parentWriter, child); + childWriter = new UnsafeArrayWriter(parentWriter, getArrayElementSize(child)); + + } + + int writeList(ListColumnVector v, int row) { + int offset = (int) v.offsets[row]; + int length = (int) v.lengths[row]; + int start = childWriter.cursor(); + childWriter.initialize(length); + for(int c = 0; c < length; ++c) { + children.convert(childWriter, c, v.child, offset + c); + } + return start; + } + + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, + int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + int start = writeList((ListColumnVector) vector, row); + writer.setOffsetAndSizeFromPreviousCursor(column, start); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, + ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + int start = writeList((ListColumnVector) vector, row); + writer.setOffsetAndSizeFromPreviousCursor(element, start); + } + } + } + + private static class MapConverter implements Converter { + private final Converter keyConvert; + private final Converter valueConvert; + + private final UnsafeArrayWriter keyWriter; + private final UnsafeArrayWriter valueWriter; + + private final int keySize; + private final int valueSize; + + private final int KEY_SIZE_BYTES = 8; + + MapConverter(final UnsafeWriter parentWriter, TypeDescription schema) { + final TypeDescription keyType = schema.getChildren().get(0); + final TypeDescription valueType = schema.getChildren().get(1); + keyConvert = buildConverter(parentWriter, keyType); + keySize = getArrayElementSize(keyType); + keyWriter = new UnsafeArrayWriter(parentWriter, keySize); + valueConvert = buildConverter(parentWriter, valueType); + valueSize = getArrayElementSize(valueType); + valueWriter = new UnsafeArrayWriter(parentWriter, valueSize); + } + + int writeMap(MapColumnVector v, int row) { + final int offset = (int) v.offsets[row]; + final int length = (int) v.lengths[row]; + final int start = keyWriter.cursor(); + + // save room for the key size + keyWriter.grow(KEY_SIZE_BYTES); + keyWriter.increaseCursor(KEY_SIZE_BYTES); + + // serialize the keys + keyWriter.initialize(length); + for(int c = 0; c < length; ++c) { + keyConvert.convert(keyWriter, c, v.keys, offset + c); + } + // store the serialized size of the keys + Platform.putLong(keyWriter.getBuffer(), start, + keyWriter.cursor() - start - KEY_SIZE_BYTES); + + // serialize the values + valueWriter.initialize(length); + for(int c = 0; c < length; ++c) { + valueConvert.convert(valueWriter, c, v.values, offset + c); + } + return start; + } + + @Override + public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNullAt(column); + } else { + int start = writeMap((MapColumnVector) vector, row); + writer.setOffsetAndSizeFromPreviousCursor(column, start); + } + } + + @Override + public void convert(UnsafeArrayWriter writer, int element, ColumnVector vector, int row) { + if (vector.isRepeating) { + row = 0; + } + if (!vector.noNulls && vector.isNull[row]) { + writer.setNull(element); + } else { + int start = writeMap((MapColumnVector) vector, row); + writer.setOffsetAndSizeFromPreviousCursor(element, start); + } + } + } + + static Converter buildConverter(final UnsafeWriter writer, final TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + return new ByteConverter(); + case SHORT: + return new ShortConverter(); + case DATE: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) { + return new Decimal18Converter(schema.getPrecision(), schema.getScale()); + } else { + return new Decimal38Converter(schema.getPrecision(), schema.getScale()); + } + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + return new BinaryConverter(); + case STRUCT: + return new StructConverter(writer, schema); + case LIST: + return new ListConverter(writer, schema); + case MAP: + return new MapConverter(writer, schema); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java new file mode 100644 index 000000000000..80e287835198 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; + +import java.util.List; + +/** + * This class acts as an adaptor from an OrcFileAppender to a + * FileAppender<InternalRow>. + */ +public class SparkOrcWriter implements OrcValueWriter { + + private final Converter[] converters; + + public SparkOrcWriter(TypeDescription schema) { + converters = buildConverters(schema); + } + + @Override + public void write(InternalRow value, VectorizedRowBatch output) { + int row = output.size++; + for(int c=0; c < converters.length; ++c) { + converters[c].addValue(row, c, value, output.cols[c]); + } + } + + /** + * The interface for the conversion from Spark's SpecializedGetters to + * ORC's ColumnVectors. + */ + interface Converter { + /** + * Take a value from the Spark data value and add it to the ORC output. + * @param rowId the row in the ColumnVector + * @param column either the column number or element number + * @param data either an InternalRow or ArrayData + * @param output the ColumnVector to put the value into + */ + void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output); + } + + static class BooleanConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0; + } + } + } + + static class ByteConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data.getByte(column); + } + } + } + + static class ShortConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data.getShort(column); + } + } + } + + static class IntConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data.getInt(column); + } + } + } + + static class LongConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data.getLong(column); + } + } + } + + static class FloatConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column); + } + } + } + + static class DoubleConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column); + } + } + } + + static class StringConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + byte[] value = data.getUTF8String(column).getBytes(); + ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); + } + } + } + + static class BytesConverter implements Converter { + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + // getBinary always makes a copy, so we don't need to worry about it + // being changed behind our back. + byte[] value = data.getBinary(column); + ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); + } + } + } + + static class TimestampConverter implements Converter { + + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + TimestampColumnVector cv = (TimestampColumnVector) output; + long micros = data.getLong(column); + cv.time[rowId] = (micros / 1_000_000) * 1000; + int nanos = (int) (micros % 1_000_000) * 1000; + if (nanos < 0) { + nanos += 1_000_000_000; + cv.time[rowId] -= 1000; + } + cv.nanos[rowId] = nanos; + } + } + } + + static class Decimal18Converter implements Converter { + private final int precision; + private final int scale; + + Decimal18Converter(TypeDescription schema) { + precision = schema.getPrecision(); + scale = schema.getScale(); + } + + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale( + data.getDecimal(column, precision, scale).toUnscaledLong(), scale); + } + } + } + + static class Decimal38Converter implements Converter { + private final int precision; + private final int scale; + + Decimal38Converter(TypeDescription schema) { + precision = schema.getPrecision(); + scale = schema.getScale(); + } + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DecimalColumnVector) output).vector[rowId].set( + HiveDecimal.create(data.getDecimal(column, precision, scale) + .toJavaBigDecimal())); + } + } + } + + static class StructConverter implements Converter { + private final Converter[] children; + + StructConverter(TypeDescription schema) { + children = new Converter[schema.getChildren().size()]; + for(int c=0; c < children.length; ++c) { + children[c] = buildConverter(schema.getChildren().get(c)); + } + } + + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + InternalRow value = data.getStruct(column, children.length); + StructColumnVector cv = (StructColumnVector) output; + for(int c=0; c < children.length; ++c) { + children[c].addValue(rowId, c, value, cv.fields[c]); + } + } + } + } + + static class ListConverter implements Converter { + private final Converter children; + + ListConverter(TypeDescription schema) { + children = buildConverter(schema.getChildren().get(0)); + } + + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ArrayData value = data.getArray(column); + ListColumnVector cv = (ListColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = value.numElements(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.child.ensureSize(cv.childCount, true); + // Add each element + for(int e=0; e < cv.lengths[rowId]; ++e) { + children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child); + } + } + } + } + + static class MapConverter implements Converter { + private final Converter keyConverter; + private final Converter valueConverter; + + MapConverter(TypeDescription schema) { + keyConverter = buildConverter(schema.getChildren().get(0)); + valueConverter = buildConverter(schema.getChildren().get(1)); + } + + public void addValue(int rowId, int column, SpecializedGetters data, + ColumnVector output) { + if (data.isNullAt(column)) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + MapData map = data.getMap(column); + ArrayData key = map.keyArray(); + ArrayData value = map.valueArray(); + MapColumnVector cv = (MapColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = value.numElements(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.keys.ensureSize(cv.childCount, true); + cv.values.ensureSize(cv.childCount, true); + // Add each element + for(int e=0; e < cv.lengths[rowId]; ++e) { + int pos = (int)(e + cv.offsets[rowId]); + keyConverter.addValue(pos, e, key, cv.keys); + valueConverter.addValue(pos, e, value, cv.values); + } + } + } + } + + private static Converter buildConverter(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + return new ByteConverter(); + case SHORT: + return new ShortConverter(); + case DATE: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case BINARY: + return new BytesConverter(); + case STRING: + case CHAR: + case VARCHAR: + return new StringConverter(); + case DECIMAL: + return schema.getPrecision() <= 18 + ? new Decimal18Converter(schema) + : new Decimal38Converter(schema); + case TIMESTAMP: + return new TimestampConverter(); + case STRUCT: + return new StructConverter(schema); + case LIST: + return new ListConverter(schema); + case MAP: + return new MapConverter(schema); + } + throw new IllegalArgumentException("Unhandled type " + schema); + } + + private static Converter[] buildConverters(TypeDescription schema) { + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Top level must be a struct " + schema); + } + List children = schema.getChildren(); + Converter[] result = new Converter[children.size()]; + for(int c=0; c < children.size(); ++c) { + result[c] = buildConverter(children.get(c)); + } + return result; + } + +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index a74d9cd43703..63a33f958000 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -51,6 +51,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkSchemaUtil; @@ -59,6 +60,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.AttributeReference; @@ -433,6 +435,10 @@ private Iterator open(FileScanTask task, Schema readSchema) { iter = newAvroIterable(location, task, readSchema); break; + case ORC: + iter = newOrcIterable(location, task, readSchema); + break; + default: throw new UnsupportedOperationException( "Cannot read unknown format: " + task.file().format()); @@ -465,6 +471,17 @@ private CloseableIterable newParquetIterable(InputFile location, .caseSensitive(caseSensitive) .build(); } + + private CloseableIterable newOrcIterable(InputFile location, + FileScanTask task, + Schema readSchema) { + return ORC.read(location) + .schema(readSchema) + .split(task.start(), task.length()) + .createReaderFunc(SparkOrcReader::new) + .caseSensitive(caseSensitive) + .build(); + } } private static class PartitionRowConverter implements Function { diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index a760455ba076..69d0d84c4cba 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -52,6 +52,7 @@ import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.BinaryType; import org.apache.spark.unsafe.types.UTF8String; import org.junit.Assert; import scala.collection.Seq; @@ -594,6 +595,8 @@ private static void assertEquals(String context, DataType type, Object expected, actual instanceof MapData); assertEquals(context, (MapType) type, (MapData) expected, (MapData) actual); + } else if (type instanceof BinaryType) { + assertEqualBytes(context, (byte[]) expected, (byte[]) actual); } else { Assert.assertEquals("Value should match expected: " + context, expected, actual); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java new file mode 100644 index 000000000000..1a20ff8e59ba --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import static org.apache.iceberg.spark.data.TestHelpers.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; + +public class TestSparkOrcReader extends AvroDataTest { + @Override + protected void writeAndValidate(Schema schema) throws IOException { + final Iterable expected = RandomData + .generateSpark(schema, 100, 0L); + + final File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = ORC.write(Files.localOutput(testFile)) + .createWriterFunc(SparkOrcWriter::new) + .schema(schema) + .build()) { + writer.addAll(expected); + } + + try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) + .schema(schema) + .createReaderFunc(SparkOrcReader::new) + .build()) { + final Iterator actualRows = reader.iterator(); + final Iterator expectedRows = expected.iterator(); + while (expectedRows.hasNext()) { + Assert.assertTrue("Should have expected number of rows", actualRows.hasNext()); + assertEquals(schema, expectedRows.next(), actualRows.next()); + } + Assert.assertFalse("Should not have extra rows", actualRows.hasNext()); + } + } +}