Skip to content

Commit 64f3d27

Browse files
author
Kostas Sakellis
committed
[SPARK-4079] [CORE] Code review feedback
Removed the fallback logic and now just throws an IllegalArgumentException if you specify a bad codec or one that is not available like Snappy.
1 parent 52dfa8f commit 64f3d27

File tree

1 file changed

+4
-21
lines changed

1 file changed

+4
-21
lines changed

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,46 +49,29 @@ trait CompressionCodec {
4949

5050
private[spark] object CompressionCodec extends Logging {
5151

52-
private val configKey = "spark.io.compression.codec"
5352
private val shortCompressionCodecNames = Map(
5453
"lz4" -> classOf[LZ4CompressionCodec].getName,
5554
"lzf" -> classOf[LZFCompressionCodec].getName,
5655
"snappy" -> classOf[SnappyCompressionCodec].getName)
5756

5857
def createCodec(conf: SparkConf): CompressionCodec = {
59-
conf.getOption(configKey)
60-
.map(createCodec(conf, _))
61-
.orElse(createCodecFromName(conf, DEFAULT_COMPRESSION_CODEC))
62-
.orElse({
63-
logWarning("Default codec " + DEFAULT_COMPRESSION_CODEC +
64-
" is unavailable. Faling back to " + FALLBACK_COMPRESSION_CODEC)
65-
createCodecFromName(conf, FALLBACK_COMPRESSION_CODEC)
66-
})
67-
.getOrElse(throw new IllegalArgumentException("The codec [" +
68-
FALLBACK_COMPRESSION_CODEC + "] is not available."))
58+
createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
6959
}
7060

7161
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
72-
createCodecFromName(conf, codecName)
73-
.getOrElse(throw new IllegalArgumentException("The specified codec [" +
74-
codecName + "] is not available."))
75-
}
76-
77-
private def createCodecFromName(conf: SparkConf, codecName : String)
78-
: Option[CompressionCodec] = {
7962
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
80-
try {
63+
val codec = try {
8164
val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
8265
.getConstructor(classOf[SparkConf])
8366
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
84-
.filter(_.isAvailable())
8567
} catch {
8668
case e: ClassNotFoundException => None
8769
}
70+
codec.filter(_.isAvailable())
71+
.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available."))
8872
}
8973

9074
val DEFAULT_COMPRESSION_CODEC = "snappy"
91-
val FALLBACK_COMPRESSION_CODEC = "lzf"
9275
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
9376
}
9477

0 commit comments

Comments
 (0)