diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 404d8d93092e..bd7d3647b2ea 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -93,7 +93,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport { // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to [[Row]] conversion // Additionally, we have to explicitly wrap around resulting [[RDD]] into the one - // injecting [[SQLConf]], which by default isn't propgated by Spark to the executor(s). + // injecting [[SQLConf]], which by default isn't propagated by Spark to the executor(s). // [[SQLConf]] is required by [[AvroSerializer]] injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows => if (rows.isEmpty) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala similarity index 95% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala index 07a0ce7f239b..68b25fafe038 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala @@ -34,20 +34,17 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable - import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf - +import org.apache.hudi.util.CachingIterator import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType import java.io.Closeable import java.util.Properties - import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.util.Try @@ -61,7 +58,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, requiredSchema: HoodieTableSchema, tableState: HoodieTableState, config: Configuration) - extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport { + extends CachingIterator[InternalRow] with Closeable with AvroDeserializerSupport { protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) @@ -78,8 +75,6 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) - protected var recordToLoad: InternalRow = _ - private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema) // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe @@ -107,7 +102,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = logRecords.remove(key) - override def hasNext: Boolean = hasNextInternal + protected def doHasNext: Boolean = hasNextInternal // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure // that recursion is unfolded into a loop to avoid stack overflows while @@ -120,14 +115,12 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, this.hasNextInternal } else { val projectedAvroRecord = requiredSchemaSafeAvroProjection(avroRecordOpt.get) - recordToLoad = deserialize(projectedAvroRecord) + nextRecord = deserialize(projectedAvroRecord) true } } } - override final def next(): InternalRow = recordToLoad - override def close(): Unit = if (logScanner != null) { try { @@ -155,13 +148,13 @@ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, private val baseFileIterator = baseFileReader(split.dataFile.get) - override def hasNext: Boolean = { + override def doHasNext: Boolean = { if (baseFileIterator.hasNext) { // No merge is required, simply load current row and project into required schema - recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next()) + nextRecord = requiredSchemaUnsafeProjection(baseFileIterator.next()) true } else { - super[LogFileIterator].hasNext + super[LogFileIterator].doHasNext } } } @@ -196,7 +189,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, private val baseFileIterator = baseFileReader(split.dataFile.get) - override def hasNext: Boolean = hasNextInternal + override def doHasNext: Boolean = hasNextInternal // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure // that recursion is unfolded into a loop to avoid stack overflows while @@ -208,7 +201,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, val updatedRecordOpt = removeLogRecord(curKey) if (updatedRecordOpt.isEmpty) { // No merge is required, simply load current row and project into required schema - recordToLoad = requiredSchemaUnsafeProjection(curRow) + nextRecord = requiredSchemaUnsafeProjection(curRow) true } else { val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get) @@ -218,12 +211,12 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, } else { val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord], avroSchema, reusableRecordBuilder) - recordToLoad = deserialize(projectedAvroRecord) + nextRecord = deserialize(projectedAvroRecord) true } } } else { - super[LogFileIterator].hasNext + super[LogFileIterator].doHasNext } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala new file mode 100644 index 000000000000..8c8fb1023a7f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/CachingIterator.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util + +/** + * Extension of the [[Iterator]] allowing for caching of the underlying record produced + * during iteration to provide for the idempotency of the [[hasNext]] invocation: + * meaning, that invoking [[hasNext]] multiple times consequently (w/o invoking [[next]] + * in between) will only make iterator step over a single element + * + * NOTE: [[hasNext]] and [[next]] are purposefully marked as final, requiring iteration + * semantic to be implemented t/h overriding of a single [[doHasNext]] method + */ +trait CachingIterator[T >: Null] extends Iterator[T] { + + protected var nextRecord: T = _ + + protected def doHasNext: Boolean + + override final def hasNext: Boolean = nextRecord != null || doHasNext + + override final def next: T = { + val record = nextRecord + nextRecord = null + record + } + +}