diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2a5d01e3772c..23636b26110c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -321,7 +321,11 @@ object MimaExcludes { // [SPARK-26616][MLlib] Expose document frequency in IDFModel ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf"), + + // [SPARK-27650][SQL] separate the row iterator functionality from ColumnarBatch + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.vectorized.ColumnarBatch.getRow"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.vectorized.ColumnarBatch.rowIterator") ) // Exclude rules for 2.4.x diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index f02861355c40..92720896c74c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -30,10 +30,7 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; -import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; -import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.*; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -104,6 +101,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private ColumnarBatch columnarBatch; + private ColumnarBatchRowView rowView; + private WritableColumnVector[] columnVectors; /** @@ -168,7 +167,7 @@ public boolean nextKeyValue() throws IOException { @Override public Object getCurrentValue() { if (returnColumnarBatch) return columnarBatch; - return columnarBatch.getRow(batchIdx - 1); + return rowView.getRow(batchIdx - 1); } @Override @@ -202,6 +201,7 @@ private void initBatch( columnVectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema); } columnarBatch = new ColumnarBatch(columnVectors); + rowView = new ColumnarBatchRowView(columnarBatch); if (partitionColumns != null) { int partitionIdx = sparkSchema.fields().length; for (int i = 0; i < partitionColumns.fields().length; i++) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatchRowView.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatchRowView.java new file mode 100644 index 000000000000..786fe0c86276 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatchRowView.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * This class provides a row view of a {@link ColumnarBatch}, so that Spark can access the data + * row by row + */ +public final class ColumnarBatchRowView { + + private final ColumnarBatch batch; + + // Staging row returned from `getRow`. + private final MutableColumnarRow row; + + public ColumnarBatchRowView(ColumnarBatch batch) { + this.batch = batch; + this.row = new MutableColumnarRow(batch.columns()); + } + + /** + * Returns an iterator over the rows in this batch. + */ + public Iterator rowIterator() { + final int maxRows = batch.numRows(); + final MutableColumnarRow row = new MutableColumnarRow(batch.columns()); + return new Iterator() { + int rowId = 0; + + @Override + public boolean hasNext() { + return rowId < maxRows; + } + + @Override + public InternalRow next() { + if (rowId >= maxRows) { + throw new NoSuchElementException(); + } + row.rowId = rowId++; + return row; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ + public InternalRow getRow(int rowId) { + assert(rowId >= 0 && rowId < batch.numRows()); + row.rowId = rowId; + return row; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 07546a54013e..5ffbb02c9a25 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -16,25 +16,17 @@ */ package org.apache.spark.sql.vectorized; -import java.util.*; - import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.execution.vectorized.MutableColumnarRow; /** - * This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this - * batch so that Spark can access the data row by row. Instance of it is meant to be reused during - * the entire data loading process. + * This class wraps multiple {@link ColumnVector}s as a table-like data batch. Instance of it is + * meant to be reused during the entire data loading process. */ @Evolving public final class ColumnarBatch { private int numRows; private final ColumnVector[] columns; - // Staging row returned from `getRow`. - private final MutableColumnarRow row; - /** * Called to close all the columns in this batch. It is not valid to access the data after * calling this. This must be called at the end to clean up memory allocations. @@ -45,36 +37,6 @@ public void close() { } } - /** - * Returns an iterator over the rows in this batch. - */ - public Iterator rowIterator() { - final int maxRows = numRows; - final MutableColumnarRow row = new MutableColumnarRow(columns); - return new Iterator() { - int rowId = 0; - - @Override - public boolean hasNext() { - return rowId < maxRows; - } - - @Override - public InternalRow next() { - if (rowId >= maxRows) { - throw new NoSuchElementException(); - } - row.rowId = rowId++; - return row; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - /** * Sets the number of rows in this batch. */ @@ -98,16 +60,13 @@ public void setNumRows(int numRows) { public ColumnVector column(int ordinal) { return columns[ordinal]; } /** - * Returns the row in this batch at `rowId`. Returned row is reused across calls. + * Returns all the columns of this batch. */ - public InternalRow getRow(int rowId) { - assert(rowId >= 0 && rowId < numRows); - row.rowId = rowId; - return row; + public ColumnVector[] columns() { + return columns; } public ColumnarBatch(ColumnVector[] columns) { this.columns = columns; - this.row = new MutableColumnarRow(columns); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index f9c4ecc14e6c..c56cf241d134 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator} -import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, OnHeapColumnVector} +import org.apache.spark.sql.execution.vectorized.{ColumnarBatchRowView, MutableColumnarRow, OnHeapColumnVector} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -212,7 +212,7 @@ class VectorizedHashMapGenerator( s""" |public java.util.Iterator<${classOf[InternalRow].getName}> rowIterator() { | batch.setNumRows(numRows); - | return batch.rowIterator(); + | return new ${classOf[ColumnarBatchRowView].getName}(batch).rowIterator(); |} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 884dc8c6215f..8be17189f534 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -33,6 +33,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized.ColumnarBatchRowView import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} import org.apache.spark.util.{ByteBufferOutputStream, Utils} @@ -172,7 +173,7 @@ private[sql] object ArrowConverters { val batch = new ColumnarBatch(columns) batch.setNumRows(root.getRowCount) - batch.rowIterator().asScala + new ColumnarBatchRowView(batch).rowIterator().asScala } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index bedfa9c4722c..711df6aaf3f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.python.BatchIterator import org.apache.spark.sql.execution.r.ArrowRRunner import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.execution.vectorized.ColumnarBatchRowView import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.GroupStateTimeout import org.apache.spark.sql.types._ @@ -247,17 +248,17 @@ case class MapPartitionsInRWithArrowExec( private var currentIter = if (columnarBatchIter.hasNext) { val batch = columnarBatchIter.next() - val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) + val actualDataTypes = batch.columns().map(_.dataType()).toSeq assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): " + s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") - batch.rowIterator.asScala + new ColumnarBatchRowView(batch).rowIterator.asScala } else { Iterator.empty } override def hasNext: Boolean = currentIter.hasNext || { if (columnarBatchIter.hasNext) { - currentIter = columnarBatchIter.next().rowIterator.asScala + currentIter = new ColumnarBatchRowView(columnarBatchIter.next()).rowIterator.asScala hasNext } else { false @@ -587,7 +588,9 @@ case class FlatMapGroupsInRWithArrowExec( // binary in a batch due to the limitation of R API. See also ARROW-4512. val columnarBatchIter = runner.compute(groupedByRKey, -1) val outputProject = UnsafeProjection.create(output, output) - columnarBatchIter.flatMap(_.rowIterator().asScala).map(outputProject) + columnarBatchIter + .flatMap(new ColumnarBatchRowView(_).rowIterator().asScala) + .map(outputProject) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 2ab7240556aa..c50a076c8ce1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.arrow.ArrowUtils +import org.apache.spark.sql.execution.vectorized.ColumnarBatchRowView import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.Utils @@ -146,7 +147,7 @@ case class AggregateInPandasExec( val joined = new JoinedRow val resultProj = UnsafeProjection.create(resultExpressions, joinedAttributes) - columnarBatchIter.map(_.rowIterator.next()).map { aggOutputRow => + columnarBatchIter.map(new ColumnarBatchRowView(_).getRow(0)).map { aggOutputRow => val leftRow = queue.remove() val joinedRow = joined(leftRow, aggOutputRow) resultProj(joinedRow) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index 61b167f50fd6..f638465c6b9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.arrow.ArrowUtils +import org.apache.spark.sql.execution.vectorized.ColumnarBatchRowView import org.apache.spark.sql.types.StructType /** @@ -101,17 +102,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi private var currentIter = if (columnarBatchIter.hasNext) { val batch = columnarBatchIter.next() - val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) + val actualDataTypes = batch.columns().map(_.dataType()).toSeq assert(outputTypes == actualDataTypes, "Invalid schema from pandas_udf: " + s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") - batch.rowIterator.asScala + new ColumnarBatchRowView(batch).rowIterator().asScala } else { Iterator.empty } override def hasNext: Boolean = currentIter.hasNext || { if (columnarBatchIter.hasNext) { - currentIter = columnarBatchIter.next().rowIterator.asScala + currentIter = new ColumnarBatchRowView(columnarBatchIter.next()).rowIterator.asScala hasNext } else { false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index ce755ffb7c9f..259ef37922cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -28,8 +28,9 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.arrow.ArrowUtils +import org.apache.spark.sql.execution.vectorized.ColumnarBatchRowView import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} /** * Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]] @@ -154,7 +155,7 @@ case class FlatMapGroupsInPandasExec( val outputVectors = output.indices.map(structVector.getChild) val flattenedBatch = new ColumnarBatch(outputVectors.toArray) flattenedBatch.setNumRows(batch.numRows()) - flattenedBatch.rowIterator.asScala + new ColumnarBatchRowView(flattenedBatch).rowIterator.asScala }.map(unsafeProj) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 1ce1215bfdd6..126c164bb169 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils +import org.apache.spark.sql.execution.vectorized.ColumnarBatchRowView import org.apache.spark.sql.execution.window._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -394,11 +395,13 @@ case class WindowInPandasExec( val joined = new JoinedRow - windowFunctionResult.flatMap(_.rowIterator.asScala).map { windowOutput => - val leftRow = queue.remove() - val joinedRow = joined(leftRow, windowOutput) - resultProj(joinedRow) - } + windowFunctionResult + .flatMap(new ColumnarBatchRowView(_).rowIterator.asScala) + .map { windowOutput => + val leftRow = queue.remove() + val joinedRow = joined(leftRow, windowOutput) + resultProj(joinedRow) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index bd2470ee2066..e1a52b3aec34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.datasources.parquet.{SpecificParquetRecordReaderBase, VectorizedParquetRecordReader} +import org.apache.spark.sql.execution.vectorized.ColumnarBatchRowView import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector @@ -204,7 +205,7 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() while (reader.nextBatch()) { - val it = batch.rowIterator() + val it = new ColumnarBatchRowView(batch).rowIterator() while (it.hasNext) { val record = it.next() if (!record.isNullAt(0)) aggregateValue(record) @@ -459,7 +460,7 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper { reader.initialize(p, ("c1" :: "c2" :: Nil).asJava) val batch = reader.resultBatch() while (reader.nextBatch()) { - val rowIterator = batch.rowIterator() + val rowIterator = new ColumnarBatchRowView(batch).rowIterator() while (rowIterator.hasNext) { val row = rowIterator.next() val value = row.getUTF8String(0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index cbfd9d9b4b81..2fdb07e047d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.arrow.ArrowUtils import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.CalendarInterval @@ -974,9 +974,10 @@ class ColumnarBatchSuite extends SparkFunSuite { allocate(capacity, field.dataType, memMode) } val batch = new ColumnarBatch(columns.toArray) + val rowView = new ColumnarBatchRowView(batch) assert(batch.numCols() == 4) assert(batch.numRows() == 0) - assert(batch.rowIterator().hasNext == false) + assert(rowView.rowIterator().hasNext == false) // Add a row [1, 1.1, NULL] columns(0).putInt(0, 1) @@ -988,8 +989,8 @@ class ColumnarBatchSuite extends SparkFunSuite { // Verify the results of the row. assert(batch.numCols() == 4) assert(batch.numRows() == 1) - assert(batch.rowIterator().hasNext) - assert(batch.rowIterator().hasNext) + assert(rowView.rowIterator().hasNext) + assert(rowView.rowIterator().hasNext) assert(columns(0).getInt(0) == 1) assert(columns(0).isNullAt(0) == false) @@ -999,7 +1000,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(columns(3).getUTF8String(0).toString == "Hello") // Verify the iterator works correctly. - val it = batch.rowIterator() + val it = rowView.rowIterator() assert(it.hasNext()) val row = it.next() assert(row.getInt(0) == 1) @@ -1046,7 +1047,7 @@ class ColumnarBatchSuite extends SparkFunSuite { // Verify assert(batch.numRows() == 3) - val it2 = batch.rowIterator() + val it2 = rowView.rowIterator() rowEquals(it2.next(), Row(null, 2.2, 2, "abc")) rowEquals(it2.next(), Row(3, null, 3, "")) rowEquals(it2.next(), Row(4, 4.4, 4, "world")) @@ -1143,7 +1144,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(batch.numRows() == 2) assert(batch.numCols() == 5) - val it = batch.rowIterator() + val it = new ColumnarBatchRowView(batch).rowIterator() val referenceIt = rows.iterator while (it.hasNext) { compareStruct(schema, it.next(), referenceIt.next(), 0) @@ -1185,7 +1186,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava) assert(batch.numRows() == NUM_ROWS) - val it = batch.rowIterator() + val it = new ColumnarBatchRowView(batch).rowIterator() val referenceIt = rows.iterator var k = 0 while (it.hasNext) { @@ -1252,7 +1253,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(batch.numCols() == 2) assert(batch.numRows() == 11) - val rowIter = batch.rowIterator().asScala + val rowIter = new ColumnarBatchRowView(batch).rowIterator().asScala rowIter.zipWithIndex.foreach { case (row, i) => if (i == 10) { assert(row.isNullAt(0))