Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class PortableDataStream(

@transient private lazy val conf = {
val bais = new ByteArrayInputStream(confBytes)
val nconf = new Configuration()
val nconf = new Configuration(false)
nconf.readFields(new DataInputStream(bais))
nconf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] class WholeTextFileRecordReader(

override def nextKeyValue(): Boolean = {
if (!processed) {
val conf = new Configuration
val conf = getConf
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path) // infers from file ext.
val fileIn = fs.open(path)
Expand Down Expand Up @@ -108,8 +108,17 @@ private[spark] class ConfigurableCombineFileRecordReader[K, V](
override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
if (getConf != null) {
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because initNextRecordReader could be called in the constructor, which getConf would be null.

We have to override setConf too to set conf for the first reader.

}
}
r
}

override def setConf(c: Configuration): Unit = {
super.setConf(c)
if (this.curReader != null) {
this.curReader.asInstanceOf[HConfigurable].setConf(c)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
val reader = new WholeTextFileRecordReader(fileSplit, hadoopAttemptContext, 0)
reader.setConf(hadoopAttemptContext.getConfiguration)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WholeTextFileRecordReader is Configurable, setConf should be called after creation.
This is why tests are failing before this patch.

However, I am wondering for org.apache.spark.input.WholeTextFileRecordReader and org.apache.spark.input.ConfigurableCombineFileRecordReader, we can already retrieve config from org.apache.hadoop.mapreduce.TaskAttemptContext. There is no need to make these class Configurable

I am wondering if we should remove Configurable trait for the related classes all at once. what do you think @gatorsmile

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an existing test that fails without this change, as you mention? should it be reenabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
Some tests in WholeTextFileSuite and SaveLoadSuite are failing without this change.

However, the failure is introduced by my change to WholeTextFileRecordReader

override def nextKeyValue(): Boolean = {
if (!processed) {
val conf = getConf
val factory = new CompressionCodecFactory(conf)

We use getConf instead of new Configuration, then should call setConf first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, they're not failing in master but can fail if run in an env where Hadoop config files are present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, they're not failing in master but can fail if run in an env where Hadoop config files are present?I see, they're not failing in master but can fail if run in an env where Hadoop config files are present?

Eh, yes, they are not failing in master. The code(master) even normally won't fail in an env where Hadoop configs are present. They could fail or get unexpected result unless the Hadoop configs are incorrectly configured in executor env(such as yarn-cluster), even user supplies correct configs (passed to TaskAttemptContext

reader.initialize(fileSplit, hadoopAttemptContext)
new RecordReaderIterator(reader)
}
Expand Down