From 3403ca4edac8e6cc57825a82d85496c3a668b1f2 Mon Sep 17 00:00:00 2001 From: Yimin Date: Mon, 27 Dec 2021 22:09:51 -0800 Subject: [PATCH 1/6] [SPARK-37728][SQL] Reading nested columns with ORC vectorized reader can cause ArrayIndexOutOfBoundsException When an OrcColumnarBatchReader is created, method initBatch will be called only once. In method initBatch: `orcVectorWrappers[i] = OrcColumnVectorUtils.toOrcColumnVector(dt, wrap.batch().cols[colId]);` When the second argument of toOrcColumnVector is a ListColumnVector/MapColumnVector, orcVectorWrappers[i] is initialized with the ListColumnVector or MapColumnVector's offsets and lengths. However, when method nextBatch of OrcColumnarBatchReader is called, method ensureSize of ColumnVector (and its subclasses, like MultiValuedColumnVector) could be called, then the ListColumnVector/MapColumnVector's offsets and lengths could refer to new array objects. This could result in the ArrayIndexOutOfBoundsException. This PR makes OrcArrayColumnVector.getArray and OrcMapColumnVector.getMap always get offsets and lengths from the underlying ColumnVector, which can resolve this issue. Bugfix No Pass the CIs with the newly added test case. Closes #35002 from yym1995/fix-nested. Lead-authored-by: Yimin Co-authored-by: Yimin <26797163+yym1995@users.noreply.github.com> Signed-off-by: Dongjoon Hyun --- .../datasources/orc/OrcArrayColumnVector.java | 13 +++++------ .../datasources/orc/OrcColumnVector.java | 2 +- .../datasources/orc/OrcColumnVectorUtils.java | 6 ++--- .../datasources/orc/OrcMapColumnVector.java | 13 +++++------ .../datasources/orc/OrcSourceSuite.scala | 22 +++++++++++++++++++ 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java index 6e13e97b4cbc..b0c818f5a4df 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; @@ -31,26 +32,22 @@ */ public class OrcArrayColumnVector extends OrcColumnVector { private final OrcColumnVector data; - private final long[] offsets; - private final long[] lengths; OrcArrayColumnVector( DataType type, ColumnVector vector, - OrcColumnVector data, - long[] offsets, - long[] lengths) { + OrcColumnVector data) { super(type, vector); this.data = data; - this.offsets = offsets; - this.lengths = lengths; } @Override public ColumnarArray getArray(int rowId) { - return new ColumnarArray(data, (int) offsets[rowId], (int) lengths[rowId]); + int offsets = (int) ((ListColumnVector) baseData).offsets[rowId]; + int lengths = (int) ((ListColumnVector) baseData).lengths[rowId]; + return new ColumnarArray(data, offsets, lengths); } @Override diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 0becd2572f99..7fe1b306142e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -29,7 +29,7 @@ * this column vector is used to adapt Hive ColumnVector with Spark ColumnarVector. */ public abstract class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector { - private final ColumnVector baseData; + protected final ColumnVector baseData; private int batchSize; OrcColumnVector(DataType type, ColumnVector vector) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java index 3bc7cc8f8014..89f6996e4610 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java @@ -53,15 +53,13 @@ static OrcColumnVector toOrcColumnVector(DataType type, ColumnVector vector) { ListColumnVector listVector = (ListColumnVector) vector; OrcColumnVector dataVector = toOrcColumnVector( ((ArrayType) type).elementType(), listVector.child); - return new OrcArrayColumnVector( - type, vector, dataVector, listVector.offsets, listVector.lengths); + return new OrcArrayColumnVector(type, vector, dataVector); } else if (vector instanceof MapColumnVector) { MapColumnVector mapVector = (MapColumnVector) vector; MapType mapType = (MapType) type; OrcColumnVector keysVector = toOrcColumnVector(mapType.keyType(), mapVector.keys); OrcColumnVector valuesVector = toOrcColumnVector(mapType.valueType(), mapVector.values); - return new OrcMapColumnVector( - type, vector, keysVector, valuesVector, mapVector.offsets, mapVector.lengths); + return new OrcMapColumnVector(type, vector, keysVector, valuesVector); } else { throw new IllegalArgumentException( String.format("OrcColumnVectorUtils.toOrcColumnVector should not take %s as type " + diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java index ace8d157792d..7eedd8b59412 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -32,28 +33,24 @@ public class OrcMapColumnVector extends OrcColumnVector { private final OrcColumnVector keys; private final OrcColumnVector values; - private final long[] offsets; - private final long[] lengths; OrcMapColumnVector( DataType type, ColumnVector vector, OrcColumnVector keys, - OrcColumnVector values, - long[] offsets, - long[] lengths) { + OrcColumnVector values) { super(type, vector); this.keys = keys; this.values = values; - this.offsets = offsets; - this.lengths = lengths; } @Override public ColumnarMap getMap(int ordinal) { - return new ColumnarMap(keys, values, (int) offsets[ordinal], (int) lengths[ordinal]); + int offsets = (int) ((MapColumnVector) baseData).offsets[ordinal]; + int lengths = (int) ((MapColumnVector) baseData).lengths[ordinal]; + return new ColumnarMap(keys, values, offsets, lengths); } @Override diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 348ef6f9eca6..a12aafe4ca1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -644,6 +644,28 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession { } } + test("SPARK-37728: Reading nested columns with ORC vectorized reader should not " + + "cause ArrayIndexOutOfBoundsException") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(100).map { _ => + val arrayColumn = (0 until 50).map(_ => (0 until 1000).map(k => k.toString)) + arrayColumn + }.toDF("record").repartition(1) + df.write.format("orc").save(path) + + withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + val readDf = spark.read.orc(path) + val vectorizationEnabled = readDf.queryExecution.executedPlan.find { + case scan: FileSourceScanExec => scan.supportsColumnar + case _ => false + }.isDefined + assert(vectorizationEnabled) + checkAnswer(readDf, df) + } + } + } + test("SPARK-34897: Support reconcile schemas based on index after nested column pruning") { withTable("t1") { spark.sql( From 1708b681e93426b8b01cbae398a881e57c23f329 Mon Sep 17 00:00:00 2001 From: Yimin Date: Wed, 29 Dec 2021 11:08:31 +0800 Subject: [PATCH 2/6] update --- .../spark/sql/execution/datasources/orc/OrcSourceSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index a12aafe4ca1e..c69152fbdb70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, SchemaMergeUtils} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StructField, StructType} @@ -657,7 +658,7 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession { withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { val readDf = spark.read.orc(path) val vectorizationEnabled = readDf.queryExecution.executedPlan.find { - case scan: FileSourceScanExec => scan.supportsColumnar + case scan @ (_: FileSourceScanExec | _: BatchScanExec) => scan.supportsColumnar case _ => false }.isDefined assert(vectorizationEnabled) From de4fd40c3e0099904c24c9c1e41174c1340dc529 Mon Sep 17 00:00:00 2001 From: Yimin Date: Wed, 29 Dec 2021 19:40:01 +0800 Subject: [PATCH 3/6] update --- .../datasources/orc/OrcQuerySuite.scala | 24 +++++++++++++++++++ .../datasources/orc/OrcSourceSuite.scala | 23 ------------------ 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index ead2c2cf1b70..a4050bde216a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -34,7 +34,9 @@ import org.apache.orc.mapreduce.OrcInputFormat import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -713,6 +715,28 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } } + + test("SPARK-37728: Reading nested columns with ORC vectorized reader should not " + + "cause ArrayIndexOutOfBoundsException") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(100).map { _ => + val arrayColumn = (0 until 50).map(_ => (0 until 1000).map(k => k.toString)) + arrayColumn + }.toDF("record").repartition(1) + df.write.format("orc").save(path) + + withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + val readDf = spark.read.orc(path) + val vectorizationEnabled = readDf.queryExecution.executedPlan.find { + case scan @ (_: FileSourceScanExec | _: BatchScanExec) => scan.supportsColumnar + case _ => false + }.isDefined + assert(vectorizationEnabled) + checkAnswer(readDf, df) + } + } + } } class OrcV1QuerySuite extends OrcQuerySuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index c69152fbdb70..348ef6f9eca6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -35,7 +35,6 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, SchemaMergeUtils} -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StructField, StructType} @@ -645,28 +644,6 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession { } } - test("SPARK-37728: Reading nested columns with ORC vectorized reader should not " + - "cause ArrayIndexOutOfBoundsException") { - withTempPath { dir => - val path = dir.getCanonicalPath - val df = spark.range(100).map { _ => - val arrayColumn = (0 until 50).map(_ => (0 until 1000).map(k => k.toString)) - arrayColumn - }.toDF("record").repartition(1) - df.write.format("orc").save(path) - - withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { - val readDf = spark.read.orc(path) - val vectorizationEnabled = readDf.queryExecution.executedPlan.find { - case scan @ (_: FileSourceScanExec | _: BatchScanExec) => scan.supportsColumnar - case _ => false - }.isDefined - assert(vectorizationEnabled) - checkAnswer(readDf, df) - } - } - } - test("SPARK-34897: Support reconcile schemas based on index after nested column pruning") { withTable("t1") { spark.sql( From 5f2e85d2cdf49b2530e4a960a31545d8c8787e6c Mon Sep 17 00:00:00 2001 From: Yimin Date: Thu, 30 Dec 2021 17:10:03 +0800 Subject: [PATCH 4/6] update --- .../spark/sql/execution/datasources/orc/OrcQuerySuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index a4050bde216a..686ef72f359b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -729,7 +728,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { val readDf = spark.read.orc(path) val vectorizationEnabled = readDf.queryExecution.executedPlan.find { - case scan @ (_: FileSourceScanExec | _: BatchScanExec) => scan.supportsColumnar + case scan @ (_: FileSourceScanExec) => scan.supportsColumnar case _ => false }.isDefined assert(vectorizationEnabled) From be7a155976a4bb5136902e0221b29ddb15e875f5 Mon Sep 17 00:00:00 2001 From: Yimin Date: Fri, 31 Dec 2021 15:39:12 +0800 Subject: [PATCH 5/6] update --- .../spark/sql/execution/datasources/orc/OrcQuerySuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 686ef72f359b..e9aa9adf5b91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -725,10 +726,11 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { }.toDF("record").repartition(1) df.write.format("orc").save(path) - withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true", + SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10000") { val readDf = spark.read.orc(path) val vectorizationEnabled = readDf.queryExecution.executedPlan.find { - case scan @ (_: FileSourceScanExec) => scan.supportsColumnar + case scan @ (_: FileSourceScanExec | _: BatchScanExec) => scan.supportsColumnar case _ => false }.isDefined assert(vectorizationEnabled) From 294b02ba7ccbcde85a76b18948faa2dbb0d8fbcc Mon Sep 17 00:00:00 2001 From: Yimin Date: Fri, 31 Dec 2021 21:31:35 +0800 Subject: [PATCH 6/6] update --- .../spark/sql/execution/datasources/orc/OrcQuerySuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index e9aa9adf5b91..6a91a9cfe934 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -726,14 +726,12 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { }.toDF("record").repartition(1) df.write.format("orc").save(path) - withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true", - SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10000") { + withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { val readDf = spark.read.orc(path) val vectorizationEnabled = readDf.queryExecution.executedPlan.find { case scan @ (_: FileSourceScanExec | _: BatchScanExec) => scan.supportsColumnar case _ => false }.isDefined - assert(vectorizationEnabled) checkAnswer(readDf, df) } }