From d1d6585e8114596c72fc1f1f54928d8c40d90020 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 25 Mar 2021 01:58:55 -0700 Subject: [PATCH 1/8] Support nested column in ORC vectorized reader --- .../apache/spark/sql/internal/SQLConf.scala | 10 ++ .../datasources/orc/OrcArrayColumnVector.java | 97 ++++++++++ .../orc/OrcAtomicColumnVector.java | 161 +++++++++++++++++ .../datasources/orc/OrcColumnVector.java | 168 ++---------------- .../datasources/orc/OrcColumnVectorUtils.java | 54 ++++++ .../orc/OrcColumnarBatchReader.java | 3 +- .../datasources/orc/OrcMapColumnVector.java | 101 +++++++++++ .../orc/OrcStructColumnVector.java | 88 +++++++++ .../datasources/orc/OrcFileFormat.scala | 17 +- .../datasources/orc/OrcSourceSuite.scala | 29 +++ 10 files changed, 571 insertions(+), 157 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ea3bb3b8790d..cd7fdae6c2c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -838,6 +838,13 @@ object SQLConf { .intConf .createWithDefault(4096) + val ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED = + buildConf("spark.sql.orc.enableNestedColumnVectorizedReader") + .doc("Enables vectorized orc decoding for nested column.") + .version("3.2.0") + .booleanConf + .createWithDefault(true) + val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .version("1.4.0") @@ -3328,6 +3335,9 @@ class SQLConf extends Serializable with Logging { def orcVectorizedReaderBatchSize: Int = getConf(ORC_VECTORIZED_READER_BATCH_SIZE) + def orcVectorizedReaderNestedColumnEnabled: Boolean = + getConf(ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED) + def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java new file mode 100644 index 000000000000..137dd3548390 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java @@ -0,0 +1,97 @@ +package org.apache.spark.sql.execution.datasources.orc; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column vector implementation for Spark's {@link ArrayType}. + */ +public class OrcArrayColumnVector extends OrcColumnVector { + private final OrcColumnVector data; + private final long[] offsets; + private final long[] lengths; + + OrcArrayColumnVector( + DataType type, + ColumnVector vector, + OrcColumnVector data, + long[] offsets, + long[] lengths) { + + super(type, vector); + + this.data = data; + this.offsets = offsets; + this.lengths = lengths; + } + + @Override + public ColumnarArray getArray(int rowId) { + return new ColumnarArray(data, (int) offsets[rowId], (int) lengths[rowId]); + } + + @Override + public boolean getBoolean(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new UnsupportedOperationException(); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBinary(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarMap getMap(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) { + throw new UnsupportedOperationException(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java new file mode 100644 index 000000000000..c2d8334d928c --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.math.BigDecimal; + +import org.apache.hadoop.hive.ql.exec.vector.*; + +import org.apache.spark.sql.catalyst.util.DateTimeUtils; +import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column vector implementation for Spark's AtomicType. + */ +public class OrcAtomicColumnVector extends OrcColumnVector { + private final boolean isTimestamp; + private final boolean isDate; + + // Column vector for each type. Only 1 is populated for any type. + private LongColumnVector longData; + private DoubleColumnVector doubleData; + private BytesColumnVector bytesData; + private DecimalColumnVector decimalData; + private TimestampColumnVector timestampData; + + OrcAtomicColumnVector(DataType type, ColumnVector vector) { + super(type, vector); + + if (type instanceof TimestampType) { + isTimestamp = true; + } else { + isTimestamp = false; + } + + if (type instanceof DateType) { + isDate = true; + } else { + isDate = false; + } + + if (vector instanceof LongColumnVector) { + longData = (LongColumnVector) vector; + } else if (vector instanceof DoubleColumnVector) { + doubleData = (DoubleColumnVector) vector; + } else if (vector instanceof BytesColumnVector) { + bytesData = (BytesColumnVector) vector; + } else if (vector instanceof DecimalColumnVector) { + decimalData = (DecimalColumnVector) vector; + } else if (vector instanceof TimestampColumnVector) { + timestampData = (TimestampColumnVector) vector; + } else { + throw new UnsupportedOperationException(); + } + } + + @Override + public boolean getBoolean(int rowId) { + return longData.vector[getRowIndex(rowId)] == 1; + } + + @Override + public byte getByte(int rowId) { + return (byte) longData.vector[getRowIndex(rowId)]; + } + + @Override + public short getShort(int rowId) { + return (short) longData.vector[getRowIndex(rowId)]; + } + + @Override + public int getInt(int rowId) { + int value = (int) longData.vector[getRowIndex(rowId)]; + if (isDate) { + return RebaseDateTime.rebaseJulianToGregorianDays(value); + } else { + return value; + } + } + + @Override + public long getLong(int rowId) { + int index = getRowIndex(rowId); + if (isTimestamp) { + return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); + } else { + return longData.vector[index]; + } + } + + @Override + public float getFloat(int rowId) { + return (float) doubleData.vector[getRowIndex(rowId)]; + } + + @Override + public double getDouble(int rowId) { + return doubleData.vector[getRowIndex(rowId)]; + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) return null; + BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue(); + return Decimal.apply(data, precision, scale); + } + + @Override + public UTF8String getUTF8String(int rowId) { + if (isNullAt(rowId)) return null; + int index = getRowIndex(rowId); + BytesColumnVector col = bytesData; + return UTF8String.fromBytes(col.vector[index], col.start[index], col.length[index]); + } + + @Override + public byte[] getBinary(int rowId) { + if (isNullAt(rowId)) return null; + int index = getRowIndex(rowId); + byte[] binary = new byte[bytesData.length[index]]; + System.arraycopy(bytesData.vector[index], bytesData.start[index], binary, 0, binary.length); + return binary; + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarMap getMap(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) { + throw new UnsupportedOperationException(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 6e55fedfc4de..b75e51ee576c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -1,91 +1,28 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.spark.sql.execution.datasources.orc; -import java.math.BigDecimal; - -import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.spark.sql.catalyst.util.DateTimeUtils; -import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DateType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.TimestampType; -import org.apache.spark.sql.vectorized.ColumnarArray; -import org.apache.spark.sql.vectorized.ColumnarMap; -import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.sql.vectorized.ColumnarBatch; /** - * A column vector class wrapping Hive's ColumnVector. Because Spark ColumnarBatch only accepts - * Spark's vectorized.ColumnVector, this column vector is used to adapt Hive ColumnVector with - * Spark ColumnarVector. + * A column vector interface wrapping Hive's {@link ColumnVector}. + * + * Because Spark {@link ColumnarBatch} only accepts Spark's vectorized.ColumnVector, + * this column vector is used to adapt Hive ColumnVector with Spark ColumnarVector. */ -public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector { - private ColumnVector baseData; - private LongColumnVector longData; - private DoubleColumnVector doubleData; - private BytesColumnVector bytesData; - private DecimalColumnVector decimalData; - private TimestampColumnVector timestampData; - private final boolean isTimestamp; - private final boolean isDate; - +public abstract class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector { + private final ColumnVector baseData; private int batchSize; OrcColumnVector(DataType type, ColumnVector vector) { super(type); - if (type instanceof TimestampType) { - isTimestamp = true; - } else { - isTimestamp = false; - } - - if (type instanceof DateType) { - isDate = true; - } else { - isDate = false; - } - baseData = vector; - if (vector instanceof LongColumnVector) { - longData = (LongColumnVector) vector; - } else if (vector instanceof DoubleColumnVector) { - doubleData = (DoubleColumnVector) vector; - } else if (vector instanceof BytesColumnVector) { - bytesData = (BytesColumnVector) vector; - } else if (vector instanceof DecimalColumnVector) { - decimalData = (DecimalColumnVector) vector; - } else if (vector instanceof TimestampColumnVector) { - timestampData = (TimestampColumnVector) vector; - } else { - throw new UnsupportedOperationException(); - } - } - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; } @Override public void close() { - } @Override @@ -112,97 +49,18 @@ public int numNulls() { } } - /* A helper method to get the row index in a column. */ - private int getRowIndex(int rowId) { - return baseData.isRepeating ? 0 : rowId; - } - @Override public boolean isNullAt(int rowId) { return baseData.isNull[getRowIndex(rowId)]; } - @Override - public boolean getBoolean(int rowId) { - return longData.vector[getRowIndex(rowId)] == 1; - } - - @Override - public byte getByte(int rowId) { - return (byte) longData.vector[getRowIndex(rowId)]; - } - - @Override - public short getShort(int rowId) { - return (short) longData.vector[getRowIndex(rowId)]; - } - - @Override - public int getInt(int rowId) { - int value = (int) longData.vector[getRowIndex(rowId)]; - if (isDate) { - return RebaseDateTime.rebaseJulianToGregorianDays(value); - } else { - return value; - } - } - - @Override - public long getLong(int rowId) { - int index = getRowIndex(rowId); - if (isTimestamp) { - return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); - } else { - return longData.vector[index]; - } - } - @Override - public float getFloat(int rowId) { - return (float) doubleData.vector[getRowIndex(rowId)]; - } - - @Override - public double getDouble(int rowId) { - return doubleData.vector[getRowIndex(rowId)]; - } - - @Override - public Decimal getDecimal(int rowId, int precision, int scale) { - if (isNullAt(rowId)) return null; - BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue(); - return Decimal.apply(data, precision, scale); - } - - @Override - public UTF8String getUTF8String(int rowId) { - if (isNullAt(rowId)) return null; - int index = getRowIndex(rowId); - BytesColumnVector col = bytesData; - return UTF8String.fromBytes(col.vector[index], col.start[index], col.length[index]); - } - - @Override - public byte[] getBinary(int rowId) { - if (isNullAt(rowId)) return null; - int index = getRowIndex(rowId); - byte[] binary = new byte[bytesData.length[index]]; - System.arraycopy(bytesData.vector[index], bytesData.start[index], binary, 0, binary.length); - return binary; - } - - @Override - public ColumnarArray getArray(int rowId) { - throw new UnsupportedOperationException(); - } - - @Override - public ColumnarMap getMap(int rowId) { - throw new UnsupportedOperationException(); + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } - @Override - public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) { - throw new UnsupportedOperationException(); + /* A helper method to get the row index in a column. */ + protected int getRowIndex(int rowId) { + return baseData.isRepeating ? 0 : rowId; } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java new file mode 100644 index 000000000000..a988029d3603 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java @@ -0,0 +1,54 @@ +package org.apache.spark.sql.execution.datasources.orc; + +import org.apache.hadoop.hive.ql.exec.vector.*; + +import org.apache.spark.sql.types.*; + +/** + * Utility class for {@link OrcColumnVector}. + */ +class OrcColumnVectorUtils { + + /** + * Convert a Hive's {@link ColumnVector} to a Spark's {@link OrcColumnVector}. + * + * @param type The data type of column vector + * @param vector Hive's column vector + * @return Spark's column vector + */ + static OrcColumnVector toOrcColumnVector(DataType type, ColumnVector vector) { + if (vector instanceof LongColumnVector || + vector instanceof DoubleColumnVector || + vector instanceof BytesColumnVector || + vector instanceof DecimalColumnVector || + vector instanceof TimestampColumnVector) { + return new OrcAtomicColumnVector(type, vector); + } else if (vector instanceof StructColumnVector) { + StructColumnVector structVector = (StructColumnVector) vector; + OrcColumnVector[] fields = new OrcColumnVector[structVector.fields.length]; + int ordinal = 0; + for (StructField f : ((StructType) type).fields()) { + fields[ordinal] = toOrcColumnVector(f.dataType(), structVector.fields[ordinal]); + ordinal++; + } + return new OrcStructColumnVector(type, vector, fields); + } else if (vector instanceof ListColumnVector) { + ListColumnVector listVector = (ListColumnVector) vector; + OrcColumnVector dataVector = toOrcColumnVector( + ((ArrayType) type).elementType(), listVector.child); + return new OrcArrayColumnVector( + type, vector, dataVector, listVector.offsets, listVector.lengths); + } else if (vector instanceof MapColumnVector) { + MapColumnVector mapVector = (MapColumnVector) vector; + MapType mapType = (MapType) type; + OrcColumnVector keysVector = toOrcColumnVector(mapType.keyType(), mapVector.keys); + OrcColumnVector valuesVector = toOrcColumnVector(mapType.valueType(), mapVector.values); + return new OrcMapColumnVector( + type, vector, keysVector, valuesVector, mapVector.offsets, mapVector.lengths); + } else { + throw new IllegalArgumentException( + String.format("OrcColumnVectorUtils.toOrcColumnVector should not take %s as type " + + "and %s as vector", type, vector)); + } + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index 6a4b116cdef0..40ed0b2454c1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -180,7 +180,8 @@ public void initBatch( missingCol.setIsConstant(); orcVectorWrappers[i] = missingCol; } else { - orcVectorWrappers[i] = new OrcColumnVector(dt, wrap.batch().cols[colId]); + orcVectorWrappers[i] = OrcColumnVectorUtils.toOrcColumnVector( + dt, wrap.batch().cols[colId]); } } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java new file mode 100644 index 000000000000..8ed4f0e7b264 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java @@ -0,0 +1,101 @@ +package org.apache.spark.sql.execution.datasources.orc; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; + +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column vector implementation for Spark's {@link MapType}. + */ +public class OrcMapColumnVector extends OrcColumnVector { + private final OrcColumnVector keys; + private final OrcColumnVector values; + private final long[] offsets; + private final long[] lengths; + + OrcMapColumnVector( + DataType type, + ColumnVector vector, + OrcColumnVector keys, + OrcColumnVector values, + long[] offsets, + long[] lengths) { + + super(type, vector); + + this.keys = keys; + this.values = values; + this.offsets = offsets; + this.lengths = lengths; + } + + @Override + public ColumnarMap getMap(int ordinal) { + return new ColumnarMap(keys, values, (int) offsets[ordinal], (int) lengths[ordinal]); + } + + @Override + public boolean getBoolean(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new UnsupportedOperationException(); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBinary(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) { + throw new UnsupportedOperationException(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java new file mode 100644 index 000000000000..abc5a3fa61ed --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java @@ -0,0 +1,88 @@ +package org.apache.spark.sql.execution.datasources.orc; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; + +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column vector implementation for Spark's {@link StructType}. + */ +public class OrcStructColumnVector extends OrcColumnVector { + private final OrcColumnVector[] fields; + + OrcStructColumnVector(DataType type, ColumnVector vector, OrcColumnVector[] fields) { + super(type, vector); + + this.fields = fields; + } + + @Override + public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) { + return fields[ordinal]; + } + + @Override + public boolean getBoolean(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new UnsupportedOperationException(); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBinary(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnarMap getMap(int rowId) { + throw new UnsupportedOperationException(); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 8f4d1e509802..bc26ab3d4d00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -131,11 +131,26 @@ class OrcFileFormat } } + private def supportBatchForNestedColumn( + sparkSession: SparkSession, + schema: StructType): Boolean = { + val hasNestedColumn = schema.map(_.dataType).exists { + case _: ArrayType | _: MapType | _: StructType => true + case _ => false + } + if (hasNestedColumn) { + sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled + } else { + true + } + } + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields && - schema.forall(_.dataType.isInstanceOf[AtomicType]) + schema.forall(s => supportDataType(s.dataType)) && + supportBatchForNestedColumn(sparkSession, schema) } override def isSplitable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index c763f4c9428c..ad69bee4fb64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY} +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, SchemaMergeUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -542,6 +543,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll with CommonFileDa } class OrcSourceSuite extends OrcSuite with SharedSparkSession { + import testImplicits._ protected override def beforeAll(): Unit = { super.beforeAll() @@ -602,4 +604,31 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession { checkAnswer(spark.read.orc(path), Seq(Row(0), Row(1), Row(2))) } } + + test("SPARK-34862: Support ORC vectorized reader for nested column") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(10).map { x => + val stringColumn = s"$x" * 10 + val structColumn = (x, s"$x" * 100) + val arrayColumn = (0 until 5).map(i => (x + i, s"$x" * 5)) + val mapColumn = Map( + s"$x" -> (x * 0.1, (x, s"$x" * 100)), + (s"$x" * 2) -> (x * 0.2, (x, s"$x" * 200)), + (s"$x" * 3) -> (x * 0.3, (x, s"$x" * 300))) + (x, stringColumn, structColumn, arrayColumn, mapColumn) + }.toDF("int_col", "string_col", "struct_col", "array_col", "map_col") + df.write.format("orc").save(path) + + withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + val readDf = spark.read.orc(path) + val vectorizationEnabled = readDf.queryExecution.executedPlan.find { + case scan: FileSourceScanExec => scan.supportsColumnar + case _ => false + }.isDefined + assert(vectorizationEnabled) + checkAnswer(readDf, df) + } + } + } } From fbc8c6c0a326a0d06e6be5bf53e0903d76dc91dd Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 25 Mar 2021 17:36:54 -0700 Subject: [PATCH 2/8] Add missing license header --- .../datasources/orc/OrcArrayColumnVector.java | 17 +++++++++++++++++ .../datasources/orc/OrcColumnVector.java | 17 +++++++++++++++++ .../datasources/orc/OrcColumnVectorUtils.java | 17 +++++++++++++++++ .../datasources/orc/OrcMapColumnVector.java | 17 +++++++++++++++++ .../datasources/orc/OrcStructColumnVector.java | 17 +++++++++++++++++ 5 files changed, 85 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java index 137dd3548390..685b1e08da55 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index b75e51ee576c..0becd2572f99 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java index a988029d3603..3bc7cc8f8014 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.*; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java index 8ed4f0e7b264..207d160665e2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java index abc5a3fa61ed..48e540d22095 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; From c911e8940c87d3703f42ebf0c101c13953109ed6 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 26 Mar 2021 14:54:39 -0700 Subject: [PATCH 3/8] Add DirectAbstractMethodProblem of ColumnVector class in MimaExcludes.scala --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4f879b881a6f..ef5f8356eaf8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -417,6 +417,9 @@ object MimaExcludes { case _ => true }, + // [SPARK-34862][SQL] Support nested column in ORC vectorized reader + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector"), + // [SPARK-27521][SQL] Move data source v2 to catalyst module ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ColumnarBatch"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ArrowColumnVector"), From 3bfc03a06711c153c92e382925c2a964e0683f3a Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 26 Mar 2021 15:58:55 -0700 Subject: [PATCH 4/8] Try to update MimaExcludes to fix MiMa test --- project/MimaExcludes.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ef5f8356eaf8..39861a01376f 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -418,7 +418,19 @@ object MimaExcludes { }, // [SPARK-34862][SQL] Support nested column in ORC vectorized reader - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBoolean"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getByte"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getShort"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getInt"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getLong"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getFloat"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getDouble"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getDecimal"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getUTF8String"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBinary"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getArray"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getMap"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild"), // [SPARK-27521][SQL] Move data source v2 to catalyst module ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ColumnarBatch"), From f30cc881a1f11c5b8bc521d9bc27e71ba065e760 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 26 Mar 2021 17:49:41 -0700 Subject: [PATCH 5/8] Do not allow UserDefinedType for vectorization to fix unit test failure --- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index bc26ab3d4d00..3a5097441ab3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -149,7 +149,8 @@ class OrcFileFormat val conf = sparkSession.sessionState.conf conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields && - schema.forall(s => supportDataType(s.dataType)) && + schema.forall(s => supportDataType(s.dataType) && + !s.dataType.isInstanceOf[UserDefinedType[_]]) && supportBatchForNestedColumn(sparkSession, schema) } From 9cd3bc573514a9e25f1e7364aacfb4c86c661552 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sun, 28 Mar 2021 22:00:46 -0700 Subject: [PATCH 6/8] Address all comments --- project/MimaExcludes.scala | 32 +++++++++---------- .../apache/spark/sql/internal/SQLConf.scala | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 39861a01376f..dc11f338a006 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -40,7 +40,22 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.write.V1WriteBuilder"), // [SPARK-33955] Add latest offsets to source progress - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceProgress.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceProgress.this"), + + // [SPARK-34862][SQL] Support nested column in ORC vectorized reader + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBoolean"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getByte"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getShort"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getInt"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getLong"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getFloat"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getDouble"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getDecimal"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getUTF8String"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBinary"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getArray"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getMap"), + ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild") ) // Exclude rules for 3.1.x @@ -417,21 +432,6 @@ object MimaExcludes { case _ => true }, - // [SPARK-34862][SQL] Support nested column in ORC vectorized reader - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBoolean"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getByte"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getShort"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getInt"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getLong"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getFloat"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getDouble"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getDecimal"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getUTF8String"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBinary"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getArray"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getMap"), - ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild"), - // [SPARK-27521][SQL] Move data source v2 to catalyst module ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ColumnarBatch"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.vectorized.ArrowColumnVector"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cd7fdae6c2c7..647d1b6cc330 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -843,7 +843,7 @@ object SQLConf { .doc("Enables vectorized orc decoding for nested column.") .version("3.2.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") From fda6b126d7d57886e5b48254adcd466d5085c3fc Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 31 Mar 2021 03:37:14 -0700 Subject: [PATCH 7/8] Address indentation comments --- .../datasources/orc/OrcArrayColumnVector.java | 10 +++++----- .../datasources/orc/OrcMapColumnVector.java | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java index 685b1e08da55..57d4dd121f93 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java @@ -34,11 +34,11 @@ public class OrcArrayColumnVector extends OrcColumnVector { private final long[] lengths; OrcArrayColumnVector( - DataType type, - ColumnVector vector, - OrcColumnVector data, - long[] offsets, - long[] lengths) { + DataType type, + ColumnVector vector, + OrcColumnVector data, + long[] offsets, + long[] lengths) { super(type, vector); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java index 207d160665e2..ace8d157792d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java @@ -36,12 +36,12 @@ public class OrcMapColumnVector extends OrcColumnVector { private final long[] lengths; OrcMapColumnVector( - DataType type, - ColumnVector vector, - OrcColumnVector keys, - OrcColumnVector values, - long[] offsets, - long[] lengths) { + DataType type, + ColumnVector vector, + OrcColumnVector keys, + OrcColumnVector values, + long[] offsets, + long[] lengths) { super(type, vector); From 44feaccadc8fa81ad9f885686685431f3976f537 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 31 Mar 2021 13:09:05 -0700 Subject: [PATCH 8/8] Address comment for style --- .../sql/execution/datasources/orc/OrcArrayColumnVector.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java index 57d4dd121f93..6e13e97b4cbc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; + import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal;