Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
620 changes: 83 additions & 537 deletions data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java

Large diffs are not rendered by default.

414 changes: 414 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -59,7 +60,7 @@ public static class WriteBuilder {
private final OutputFile file;
private final Configuration conf;
private Schema schema = null;
private Function<TypeDescription, OrcValueWriter<?>> createWriterFunc;
private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> createWriterFunc;
private Map<String, byte[]> metadata = new HashMap<>();

private WriteBuilder(OutputFile file) {
Expand All @@ -81,7 +82,7 @@ public WriteBuilder config(String property, String value) {
return this;
}

public WriteBuilder createWriterFunc(Function<TypeDescription, OrcValueWriter<?>> writerFunction) {
public WriteBuilder createWriterFunc(BiFunction<Schema, TypeDescription, OrcRowWriter<?>> writerFunction) {
this.createWriterFunc = writerFunction;
return this;
}
Expand Down
20 changes: 10 additions & 10 deletions orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Metrics;
Expand All @@ -47,24 +47,22 @@
*/
class OrcFileAppender<D> implements FileAppender<D> {
private final int batchSize;
private final Schema schema;
private final OutputFile file;
private final Writer writer;
private final VectorizedRowBatch batch;
private final OrcValueWriter<D> valueWriter;
private final OrcRowWriter<D> valueWriter;
private boolean isClosed = false;
private final Configuration conf;

OrcFileAppender(Schema schema, OutputFile file,
Function<TypeDescription, OrcValueWriter<?>> createWriterFunc,
BiFunction<Schema, TypeDescription, OrcRowWriter<?>> createWriterFunc,
Configuration conf, Map<String, byte[]> metadata,
int batchSize) {
this.conf = conf;
this.file = file;
this.batchSize = batchSize;
this.schema = schema;

TypeDescription orcSchema = ORCSchemaUtil.convert(this.schema);
TypeDescription orcSchema = ORCSchemaUtil.convert(schema);
this.batch = orcSchema.createRowBatch(this.batchSize);

OrcFile.WriterOptions options = OrcFile.writerOptions(conf).useUTCTimestamp(true);
Expand All @@ -73,7 +71,7 @@ class OrcFileAppender<D> implements FileAppender<D> {
}
options.setSchema(orcSchema);
this.writer = newOrcWriter(file, options, metadata);
this.valueWriter = newOrcValueWriter(orcSchema, createWriterFunc);
this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc);
}

@Override
Expand Down Expand Up @@ -146,8 +144,10 @@ private static Writer newOrcWriter(OutputFile file,
}

@SuppressWarnings("unchecked")
private static <D> OrcValueWriter<D> newOrcValueWriter(
TypeDescription schema, Function<TypeDescription, OrcValueWriter<?>> createWriterFunc) {
return (OrcValueWriter<D>) createWriterFunc.apply(schema);
private static <D> OrcRowWriter<D> newOrcRowWriter(Schema schema,
TypeDescription orcSchema,
BiFunction<Schema, TypeDescription, OrcRowWriter<?>>
createWriterFunc) {
return (OrcRowWriter<D>) createWriterFunc.apply(schema, orcSchema);
}
}
38 changes: 38 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import java.io.IOException;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

/**
* Write data value of a schema.
*/
public interface OrcRowWriter<T> {

/**
* Writes or appends a row to ORC's VectorizedRowBatch.
*
* @param row the row data value to write.
* @param output the VectorizedRowBatch to which the output will be written.
* @throws IOException if there's any IO error while writing the data value.
*/
void write(T row, VectorizedRowBatch output) throws IOException;
}
28 changes: 18 additions & 10 deletions orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@

package org.apache.iceberg.orc;

import java.io.IOException;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;

/**
* Write data value of a schema.
*/
public interface OrcValueWriter<T> {

Class<?> getJavaClass();

/**
* Writes the data.
* Take a value from the data value and add it to the ORC output.
*
* @param value the data value to write.
* @param output the VectorizedRowBatch to which the output will be written.
* @throws IOException if there's any IO error while writing the data value.
* @param rowId the row in the ColumnVector
* @param data the data value to write.
* @param output the ColumnVector to put the value into
*/
void write(T value, VectorizedRowBatch output) throws IOException;
default void write(int rowId, T data, ColumnVector output) {
if (data == null) {
output.noNulls = false;
output.isNull[rowId] = true;
} else {
output.isNull[rowId] = false;
nonNullWrite(rowId, data, output);
}
}

void nonNullWrite(int rowId, T data, ColumnVector output);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.iceberg.spark.data;

import java.util.List;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.iceberg.orc.OrcRowWriter;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
Expand All @@ -42,7 +42,7 @@
* This class acts as an adaptor from an OrcFileAppender to a
* FileAppender&lt;InternalRow&gt;.
*/
public class SparkOrcWriter implements OrcValueWriter<InternalRow> {
public class SparkOrcWriter implements OrcRowWriter<InternalRow> {

private final Converter[] converters;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFor

case ORC:
return ORC.write(file)
.createWriterFunc(SparkOrcWriter::new)
.createWriterFunc((schema, typeDesc) -> new SparkOrcWriter(typeDesc))
.setAll(properties)
.schema(writeSchema)
.overwrite()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void splitOffsets() throws IOException {

Iterable<InternalRow> rows = RandomData.generateSpark(SCHEMA, 1, 0L);
FileAppender<InternalRow> writer = ORC.write(Files.localOutput(testFile))
.createWriterFunc(SparkOrcWriter::new)
.createWriterFunc((schema, typeDesc) -> new SparkOrcWriter(typeDesc))
.schema(SCHEMA)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void writeFile() throws IOException {
Assert.assertTrue("Delete should succeed", testFile.delete());

try (FileAppender<InternalRow> writer = ORC.write(Files.localOutput(testFile))
.createWriterFunc(SparkOrcWriter::new)
.createWriterFunc((icebergSchema, typeDesc) -> new SparkOrcWriter(typeDesc))
.schema(DATA_SCHEMA)
// write in such a way that the file contains 10 stripes each with 100 rows
.config("iceberg.orc.vectorbatch.size", "100")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private void writeAndValidateRecords(Schema schema, Iterable<InternalRow> expect
Assert.assertTrue("Delete should succeed", testFile.delete());

try (FileAppender<InternalRow> writer = ORC.write(Files.localOutput(testFile))
.createWriterFunc(SparkOrcWriter::new)
.createWriterFunc((icebergSchema, typeDesc) -> new SparkOrcWriter(typeDesc))
.schema(schema)
.build()) {
writer.addAll(expected);
Expand Down