From 617ff2f83810767655179d7dac2a6756831e73e2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 5 Apr 2022 14:31:24 -0700 Subject: [PATCH] Fixed stack overflows in Record Iterators --- .../apache/hudi/HoodieMergeOnReadRDD.scala | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 05c98e3aeb7ba..d40cf4943499a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -50,6 +50,7 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskCont import java.io.Closeable import java.util.Properties +import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.util.Try @@ -188,17 +189,23 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = logRecords.remove(key) - override def hasNext: Boolean = + override def hasNext: Boolean = hasNextInternal + + // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure + // that recursion is unfolded into a loop to avoid stack overflows while + // handling records + @tailrec private def hasNextInternal: Boolean = { logRecordsIterator.hasNext && { val avroRecordOpt = logRecordsIterator.next() if (avroRecordOpt.isEmpty) { // Record has been deleted, skipping - this.hasNext + this.hasNextInternal } else { recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get)) true } } + } override final def next(): InternalRow = recordToLoad @@ -257,7 +264,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) - override def hasNext: Boolean = { + override def hasNext: Boolean = hasNextInternal + + // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure + // that recursion is unfolded into a loop to avoid stack overflows while + // handling records + @tailrec private def hasNextInternal: Boolean = { if (baseFileIterator.hasNext) { val curRowRecord = baseFileIterator.next() val curKey = curRowRecord.getString(recordKeyOrdinal) @@ -270,7 +282,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get) if (mergedAvroRecordOpt.isEmpty) { // Record has been deleted, skipping - this.hasNext + this.hasNextInternal } else { // NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c // record from the Delta Log will bear (full) Table schema, while record from the Base file