diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 3583b38a01333..51997acc6dffe 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -174,7 +174,7 @@ private[sql] object AvroUtils extends Logging { private[this] var currentRow: Option[InternalRow] = None def hasNextRow: Boolean = { - do { + while (!completed && currentRow.isEmpty) { val r = fileReader.hasNext && !fileReader.pastSync(stopPosition) if (!r) { fileReader.close() @@ -182,15 +182,21 @@ private[sql] object AvroUtils extends Logging { currentRow = None } else { val record = fileReader.next() + // the row must be deserialized in hasNextRow, because AvroDeserializer#deserialize + // potentially filters rows currentRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] } - } while (!completed && currentRow.isEmpty) - + } currentRow.isDefined } def nextRow: InternalRow = { - currentRow.getOrElse { + if (currentRow.isEmpty) { + hasNextRow + } + val returnRow = currentRow + currentRow = None // free up hasNextRow to consume more Avro records, if not exhausted + returnRow.getOrElse { throw new NoSuchElementException("next on empty iterator") } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 52cab880ab897..4f4af97f1299f 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.avro import java.io._ -import java.net.URL +import java.net.{URI, URL} import java.nio.file.{Files, Paths, StandardCopyOption} import java.sql.{Date, Timestamp} import java.util.{Locale, UUID} @@ -31,16 +31,20 @@ import org.apache.avro.Schema.Type._ import org.apache.avro.file.{DataFileReader, DataFileWriter} import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord} import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} +import org.apache.avro.mapred.FsInput import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException} import org.apache.spark.sql._ import org.apache.spark.sql.TestingUDT.IntervalData +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters} import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA, UTC} import org.apache.spark.sql.execution.{FormattedMode, SparkPlan} -import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, DataSource, FilePartition} +import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, DataSource, FilePartition, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf @@ -1836,6 +1840,24 @@ abstract class AvroSuite } } } + + test("SPARK-33314: RowReader doesn't over-consume when hasNextRow called twice") { + withTempPath { dir => + Seq((1), (2), (3)) + .toDF("index") + .write + .format("avro") + .save(dir.getCanonicalPath) + val df = spark + .read + .format("avro") + .load(dir.getCanonicalPath) + .orderBy("index") + + checkAnswer(df, + Seq(Row(1), Row(2), Row(3))) + } + } } class AvroV1Suite extends AvroSuite { @@ -2005,3 +2027,61 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { } } } + +class AvroRowReaderSuite + extends QueryTest + with SharedSparkSession { + + import testImplicits._ + + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, "") // need this for BatchScanExec + + test("SPARK-33314: hasNextRow and nextRow properly handle consecutive calls") { + withTempPath { dir => + Seq((1), (2), (3)) + .toDF("value") + .coalesce(1) + .write + .format("avro") + .save(dir.getCanonicalPath) + + val df = spark.read.format("avro").load(dir.getCanonicalPath) + val fileScan = df.queryExecution.executedPlan collectFirst { + case BatchScanExec(_, f: AvroScan) => f + } + val filePath = fileScan.get.fileIndex.inputFiles(0) + val fileSize = new File(new URI(filePath)).length + val in = new FsInput(new Path(new URI(filePath)), new Configuration()) + val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()) + + val it = new Iterator[InternalRow] with AvroUtils.RowReader { + override val fileReader = reader + override val deserializer = new AvroDeserializer( + reader.getSchema, + StructType(new StructField("value", IntegerType, true) :: Nil), + CORRECTED, + new NoopFilters) + override val stopPosition = fileSize + + override def hasNext: Boolean = hasNextRow + + override def next: InternalRow = nextRow + } + assert(it.hasNext == true) + assert(it.next.getInt(0) == 1) + // test no intervening next + assert(it.hasNext == true) + assert(it.hasNext == true) + // test no intervening hasNext + assert(it.next.getInt(0) == 2) + assert(it.next.getInt(0) == 3) + assert(it.hasNext == false) + assertThrows[NoSuchElementException] { + it.next + } + } + } +}