Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ public MapData getMap(int ordinal) {
/**
* Returns the decimal for rowId.
*/
public final Decimal getDecimal(int rowId, int precision, int scale) {
public Decimal getDecimal(int rowId, int precision, int scale) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
return Decimal.createUnsafe(getInt(rowId), precision, scale);
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
Expand All @@ -661,7 +661,7 @@ public final Decimal getDecimal(int rowId, int precision, int scale) {
}


public final void putDecimal(int rowId, Decimal value, int precision) {
public void putDecimal(int rowId, Decimal value, int precision) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
putInt(rowId, (int) value.toUnscaledLong());
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
Expand All @@ -675,7 +675,7 @@ public final void putDecimal(int rowId, Decimal value, int precision) {
/**
* Returns the UTF8String for rowId.
*/
public final UTF8String getUTF8String(int rowId) {
public UTF8String getUTF8String(int rowId) {
if (dictionary == null) {
ColumnVector.Array a = getByteArray(rowId);
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
Expand All @@ -688,7 +688,7 @@ public final UTF8String getUTF8String(int rowId) {
/**
* Returns the byte array for rowId.
*/
public final byte[] getBinary(int rowId) {
public byte[] getBinary(int rowId) {
if (dictionary == null) {
ColumnVector.Array array = getByteArray(rowId);
byte[] bytes = new byte[array.length];
Expand Down Expand Up @@ -956,7 +956,7 @@ public final int appendStruct(boolean isNull) {
/**
* Data type for this column.
*/
protected final DataType type;
protected DataType type;

/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
Expand Down Expand Up @@ -988,17 +988,17 @@ public final int appendStruct(boolean isNull) {
/**
* If this is a nested type (array or struct), the column for the child data.
*/
protected final ColumnVector[] childColumns;
protected ColumnVector[] childColumns;

/**
* Reusable Array holder for getArray().
*/
protected final Array resultArray;
protected Array resultArray;

/**
* Reusable Struct holder for getStruct().
*/
protected final ColumnarBatch.Row resultStruct;
protected ColumnarBatch.Row resultStruct;

/**
* The Dictionary for this column.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.vectorized;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.types.*;

/**
* An abstract class for read-only column vector.
*/
public abstract class ReadOnlyColumnVector extends ColumnVector {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to refactor ColumnVector into classes that separate reading/writing so you could just extend the read portion instead of making this class that throws exceptions on writes? e.g.

ColumnVector -> ColumnVectorWritable -> ColumnVectorReadable
ArrowColumnVector -> ColumnVectorReadable

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it'd be better to refactor ColumnVector, but I think ColumnVector is related to ColumnarBatch or other classes, so we should do it, and also refactor ColumnarBatch at the same time, in the future PRs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on separating the read/write, we should definitely do this before we publish the ColumnVector interfaces.


protected ReadOnlyColumnVector(int capacity, DataType type, MemoryMode memMode) {
super(capacity, DataTypes.NullType, memMode);
this.type = type;
isConstant = true;
}

//
// APIs dealing with nulls
//

@Override
public final void putNotNull(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public final void putNull(int rowId) {
throw new UnsupportedOperationException();
}

@Override
public final void putNulls(int rowId, int count) {
throw new UnsupportedOperationException();
}

@Override
public final void putNotNulls(int rowId, int count) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Booleans
//

@Override
public final void putBoolean(int rowId, boolean value) {
throw new UnsupportedOperationException();
}

@Override
public final void putBooleans(int rowId, int count, boolean value) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Bytes
//

@Override
public final void putByte(int rowId, byte value) {
throw new UnsupportedOperationException();
}

@Override
public final void putBytes(int rowId, int count, byte value) {
throw new UnsupportedOperationException();
}

@Override
public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Shorts
//

@Override
public final void putShort(int rowId, short value) {
throw new UnsupportedOperationException();
}

@Override
public final void putShorts(int rowId, int count, short value) {
throw new UnsupportedOperationException();
}

@Override
public final void putShorts(int rowId, int count, short[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Ints
//

@Override
public final void putInt(int rowId, int value) {
throw new UnsupportedOperationException();
}

@Override
public final void putInts(int rowId, int count, int value) {
throw new UnsupportedOperationException();
}

@Override
public final void putInts(int rowId, int count, int[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

@Override
public final void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Longs
//

@Override
public final void putLong(int rowId, long value) {
throw new UnsupportedOperationException();
}

@Override
public final void putLongs(int rowId, int count, long value) {
throw new UnsupportedOperationException();
}

@Override
public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

@Override
public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with floats
//

@Override
public final void putFloat(int rowId, float value) {
throw new UnsupportedOperationException();
}

@Override
public final void putFloats(int rowId, int count, float value) {
throw new UnsupportedOperationException();
}

@Override
public final void putFloats(int rowId, int count, float[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

@Override
public final void putFloats(int rowId, int count, byte[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with doubles
//

@Override
public final void putDouble(int rowId, double value) {
throw new UnsupportedOperationException();
}

@Override
public final void putDoubles(int rowId, int count, double value) {
throw new UnsupportedOperationException();
}

@Override
public final void putDoubles(int rowId, int count, double[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

@Override
public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Arrays
//

@Override
public final void putArray(int rowId, int offset, int length) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Byte Arrays
//

@Override
public final int putByteArray(int rowId, byte[] value, int offset, int count) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Decimals
//

@Override
public final void putDecimal(int rowId, Decimal value, int precision) {
throw new UnsupportedOperationException();
}

//
// Other APIs
//

@Override
public final void setDictionary(Dictionary dictionary) {
throw new UnsupportedOperationException();
}

@Override
public final ColumnVector reserveDictionaryIds(int capacity) {
throw new UnsupportedOperationException();
}

@Override
protected final void reserveInternal(int newCapacity) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,34 +70,6 @@ private[sql] object ArrowPayload {

private[sql] object ArrowConverters {

/**
* Map a Spark DataType to ArrowType.
*/
private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
dataType match {
case BooleanType => ArrowType.Bool.INSTANCE
case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
case ByteType => new ArrowType.Int(8, true)
case StringType => ArrowType.Utf8.INSTANCE
case BinaryType => ArrowType.Binary.INSTANCE
case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType")
}
}

/**
* Convert a Spark Dataset schema to Arrow schema.
*/
private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
val arrowFields = schema.fields.map { f =>
new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava)
}
new Schema(arrowFields.toList.asJava)
}

/**
* Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload
* by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
Expand Down Expand Up @@ -178,7 +150,7 @@ private[sql] object ArrowConverters {
batch: ArrowRecordBatch,
schema: StructType,
allocator: BufferAllocator): Array[Byte] = {
val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
val arrowSchema = ArrowUtils.toArrowSchema(schema)
val root = VectorSchemaRoot.create(arrowSchema, allocator)
val out = new ByteArrayOutputStream()
val writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
Expand Down Expand Up @@ -410,7 +382,7 @@ private[arrow] object ColumnWriter {
* Create an Arrow ColumnWriter given the type and ordinal of row.
*/
def apply(dataType: DataType, ordinal: Int, allocator: BufferAllocator): ColumnWriter = {
val dtype = ArrowConverters.sparkTypeToArrowType(dataType)
val dtype = ArrowUtils.toArrowType(dataType)
dataType match {
case BooleanType => new BooleanColumnWriter(dtype, ordinal, allocator)
case ShortType => new ShortColumnWriter(dtype, ordinal, allocator)
Expand Down
Loading