Skip to content

Commit 52dfa8f

Browse files
author
Kostas Sakellis
committed
[SPARK-4079] [CORE] Default to LZF if Snappy not available
By default, snappy is the compression codec used. If Snappy is not available, Spark currently throws a stack trace. Now Spark falls back to LZF if Snappy is not available on the cluster and logs a warning message. The only exception is if the user has explicitly set spark.io.compression.codec=snappy. In this case, if snappy is not available, an IllegalArgumentException is thrown. Because of the way the Snappy library uses static initialization, it was very difficult in a unit test to simulate Snappy not being available. The only way I could think of was to create multiple classloaders which seemed excessive. As a result, most of this was tested adhoc on a test cluster by modifying the system property: org.xerial.snappy.use.systemlib=true which caused Snappy to not load and thus triggering this logic.
1 parent 2b233f5 commit 52dfa8f

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream}
2121

2222
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
2323
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
24-
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
24+
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
2525

2626
import org.apache.spark.SparkConf
2727
import org.apache.spark.annotation.DeveloperApi
2828
import org.apache.spark.util.Utils
29+
import org.apache.spark.Logging
2930

3031
/**
3132
* :: DeveloperApi ::
@@ -42,28 +43,52 @@ trait CompressionCodec {
4243
def compressedOutputStream(s: OutputStream): OutputStream
4344

4445
def compressedInputStream(s: InputStream): InputStream
45-
}
4646

47+
def isAvailable() : Boolean = true
48+
}
4749

48-
private[spark] object CompressionCodec {
50+
private[spark] object CompressionCodec extends Logging {
4951

52+
private val configKey = "spark.io.compression.codec"
5053
private val shortCompressionCodecNames = Map(
5154
"lz4" -> classOf[LZ4CompressionCodec].getName,
5255
"lzf" -> classOf[LZFCompressionCodec].getName,
5356
"snappy" -> classOf[SnappyCompressionCodec].getName)
5457

5558
def createCodec(conf: SparkConf): CompressionCodec = {
56-
createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
59+
conf.getOption(configKey)
60+
.map(createCodec(conf, _))
61+
.orElse(createCodecFromName(conf, DEFAULT_COMPRESSION_CODEC))
62+
.orElse({
63+
logWarning("Default codec " + DEFAULT_COMPRESSION_CODEC +
64+
" is unavailable. Faling back to " + FALLBACK_COMPRESSION_CODEC)
65+
createCodecFromName(conf, FALLBACK_COMPRESSION_CODEC)
66+
})
67+
.getOrElse(throw new IllegalArgumentException("The codec [" +
68+
FALLBACK_COMPRESSION_CODEC + "] is not available."))
5769
}
5870

5971
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
72+
createCodecFromName(conf, codecName)
73+
.getOrElse(throw new IllegalArgumentException("The specified codec [" +
74+
codecName + "] is not available."))
75+
}
76+
77+
private def createCodecFromName(conf: SparkConf, codecName : String)
78+
: Option[CompressionCodec] = {
6079
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
61-
val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
62-
.getConstructor(classOf[SparkConf])
63-
ctor.newInstance(conf).asInstanceOf[CompressionCodec]
80+
try {
81+
val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
82+
.getConstructor(classOf[SparkConf])
83+
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
84+
.filter(_.isAvailable())
85+
} catch {
86+
case e: ClassNotFoundException => None
87+
}
6488
}
6589

6690
val DEFAULT_COMPRESSION_CODEC = "snappy"
91+
val FALLBACK_COMPRESSION_CODEC = "lzf"
6792
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
6893
}
6994

@@ -126,4 +151,13 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
126151
}
127152

128153
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
154+
155+
override def isAvailable() = {
156+
try {
157+
Snappy.getNativeLibraryVersion
158+
true
159+
} catch {
160+
case e: Error => false
161+
}
162+
}
129163
}

core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite {
8585
assert(codec.getClass === classOf[SnappyCompressionCodec])
8686
testCodec(codec)
8787
}
88+
89+
test("bad compression codec") {
90+
intercept[IllegalArgumentException] {
91+
CompressionCodec.createCodec(conf, "foobar")
92+
}
93+
}
8894
}

0 commit comments

Comments
 (0)