Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,10 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val CHECKPOINT_COMPRESS =
ConfigBuilder("spark.checkpoint.compress")
.doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " +
"spark.io.compression.codec.")
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.rdd

import java.io.{FileNotFoundException, IOException}
import java.util.concurrent.TimeUnit

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand All @@ -27,6 +28,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CHECKPOINT_COMPRESS
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
Expand Down Expand Up @@ -119,6 +122,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()

val sc = originalRDD.sparkContext

Expand All @@ -140,6 +144,10 @@ private[spark] object ReliableCheckpointRDD extends Logging {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}

val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took $checkpointDurationMs ms.")

val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
Expand Down Expand Up @@ -169,7 +177,12 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
val fileStream = fs.create(tempOutputPath, false, bufferSize)
if (env.conf.get(CHECKPOINT_COMPRESS)) {
CompressionCodec.createCodec(env.conf).compressedOutputStream(fileStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question I had even with the earlier PR was - should we add the extension to either the directory or the file indicating compression type ?

} else {
fileStream
}
} else {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize,
Expand Down Expand Up @@ -273,7 +286,14 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val env = SparkEnv.get
val fs = path.getFileSystem(broadcastedConf.value.value)
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileInputStream = fs.open(path, bufferSize)
val fileInputStream = {
val fileStream = fs.open(path, bufferSize)
if (env.conf.get(CHECKPOINT_COMPRESS)) {
CompressionCodec.createCodec(env.conf).compressedInputStream(fileStream)
} else {
fileStream
}
}
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)

Expand Down
41 changes: 41 additions & 0 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import java.io.File

import scala.reflect.ClassTag

import com.google.common.io.ByteStreams
import org.apache.hadoop.fs.Path

import org.apache.spark.io.CompressionCodec
import org.apache.spark.rdd._
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -580,3 +582,42 @@ object CheckpointSuite {
).asInstanceOf[RDD[(K, Array[Iterable[V]])]]
}
}

class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {

test("checkpoint compression") {
val checkpointDir = Utils.createTempDir()
try {
val conf = new SparkConf()
.set("spark.checkpoint.compress", "true")
.set("spark.ui.enabled", "false")
sc = new SparkContext("local", "test", conf)
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
rdd.checkpoint()
assert(rdd.collect().toSeq === (1 to 20))

// Verify that RDD is checkpointed
assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]])

val checkpointPath = new Path(rdd.getCheckpointFile.get)
val fs = checkpointPath.getFileSystem(sc.hadoopConfiguration)
val checkpointFile =
fs.listStatus(checkpointPath).map(_.getPath).find(_.getName.startsWith("part-")).get

// Verify the checkpoint file is compressed, in other words, can be decompressed
val compressedInputStream = CompressionCodec.createCodec(conf)
.compressedInputStream(fs.open(checkpointFile))
try {
ByteStreams.toByteArray(compressedInputStream)
} finally {
compressedInputStream.close()
}

// Verify that the compressed content can be read back
assert(rdd.collect().toSeq === (1 to 20))
} finally {
Utils.deleteRecursively(checkpointDir)
}
}
}