Skip to content
35 changes: 22 additions & 13 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class NewHadoopRDD[K, V](
configurable.setConf(conf)
case _ =>
}
val reader = format.createRecordReader(
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

Expand All @@ -141,6 +141,10 @@ class NewHadoopRDD[K, V](
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
if (finished) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense if we just call close here?

// Close reader and release it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change this comment (and the one in SqlNewHadoopRdd) to say something like "Close and release the reader here; close() will also be called when the task completes, but for tasks that read from many files, it helps to release the resources early"? I'm just worried this could be removed later on if someone things it's redundant with the close() in task completion listener.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I added it.

close()
}
havePair = !finished
}
!finished
Expand All @@ -159,18 +163,23 @@ class NewHadoopRDD[K, V](

private def close() {
try {
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
if (reader != null) {
// Close reader and release it
reader.close()
reader = null

if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private[sql] class SqlNewHadoopRDD[K, V](
configurable.setConf(conf)
case _ =>
}
val reader = format.createRecordReader(
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

Expand All @@ -160,6 +160,10 @@ private[sql] class SqlNewHadoopRDD[K, V](
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
if (finished) {
// Close reader and release it
close()
}
havePair = !finished
}
!finished
Expand All @@ -178,18 +182,22 @@ private[sql] class SqlNewHadoopRDD[K, V](

private def close() {
try {
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
if (reader != null) {
reader.close()
reader = null

if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
}
} catch {
Expand Down