Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {

override def beforeAll() {
super.beforeAll()
// Clear the local cache directory
// Once 'spark.local.dir' is set, it is cached. Unless this is manually cleared
// before/after a test, it could return the same directory even if this property
// is configured.
Utils.clearLocalRootDirs()
conf.set("spark.shuffle.manager", "sort")
}
Expand All @@ -52,7 +54,6 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
override def afterEach(): Unit = {
try {
Utils.deleteRecursively(tempDir)
// Clear the local cache directory
Utils.clearLocalRootDirs()
} finally {
super.afterEach()
Expand Down
111 changes: 62 additions & 49 deletions mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.language.existentials
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.TrueFileFilter

import org.scalatest.BeforeAndAfterEach

import org.apache.spark._
Expand Down Expand Up @@ -748,79 +747,93 @@ class ALSSuite

class ALSCleanerSuite extends SparkFunSuite with BeforeAndAfterEach {

@transient var sc: SparkContext = _
private var localDir: File = _
private var checkpointDir: File = _
val conf = new SparkConf()
private val conf = new SparkConf()

override def beforeEach(): Unit = {
super.beforeEach()
// Once 'spark.local.dir' is set, it is cached. Unless this is manually cleared
// before/after a test, it could return the same directory even if this property
// is configured.
Utils.clearLocalRootDirs()
localDir = Utils.createTempDir()
checkpointDir = Utils.createTempDir()
conf.set("spark.local.dir", localDir.getAbsolutePath)
}

override def afterEach(): Unit = {
try {
sc.stop()
sc = null
Utils.deleteRecursively(localDir)
Utils.deleteRecursively(checkpointDir)
// Clear the local cache directory
Utils.clearLocalRootDirs()
} finally {
super.afterEach()
}
}

test("ALS shuffle cleanup standalone") {
val checkpointDir = Utils.createTempDir()
def getAllFiles: Set[File] =
FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
conf.set("spark.local.dir", localDir.getAbsolutePath)
sc = new SparkContext("local[2]", "test", conf)
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Test checkpoint and clean parents
val input = sc.parallelize(1 to 1000)
val keyed = input.map(x => (x % 20, 1))
val shuffled = keyed.reduceByKey(_ + _)
val keysOnly = shuffled.keys
val deps = keysOnly.dependencies
keysOnly.count()
ALS.cleanShuffleDependencies(sc, deps, true)
val resultingFiles = getAllFiles
assert(resultingFiles === Set())
// Ensure running count again works fine even if we kill the shuffle files.
keysOnly.count()
try {
val sc = new SparkContext("local[2]", "test", conf)
try {
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Test checkpoint and clean parents
val input = sc.parallelize(1 to 1000)
val keyed = input.map(x => (x % 20, 1))
val shuffled = keyed.reduceByKey(_ + _)
val keysOnly = shuffled.keys
val deps = keysOnly.dependencies
keysOnly.count()
ALS.cleanShuffleDependencies(sc, deps, true)
val resultingFiles = getAllFiles
assert(resultingFiles === Set())
// Ensure running count again works fine even if we kill the shuffle files.
keysOnly.count()
} finally {
sc.stop()
}
} finally {
Utils.deleteRecursively(checkpointDir)
}
}

test("ALS shuffle cleanup in algorithm") {
val checkpointDir = Utils.createTempDir()
def getAllFiles: Set[File] =
FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
sc = new SparkContext("local[2]", "test", conf)
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Generate test data
val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0)
// Implicitly test the cleaning of parents during ALS training
val spark = SparkSession.builder
.master("local[2]")
.appName("ALSCleanerSuite")
.sparkContext(sc)
.getOrCreate()
import spark.implicits._
val als = new ALS()
.setRank(1)
.setRegParam(1e-5)
.setSeed(0)
.setCheckpointInterval(1)
.setMaxIter(7)
val model = als.fit(training.toDF())
val resultingFiles = getAllFiles
// We expect the last shuffles files, block ratings, user factors, and item factors to be
// around but no more.
val pattern = "shuffle_(\\d+)_.+\\.data".r
val rddIds = resultingFiles.flatMap { f =>
pattern.findAllIn(f.getName()).matchData.map { _.group(1) } }
assert(rddIds.size === 4)
try {
val sc = new SparkContext("local[2]", "test", conf)
try {
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Generate test data
val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0)
// Implicitly test the cleaning of parents during ALS training
val spark = SparkSession.builder
.master("local[2]")
.appName("ALSCleanerSuite")
.sparkContext(sc)
.getOrCreate()
import spark.implicits._
val als = new ALS()
.setRank(1)
.setRegParam(1e-5)
.setSeed(0)
.setCheckpointInterval(1)
.setMaxIter(7)
val model = als.fit(training.toDF())
val resultingFiles = getAllFiles
// We expect the last shuffles files, block ratings, user factors, and item factors to be
// around but no more.
val pattern = "shuffle_(\\d+)_.+\\.data".r
val rddIds = resultingFiles.flatMap { f =>
pattern.findAllIn(f.getName()).matchData.map { _.group(1) } }
assert(rddIds.size === 4)
} finally {
sc.stop()
}
} finally {
Utils.deleteRecursively(checkpointDir)
}
}
}

Expand Down