Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,13 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

// TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD

val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
val existingBytesRead = inputMetrics.bytesRead

// Sets the thread local variable for the file's name
split.inputSplit.value match {
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDDState.unsetInputFileName()
case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
case _ => InputFileNameHolder.unsetInputFileName()
}

// Find a function that will return the FileSystem bytes read by this thread. Do this before
Expand Down Expand Up @@ -271,7 +269,7 @@ class HadoopRDD[K, V](

override def close() {
if (reader != null) {
SqlNewHadoopRDDState.unsetInputFileName()
InputFileNameHolder.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package org.apache.spark.rdd
import org.apache.spark.unsafe.types.UTF8String

/**
* State for SqlNewHadoopRDD objects. This is split this way because of the package splits.
* TODO: Move/Combine this with org.apache.spark.sql.datasources.SqlNewHadoopRDD
* This holds file names of the current Spark task. This is used in HadoopRDD,
* FileScanRDD and InputFileName function in Spark SQL.
*/
private[spark] object SqlNewHadoopRDDState {
private[spark] object InputFileNameHolder {
/**
* The thread variable for the name of the current file being read. This is used by
* the InputFileName function in Spark SQL.
Expand Down
5 changes: 0 additions & 5 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopPartition"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DefaultWriterContainer"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues"),
Expand All @@ -856,10 +855,8 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DynamicPartitionWriterContainer"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation$"),
Expand All @@ -870,7 +867,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect$"),
Expand All @@ -884,7 +880,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CaseInsensitiveMap"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.rdd.SqlNewHadoopRDDState
import org.apache.spark.rdd.InputFileNameHolder
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

/**
* Expression that returns the name of the current file being read in using [[SqlNewHadoopRDD]]
* Expression that returns the name of the current file being read.
*/
@ExpressionDescription(
usage = "_FUNC_() - Returns the name of the current file being read if available",
Expand All @@ -40,12 +40,12 @@ case class InputFileName() extends LeafExpression with Nondeterministic {
override protected def initInternal(): Unit = {}

override protected def evalInternal(input: InternalRow): UTF8String = {
SqlNewHadoopRDDState.getInputFileName()
InputFileNameHolder.getInputFileName()
}

override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
ev.isNull = "false"
s"final ${ctx.javaType(dataType)} ${ev.value} = " +
"org.apache.spark.rdd.SqlNewHadoopRDDState.getInputFileName();"
"org.apache.spark.rdd.InputFileNameHolder.getInputFileName();"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow

Expand All @@ -37,7 +37,6 @@ case class PartitionedFile(
}
}


/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
Expand All @@ -50,7 +49,7 @@ class FileScanRDD(
@transient val sqlContext: SQLContext,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
Expand All @@ -65,17 +64,17 @@ class FileScanRDD(
if (files.hasNext) {
val nextFile = files.next()
logInfo(s"Reading File $nextFile")
SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
InputFileNameHolder.setInputFileName(nextFile.filePath)
currentIterator = readFunction(nextFile)
hasNext
} else {
SqlNewHadoopRDDState.unsetInputFileName()
InputFileNameHolder.unsetInputFileName()
false
}
}

override def close() = {
SqlNewHadoopRDDState.unsetInputFileName()
InputFileNameHolder.unsetInputFileName()
}
}

Expand Down
Loading