Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0a2d0d7
Remove shuffle blocks using the shuffle service for released executors
Kimahriman Dec 22, 2021
92af8ae
Check for existing executor
Kimahriman Jan 4, 2022
2a0dfac
Fix to work through the context cleaner
Kimahriman Jan 12, 2022
1190470
Create shuffle files as group writable
Kimahriman Jan 14, 2022
d12b6b2
Make sure external shuffle is used and clean some things up
Kimahriman Jan 16, 2022
b764141
Create disk block dirs as group writable rather than files
Kimahriman Jan 16, 2022
b837362
Add test for block manager sub dir being group writable
Kimahriman Jan 16, 2022
f6b7560
Use the create dir with 770 helper
Kimahriman Jan 19, 2022
3689b9e
World readable block file approach
Kimahriman Jan 23, 2022
7be49a0
Create final shuffle files correctly with world readable
Kimahriman Jan 25, 2022
4e62191
Update LocalDiskSingleSpillMapOutputWriter.java
Kimahriman Jan 25, 2022
940a934
Fix tests and lint
Kimahriman Jan 26, 2022
e4a71af
Rework some conditional checks
Kimahriman Jan 29, 2022
2b4373f
Add feature flag
Kimahriman Feb 3, 2022
9698b09
Change config name
Kimahriman Feb 27, 2022
126955b
Only change permissions if removing shuffle through external shuffle …
Kimahriman Mar 17, 2022
cc79384
Update description, add to markdown, and reorder logic
Kimahriman Mar 17, 2022
02057b8
Enable settings for test
Kimahriman Mar 17, 2022
b55eb20
Add permission changing back to temp shuffle block
Kimahriman Mar 17, 2022
84f2929
Apply suggestions for comment change
Kimahriman Mar 18, 2022
61aa9f0
Fix typo, add negative test, and remove duplicate check
Kimahriman Mar 18, 2022
d34be20
Default config to false
Kimahriman Mar 18, 2022
3d2fd7b
Merge branch 'shuffle-service-remove-shuffle-blocks' of github.com:Ki…
Kimahriman Mar 18, 2022
ad0f9eb
Update permissions for RDD blocks if shuffle service fetching is enab…
Kimahriman Mar 22, 2022
c39cdf3
Update comments and use locking withMapStatuses
Kimahriman Mar 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ private[spark] class DiskBlockManager(

private val shutdownHook = addShutdownHook()

private val shuffleServiceRemoveShuffleEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED)
// If either of these features are enabled, we must change permissions on block manager
// directories and files to accomodate the shuffle service deleting files in a secure environment
private val permissionChangingRequired = conf.get(config.SHUFFLE_SERVICE_ENABLED) && (
conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) ||
conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
)

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
Expand All @@ -100,7 +104,7 @@ private[spark] class DiskBlockManager(
if (!newDir.exists()) {
val path = newDir.toPath
Files.createDirectory(path)
if (shuffleServiceRemoveShuffleEnabled) {
if (permissionChangingRequired) {
// SPARK-37618: Create dir as group writable so files within can be deleted by the
// shuffle service in a secure setup. This will remove the setgid bit so files created
// within won't be created with the parent folder group.
Expand Down Expand Up @@ -201,7 +205,7 @@ private[spark] class DiskBlockManager(
*/
def createTempFileWith(file: File): File = {
val tmpFile = Utils.tempFileWith(file)
if (shuffleServiceRemoveShuffleEnabled) {
if (permissionChangingRequired) {
// SPARK-37618: we need to make the file world readable because the parent will
// lose the setgid bit when making it group writable. Without this the shuffle
// service can't read the shuffle files in a secure setup.
Expand All @@ -226,7 +230,7 @@ private[spark] class DiskBlockManager(
blockId = new TempShuffleBlockId(UUID.randomUUID())
}
val tmpFile = getFile(blockId)
if (shuffleServiceRemoveShuffleEnabled) {
if (permissionChangingRequired) {
// SPARK-37618: we need to make the file world readable because the parent will
// lose the setgid bit when making it group writable. Without this the shuffle
// service can't read the shuffle files in a secure setup.
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ private[spark] class DiskStore(
private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
private val blockSizes = new ConcurrentHashMap[BlockId, Long]()

private val shuffleServiceFetchRddEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)

def getSize(blockId: BlockId): Long = blockSizes.get(blockId)

/**
Expand All @@ -71,6 +74,13 @@ private[spark] class DiskStore(
logDebug(s"Attempting to put block $blockId")
val startTimeNs = System.nanoTime()
val file = diskManager.getFile(blockId)

// SPARK-37618: If fetching cached RDDs from the shuffle service is enabled, we must
// make the file world readable, as it will not be owned by the gropu running the shuffle
// service in a secure environment
if (shuffleServiceFetchRddEnabled) {
diskManager.createWorldReadableFile(file)
}
val out = new CountingWritableChannel(openForWrite(file))
var threwException: Boolean = true
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark

import java.io.File
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission

import scala.concurrent.Promise
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -106,7 +108,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
}

test("SPARK-25888: using external shuffle service fetching disk persisted blocks") {
val confWithRddFetchEnabled = conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
val confWithRddFetchEnabled = conf.clone
.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled)
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
Expand All @@ -118,13 +122,42 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
rdd.count()

val blockId = RDDBlockId(rdd.id, 0)
eventually(timeout(2.seconds), interval(100.milliseconds)) {
val bms = eventually(timeout(2.seconds), interval(100.milliseconds)) {
val locations = sc.env.blockManager.master.getLocations(blockId)
assert(locations.size === 2)
assert(locations.map(_.port).contains(server.getPort),
"external shuffle service port should be contained")
locations
}

val dirManager = sc.env.blockManager.hostLocalDirManager
.getOrElse(fail("No host local dir manager"))

val promises = bms.map { case bmid =>
val promise = Promise[File]()
dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) {
case scala.util.Success(res) => res.foreach { case (eid, dirs) =>
val file = new File(ExecutorDiskUtils.getFilePath(dirs,
sc.env.blockManager.subDirsPerLocalDir, blockId.name))
promise.success(file)
}
case scala.util.Failure(error) => promise.failure(error)
}
promise.future
}
val filesToCheck = promises.map(p => ThreadUtils.awaitResult(p, Duration(2, "sec")))

filesToCheck.foreach(f => {
val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath)
assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE))

// On most operating systems the default umask will make this test pass
// even if the permission isn't changed. To properly test this, run the
// test with a umask of 0027
val perms = Files.getPosixFilePermissions(f.toPath)
assert(perms.contains(PosixFilePermission.OTHERS_READ))
})

sc.killExecutors(sc.getExecutorIds())

eventually(timeout(2.seconds), interval(100.milliseconds)) {
Expand Down Expand Up @@ -191,6 +224,19 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
assert(filesToCheck.length == 4)
assert(filesToCheck.forall(_.exists()))

if (enabled) {
filesToCheck.foreach(f => {
val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath)
assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE))

// On most operating systems the default umask will make this test pass
// even if the permission isn't changed. To properly test this, run the
// test with a umask of 0027
val perms = Files.getPosixFilePermissions(f.toPath)
assert(perms.contains(PosixFilePermission.OTHERS_READ))
})
}

sc.killExecutors(sc.getExecutorIds())
eventually(timeout(2.seconds), interval(100.milliseconds)) {
assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty)
Expand Down