Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2658,7 +2658,7 @@ object SparkContext extends Logging {
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url)
new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager)
} else {
new MesosSchedulerBackend(scheduler, sc, url)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.deploy

import java.util.concurrent.CountDownLatch

import org.apache.spark.network.util.TransportConf

import scala.collection.JavaConversions._

import org.apache.spark.{Logging, SparkConf, SecurityManager}
Expand All @@ -37,15 +39,17 @@ import org.apache.spark.util.Utils
* Optionally requires SASL authentication in order to read. See [[SecurityManager]].
*/
private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
class ExternalShuffleService(
sparkConf: SparkConf,
securityManager: SecurityManager,
transportConf: TransportConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is confusing: we pass in both a SparkConf and a TransportConf, even though one is created from another one.

blockHandler: ExternalShuffleBlockHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do this a different way, just define a

protected def newBlockHandler(): ExternalShuffleBlockHandler = {
  new ExternalShuffleBlockHandler(transportConf)
}

and then have the subclass override it. Then you won't have the weird conf signature.

extends Logging {

private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = new TransportContext(transportConf, blockHandler)

private var server: TransportServer = _
Expand Down Expand Up @@ -100,7 +104,9 @@ object ExternalShuffleService extends Logging {
// we override this value since this service is started from the command line
// and we assume the user really wants it to be running
sparkConf.set("spark.shuffle.service.enabled", "true")
server = new ExternalShuffleService(sparkConf, securityManager)
val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
val blockHandler = new ExternalShuffleBlockHandler(transportConf)
server = new ExternalShuffleService(sparkConf, securityManager, transportConf, blockHandler)
server.start()

installShutdownHook()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.mesos

import java.net.SocketAddress
import java.util.concurrent.CountDownLatch

import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf}

import scala.collection.mutable


/**
* MesosExternalShuffleServiceEndpoint is a RPC endpoint that receives
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just start with An RPC endpoint that.... It's kinda verbose right now.

* registration requests from Spark drivers launched with Mesos.
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]
*/
private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
extends ExternalShuffleBlockHandler(transportConf) with Logging {

// Stores a map of driver socket addresses to app ids
private val connectedApps = new mutable.HashMap[SocketAddress, String]

protected override def handleMessage(
message: BlockTransferMessage,
client: TransportClient,
callback: RpcResponseCallback): Unit = {
message match {
case RegisterDriverParam(appId) =>
val address = client.getSocketAddress()
logDebug(s"Received registration request from app $appId, address $address")
if (connectedApps.contains(address)) {
val existingAppId: String = connectedApps(address)
if (!existingAppId.equals(appId)) {
logError(s"A new app id $appId has connected to existing address $address" +
s", removing registered app $existingAppId")
applicationRemoved(existingAppId, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you name this boolean parameter, e.g. cleanup = true

}
}
connectedApps(address) = appId
callback.onSuccess(new Array[Byte](0))
case _ => super.handleMessage(message, client, callback)
}
}

override def connectionTerminated(client: TransportClient): Unit = {
val address = client.getSocketAddress()
if (connectedApps.contains(address)) {
val appId = connectedApps(address)
logInfo(s"Application $appId disconnected (address was $address)")
applicationRemoved(appId, true)
connectedApps.remove(address)
} else {
logWarning(s"Address $address not found in mesos shuffle service")
}
}
}

/**
* An extractor object for matching RegisterDriver message.
*/
private[mesos] object RegisterDriverParam {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be strictly private. It could even go into the MesosShuffleBlockHandler class since it's only used there

def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId())
}

/**
* MesosExternalShuffleService wraps [[ExternalShuffleService]] which provides an additional
* endpoint for drivers to associate with. This allows the shuffle service to detect when
* a driver is terminated and can further clean up the cached shuffle data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically incorrect: shuffle data is not cached. Just say the clean up the shuffle files

*/
private[mesos] class MesosExternalShuffleService(
conf: SparkConf,
securityManager: SecurityManager,
transportConf: TransportConf)
extends ExternalShuffleService(
conf, securityManager, transportConf, new MesosExternalShuffleBlockHandler(transportConf)) {
}

private[spark] object MesosExternalShuffleService extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole thing is duplicated from ExternalShuffleService. We should at least attempt to reuse some of the code... I think if you make this object inherit the ExternalShuffleService object we do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind you can't extend objects.

val SYSTEM_NAME = "mesosExternalShuffleService"
val ENDPOINT_NAME = "mesosExternalShuffleServiceEndpoint"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used, please clean up the code after an update


@volatile
private var server: MesosExternalShuffleService = _

private val barrier = new CountDownLatch(1)

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
Utils.loadDefaultSparkProperties(sparkConf)
val securityManager = new SecurityManager(sparkConf)

// we override this value since this service is started from the command line
// and we assume the user really wants it to be running
sparkConf.set("spark.shuffle.service.enabled", "true")
val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
server = new MesosExternalShuffleService(
sparkConf, securityManager, transportConf)
server.start()

installShutdownHook()

// keep running until the process is terminated
barrier.await()
}

private def installShutdownHook(): Unit = {
Runtime.getRuntime.addShutdownHook(
new Thread("Mesos External Shuffle Service shutdown thread") {
override def run() {
logInfo("Shutting down Mesos shuffle service.")
server.stop()
barrier.countDown()
}
})
}
}


33 changes: 17 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@

package org.apache.spark.deploy.worker

import java.io.File
import java.io.IOException
import java.io.{File, IOException}
import java.text.SimpleDateFormat
import java.util.{UUID, Date}
import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture, _}
import java.util.{Date, UUID}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
import scala.util.Random
import scala.util.control.NonFatal

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState, ExternalShuffleService}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.rpc._
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
import org.apache.spark.util.{SignalLogger, ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}

import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
import scala.util.Random
import scala.util.control.NonFatal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these imports should be organized (how many of these are not used?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this file doesn't need to change if we don't modify the constructor as suggested above


private[deploy] class Worker(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -127,8 +125,11 @@ private[deploy] class Worker(
val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)

private val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 0)

// The shuffle service is not actually started unless configured.
private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val shuffleService = new ExternalShuffleService(
conf, securityMgr, transportConf, new ExternalShuffleBlockHandler(transportConf))

private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
*
* It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
*
* The lift-cycle will be:
* The life-cycle of an endpoint is:
*
* constructor onStart receive* onStop
* constructor -> onStart -> receive* -> onStop
*
* Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use
* Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use
* [[ThreadSafeRpcEndpoint]]
*
* If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ import java.io.File
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState, _}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}


/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand All @@ -46,7 +49,8 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
private[spark] class CoarseMesosSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String)
master: String,
securityManager: SecurityManager)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with MScheduler
with MesosSchedulerUtils {
Expand All @@ -62,6 +66,9 @@ private[spark] class CoarseMesosSchedulerBackend(

val slaveIdsWithExecutors = new HashSet[String]

// Maping from slave Id to hostname. Only used when shuffle service is enabled.
val slaveIdsToHost = new HashMap[String, String]

val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
// How many times tasks on each slave failed
val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
Expand Down Expand Up @@ -90,6 +97,12 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))

private val mesosExternalShuffleClient = new MesosExternalShuffleClient(
SparkTransportConf.fromSparkConf(conf),
securityManager,
securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled());

var nextMesosTaskId = 0

@volatile var appId: String = _
Expand Down Expand Up @@ -188,6 +201,7 @@ private[spark] class CoarseMesosSchedulerBackend(

override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId.getValue
mesosExternalShuffleClient.init(appId)
logInfo("Registered as framework ID " + appId)
markRegistered()
}
Expand Down Expand Up @@ -244,6 +258,9 @@ private[spark] class CoarseMesosSchedulerBackend(

// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
if (conf.getBoolean("spark.shuffle.service.enabled", false)) {
slaveIdsToHost(offer.getSlaveId.getValue) = offer.getHostname
}
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
Expand All @@ -261,7 +278,24 @@ private[spark] class CoarseMesosSchedulerBackend(
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
logInfo(s"Mesos task $taskId is now $state")
val slaveId: String = status.getSlaveId.getValue
stateLock.synchronized {
if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block of code needs a huge comment:

// If the shuffle service is enabled, have the driver register with each one
// of the shuffle services. This allows the shuffle services to clean up state
// associated with this application when the driver exits. There is currently
// not a great way to detect this through Mesos, since the shuffle services
// are set up independently.

slaveIdsToHost.contains(slaveId)) {
// If the shuffle service is enabled, have the driver register with each one
// of the shuffle services. This allows the shuffle services to clean up state
// associated with this application when the driver exits. There is currently
// not a great way to detect this through Mesos, since the shuffle services
// are set up independently.
// TODO: Remove this and allow the MesosExternalShuffleService to detect
// framework termination when new Mesos Framework HTTP API is available.
val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
val hostname = slaveIdsToHost.remove(slaveId).get
logDebug(s"Connecting to shuffle service on slave ${slaveId}, " +
s"host $hostname, port $externalShufflePort for app ${conf.getAppId}")
mesosExternalShuffleClient.registerDriverWithShuffleService(hostname, externalShufflePort)
}

if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
slaveIdsWithExecutors -= slaveId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter

import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}

class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
Expand Down Expand Up @@ -59,7 +59,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") {
val securityManager = mock[SecurityManager]
val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
Expand Down
Loading