Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFi
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}

@VisibleForTesting
public ExternalShuffleBlockResolver getBlockResolver() {
return blockManager;
}

/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
public ExternalShuffleBlockHandler(
Expand Down
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import java.io.File
import java.util.concurrent.CountDownLatch

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -49,6 +50,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)

private val registeredExecutorsDB = "registeredExecutors.ldb"

private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
private val blockHandler = newShuffleBlockHandler(transportConf)
Expand All @@ -58,9 +61,29 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana

private val shuffleServiceSource = new ExternalShuffleServiceSource

protected def findRegisteredExecutorsDBFile(dbName: String): File = {
val localDirs = sparkConf.getOption("spark.local.dir").map(_.split(",")).getOrElse(Array())
if (localDirs.length >= 1) {
new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName)
} else {
logWarning(s"'spark.local.dir' should be set first when we use db in " +
s"ExternalShuffleService. Note that this only affects standalone mode.")
null
}
}

/** Get blockhandler */
def getBlockHandler: ExternalShuffleBlockHandler = {
blockHandler
}

/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
new ExternalShuffleBlockHandler(conf, null)
if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
new ExternalShuffleBlockHandler(conf, findRegisteredExecutorsDBFile(registeredExecutorsDB))
} else {
new ExternalShuffleBlockHandler(conf, null)
}
}

/** Starts the external shuffle service if the user has configured us to. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,15 @@ private[deploy] class Worker(
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)

// Remove some registeredExecutors information of DB in external shuffle service when
// #spark.shuffle.service.db.enabled=true, the one which comes to mind is, what happens
// if an application is stopped while the external shuffle service is down?
// So then it'll leave an entry in the DB and the entry should be removed.
if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
shuffleService.applicationRemoved(dir.getName)
}
}
}(cleanupThreadExecutor)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ package object config {
private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)

private[spark] val SHUFFLE_SERVICE_DB_ENABLED =
ConfigBuilder("spark.shuffle.service.db.enabled")
.doc("Whether to use db in ExternalShuffleService. Note that this only affects " +
"standalone mode.")
.booleanConf
.createWithDefault(true)

private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.io._
import java.nio.charset.StandardCharsets

import com.google.common.io.CharStreams

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
import org.apache.spark.network.shuffle.TestShuffleDataContext
import org.apache.spark.util.Utils

/**
* This suite gets BlockData when the ExternalShuffleService is restarted
* with #spark.shuffle.service.db.enabled = true or false
* Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false
*/
class ExternalShuffleServiceDbSuite extends SparkFunSuite {
val sortBlock0 = "Hello!"
val sortBlock1 = "World!"
val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"

var sparkConf: SparkConf = _
var dataContext: TestShuffleDataContext = _

var securityManager: SecurityManager = _
var externalShuffleService: ExternalShuffleService = _
var blockHandler: ExternalShuffleBlockHandler = _
var blockResolver: ExternalShuffleBlockResolver = _

override def beforeAll() {
super.beforeAll()
sparkConf = new SparkConf()
sparkConf.set("spark.shuffle.service.enabled", "true")
sparkConf.set("spark.local.dir", System.getProperty("java.io.tmpdir"))
Utils.loadDefaultSparkProperties(sparkConf, null)
securityManager = new SecurityManager(sparkConf)

dataContext = new TestShuffleDataContext(2, 5)
dataContext.create()
// Write some sort data.
dataContext.insertSortShuffleData(0, 0,
Array[Array[Byte]](sortBlock0.getBytes(StandardCharsets.UTF_8),
sortBlock1.getBytes(StandardCharsets.UTF_8)))
registerExecutor()
}

override def afterAll() {
try {
dataContext.cleanup()
} finally {
super.afterAll()
}
}

def registerExecutor(): Unit = {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)

// external Shuffle Service start
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver
blockResolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER))
} finally {
blockHandler.close()
// external Shuffle Service stop
externalShuffleService.stop()
}
}

// The beforeAll ensures the shuffle data was already written, and then
// the shuffle service was stopped. Here we restart the shuffle service
// and make we can read the shuffle data
test("Recover shuffle data with spark.shuffle.service.db.enabled=true after " +
"shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver

val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8))
block0Stream.close()
assert(sortBlock0 == block0)
// pass
} finally {
blockHandler.close()
// externalShuffleService stop
externalShuffleService.stop()
}

}

// The beforeAll ensures the shuffle data was already written, and then
// the shuffle service was stopped. Here we restart the shuffle service ,
// but we can't read the shuffle data
test("Can't recover shuffle data with spark.shuffle.service.db.enabled=false after" +
" shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "false")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver

val error = intercept[RuntimeException] {
blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
}.getMessage

assert(error.contains("not registered"))
} finally {
blockHandler.close()
// externalShuffleService stop
externalShuffleService.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@

package org.apache.spark.deploy.worker

import java.io.{File, IOException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Supplier

import scala.concurrent.duration._

import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Worker._
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.util.Utils

class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {

Expand Down Expand Up @@ -245,4 +250,48 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
assert(cleanupCalled.get() == value)
}

test("WorkDirCleanup cleans app dirs and shuffle metadata when " +
"spark.shuffle.service.db.enabled=true") {
testWorkDirCleanupAndRemoveMetadataWithConfig(true)
}

test("WorkdDirCleanup cleans only app dirs when" +
"spark.shuffle.service.db.enabled=false") {
testWorkDirCleanupAndRemoveMetadataWithConfig(false)
}

private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean) = {
val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString)
conf.set("spark.worker.cleanup.appDataTtl", "60")
conf.set("spark.shuffle.service.enabled", "true")

val appId = "app1"
val execId = "exec1"
val cleanupCalled = new AtomicBoolean(false)
when(shuffleService.applicationRemoved(any[String])).thenAnswer(new Answer[Unit] {
override def answer(invocations: InvocationOnMock): Unit = {
cleanupCalled.set(true)
}
})
val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
override def get: ExternalShuffleService = shuffleService
}
val worker = makeWorker(conf, externalShuffleServiceSupplier)
val workDir = Utils.createTempDir(namePrefix = "work")
// initialize workers
worker.workDir = workDir
// Create the executor's working directory
val executorDir = new File(worker.workDir, appId + "/" + execId)

if (!executorDir.exists && !executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
worker.receive(WorkDirCleanup)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
assert(!executorDir.exists() == true)
assert(cleanupCalled.get() == dbCleanupEnabled)
}
}
}
11 changes: 11 additions & 0 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ SPARK_WORKER_OPTS supports the following system properties:
<td>
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
mode, as YARN works differently. Only the directories of stopped applications are cleaned up.
This should be enabled if spark.shuffle.service.db.enabled is "true"
</td>
</tr>
<tr>
Expand All @@ -260,6 +261,16 @@ SPARK_WORKER_OPTS supports the following system properties:
especially if you run jobs very frequently.
</td>
</tr>
<tr>
<td><spark.shuffle.service.db.enabled</code></td>
<td>true</td>
<td>
Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will
automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior
enabled). You should also enable <code>spark.worker.cleanup.enabled</code>, to ensure that the state
eventually gets cleaned up. This config may be removed in the future.
</td>
</tr>
<tr>
<td><code>spark.storage.cleanupFilesAfterExecutorExit</code></td>
<td>true</td>
Expand Down