diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a1c66ef4fc5ea..6f336a7c299ab 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2658,7 +2658,7 @@ object SparkContext extends Logging { val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false) val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs val backend = if (coarseGrained) { - new CoarseMesosSchedulerBackend(scheduler, sc, url) + new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager) } else { new MesosSchedulerBackend(scheduler, sc, url) } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 4089c3e771fa8..1ab40d0f15435 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy import java.util.concurrent.CountDownLatch +import org.apache.spark.network.util.TransportConf + import scala.collection.JavaConversions._ import org.apache.spark.{Logging, SparkConf, SecurityManager} @@ -37,15 +39,17 @@ import org.apache.spark.util.Utils * Optionally requires SASL authentication in order to read. See [[SecurityManager]]. */ private[deploy] -class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) +class ExternalShuffleService( + sparkConf: SparkConf, + securityManager: SecurityManager, + transportConf: TransportConf, + blockHandler: ExternalShuffleBlockHandler) extends Logging { private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) private val useSasl: Boolean = securityManager.isAuthenticationEnabled() - private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0) - private val blockHandler = new ExternalShuffleBlockHandler(transportConf) private val transportContext: TransportContext = new TransportContext(transportConf, blockHandler) private var server: TransportServer = _ @@ -100,7 +104,9 @@ object ExternalShuffleService extends Logging { // we override this value since this service is started from the command line // and we assume the user really wants it to be running sparkConf.set("spark.shuffle.service.enabled", "true") - server = new ExternalShuffleService(sparkConf, securityManager) + val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0) + val blockHandler = new ExternalShuffleBlockHandler(transportConf) + server = new ExternalShuffleService(sparkConf, securityManager, transportConf, blockHandler) server.start() installShutdownHook() diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala new file mode 100644 index 0000000000000..128b5f2e00548 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.net.SocketAddress +import java.util.concurrent.CountDownLatch + +import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage +import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver +import org.apache.spark.network.util.TransportConf +import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SecurityManager, SparkConf} + +import scala.collection.mutable + + +/** + * MesosExternalShuffleServiceEndpoint is a RPC endpoint that receives + * registration requests from Spark drivers launched with Mesos. + * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]] + */ +private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf) + extends ExternalShuffleBlockHandler(transportConf) with Logging { + + // Stores a map of driver socket addresses to app ids + private val connectedApps = new mutable.HashMap[SocketAddress, String] + + protected override def handleMessage( + message: BlockTransferMessage, + client: TransportClient, + callback: RpcResponseCallback): Unit = { + message match { + case RegisterDriverParam(appId) => + val address = client.getSocketAddress() + logDebug(s"Received registration request from app $appId, address $address") + if (connectedApps.contains(address)) { + val existingAppId: String = connectedApps(address) + if (!existingAppId.equals(appId)) { + logError(s"A new app id $appId has connected to existing address $address" + + s", removing registered app $existingAppId") + applicationRemoved(existingAppId, true) + } + } + connectedApps(address) = appId + callback.onSuccess(new Array[Byte](0)) + case _ => super.handleMessage(message, client, callback) + } + } + + override def connectionTerminated(client: TransportClient): Unit = { + val address = client.getSocketAddress() + if (connectedApps.contains(address)) { + val appId = connectedApps(address) + logInfo(s"Application $appId disconnected (address was $address)") + applicationRemoved(appId, true) + connectedApps.remove(address) + } else { + logWarning(s"Address $address not found in mesos shuffle service") + } + } +} + +/** + * An extractor object for matching RegisterDriver message. + */ +private[mesos] object RegisterDriverParam { + def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId()) +} + +/** + * MesosExternalShuffleService wraps [[ExternalShuffleService]] which provides an additional + * endpoint for drivers to associate with. This allows the shuffle service to detect when + * a driver is terminated and can further clean up the cached shuffle data. + */ +private[mesos] class MesosExternalShuffleService( + conf: SparkConf, + securityManager: SecurityManager, + transportConf: TransportConf) + extends ExternalShuffleService( + conf, securityManager, transportConf, new MesosExternalShuffleBlockHandler(transportConf)) { +} + +private[spark] object MesosExternalShuffleService extends Logging { + val SYSTEM_NAME = "mesosExternalShuffleService" + val ENDPOINT_NAME = "mesosExternalShuffleServiceEndpoint" + + @volatile + private var server: MesosExternalShuffleService = _ + + private val barrier = new CountDownLatch(1) + + def main(args: Array[String]): Unit = { + val sparkConf = new SparkConf + Utils.loadDefaultSparkProperties(sparkConf) + val securityManager = new SecurityManager(sparkConf) + + // we override this value since this service is started from the command line + // and we assume the user really wants it to be running + sparkConf.set("spark.shuffle.service.enabled", "true") + val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0) + server = new MesosExternalShuffleService( + sparkConf, securityManager, transportConf) + server.start() + + installShutdownHook() + + // keep running until the process is terminated + barrier.await() + } + + private def installShutdownHook(): Unit = { + Runtime.getRuntime.addShutdownHook( + new Thread("Mesos External Shuffle Service shutdown thread") { + override def run() { + logInfo("Shutting down Mesos shuffle service.") + server.stop() + barrier.countDown() + } + }) + } +} + + diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index c82a7ccab54dc..51f0df396dab0 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -17,28 +17,26 @@ package org.apache.spark.deploy.worker -import java.io.File -import java.io.IOException +import java.io.{File, IOException} import java.text.SimpleDateFormat -import java.util.{UUID, Date} -import java.util.concurrent._ -import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture} +import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture, _} +import java.util.{Date, UUID} -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} -import scala.concurrent.ExecutionContext -import scala.util.Random -import scala.util.control.NonFatal - -import org.apache.spark.{Logging, SecurityManager, SparkConf} -import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI +import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState, ExternalShuffleService} import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.rpc._ -import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} +import org.apache.spark.util.{SignalLogger, ThreadUtils, Utils} +import org.apache.spark.{Logging, SecurityManager, SparkConf} + +import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} +import scala.concurrent.ExecutionContext +import scala.util.Random +import scala.util.control.NonFatal private[deploy] class Worker( override val rpcEnv: RpcEnv, @@ -127,8 +125,11 @@ private[deploy] class Worker( val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers", WorkerWebUI.DEFAULT_RETAINED_DRIVERS) + private val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 0) + // The shuffle service is not actually started unless configured. - private val shuffleService = new ExternalShuffleService(conf, securityMgr) + private val shuffleService = new ExternalShuffleService( + conf, securityMgr, transportConf, new ExternalShuffleBlockHandler(transportConf)) private val publicAddress = { val envVar = conf.getenv("SPARK_PUBLIC_DNS") diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala index d2b2baef1d8c4..dfcbc51cdf616 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala @@ -47,11 +47,11 @@ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint * * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence. * - * The lift-cycle will be: + * The life-cycle of an endpoint is: * - * constructor onStart receive* onStop + * constructor -> onStart -> receive* -> onStop * - * Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use + * Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use * [[ThreadSafeRpcEndpoint]] * * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index b7fde0d9b3265..50b14e9639bcc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -21,17 +21,20 @@ import java.io.File import java.util.concurrent.locks.ReentrantLock import java.util.{Collections, List => JList} -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, HashSet} - import com.google.common.collect.HashBiMap import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.{Scheduler => MScheduler, _} +import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils -import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} +import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState, _} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap, HashSet} + /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -46,7 +49,8 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} private[spark] class CoarseMesosSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, - master: String) + master: String, + securityManager: SecurityManager) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with MScheduler with MesosSchedulerUtils { @@ -62,6 +66,9 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveIdsWithExecutors = new HashSet[String] + // Maping from slave Id to hostname. Only used when shuffle service is enabled. + val slaveIdsToHost = new HashMap[String, String] + val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String] // How many times tasks on each slave failed val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int] @@ -90,6 +97,12 @@ private[spark] class CoarseMesosSchedulerBackend( private val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + private val mesosExternalShuffleClient = new MesosExternalShuffleClient( + SparkTransportConf.fromSparkConf(conf), + securityManager, + securityManager.isAuthenticationEnabled(), + securityManager.isSaslEncryptionEnabled()); + var nextMesosTaskId = 0 @volatile var appId: String = _ @@ -188,6 +201,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { appId = frameworkId.getValue + mesosExternalShuffleClient.init(appId) logInfo("Registered as framework ID " + appId) markRegistered() } @@ -244,6 +258,9 @@ private[spark] class CoarseMesosSchedulerBackend( // accept the offer and launch the task logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") + if (conf.getBoolean("spark.shuffle.service.enabled", false)) { + slaveIdsToHost(offer.getSlaveId.getValue) = offer.getHostname + } d.launchTasks( Collections.singleton(offer.getId), Collections.singleton(taskBuilder.build()), filters) @@ -261,7 +278,24 @@ private[spark] class CoarseMesosSchedulerBackend( val taskId = status.getTaskId.getValue.toInt val state = status.getState logInfo(s"Mesos task $taskId is now $state") + val slaveId: String = status.getSlaveId.getValue stateLock.synchronized { + if (TaskState.fromMesos(state).equals(TaskState.RUNNING) && + slaveIdsToHost.contains(slaveId)) { + // If the shuffle service is enabled, have the driver register with each one + // of the shuffle services. This allows the shuffle services to clean up state + // associated with this application when the driver exits. There is currently + // not a great way to detect this through Mesos, since the shuffle services + // are set up independently. + // TODO: Remove this and allow the MesosExternalShuffleService to detect + // framework termination when new Mesos Framework HTTP API is available. + val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) + val hostname = slaveIdsToHost.remove(slaveId).get + logDebug(s"Connecting to shuffle service on slave ${slaveId}, " + + s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") + mesosExternalShuffleClient.registerDriverWithShuffleService(hostname, externalShufflePort) + } + if (TaskState.isFinished(TaskState.fromMesos(state))) { val slaveId = taskIdToSlaveId(taskId) slaveIdsWithExecutors -= slaveId diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 4b504df7b8851..00559bae86884 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} class CoarseMesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext @@ -59,7 +59,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite private def createSchedulerBackend( taskScheduler: TaskSchedulerImpl, driver: SchedulerDriver): CoarseMesosSchedulerBackend = { - val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") { + val securityManager = mock[SecurityManager] + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( masterUrl: String, scheduler: Scheduler, diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java index 37f2e34ceb24d..a0d4936fbbe80 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.net.SocketAddress; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -79,6 +80,8 @@ public boolean isActive() { return channel.isOpen() || channel.isActive(); } + public SocketAddress getSocketAddress() { return channel.remoteAddress(); } + /** * Requests a single chunk from the remote side, from the pre-negotiated streamId. * diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index e4faaf8854fc7..db9dc4f17cee9 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -65,7 +65,13 @@ public ExternalShuffleBlockHandler(TransportConf conf) { @Override public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) { BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteArray(message); + handleMessage(msgObj, client, callback); + } + protected void handleMessage( + BlockTransferMessage msgObj, + TransportClient client, + RpcResponseCallback callback) { if (msgObj instanceof OpenBlocks) { OpenBlocks msg = (OpenBlocks) msgObj; List blocks = Lists.newArrayList(); diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 612bce571a493..ea6d248d66be3 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -50,8 +50,8 @@ public class ExternalShuffleClient extends ShuffleClient { private final boolean saslEncryptionEnabled; private final SecretKeyHolder secretKeyHolder; - private TransportClientFactory clientFactory; - private String appId; + protected TransportClientFactory clientFactory; + protected String appId; /** * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, @@ -71,6 +71,10 @@ public ExternalShuffleClient( this.saslEncryptionEnabled = saslEncryptionEnabled; } + protected void checkInit() { + assert appId != null : "Called before init()"; + } + @Override public void init(String appId) { this.appId = appId; @@ -89,7 +93,7 @@ public void fetchBlocks( final String execId, String[] blockIds, BlockFetchingListener listener) { - assert appId != null : "Called before init()"; + checkInit(); logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = @@ -132,7 +136,7 @@ public void registerWithShuffleServer( int port, String execId, ExecutorShuffleInfo executorInfo) throws IOException { - assert appId != null : "Called before init()"; + checkInit(); TransportClient client = clientFactory.createClient(host, port); byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray(); client.sendRpcSync(registerMessage, 5000 /* timeoutMs */); diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java new file mode 100644 index 0000000000000..9d3d2fcd97936 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.mesos; + +import java.io.IOException; + +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.sasl.SecretKeyHolder; +import org.apache.spark.network.shuffle.ExternalShuffleClient; +import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; +import org.apache.spark.network.util.TransportConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MesosExternalShuffleClient extends ExternalShuffleClient { + private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class); + + /** + * Creates an Mesos external shuffle client that wraps the [[ExternalShuffleClient]]. + * Please refer to docs on [[ExternalShuffleClient]] for more information. + */ + public MesosExternalShuffleClient( + TransportConf conf, + SecretKeyHolder secretKeyHolder, + boolean saslEnabled, + boolean saslEncryptionEnabled) { + super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled); + } + + public void registerDriverWithShuffleService(String host, int port) throws IOException { + checkInit(); + byte[] registerDriver = new RegisterDriver(appId).toByteArray(); + TransportClient client = clientFactory.createClient(host, port); + client.sendRpc(registerDriver, new RpcResponseCallback() { + @Override + public void onSuccess(byte[] response) { + logger.info("Successfully registered app " + appId); + } + + @Override + public void onFailure(Throwable e) { + logger.warn("Unable to register app " + appId + " with external shuffle service. " + + "Please manually remove shuffle data after driver exit. Error: " + e); + } + }); + } +} diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 6c1210b33268a..fcb52363e632c 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -21,6 +21,7 @@ import io.netty.buffer.Unpooled; import org.apache.spark.network.protocol.Encodable; +import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; /** * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or @@ -37,7 +38,7 @@ public abstract class BlockTransferMessage implements Encodable { /** Preceding every serialized message is its type, which allows us to deserialize it. */ public static enum Type { - OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3); + OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4); private final byte id; @@ -60,6 +61,7 @@ public static BlockTransferMessage fromByteArray(byte[] msg) { case 1: return UploadBlock.decode(buf); case 2: return RegisterExecutor.decode(buf); case 3: return StreamHandle.decode(buf); + case 4: return RegisterDriver.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java new file mode 100644 index 0000000000000..540758b21c719 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol.mesos; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; +import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; + +/** + * A message sent from the driver to register with the [[MesosExternalShuffleService]] + */ +public class RegisterDriver extends BlockTransferMessage { + private final String appId; + + public RegisterDriver(String appId) { + this.appId = appId; + } + + public String getAppId() { return appId; } + + @Override + protected Type type() { return Type.REGISTER_DRIVER; } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(appId); + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + } + + @Override + public int hashCode() { + return Objects.hashCode(appId); + } + + public static RegisterDriver decode(ByteBuf buf) { + String appId = Encoders.Strings.decode(buf); + return new RegisterDriver(appId); + } +} diff --git a/sbin/start-mesos-shuffle-service.sh b/sbin/start-mesos-shuffle-service.sh new file mode 100755 index 0000000000000..0bda3b7766756 --- /dev/null +++ b/sbin/start-mesos-shuffle-service.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Starts the Mesos external shuffle server on the machine this script is executed on. +# The Mesos external shuffle server detects when an application exits and automatically +# cleans up its shuffle files. +# +# Usage: start-mesos-shuffle-server.sh +# +# Use the SPARK_SHUFFLE_OPTS environment variable to set shuffle server configuration. +# + +sbin="`dirname "$0"`" +sbin="`cd "$sbin"; pwd`" + +. "$sbin/spark-config.sh" +. "$SPARK_PREFIX/bin/load-spark-env.sh" + +exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.mesos.MesosExternalShuffleService 1 diff --git a/sbin/stop-mesos-shuffle-service.sh b/sbin/stop-mesos-shuffle-service.sh new file mode 100755 index 0000000000000..0e965d5ec5886 --- /dev/null +++ b/sbin/stop-mesos-shuffle-service.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Stops the Mesos external shuffle service on the machine this script is executed on. + +sbin="`dirname "$0"`" +sbin="`cd "$sbin"; pwd`" + +"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.mesos.MesosExternalShuffleService 1