Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFi
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}

@VisibleForTesting
public ExternalShuffleBlockResolver getBlockResolver() {
return blockManager;
}

/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
public ExternalShuffleBlockHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ void close() {
}
}

@VisibleForTesting
public static File getFileForTest(String[] localDirs, int subDirsPerLocalDir, String filename) {
return getFile(localDirs, subDirsPerLocalDir, filename);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unused now, right? You can undo this change?

Copy link
Contributor Author

@weixiuli weixiuli Mar 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,your are right,i have fixxed it.


/**
* This method is needed to avoid the situation when multiple File instances for the
* same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
Expand Down
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import java.io.File
import java.util.concurrent.CountDownLatch

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -49,6 +50,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)

private val registeredExecutorsDB = "registeredExecutors.ldb"

private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
private val blockHandler = newShuffleBlockHandler(transportConf)
Expand All @@ -58,9 +61,29 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana

private val shuffleServiceSource = new ExternalShuffleServiceSource

protected def findRegisteredExecutorsDBFile(dbName: String): File = {
val localDirs = sparkConf.getOption("spark.local.dir").map(_.split(",")).getOrElse(Array())
if (localDirs.length >= 1) {
new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName)
} else {
logWarning(s"'spark.local.dir' should be set first when we use db in " +
s"ExternalShuffleService. Note that this only affects standalone mode.")
null
}
}

/** Get blockhandler */
def getBlockHandler: ExternalShuffleBlockHandler = {
blockHandler
}

/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
new ExternalShuffleBlockHandler(conf, null)
if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
new ExternalShuffleBlockHandler(conf, findRegisteredExecutorsDBFile(registeredExecutorsDB))
} else {
new ExternalShuffleBlockHandler(conf, null)
}
}

/** Starts the external shuffle service if the user has configured us to. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,15 @@ private[deploy] class Worker(
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)

// Remove some registeredExecutors information of DB in external shuffle service when
// #spark.shuffle.service.db.enabled=true, the one which comes to mind is, what happens
// if an application is stopped while the external shuffle service is down?
// So then it'll leave an entry in the DB and the entry should be removed.
if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
shuffleService.applicationRemoved(dir.getName)
}
}
}(cleanupThreadExecutor)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[spark] object Worker {

val WORKER_CLEANUP_ENABLED = ConfigBuilder("spark.worker.cleanup.enabled")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you changing this default? Honestly I am much less comfortable merging it with the default changed, as I don't have much experience w/ standalone mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As ,this commit depend on WORKER_CLEANUP_ENABLED . While ,we should keep the default value of spark.worker.cleanup.enabled = false . But ,We should make it clear in the docs that spark.worker.cleanup.enabled should be enabled if spark.shuffle.service.db.enabled is "true”, all right?


val WORKER_CLEANUP_INTERVAL = ConfigBuilder("spark.worker.cleanup.interval")
.longConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ package object config {
private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)

private[spark] val SHUFFLE_SERVICE_DB_ENABLED =
ConfigBuilder("spark.shuffle.service.db.enabled")
.doc("Whether to use db in ExternalShuffleService. Note that this only affects " +
"standalone mode.")
.booleanConf
.createWithDefault(true)

private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.io._
import java.nio.charset.StandardCharsets

import com.google.common.io.CharStreams

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
import org.apache.spark.network.shuffle.TestShuffleDataContext
import org.apache.spark.util.Utils

/**
* This suite gets BlockData when the ExternalShuffleService is restarted
* with #spark.shuffle.service.db.enabled = true or false
* Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false
*/
class ExternalShuffleServiceDbSuite extends SparkFunSuite {
val sortBlock0 = "Hello!"
val sortBlock1 = "World!"
val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"

var sparkConf: SparkConf = _
var dataContext: TestShuffleDataContext = _

var securityManager: SecurityManager = _
var externalShuffleService: ExternalShuffleService = _
var blockHandler: ExternalShuffleBlockHandler = _
var blockResolver: ExternalShuffleBlockResolver = _

override def beforeAll() {
super.beforeAll()
sparkConf = new SparkConf()
sparkConf.set("spark.shuffle.service.enabled", "true")
sparkConf.set("spark.local.dir", System.getProperty("java.io.tmpdir"))
Utils.loadDefaultSparkProperties(sparkConf, null)
securityManager = new SecurityManager(sparkConf)

dataContext = new TestShuffleDataContext(2, 5)
dataContext.create()
// Write some sort data.
dataContext.insertSortShuffleData(0, 0,
Array[Array[Byte]](sortBlock0.getBytes(StandardCharsets.UTF_8),
sortBlock1.getBytes(StandardCharsets.UTF_8)))
registerExecutor()
}

override def afterAll() {
try {
dataContext.cleanup()
} finally {
super.afterAll()
}
}

def registerExecutor(): Unit = {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)

// external Shuffle Service start
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver
blockResolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER))
} finally {
blockHandler.close()
// external Shuffle Service stop
externalShuffleService.stop()
}
}

// The beforeAll ensures the shuffle data was already written, and then
// the shuffle service was stopped. Here we restart the shuffle service
// and make we can read the shuffle data
test("Recover shuffle data with spark.shuffle.service.db.enabled=true after " +
"shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver

val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8))
block0Stream.close()
assert(sortBlock0 == block0)
// pass
} finally {
blockHandler.close()
// externalShuffleService stop
externalShuffleService.stop()
}

}

// The beforeAll ensures the shuffle data was already written, and then
// the shuffle service was stopped. Here we restart the shuffle service ,
// but we can't read the shuffle data
test("Can't recover shuffle data with spark.shuffle.service.db.enabled=false after" +
" shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "false")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver

val error = intercept[RuntimeException] {
blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
}.getMessage

assert(error.contains("not registered"))
} finally {
blockHandler.close()
// externalShuffleService stop
externalShuffleService.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@

package org.apache.spark.deploy.worker

import java.io.{File, IOException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Supplier

import scala.concurrent.duration._

import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Worker._
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.util.Utils

class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {

Expand Down Expand Up @@ -245,4 +250,48 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
assert(cleanupCalled.get() == value)
}

test("WorkDirCleanup cleans app dirs and shuffle metadata when " +
"spark.shuffle.service.db.enabled=true") {
testWorkDirCleanupAndRemoveMetadataWithConfig(true)
}

test("WorkdDirCleanup cleans only app dirs when" +
"spark.shuffle.service.db.enabled=false") {
testWorkDirCleanupAndRemoveMetadataWithConfig(false)
}

private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean) = {
val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString)
conf.set("spark.worker.cleanup.appDataTtl", "60")
conf.set("spark.shuffle.service.enabled", "true")

val appId = "app1"
val execId = "exec1"
val cleanupCalled = new AtomicBoolean(false)
when(shuffleService.applicationRemoved(any[String])).thenAnswer(new Answer[Unit] {
override def answer(invocations: InvocationOnMock): Unit = {
cleanupCalled.set(true)
}
})
val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
override def get: ExternalShuffleService = shuffleService
}
val worker = makeWorker(conf, externalShuffleServiceSupplier)
val workDir = Utils.createTempDir(namePrefix = "work")
// initialize workers
worker.workDir = workDir
// Create the executor's working directory
val executorDir = new File(worker.workDir, appId + "/" + execId)

if (!executorDir.exists && !executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
worker.receive(WorkDirCleanup)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
assert(!executorDir.exists() == true)
assert(cleanupCalled.get() == dbCleanupEnabled)
}
}
}
14 changes: 13 additions & 1 deletion docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,11 @@ SPARK_WORKER_OPTS supports the following system properties:
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.worker.cleanup.enabled</code></td>
<td>false</td>
<td>true</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have you changed this default value in the documentation? As I see it is still false.

val WORKER_CLEANUP_ENABLED = ConfigBuilder("spark.worker.cleanup.enabled")
.booleanConf
.createWithDefault(false)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry,i missed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe attila meant to revert your change to the docs, not to change the default

<td>
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
mode, as YARN works differently. Only the directories of stopped applications are cleaned up.
This must be enabled if spark.shuffle.service.db.enabled is "true"
</td>
</tr>
<tr>
Expand All @@ -260,6 +261,17 @@ SPARK_WORKER_OPTS supports the following system properties:
especially if you run jobs very frequently.
</td>
</tr>
<tr>
<td><spark.shuffle.service.db.enabled</code></td>
<td>true</td>
<td>
Enable record RegisteredExecutors information by leveldb, which can be reloaded and
used again when the external shuffle service is restarted. Note that this only affects standalone
mode, its has always on for yarn. We should Enable `spark.worker.cleanup.enabled` to remove the entry
(It will leave an entry in the DB forever when an application is stopped while the external shuffle
service is down) in the leveldb with WorkDirCleanup. It may be removed in the future.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor rewordings:

Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will
automatically reload info on current executors.  This only affects standalone mode (yarn always has this behavior
enabled).  You should also enable <code>spark.worker.cleanup.enabled</code>, to ensure that the state
eventually gets cleaned up.  This config may be removed in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clear description, thank you.

</td>
</tr>
<tr>
<td><code>spark.storage.cleanupFilesAfterExecutorExit</code></td>
<td>true</td>
Expand Down