Skip to content

Commit ca03559

Browse files
committed
Use reflection to access JobContext/TaskAttemptContext.getConfiguration
1 parent fa40db0 commit ca03559

File tree

5 files changed

+24
-4
lines changed

5 files changed

+24
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
2424
import org.apache.hadoop.fs.{FileSystem, Path}
2525
import org.apache.hadoop.fs.FileSystem.Statistics
2626
import org.apache.hadoop.mapred.JobConf
27+
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
2728
import org.apache.hadoop.security.Credentials
2829
import org.apache.hadoop.security.UserGroupInformation
2930

@@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging {
183184
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
184185
statisticsDataClass.getDeclaredMethod(methodName)
185186
}
187+
188+
/**
189+
* Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
190+
* call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
191+
* for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
192+
* while it's interface in Hadoop 2.+.
193+
*/
194+
def getConfigurationFromJobContext(context: JobContext): Configuration = {
195+
val method = context.getClass.getMethod("getConfiguration")
196+
method.invoke(context).asInstanceOf[Configuration]
197+
}
186198
}
187199

188200
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
2121
import org.apache.hadoop.io.{BytesWritable, LongWritable}
2222
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
2323
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
24+
import org.apache.spark.deploy.SparkHadoopUtil
2425

2526
/**
2627
* Custom Input Format for reading and splitting flat binary files that contain records,
@@ -33,7 +34,8 @@ private[spark] object FixedLengthBinaryInputFormat {
3334

3435
/** Retrieves the record length property from a Hadoop configuration */
3536
def getRecordLength(context: JobContext): Int = {
36-
context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
37+
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
38+
get(RECORD_LENGTH_PROPERTY).toInt
3739
}
3840
}
3941

core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
2424
import org.apache.hadoop.io.{BytesWritable, LongWritable}
2525
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
2626
import org.apache.hadoop.mapreduce.lib.input.FileSplit
27+
import org.apache.spark.deploy.SparkHadoopUtil
2728

2829
/**
2930
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
@@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader
8283
// the actual file we will be reading from
8384
val file = fileSplit.getPath
8485
// job configuration
85-
val job = context.getConfiguration
86+
val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
8687
// check compression
8788
val codec = new CompressionCodecFactory(job).getCodec(file)
8889
if (codec != null) {

core/src/main/scala/org/apache/spark/input/PortableDataStream.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.input
1919

2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
2121

22+
import org.apache.spark.deploy.SparkHadoopUtil
23+
2224
import scala.collection.JavaConversions._
2325

2426
import com.google.common.io.ByteStreams
@@ -145,7 +147,8 @@ class PortableDataStream(
145147

146148
private val confBytes = {
147149
val baos = new ByteArrayOutputStream()
148-
context.getConfiguration.write(new DataOutputStream(baos))
150+
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
151+
write(new DataOutputStream(baos))
149152
baos.toByteArray
150153
}
151154

core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit
2626
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
2727
import org.apache.hadoop.mapreduce.RecordReader
2828
import org.apache.hadoop.mapreduce.TaskAttemptContext
29+
import org.apache.spark.deploy.SparkHadoopUtil
2930

3031

3132
/**
@@ -51,7 +52,8 @@ private[spark] class WholeTextFileRecordReader(
5152
extends RecordReader[String, String] with Configurable {
5253

5354
private[this] val path = split.getPath(index)
54-
private[this] val fs = path.getFileSystem(context.getConfiguration)
55+
private[this] val fs = path.getFileSystem(
56+
SparkHadoopUtil.get.getConfigurationFromJobContext(context))
5557

5658
// True means the current file has been processed, then skip it.
5759
private[this] var processed = false

0 commit comments

Comments
 (0)