From 7837b0c6052fa20bd1a6cf823947e95379d6d3b8 Mon Sep 17 00:00:00 2001 From: Aaditya Ramesh Date: Tue, 21 Feb 2017 21:05:48 -0800 Subject: [PATCH 1/3] [SPARK-19525][CORE] Compressing checkpoints. Spark's performance improves greatly if we enable compression of checkpoints. --- .../apache/spark/io/CompressionCodec.scala | 1 + .../spark/rdd/ReliableCheckpointRDD.scala | 29 +++++- .../org/apache/spark/CheckpointSuite.scala | 99 +++++++++++++++++-- 3 files changed, 119 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 2e991ce394c4..c94b0017d9e1 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -95,6 +95,7 @@ private[spark] object CompressionCodec { val FALLBACK_COMPRESSION_CODEC = "snappy" val DEFAULT_COMPRESSION_CODEC = "lz4" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq + val ALL_COMPRESSION_CODECS_SHORT: Set[String] = shortCompressionCodecNames.keySet } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index e0a29b48314f..ba03049ed038 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{SerializableConfiguration, Utils} /** @@ -133,9 +134,13 @@ private[spark] object ReliableCheckpointRDD extends Logging { val broadcastedConf = sc.broadcast( new SerializableConfiguration(sc.hadoopConfiguration)) // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) + logInfo(s"The checkpoint compression codec is " + + s"${sc.conf.get("spark.checkpoint.compress.codec", "none")}.") + val startTime = System.currentTimeMillis() sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) + logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTime} ms.") if (originalRDD.partitioner.nonEmpty) { writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) } @@ -169,7 +174,15 @@ private[spark] object ReliableCheckpointRDD extends Logging { val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileOutputStream = if (blockSize < 0) { - fs.create(tempOutputPath, false, bufferSize) + val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none") + val fileStream = fs.create(tempOutputPath, false, bufferSize) + if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) { + val compressionCodec = CompressionCodec.createCodec(env.conf, checkpointCodec) + logInfo(s"Compressing using $checkpointCodec.") + compressionCodec.compressedOutputStream(fileStream) + } else { + fileStream + } } else { // This is mainly for testing purpose fs.create(tempOutputPath, false, bufferSize, @@ -177,6 +190,8 @@ private[spark] object ReliableCheckpointRDD extends Logging { } val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) + logInfo(s"Starting to write to checkpoint file $tempOutputPath.") + val startTimeMs = System.currentTimeMillis() Utils.tryWithSafeFinally { serializeStream.writeAll(iterator) } { @@ -197,6 +212,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { } } } + logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTimeMs} ms.") } /** @@ -273,9 +289,16 @@ private[spark] object ReliableCheckpointRDD extends Logging { val env = SparkEnv.get val fs = path.getFileSystem(broadcastedConf.value.value) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) - val fileInputStream = fs.open(path, bufferSize) + val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none") + val fileStream = fs.open(path, bufferSize) + val inputStream = + if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) { + CompressionCodec.createCodec(env.conf, checkpointCodec).compressedInputStream(fileStream) + } else { + fileStream + } val serializer = env.serializer.newInstance() - val deserializeStream = serializer.deserializeStream(fileInputStream) + val deserializeStream = serializer.deserializeStream(inputStream) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => deserializeStream.close()) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index b117c7709b46..b09028e1dcb0 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -21,12 +21,15 @@ import java.io.File import scala.reflect.ClassTag +import com.google.common.io.ByteStreams import org.apache.hadoop.fs.Path +import org.apache.spark.io.CompressionCodec import org.apache.spark.rdd._ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils + trait RDDCheckpointTester { self: SparkFunSuite => protected val partitioner = new HashPartitioner(2) @@ -238,6 +241,16 @@ trait RDDCheckpointTester { self: SparkFunSuite => protected def generateFatPairRDD(): RDD[(Int, Int)] = { new FatPairRDD(sparkContext.makeRDD(1 to 100, 4), partitioner).mapValues(x => x) } + + protected def testBasicCheckpoint(sc: SparkContext, reliableCheckpoint: Boolean): Unit = { + val parCollection = sc.makeRDD(1 to 4) + val flatMappedRDD = parCollection.flatMap(x => 1 to x) + checkpoint(flatMappedRDD, reliableCheckpoint) + assert(flatMappedRDD.dependencies.head.rdd === parCollection) + val result = flatMappedRDD.collect() + assert(flatMappedRDD.dependencies.head.rdd != parCollection) + assert(flatMappedRDD.collect() === result) + } } /** @@ -251,10 +264,14 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS super.beforeEach() checkpointDir = File.createTempFile("temp", "", Utils.createTempDir()) checkpointDir.delete() + } + + private def startSparkContext(): Unit = { sc = new SparkContext("local", "test") sc.setCheckpointDir(checkpointDir.toString) } + override def afterEach(): Unit = { try { Utils.deleteRecursively(checkpointDir) @@ -266,13 +283,68 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS override def sparkContext: SparkContext = sc runTest("basic checkpointing") { reliableCheckpoint: Boolean => - val parCollection = sc.makeRDD(1 to 4) - val flatMappedRDD = parCollection.flatMap(x => 1 to x) - checkpoint(flatMappedRDD, reliableCheckpoint) - assert(flatMappedRDD.dependencies.head.rdd === parCollection) - val result = flatMappedRDD.collect() - assert(flatMappedRDD.dependencies.head.rdd != parCollection) - assert(flatMappedRDD.collect() === result) + startSparkContext() + testBasicCheckpoint(sc, reliableCheckpoint) + } + + runTest("compression with snappy", skipLocalCheckpoint = true) { reliableCheckpoint: Boolean => + val sparkConf = new SparkConf() + sparkConf.set("spark.checkpoint.compress.codec", "snappy") + sc = new SparkContext("local", "test", sparkConf) + sc.setCheckpointDir(checkpointDir.toString) + testBasicCheckpoint(sc, reliableCheckpoint = true) + } + + runTest("compression with lz4", skipLocalCheckpoint = true) { reliableCheckpoint: Boolean => + val sparkConf = new SparkConf() + sparkConf.set("spark.checkpoint.compress.codec", "lz4") + sc = new SparkContext("local", "test", sparkConf) + sc.setCheckpointDir(checkpointDir.toString) + testBasicCheckpoint(sc, reliableCheckpoint = true) + } + + runTest("compression with lzf", skipLocalCheckpoint = true) { reliableCheckpoint: Boolean => + val sparkConf = new SparkConf() + sparkConf.set("spark.checkpoint.compress.codec", "lzf") + sc = new SparkContext("local", "test", sparkConf) + sc.setCheckpointDir(checkpointDir.toString) + testBasicCheckpoint(sc, reliableCheckpoint = true) + } + + private def testCompression(compressionCodec: String): Unit = { + val sparkConf = new SparkConf() + sparkConf.set("spark.checkpoint.compress.codec", compressionCodec) + sc = new SparkContext("local", "test", sparkConf) + sc.setCheckpointDir(checkpointDir.toString) + val initialSize = 20 + // Use just one partition for now since compression works best on large data sets. + val collection = sc.makeRDD(1 to initialSize, numSlices = 1) + val flatMappedRDD = collection.flatMap(x => 1 to x) + checkpoint(flatMappedRDD, reliableCheckpoint = true) + assert(flatMappedRDD.collect().length == initialSize * (initialSize + 1)/2, + "The checkpoint was lossy!") + val checkpointPath = new Path(flatMappedRDD.getCheckpointFile.get) + val fs = checkpointPath.getFileSystem(sc.hadoopConfiguration) + val fileStatus = fs.listStatus(checkpointPath).find(_.getPath.getName.startsWith("part-")).get + val compressedSize = fileStatus.getLen + assert(compressedSize > 0, "The checkpoint file was not written!") + val compressedInputStream = CompressionCodec.createCodec(sparkConf, compressionCodec) + .compressedInputStream(fs.open(fileStatus.getPath)) + val uncompressedSize = ByteStreams.toByteArray(compressedInputStream).length + compressedInputStream.close() + assert(compressedSize < uncompressedSize, "The compression was not successful!") + } + + runTest("compression size snappy", skipLocalCheckpoint = true) { _: Boolean => + testCompression("snappy") + } + + runTest("compression size lzf", skipLocalCheckpoint = true) { _: Boolean => + testCompression("lzf") + } + + runTest("compression size lz4", skipLocalCheckpoint = true) { _: Boolean => + testCompression("lz4") } runTest("checkpointing partitioners", skipLocalCheckpoint = true) { _: Boolean => @@ -312,6 +384,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } } + startSparkContext() testPartitionerCheckpointing(partitioner) // Test that corrupted partitioner file does not prevent recovery of RDD @@ -319,6 +392,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("RDDs with one-to-one dependencies") { reliableCheckpoint: Boolean => + startSparkContext() testRDD(_.map(x => x.toString), reliableCheckpoint) testRDD(_.flatMap(x => 1 to x), reliableCheckpoint) testRDD(_.filter(_ % 2 == 0), reliableCheckpoint) @@ -332,6 +406,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("ParallelCollectionRDD") { reliableCheckpoint: Boolean => + startSparkContext() val parCollection = sc.makeRDD(1 to 4, 2) val numPartitions = parCollection.partitions.size checkpoint(parCollection, reliableCheckpoint) @@ -348,6 +423,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("BlockRDD") { reliableCheckpoint: Boolean => + startSparkContext() val blockId = TestBlockId("id") val blockManager = SparkEnv.get.blockManager blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY) @@ -365,6 +441,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("ShuffleRDD") { reliableCheckpoint: Boolean => + startSparkContext() testRDD(rdd => { // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD new ShuffledRDD[Int, Int, Int](rdd.map(x => (x % 2, 1)), partitioner) @@ -372,12 +449,14 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("UnionRDD") { reliableCheckpoint: Boolean => + startSparkContext() def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1) testRDD(_.union(otherRDD), reliableCheckpoint) testRDDPartitions(_.union(otherRDD), reliableCheckpoint) } runTest("CartesianRDD") { reliableCheckpoint: Boolean => + startSparkContext() def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1) testRDD(new CartesianRDD(sc, _, otherRDD), reliableCheckpoint) testRDDPartitions(new CartesianRDD(sc, _, otherRDD), reliableCheckpoint) @@ -401,6 +480,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("CoalescedRDD") { reliableCheckpoint: Boolean => + startSparkContext() testRDD(_.coalesce(2), reliableCheckpoint) testRDDPartitions(_.coalesce(2), reliableCheckpoint) @@ -423,6 +503,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("CoGroupedRDD") { reliableCheckpoint: Boolean => + startSparkContext() val longLineageRDD1 = generateFatPairRDD() // Collect the RDD as sequences instead of arrays to enable equality tests in testRDD @@ -441,6 +522,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("ZippedPartitionsRDD") { reliableCheckpoint: Boolean => + startSparkContext() testRDD(rdd => rdd.zip(rdd.map(x => x)), reliableCheckpoint) testRDDPartitions(rdd => rdd.zip(rdd.map(x => x)), reliableCheckpoint) @@ -466,6 +548,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("PartitionerAwareUnionRDD") { reliableCheckpoint: Boolean => + startSparkContext() testRDD(rdd => { new PartitionerAwareUnionRDD[(Int, Int)](sc, Array( generateFatPairRDD(), @@ -500,6 +583,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean => + startSparkContext() val rdd = new BlockRDD[Int](sc, Array.empty[BlockId]) assert(rdd.partitions.size === 0) assert(rdd.isCheckpointed === false) @@ -514,6 +598,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("checkpointAllMarkedAncestors") { reliableCheckpoint: Boolean => + startSparkContext() testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = true) testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = false) } From 18e7ba6be042f344e5254fba7a0e55f41eca6d8c Mon Sep 17 00:00:00 2001 From: Aaditya Ramesh Date: Wed, 22 Feb 2017 16:03:56 -0800 Subject: [PATCH 2/3] [SPARK-19525][CORE] Addressing comments. --- .../apache/spark/io/CompressionCodec.scala | 1 - .../spark/rdd/ReliableCheckpointRDD.scala | 43 +++++++------ .../org/apache/spark/CheckpointSuite.scala | 62 ++++++++++--------- 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index c94b0017d9e1..2e991ce394c4 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -95,7 +95,6 @@ private[spark] object CompressionCodec { val FALLBACK_COMPRESSION_CODEC = "snappy" val DEFAULT_COMPRESSION_CODEC = "lz4" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq - val ALL_COMPRESSION_CODECS_SHORT: Set[String] = shortCompressionCodecNames.keySet } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index ba03049ed038..368b4f3936fe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.io.{FileNotFoundException, IOException} +import java.io.{FileNotFoundException, InputStream, IOException, OutputStream} import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -30,6 +30,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{SerializableConfiguration, Utils} + + /** * An RDD that reads from checkpoint files previously written to reliable storage. */ @@ -134,13 +136,14 @@ private[spark] object ReliableCheckpointRDD extends Logging { val broadcastedConf = sc.broadcast( new SerializableConfiguration(sc.hadoopConfiguration)) // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) - logInfo(s"The checkpoint compression codec is " + - s"${sc.conf.get("spark.checkpoint.compress.codec", "none")}.") val startTime = System.currentTimeMillis() sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTime} ms.") + sc.conf.getOption("spark.checkpoint.compress.codec").foreach(codec => { + logInfo(s"The checkpoint compression codec is $codec.") + }) if (originalRDD.partitioner.nonEmpty) { writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) } @@ -161,7 +164,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { def writePartitionToCheckpointFile[T: ClassTag]( path: String, broadcastedConf: Broadcast[SerializableConfiguration], - blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { + blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]): Unit = { val env = SparkEnv.get val outputDir = new Path(path) val fs = outputDir.getFileSystem(broadcastedConf.value.value) @@ -174,14 +177,13 @@ private[spark] object ReliableCheckpointRDD extends Logging { val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileOutputStream = if (blockSize < 0) { - val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none") - val fileStream = fs.create(tempOutputPath, false, bufferSize) - if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) { - val compressionCodec = CompressionCodec.createCodec(env.conf, checkpointCodec) - logInfo(s"Compressing using $checkpointCodec.") - compressionCodec.compressedOutputStream(fileStream) - } else { - fileStream + lazy val fileStream: OutputStream = fs.create(tempOutputPath, false, bufferSize) + env.conf.getOption("spark.checkpoint.compress.codec").fold(fileStream) { + codec => { + logDebug(s"Compressing using $codec.") + CompressionCodec.createCodec(env.conf, codec) + .compressedOutputStream(fileStream) + } } } else { // This is mainly for testing purpose @@ -190,7 +192,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { } val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) - logInfo(s"Starting to write to checkpoint file $tempOutputPath.") + logTrace(s"Starting to write to checkpoint file $tempOutputPath.") val startTimeMs = System.currentTimeMillis() Utils.tryWithSafeFinally { serializeStream.writeAll(iterator) @@ -289,13 +291,14 @@ private[spark] object ReliableCheckpointRDD extends Logging { val env = SparkEnv.get val fs = path.getFileSystem(broadcastedConf.value.value) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) - val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none") - val fileStream = fs.open(path, bufferSize) - val inputStream = - if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) { - CompressionCodec.createCodec(env.conf, checkpointCodec).compressedInputStream(fileStream) - } else { - fileStream + lazy val fileStream: InputStream = fs.open(path, bufferSize) + val inputStream: InputStream = + env.conf.getOption("spark.checkpoint.compress.codec").fold(fileStream) { + codec => { + logDebug(s"Decompressing using $codec.") + CompressionCodec.createCodec(env.conf, codec) + .compressedInputStream(fileStream) + } } val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(inputStream) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index b09028e1dcb0..6f0adb2d7567 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -251,6 +251,32 @@ trait RDDCheckpointTester { self: SparkFunSuite => assert(flatMappedRDD.dependencies.head.rdd != parCollection) assert(flatMappedRDD.collect() === result) } + + protected def testCompression(checkpointDir: File, compressionCodec: String): Unit = { + val sparkConf = new SparkConf() + sparkConf.set("spark.checkpoint.compress.codec", compressionCodec) + val sc = new SparkContext("local", "test", sparkConf) + sc.setCheckpointDir(checkpointDir.toString) + val initialSize = 20 + // Use just one partition for now since compression works best on large data sets. + val collection = sc.makeRDD(1 to initialSize, numSlices = 1) + val flatMappedRDD = collection.flatMap(x => 1 to x) + checkpoint(flatMappedRDD, reliableCheckpoint = true) + assert(flatMappedRDD.collect().length == initialSize * (initialSize + 1)/2, + "The checkpoint was lossy!") + sc.stop() + val checkpointPath = new Path(flatMappedRDD.getCheckpointFile.get) + val fs = checkpointPath.getFileSystem(sc.hadoopConfiguration) + val fileStatus = fs.listStatus(checkpointPath).find(_.getPath.getName.startsWith("part-")).get + val compressedSize = fileStatus.getLen + assert(compressedSize > 0, "The checkpoint file was not written!") + val compressedInputStream = CompressionCodec.createCodec(sparkConf, compressionCodec) + .compressedInputStream(fs.open(fileStatus.getPath)) + val uncompressedSize = ByteStreams.toByteArray(compressedInputStream).length + compressedInputStream.close() + assert(compressedSize < uncompressedSize, "The compression was not successful!") + } + } /** @@ -287,7 +313,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS testBasicCheckpoint(sc, reliableCheckpoint) } - runTest("compression with snappy", skipLocalCheckpoint = true) { reliableCheckpoint: Boolean => + runTest("compression with snappy", skipLocalCheckpoint = true) { _: Boolean => val sparkConf = new SparkConf() sparkConf.set("spark.checkpoint.compress.codec", "snappy") sc = new SparkContext("local", "test", sparkConf) @@ -295,7 +321,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS testBasicCheckpoint(sc, reliableCheckpoint = true) } - runTest("compression with lz4", skipLocalCheckpoint = true) { reliableCheckpoint: Boolean => + runTest("compression with lz4", skipLocalCheckpoint = true) { _: Boolean => val sparkConf = new SparkConf() sparkConf.set("spark.checkpoint.compress.codec", "lz4") sc = new SparkContext("local", "test", sparkConf) @@ -303,7 +329,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS testBasicCheckpoint(sc, reliableCheckpoint = true) } - runTest("compression with lzf", skipLocalCheckpoint = true) { reliableCheckpoint: Boolean => + runTest("compression with lzf", skipLocalCheckpoint = true) { _: Boolean => val sparkConf = new SparkConf() sparkConf.set("spark.checkpoint.compress.codec", "lzf") sc = new SparkContext("local", "test", sparkConf) @@ -311,40 +337,16 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS testBasicCheckpoint(sc, reliableCheckpoint = true) } - private def testCompression(compressionCodec: String): Unit = { - val sparkConf = new SparkConf() - sparkConf.set("spark.checkpoint.compress.codec", compressionCodec) - sc = new SparkContext("local", "test", sparkConf) - sc.setCheckpointDir(checkpointDir.toString) - val initialSize = 20 - // Use just one partition for now since compression works best on large data sets. - val collection = sc.makeRDD(1 to initialSize, numSlices = 1) - val flatMappedRDD = collection.flatMap(x => 1 to x) - checkpoint(flatMappedRDD, reliableCheckpoint = true) - assert(flatMappedRDD.collect().length == initialSize * (initialSize + 1)/2, - "The checkpoint was lossy!") - val checkpointPath = new Path(flatMappedRDD.getCheckpointFile.get) - val fs = checkpointPath.getFileSystem(sc.hadoopConfiguration) - val fileStatus = fs.listStatus(checkpointPath).find(_.getPath.getName.startsWith("part-")).get - val compressedSize = fileStatus.getLen - assert(compressedSize > 0, "The checkpoint file was not written!") - val compressedInputStream = CompressionCodec.createCodec(sparkConf, compressionCodec) - .compressedInputStream(fs.open(fileStatus.getPath)) - val uncompressedSize = ByteStreams.toByteArray(compressedInputStream).length - compressedInputStream.close() - assert(compressedSize < uncompressedSize, "The compression was not successful!") - } - runTest("compression size snappy", skipLocalCheckpoint = true) { _: Boolean => - testCompression("snappy") + testCompression(checkpointDir, "snappy") } runTest("compression size lzf", skipLocalCheckpoint = true) { _: Boolean => - testCompression("lzf") + testCompression(checkpointDir, "lzf") } runTest("compression size lz4", skipLocalCheckpoint = true) { _: Boolean => - testCompression("lz4") + testCompression(checkpointDir, "lz4") } runTest("checkpointing partitioners", skipLocalCheckpoint = true) { _: Boolean => From dab44edce2cb9b49b44983fc944fbc1ebbf67ce7 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 27 Apr 2017 13:45:06 -0700 Subject: [PATCH 3/3] Finish the PR --- .../spark/internal/config/package.scala | 6 + .../spark/rdd/ReliableCheckpointRDD.scala | 50 +++---- .../org/apache/spark/CheckpointSuite.scala | 138 ++++++------------ 3 files changed, 74 insertions(+), 120 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2f0a3064be11..7f7921d56f49 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -272,4 +272,10 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val CHECKPOINT_COMPRESS = + ConfigBuilder("spark.checkpoint.compress") + .doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " + + "spark.io.compression.codec.") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 368b4f3936fe..37c67cee55f9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -17,7 +17,8 @@ package org.apache.spark.rdd -import java.io.{FileNotFoundException, InputStream, IOException, OutputStream} +import java.io.{FileNotFoundException, IOException} +import java.util.concurrent.TimeUnit import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -27,11 +28,10 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.CHECKPOINT_COMPRESS import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{SerializableConfiguration, Utils} - - /** * An RDD that reads from checkpoint files previously written to reliable storage. */ @@ -122,6 +122,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { originalRDD: RDD[T], checkpointDir: String, blockSize: Int = -1): ReliableCheckpointRDD[T] = { + val checkpointStartTimeNs = System.nanoTime() val sc = originalRDD.sparkContext @@ -136,18 +137,17 @@ private[spark] object ReliableCheckpointRDD extends Logging { val broadcastedConf = sc.broadcast( new SerializableConfiguration(sc.hadoopConfiguration)) // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) - val startTime = System.currentTimeMillis() sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) - logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTime} ms.") - sc.conf.getOption("spark.checkpoint.compress.codec").foreach(codec => { - logInfo(s"The checkpoint compression codec is $codec.") - }) if (originalRDD.partitioner.nonEmpty) { writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) } + val checkpointDurationMs = + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs) + logInfo(s"Checkpointing took $checkpointDurationMs ms.") + val newRDD = new ReliableCheckpointRDD[T]( sc, checkpointDirPath.toString, originalRDD.partitioner) if (newRDD.partitions.length != originalRDD.partitions.length) { @@ -164,7 +164,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { def writePartitionToCheckpointFile[T: ClassTag]( path: String, broadcastedConf: Broadcast[SerializableConfiguration], - blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]): Unit = { + blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) val fs = outputDir.getFileSystem(broadcastedConf.value.value) @@ -177,13 +177,11 @@ private[spark] object ReliableCheckpointRDD extends Logging { val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileOutputStream = if (blockSize < 0) { - lazy val fileStream: OutputStream = fs.create(tempOutputPath, false, bufferSize) - env.conf.getOption("spark.checkpoint.compress.codec").fold(fileStream) { - codec => { - logDebug(s"Compressing using $codec.") - CompressionCodec.createCodec(env.conf, codec) - .compressedOutputStream(fileStream) - } + val fileStream = fs.create(tempOutputPath, false, bufferSize) + if (env.conf.get(CHECKPOINT_COMPRESS)) { + CompressionCodec.createCodec(env.conf).compressedOutputStream(fileStream) + } else { + fileStream } } else { // This is mainly for testing purpose @@ -192,8 +190,6 @@ private[spark] object ReliableCheckpointRDD extends Logging { } val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) - logTrace(s"Starting to write to checkpoint file $tempOutputPath.") - val startTimeMs = System.currentTimeMillis() Utils.tryWithSafeFinally { serializeStream.writeAll(iterator) } { @@ -214,7 +210,6 @@ private[spark] object ReliableCheckpointRDD extends Logging { } } } - logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTimeMs} ms.") } /** @@ -291,17 +286,16 @@ private[spark] object ReliableCheckpointRDD extends Logging { val env = SparkEnv.get val fs = path.getFileSystem(broadcastedConf.value.value) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) - lazy val fileStream: InputStream = fs.open(path, bufferSize) - val inputStream: InputStream = - env.conf.getOption("spark.checkpoint.compress.codec").fold(fileStream) { - codec => { - logDebug(s"Decompressing using $codec.") - CompressionCodec.createCodec(env.conf, codec) - .compressedInputStream(fileStream) - } + val fileInputStream = { + val fileStream = fs.open(path, bufferSize) + if (env.conf.get(CHECKPOINT_COMPRESS)) { + CompressionCodec.createCodec(env.conf).compressedInputStream(fileStream) + } else { + fileStream } + } val serializer = env.serializer.newInstance() - val deserializeStream = serializer.deserializeStream(inputStream) + val deserializeStream = serializer.deserializeStream(fileInputStream) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => deserializeStream.close()) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 6f0adb2d7567..ee70a3399efe 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.rdd._ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils - trait RDDCheckpointTester { self: SparkFunSuite => protected val partitioner = new HashPartitioner(2) @@ -241,42 +240,6 @@ trait RDDCheckpointTester { self: SparkFunSuite => protected def generateFatPairRDD(): RDD[(Int, Int)] = { new FatPairRDD(sparkContext.makeRDD(1 to 100, 4), partitioner).mapValues(x => x) } - - protected def testBasicCheckpoint(sc: SparkContext, reliableCheckpoint: Boolean): Unit = { - val parCollection = sc.makeRDD(1 to 4) - val flatMappedRDD = parCollection.flatMap(x => 1 to x) - checkpoint(flatMappedRDD, reliableCheckpoint) - assert(flatMappedRDD.dependencies.head.rdd === parCollection) - val result = flatMappedRDD.collect() - assert(flatMappedRDD.dependencies.head.rdd != parCollection) - assert(flatMappedRDD.collect() === result) - } - - protected def testCompression(checkpointDir: File, compressionCodec: String): Unit = { - val sparkConf = new SparkConf() - sparkConf.set("spark.checkpoint.compress.codec", compressionCodec) - val sc = new SparkContext("local", "test", sparkConf) - sc.setCheckpointDir(checkpointDir.toString) - val initialSize = 20 - // Use just one partition for now since compression works best on large data sets. - val collection = sc.makeRDD(1 to initialSize, numSlices = 1) - val flatMappedRDD = collection.flatMap(x => 1 to x) - checkpoint(flatMappedRDD, reliableCheckpoint = true) - assert(flatMappedRDD.collect().length == initialSize * (initialSize + 1)/2, - "The checkpoint was lossy!") - sc.stop() - val checkpointPath = new Path(flatMappedRDD.getCheckpointFile.get) - val fs = checkpointPath.getFileSystem(sc.hadoopConfiguration) - val fileStatus = fs.listStatus(checkpointPath).find(_.getPath.getName.startsWith("part-")).get - val compressedSize = fileStatus.getLen - assert(compressedSize > 0, "The checkpoint file was not written!") - val compressedInputStream = CompressionCodec.createCodec(sparkConf, compressionCodec) - .compressedInputStream(fs.open(fileStatus.getPath)) - val uncompressedSize = ByteStreams.toByteArray(compressedInputStream).length - compressedInputStream.close() - assert(compressedSize < uncompressedSize, "The compression was not successful!") - } - } /** @@ -290,14 +253,10 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS super.beforeEach() checkpointDir = File.createTempFile("temp", "", Utils.createTempDir()) checkpointDir.delete() - } - - private def startSparkContext(): Unit = { sc = new SparkContext("local", "test") sc.setCheckpointDir(checkpointDir.toString) } - override def afterEach(): Unit = { try { Utils.deleteRecursively(checkpointDir) @@ -309,44 +268,13 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS override def sparkContext: SparkContext = sc runTest("basic checkpointing") { reliableCheckpoint: Boolean => - startSparkContext() - testBasicCheckpoint(sc, reliableCheckpoint) - } - - runTest("compression with snappy", skipLocalCheckpoint = true) { _: Boolean => - val sparkConf = new SparkConf() - sparkConf.set("spark.checkpoint.compress.codec", "snappy") - sc = new SparkContext("local", "test", sparkConf) - sc.setCheckpointDir(checkpointDir.toString) - testBasicCheckpoint(sc, reliableCheckpoint = true) - } - - runTest("compression with lz4", skipLocalCheckpoint = true) { _: Boolean => - val sparkConf = new SparkConf() - sparkConf.set("spark.checkpoint.compress.codec", "lz4") - sc = new SparkContext("local", "test", sparkConf) - sc.setCheckpointDir(checkpointDir.toString) - testBasicCheckpoint(sc, reliableCheckpoint = true) - } - - runTest("compression with lzf", skipLocalCheckpoint = true) { _: Boolean => - val sparkConf = new SparkConf() - sparkConf.set("spark.checkpoint.compress.codec", "lzf") - sc = new SparkContext("local", "test", sparkConf) - sc.setCheckpointDir(checkpointDir.toString) - testBasicCheckpoint(sc, reliableCheckpoint = true) - } - - runTest("compression size snappy", skipLocalCheckpoint = true) { _: Boolean => - testCompression(checkpointDir, "snappy") - } - - runTest("compression size lzf", skipLocalCheckpoint = true) { _: Boolean => - testCompression(checkpointDir, "lzf") - } - - runTest("compression size lz4", skipLocalCheckpoint = true) { _: Boolean => - testCompression(checkpointDir, "lz4") + val parCollection = sc.makeRDD(1 to 4) + val flatMappedRDD = parCollection.flatMap(x => 1 to x) + checkpoint(flatMappedRDD, reliableCheckpoint) + assert(flatMappedRDD.dependencies.head.rdd === parCollection) + val result = flatMappedRDD.collect() + assert(flatMappedRDD.dependencies.head.rdd != parCollection) + assert(flatMappedRDD.collect() === result) } runTest("checkpointing partitioners", skipLocalCheckpoint = true) { _: Boolean => @@ -386,7 +314,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } } - startSparkContext() testPartitionerCheckpointing(partitioner) // Test that corrupted partitioner file does not prevent recovery of RDD @@ -394,7 +321,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("RDDs with one-to-one dependencies") { reliableCheckpoint: Boolean => - startSparkContext() testRDD(_.map(x => x.toString), reliableCheckpoint) testRDD(_.flatMap(x => 1 to x), reliableCheckpoint) testRDD(_.filter(_ % 2 == 0), reliableCheckpoint) @@ -408,7 +334,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("ParallelCollectionRDD") { reliableCheckpoint: Boolean => - startSparkContext() val parCollection = sc.makeRDD(1 to 4, 2) val numPartitions = parCollection.partitions.size checkpoint(parCollection, reliableCheckpoint) @@ -425,7 +350,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("BlockRDD") { reliableCheckpoint: Boolean => - startSparkContext() val blockId = TestBlockId("id") val blockManager = SparkEnv.get.blockManager blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY) @@ -443,7 +367,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("ShuffleRDD") { reliableCheckpoint: Boolean => - startSparkContext() testRDD(rdd => { // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD new ShuffledRDD[Int, Int, Int](rdd.map(x => (x % 2, 1)), partitioner) @@ -451,14 +374,12 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("UnionRDD") { reliableCheckpoint: Boolean => - startSparkContext() def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1) testRDD(_.union(otherRDD), reliableCheckpoint) testRDDPartitions(_.union(otherRDD), reliableCheckpoint) } runTest("CartesianRDD") { reliableCheckpoint: Boolean => - startSparkContext() def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1) testRDD(new CartesianRDD(sc, _, otherRDD), reliableCheckpoint) testRDDPartitions(new CartesianRDD(sc, _, otherRDD), reliableCheckpoint) @@ -482,7 +403,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("CoalescedRDD") { reliableCheckpoint: Boolean => - startSparkContext() testRDD(_.coalesce(2), reliableCheckpoint) testRDDPartitions(_.coalesce(2), reliableCheckpoint) @@ -505,7 +425,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("CoGroupedRDD") { reliableCheckpoint: Boolean => - startSparkContext() val longLineageRDD1 = generateFatPairRDD() // Collect the RDD as sequences instead of arrays to enable equality tests in testRDD @@ -524,7 +443,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("ZippedPartitionsRDD") { reliableCheckpoint: Boolean => - startSparkContext() testRDD(rdd => rdd.zip(rdd.map(x => x)), reliableCheckpoint) testRDDPartitions(rdd => rdd.zip(rdd.map(x => x)), reliableCheckpoint) @@ -550,7 +468,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("PartitionerAwareUnionRDD") { reliableCheckpoint: Boolean => - startSparkContext() testRDD(rdd => { new PartitionerAwareUnionRDD[(Int, Int)](sc, Array( generateFatPairRDD(), @@ -585,7 +502,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean => - startSparkContext() val rdd = new BlockRDD[Int](sc, Array.empty[BlockId]) assert(rdd.partitions.size === 0) assert(rdd.isCheckpointed === false) @@ -600,7 +516,6 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("checkpointAllMarkedAncestors") { reliableCheckpoint: Boolean => - startSparkContext() testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = true) testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = false) } @@ -667,3 +582,42 @@ object CheckpointSuite { ).asInstanceOf[RDD[(K, Array[Iterable[V]])]] } } + +class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext { + + test("checkpoint compression") { + val checkpointDir = Utils.createTempDir() + try { + val conf = new SparkConf() + .set("spark.checkpoint.compress", "true") + .set("spark.ui.enabled", "false") + sc = new SparkContext("local", "test", conf) + sc.setCheckpointDir(checkpointDir.toString) + val rdd = sc.makeRDD(1 to 20, numSlices = 1) + rdd.checkpoint() + assert(rdd.collect().toSeq === (1 to 20)) + + // Verify that RDD is checkpointed + assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]]) + + val checkpointPath = new Path(rdd.getCheckpointFile.get) + val fs = checkpointPath.getFileSystem(sc.hadoopConfiguration) + val checkpointFile = + fs.listStatus(checkpointPath).map(_.getPath).find(_.getName.startsWith("part-")).get + + // Verify the checkpoint file is compressed, in other words, can be decompressed + val compressedInputStream = CompressionCodec.createCodec(conf) + .compressedInputStream(fs.open(checkpointFile)) + try { + ByteStreams.toByteArray(compressedInputStream) + } finally { + compressedInputStream.close() + } + + // Verify that the compressed content can be read back + assert(rdd.collect().toSeq === (1 to 20)) + } finally { + Utils.deleteRecursively(checkpointDir) + } + } +}