@@ -19,6 +19,7 @@ package org.apache.spark.io
1919
2020import java .io .{ByteArrayInputStream , ByteArrayOutputStream }
2121
22+ import com .google .common .io .ByteStreams
2223import org .scalatest .FunSuite
2324
2425import org .apache .spark .SparkConf
@@ -62,6 +63,14 @@ class CompressionCodecSuite extends FunSuite {
6263 testCodec(codec)
6364 }
6465
66+ test(" lz4 does not support concatenation of serialized streams" ) {
67+ val codec = CompressionCodec .createCodec(conf, classOf [LZ4CompressionCodec ].getName)
68+ assert(codec.getClass === classOf [LZ4CompressionCodec ])
69+ intercept[Exception ] {
70+ testConcatenationOfSerializedStreams(codec)
71+ }
72+ }
73+
6574 test(" lzf compression codec" ) {
6675 val codec = CompressionCodec .createCodec(conf, classOf [LZFCompressionCodec ].getName)
6776 assert(codec.getClass === classOf [LZFCompressionCodec ])
@@ -74,6 +83,12 @@ class CompressionCodecSuite extends FunSuite {
7483 testCodec(codec)
7584 }
7685
86+ test(" lzf supports concatenation of serialized streams" ) {
87+ val codec = CompressionCodec .createCodec(conf, classOf [LZFCompressionCodec ].getName)
88+ assert(codec.getClass === classOf [LZFCompressionCodec ])
89+ testConcatenationOfSerializedStreams(codec)
90+ }
91+
7792 test(" snappy compression codec" ) {
7893 val codec = CompressionCodec .createCodec(conf, classOf [SnappyCompressionCodec ].getName)
7994 assert(codec.getClass === classOf [SnappyCompressionCodec ])
@@ -86,9 +101,38 @@ class CompressionCodecSuite extends FunSuite {
86101 testCodec(codec)
87102 }
88103
104+ test(" snappy does not support concatenation of serialized streams" ) {
105+ val codec = CompressionCodec .createCodec(conf, classOf [SnappyCompressionCodec ].getName)
106+ assert(codec.getClass === classOf [SnappyCompressionCodec ])
107+ intercept[Exception ] {
108+ testConcatenationOfSerializedStreams(codec)
109+ }
110+ }
111+
89112 test(" bad compression codec" ) {
90113 intercept[IllegalArgumentException ] {
91114 CompressionCodec .createCodec(conf, " foobar" )
92115 }
93116 }
117+
118+ private def testConcatenationOfSerializedStreams (codec : CompressionCodec ): Unit = {
119+ val bytes1 : Array [Byte ] = {
120+ val baos = new ByteArrayOutputStream ()
121+ val out = codec.compressedOutputStream(baos)
122+ (0 to 64 ).foreach(out.write)
123+ out.close()
124+ baos.toByteArray
125+ }
126+ val bytes2 : Array [Byte ] = {
127+ val baos = new ByteArrayOutputStream ()
128+ val out = codec.compressedOutputStream(baos)
129+ (65 to 127 ).foreach(out.write)
130+ out.close()
131+ baos.toByteArray
132+ }
133+ val concatenatedBytes = codec.compressedInputStream(new ByteArrayInputStream (bytes1 ++ bytes2))
134+ val decompressed : Array [Byte ] = new Array [Byte ](128 )
135+ ByteStreams .readFully(concatenatedBytes, decompressed)
136+ assert(decompressed.toSeq === (0 to 127 ))
137+ }
94138}
0 commit comments