From 50d6d7558d098a8423de0ae7570b4863c778dd14 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 29 Aug 2019 15:30:01 +0800 Subject: [PATCH 1/2] [SPARK-28907] Review invalid usage of new Configuration() --- .../org/apache/spark/input/PortableDataStream.scala | 2 +- .../spark/input/WholeTextFileRecordReader.scala | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 5b33c110154d6..6a4af01475646 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -172,7 +172,7 @@ class PortableDataStream( @transient private lazy val conf = { val bais = new ByteArrayInputStream(confBytes) - val nconf = new Configuration() + val nconf = new Configuration(false) nconf.readFields(new DataInputStream(bais)) nconf } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 6b7f086678e93..28fd1ff1b77ca 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -69,7 +69,7 @@ private[spark] class WholeTextFileRecordReader( override def nextKeyValue(): Boolean = { if (!processed) { - val conf = new Configuration + val conf = getConf val factory = new CompressionCodecFactory(conf) val codec = factory.getCodec(path) // infers from file ext. val fileIn = fs.open(path) @@ -108,8 +108,17 @@ private[spark] class ConfigurableCombineFileRecordReader[K, V]( override def initNextRecordReader(): Boolean = { val r = super.initNextRecordReader() if (r) { - this.curReader.asInstanceOf[HConfigurable].setConf(getConf) + if (getConf != null) { + this.curReader.asInstanceOf[HConfigurable].setConf(getConf) + } } r } + + override def setConf(c: Configuration): Unit = { + super.setConf(c) + if (this.curReader != null) { + this.curReader.asInstanceOf[HConfigurable].setConf(c) + } + } } From 149de72c220cbc094f0b8756c535cf1bd796a48e Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Fri, 30 Aug 2019 14:28:56 +0800 Subject: [PATCH 2/2] call setConf for WholeTextFileReader to avoid NPE. --- .../sql/execution/datasources/HadoopFileWholeTextReader.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala index f5724f7c5955d..0e6d803f02d4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala @@ -45,6 +45,7 @@ class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) val reader = new WholeTextFileRecordReader(fileSplit, hadoopAttemptContext, 0) + reader.setConf(hadoopAttemptContext.getConfiguration) reader.initialize(fileSplit, hadoopAttemptContext) new RecordReaderIterator(reader) }