Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.vectorized;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the Apache license header similar to other files.


import java.math.BigDecimal;
import java.math.BigInteger;

import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

/**
* This class adds the constant support to ColumnVector.
* It supports all the types and contains `set` APIs,
* which will set the exact same value to all rows.
*
* Capacity: The vector stores only one copy of the data.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a UT for this new vector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure - working on it!

public class ConstantColumnVector extends ColumnVector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking whether we should extend WritableColumnVector instead, so we can easily leverage constant column vector to represent partition columns.

It seems for partition columns, we are doing copying of same value per row (Parquet and ORC). A future improvement is to use the constant column vector we are introducing here to avoid unnecessary operations.

@cloud-fan WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to extend WritableColumnVector initially, but seems like we needs to implement some unnecessary public methods like: putLongs(rowId, count, value)


// The data stored in this ConstantColumnVector, the vector stores only one copy of the data.
private byte nullData;
private byte byteData;
private short shortData;
private int intData;
private long longData;
private float floatData;
private double doubleData;
private UTF8String stringData;
private byte[] byteArrayData;
private ConstantColumnVector[] childData;
private ColumnarArray arrayData;
private ColumnarMap mapData;

private final int numRows;

/**
* @param numRows: The number of rows for this ConstantColumnVector
* @param type: The data type of this ConstantColumnVector
*/
public ConstantColumnVector(int numRows, DataType type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WritableColumnVector already has a way to set constant via setIsConstant. Have you looked at it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the setIsConstant only affects reset, but doesn't change how the data is stored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I actually looked at it as well. Seems there's more code change needed if we want to utilize setIsConstant from WritableColumnVector. It'd better to start with a separate new class ConstantColumnVector here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Perhaps we can remove setIsConstant later and replace its usage with ConstantColumnVector.

super(type);
this.numRows = numRows;

if (type instanceof StructType) {
this.childData = new ConstantColumnVector[((StructType) type).fields().length];
} else if (type instanceof CalendarIntervalType) {
// Three columns. Months as int. Days as Int. Microseconds as Long.
this.childData = new ConstantColumnVector[3];
} else {
this.childData = null;
}
}

@Override
public void close() {
byteArrayData = null;
for (int i = 0; i < childData.length; i++) {
childData[i].close();
childData[i] = null;
}
childData = null;
arrayData = null;
mapData = null;
}

@Override
public boolean hasNull() {
return nullData == 1;
}

@Override
public int numNulls() {
return hasNull() ? numRows : 0;
}

@Override
public boolean isNullAt(int rowId) {
return nullData == 1;
}

/**
* Sets all rows as `null`
*/
public void setNull() {
nullData = (byte) 1;
}

/**
* Sets all rows as not `null`
*/
public void setNotNull() {
nullData = (byte) 0;
}

@Override
public boolean getBoolean(int rowId) {
return byteData == 1;
}

/**
* Sets the boolean `value` for all rows
*/
public void setBoolean(boolean value) {
byteData = (byte) ((value) ? 1 : 0);
}

@Override
public byte getByte(int rowId) {
return byteData;
}

/**
* Sets the byte `value` for all rows
*/
public void setByte(byte value) {
byteData = value;
}

@Override
public short getShort(int rowId) {
return shortData;
}

/**
* Sets the short `value` for all rows
*/
public void setShort(short value) {
shortData = value;
}

@Override
public int getInt(int rowId) {
return intData;
}

/**
* Sets the int `value` for all rows
*/
public void setInt(int value) {
intData = value;
}

@Override
public long getLong(int rowId) {
return longData;
}

/**
* Sets the long `value` for all rows
*/
public void setLong(long value) {
longData = value;
}

@Override
public float getFloat(int rowId) {
return floatData;
}

/**
* Sets the float `value` for all rows
*/
public void setFloat(float value) {
floatData = value;
}

@Override
public double getDouble(int rowId) {
return doubleData;
}

/**
* Sets the double `value` for all rows
*/
public void setDouble(double value) {
doubleData = value;
}

@Override
public ColumnarArray getArray(int rowId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this can work properly. Looking at ColumnarArray, in some cases offset is required from underlying ColumnVector, for instance, copy, toBooleanArray, etc.

return arrayData;
}

/**
* Sets the `ColumnarArray` `value` for all rows
*/
public void setArray(ColumnarArray value) {
arrayData = value;
}

@Override
public ColumnarMap getMap(int ordinal) {
return mapData;
}

/**
* Sets the `ColumnarMap` `value` for all rows
*/
public void setMap(ColumnarMap value) {
mapData = value;
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
// copy and modify from WritableColumnVector
if (precision <= Decimal.MAX_INT_DIGITS()) {
return Decimal.createUnsafe(getInt(rowId), precision, scale);
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
return Decimal.createUnsafe(getLong(rowId), precision, scale);
} else {
byte[] bytes = getBinary(rowId);
BigInteger bigInteger = new BigInteger(bytes);
BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
return Decimal.apply(javaDecimal, precision, scale);
}
}

/**
* Sets the `Decimal` `value` with the precision for all rows
*/
public void setDecimal(Decimal value, int precision) {
// copy and modify from WritableColumnVector
if (precision <= Decimal.MAX_INT_DIGITS()) {
setInt((int) value.toUnscaledLong());
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
setLong(value.toUnscaledLong());
} else {
BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
setByteArray(bigInteger.toByteArray());
}
}

@Override
public UTF8String getUTF8String(int rowId) {
return stringData;
}

/**
* Sets the `UTF8String` `value` for all rows
*/
public void setUtf8String(UTF8String value) {
stringData = value;
}

Copy link
Member

@sunchao sunchao Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add putInterval (or setInterval) too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion - just wanna put some minimum supports in this PR (implement all necessary APIs extending from ColumnVector), will add more follow-up PRs to include more high-level APIs (setStruct, setCalendarInterval, set..., etc) thanks!

/**
* Sets the byte array `value` for all rows
*/
private void setByteArray(byte[] value) {
byteArrayData = value;
}

@Override
public byte[] getBinary(int rowId) {
return byteArrayData;
}

/**
* Sets the binary `value` for all rows
*/
public void setBinary(byte[] value) {
setByteArray(value);
}

@Override
public ColumnVector getChild(int ordinal) {
return childData[ordinal];
}

/**
* Sets the child `ConstantColumnVector` `value` at the given ordinal for all rows
*/
public void setChild(int ordinal, ConstantColumnVector value) {
childData[ordinal] = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -221,8 +221,8 @@ case class FileSourceScanExec(
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf).map { vectorTypes =>
// for column-based file format, append metadata struct column's vector type classes if any
vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[OnHeapColumnVector].getName)
// for column-based file format, append metadata column's vector type classes if any
vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName)
}

private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, JoinedRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.FileFormat._
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -134,7 +134,7 @@ class FileScanRDD(
* For each partitioned file, metadata columns for each record in the file are exactly same.
* Only update metadata row when `currentFile` is changed.
*/
private def updateMetadataRow(): Unit = {
private def updateMetadataRow(): Unit =
if (metadataColumns.nonEmpty && currentFile != null) {
val path = new Path(currentFile.filePath)
metadataColumns.zipWithIndex.foreach { case (attr, i) =>
Expand All @@ -149,44 +149,30 @@ class FileScanRDD(
}
}
}
}

/**
* Create a writable column vector containing all required metadata columns
* Create an array of constant column vectors containing all required metadata columns
*/
private def createMetadataColumnVector(c: ColumnarBatch): Array[WritableColumnVector] = {
private def createMetadataColumnVector(c: ColumnarBatch): Array[ConstantColumnVector] = {
val path = new Path(currentFile.filePath)
val filePathBytes = path.toString.getBytes
val fileNameBytes = path.getName.getBytes
var rowId = 0
metadataColumns.map(_.name).map {
case FILE_PATH =>
val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
rowId = 0
// use a tight-loop for better performance
while (rowId < c.numRows()) {
columnVector.putByteArray(rowId, filePathBytes)
rowId += 1
}
val columnVector = new ConstantColumnVector(c.numRows(), StringType)
columnVector.setUtf8String(UTF8String.fromString(path.toString))
columnVector
case FILE_NAME =>
val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
rowId = 0
// use a tight-loop for better performance
while (rowId < c.numRows()) {
columnVector.putByteArray(rowId, fileNameBytes)
rowId += 1
}
val columnVector = new ConstantColumnVector(c.numRows(), StringType)
columnVector.setUtf8String(UTF8String.fromString(path.getName))
columnVector
case FILE_SIZE =>
val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
columnVector.putLongs(0, c.numRows(), currentFile.fileSize)
val columnVector = new ConstantColumnVector(c.numRows(), LongType)
columnVector.setLong(currentFile.fileSize)
columnVector
case FILE_MODIFICATION_TIME =>
val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
val columnVector = new ConstantColumnVector(c.numRows(), LongType)
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType is stored in microsecond
columnVector.putLongs(0, c.numRows(), currentFile.modificationTime * 1000L)
columnVector.setLong(currentFile.modificationTime * 1000L)
columnVector
}.toArray
}
Expand All @@ -198,10 +184,9 @@ class FileScanRDD(
private def addMetadataColumnsIfNeeded(nextElement: Object): Object = {
if (metadataColumns.nonEmpty) {
nextElement match {
case c: ColumnarBatch =>
new ColumnarBatch(
Array.tabulate(c.numCols())(c.column) ++ createMetadataColumnVector(c),
c.numRows())
case c: ColumnarBatch => new ColumnarBatch(
Array.tabulate(c.numCols())(c.column) ++ createMetadataColumnVector(c),
c.numRows())
case u: UnsafeRow => projection.apply(new JoinedRow(u, metadataRow))
case i: InternalRow => new JoinedRow(i, metadataRow)
}
Expand Down
Loading