Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}

import java.io.Closeable

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
Expand All @@ -58,7 +60,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
mergeOnReadPartition.split match {
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
Expand All @@ -84,6 +86,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
s"spark partition Index: ${mergeOnReadPartition.index}" +
s"merge type: ${mergeOnReadPartition.split.mergeType}")
}
if (iter.isInstanceOf[Closeable]) {
// register a callback to close logScanner which will be executed on task completion.
// when tasks finished, this method will be called, and release resources.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you add some comments about this? not quite sure how this work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
iter
}

override protected def getPartitions: Array[Partition] = {
Expand Down Expand Up @@ -112,7 +120,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,

private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] {
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
Expand All @@ -121,7 +129,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
private val logRecords = logScanner.getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala

private var recordToLoad: InternalRow = _
Expand All @@ -146,12 +155,22 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override def next(): InternalRow = {
recordToLoad
}

override def close(): Unit = {
if (logScanner != null) {
try {
logScanner.close()
} finally {
logScanner = null
}
}
}
}

private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] {
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
Expand All @@ -160,7 +179,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
private val logRecords = logScanner.getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala

private var recordToLoad: InternalRow = _
Expand Down Expand Up @@ -192,12 +212,22 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override def next(): InternalRow = {
recordToLoad
}

override def close(): Unit = {
if (logScanner != null) {
try {
logScanner.close()
} finally {
logScanner = null
}
}
}
}

private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] {
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
Expand All @@ -207,7 +237,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val requiredDeserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
private val logRecords = logScanner.getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
private val keyToSkip = mutable.Set.empty[String]
private val recordKeyPosition = if (recordKeyFieldOpt.isEmpty) HOODIE_RECORD_KEY_COL_POS else tableState.tableStructSchema.fieldIndex(recordKeyFieldOpt.get)
Expand Down Expand Up @@ -276,6 +307,16 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,

override def next(): InternalRow = recordToLoad

override def close(): Unit = {
if (logScanner != null) {
try {
logScanner.close()
} finally {
logScanner = null
}
}
}

private def createRowWithRequiredSchema(row: InternalRow): InternalRow = {
val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
val posIterator = requiredFieldPosition.iterator
Expand Down