Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.data;

import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;

interface SparkOrcValueWriter {

/**
* Take a value from the data and add it to the ORC output.
*
* @param rowId the row id in the ColumnVector.
* @param column the column number.
* @param data the data value to write.
* @param output the ColumnVector to put the value into.
*/
default void write(int rowId, int column, SpecializedGetters data, ColumnVector output) {
if (data.isNullAt(column)) {
output.noNulls = false;
output.isNull[rowId] = true;
} else {
output.isNull[rowId] = false;
nonNullWrite(rowId, column, data, output);
}
}

void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.data;

import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;

class SparkOrcValueWriters {
private SparkOrcValueWriters() {
}

static SparkOrcValueWriter booleans() {
return BooleanWriter.INSTANCE;
}

static SparkOrcValueWriter bytes() {
return ByteWriter.INSTANCE;
}

static SparkOrcValueWriter shorts() {
return ShortWriter.INSTANCE;
}

static SparkOrcValueWriter ints() {
return IntWriter.INSTANCE;
}

static SparkOrcValueWriter longs() {
return LongWriter.INSTANCE;
}

static SparkOrcValueWriter floats() {
return FloatWriter.INSTANCE;
}

static SparkOrcValueWriter doubles() {
return DoubleWriter.INSTANCE;
}

static SparkOrcValueWriter byteArrays() {
return BytesWriter.INSTANCE;
}

static SparkOrcValueWriter strings() {
return StringWriter.INSTANCE;
}

static SparkOrcValueWriter timestampTz() {
return TimestampTzWriter.INSTANCE;
}

static SparkOrcValueWriter decimal(int precision, int scale) {
if (precision <= 18) {
return new Decimal18Writer(precision, scale);
} else {
return new Decimal38Writer(precision, scale);
}
}

static SparkOrcValueWriter list(SparkOrcValueWriter element) {
return new ListWriter(element);
}

static SparkOrcValueWriter map(SparkOrcValueWriter keyWriter, SparkOrcValueWriter valueWriter) {
return new MapWriter(keyWriter, valueWriter);
}

private static class BooleanWriter implements SparkOrcValueWriter {
private static final BooleanWriter INSTANCE = new BooleanWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0;
}
}

private static class ByteWriter implements SparkOrcValueWriter {
private static final ByteWriter INSTANCE = new ByteWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = data.getByte(column);
}
}

private static class ShortWriter implements SparkOrcValueWriter {
private static final ShortWriter INSTANCE = new ShortWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = data.getShort(column);
}
}

private static class IntWriter implements SparkOrcValueWriter {
private static final IntWriter INSTANCE = new IntWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = data.getInt(column);
}
}

private static class LongWriter implements SparkOrcValueWriter {
private static final LongWriter INSTANCE = new LongWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = data.getLong(column);
}
}

private static class FloatWriter implements SparkOrcValueWriter {
private static final FloatWriter INSTANCE = new FloatWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
}
}

private static class DoubleWriter implements SparkOrcValueWriter {
private static final DoubleWriter INSTANCE = new DoubleWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
}
}

private static class BytesWriter implements SparkOrcValueWriter {
private static final BytesWriter INSTANCE = new BytesWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
// getBinary always makes a copy, so we don't need to worry about it
// being changed behind our back.
byte[] value = data.getBinary(column);
((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
}
}

private static class StringWriter implements SparkOrcValueWriter {
private static final StringWriter INSTANCE = new StringWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
byte[] value = data.getUTF8String(column).getBytes();
((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
}
}

private static class TimestampTzWriter implements SparkOrcValueWriter {
private static final TimestampTzWriter INSTANCE = new TimestampTzWriter();

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
TimestampColumnVector cv = (TimestampColumnVector) output;
long micros = data.getLong(column);
cv.time[rowId] = micros / 1_000; // millis
cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
}
}

private static class Decimal18Writer implements SparkOrcValueWriter {
private final int precision;
private final int scale;

Decimal18Writer(int precision, int scale) {
this.precision = precision;
this.scale = scale;
}

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
}
}

private static class Decimal38Writer implements SparkOrcValueWriter {
private final int precision;
private final int scale;

Decimal38Writer(int precision, int scale) {
this.precision = precision;
this.scale = scale;
}

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
((DecimalColumnVector) output).vector[rowId].set(
HiveDecimal.create(data.getDecimal(column, precision, scale)
.toJavaBigDecimal()));
}
}

private static class ListWriter implements SparkOrcValueWriter {
private final SparkOrcValueWriter writer;

ListWriter(SparkOrcValueWriter writer) {
this.writer = writer;
}

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
ArrayData value = data.getArray(column);
ListColumnVector cv = (ListColumnVector) output;
// record the length and start of the list elements
cv.lengths[rowId] = value.numElements();
cv.offsets[rowId] = cv.childCount;
cv.childCount += cv.lengths[rowId];
// make sure the child is big enough
cv.child.ensureSize(cv.childCount, true);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
writer.write((int) (e + cv.offsets[rowId]), e, value, cv.child);
}
}
}

private static class MapWriter implements SparkOrcValueWriter {
private final SparkOrcValueWriter keyWriter;
private final SparkOrcValueWriter valueWriter;

MapWriter(SparkOrcValueWriter keyWriter, SparkOrcValueWriter valueWriter) {
this.keyWriter = keyWriter;
this.valueWriter = valueWriter;
}

@Override
public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) {
MapData map = data.getMap(column);
ArrayData key = map.keyArray();
ArrayData value = map.valueArray();
MapColumnVector cv = (MapColumnVector) output;
// record the length and start of the list elements
cv.lengths[rowId] = value.numElements();
cv.offsets[rowId] = cv.childCount;
cv.childCount += cv.lengths[rowId];
// make sure the child is big enough
cv.keys.ensureSize(cv.childCount, true);
cv.values.ensureSize(cv.childCount, true);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int pos = (int) (e + cv.offsets[rowId]);
keyWriter.write(pos, e, key, cv.keys);
valueWriter.write(pos, e, value, cv.values);
}
}
}
}
Loading