From 105a3210a06181740d048011a29a7565dae71af7 Mon Sep 17 00:00:00 2001 From: weixiuli Date: Fri, 28 Dec 2018 09:58:17 +0800 Subject: [PATCH 01/15] add initRegisteredExecutorsDB --- .../shuffle/ExternalShuffleBlockHandler.java | 5 + .../shuffle/ExternalShuffleBlockResolver.java | 10 + .../spark/deploy/ExternalShuffleService.scala | 50 ++++- .../spark/internal/config/package.scala | 6 + .../ExternalShuffleServiceDbSuite.scala | 193 ++++++++++++++++++ 5 files changed, 263 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index b25e48a164e6..705130a27f27 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -66,6 +66,11 @@ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFi new ExternalShuffleBlockResolver(conf, registeredExecutorFile)); } + /** ForTesting */ + public ExternalShuffleBlockResolver getBlockResolver() { + return blockManager; + } + /** Enables mocking out the StreamManager and BlockManager. */ @VisibleForTesting public ExternalShuffleBlockHandler( diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 0b7a27402369..0e94ebdf4c97 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -321,6 +321,16 @@ void close() { } } + /**ForTesting**/ + public void closeForTest() { + close(); + } + + /**ForTesting**/ + public static File getFileForTest(String[] localDirs, int subDirsPerLocalDir, String filename) { + return getFile(localDirs, subDirsPerLocalDir, filename); + } + /** * This method is needed to avoid the situation when multiple File instances for the * same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String. diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 12ed1893ed61..79dd0521d6de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy +import java.io.{File, IOException} import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ @@ -32,6 +33,7 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf import org.apache.spark.util.{ShutdownHookManager, Utils} + /** * Provides a server from which Executors can read shuffle files (rather than reading directly from * each other), to provide uninterrupted access to the files in the face of executors being turned @@ -49,6 +51,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED) private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT) + private val registeredExecutorsDB = "registeredExecutors.ldb" + private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0) private val blockHandler = newShuffleBlockHandler(transportConf) @@ -56,11 +60,55 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private var server: TransportServer = _ + private final val MAX_DIR_CREATION_ATTEMPTS = 10 + private val shuffleServiceSource = new ExternalShuffleServiceSource + protected def createDirectory(root: String, name: String): File = { + var attempts = 0 + val maxAttempts = MAX_DIR_CREATION_ATTEMPTS + var dir: File = null + while (dir == null) { + attempts += 1 + if (attempts > maxAttempts) { + throw new IOException("Failed to create a temp directory (under " + root + ") after " + + maxAttempts + " attempts!") + } + try { + dir = new File(root, "registeredExecutors") + if (!dir.exists() && !dir.mkdirs()) { + dir = null + } + } catch { case e: SecurityException => dir = null; } + } + logInfo(s"registeredExecutorsDb path is ${dir.getAbsolutePath}") + new File(dir.getAbsolutePath, name) + } + + protected def initRegisteredExecutorsDB(dbName: String): File = { + val localDirs = sparkConf.get("spark.local.dir", "").split(",") + if (localDirs.length >= 1 && !"".equals(localDirs(0))) { + createDirectory(localDirs(0), dbName) + } + else { + logWarning(s"'spark.local.dir' should be set first.") + null + } + } + + /** ForTesting */ + def getBlockHandler: ExternalShuffleBlockHandler = { + blockHandler + } + /** Create a new shuffle block handler. Factored out for subclasses to override. */ protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = { - new ExternalShuffleBlockHandler(conf, null) + if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) { + new ExternalShuffleBlockHandler(conf, initRegisteredExecutorsDB(registeredExecutorsDB)) + } + else { + new ExternalShuffleBlockHandler(conf, null) + } } /** Starts the external shuffle service if the user has configured us to. */ diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 850d6845684b..dbb880f530ab 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -358,6 +358,12 @@ package object config { private[spark] val SHUFFLE_SERVICE_ENABLED = ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) + private[spark] val SHUFFLE_SERVICE_DB_ENABLED = + ConfigBuilder("spark.shuffle.service.db.enabled") + .doc("Whether use db in ExternalShuffleService.") + .booleanConf + .createWithDefault(true) + private[spark] val SHUFFLE_SERVICE_PORT = ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337) diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala new file mode 100644 index 000000000000..0002425dd3a9 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io._ +import java.nio.charset.StandardCharsets + +import com.google.common.io.{CharStreams, Closeables, Files} +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.{SecurityManager, ShuffleSuite, SparkConf, SparkException} +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} +import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + + +/** + * This suite gets BlockData when the ExternalShuffleService is restarted + * with #spark.shuffle.service.db.enabled = true or false + * Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false + */ +class ExternalShuffleServiceDbSuite extends ShuffleSuite with BeforeAndAfterAll { + val sortBlock0 = "Hello!" + val sortBlock1 = "World!" + val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" + + var sparkConf: SparkConf = _ + var dataContext: TestShuffleDataContext = _ + + var securityManager: SecurityManager = _ + var externalShuffleService: ExternalShuffleService = _ + var bockHandler: ExternalShuffleBlockHandler = _ + var blockResolver: ExternalShuffleBlockResolver = _ + + override def beforeAll() { + super.beforeAll() + sparkConf = new SparkConf() + sparkConf.set("spark.shuffle.service.enabled", "true") + sparkConf.set("spark.local.dir", System.getProperty("java.io.tmpdir")) + Utils.loadDefaultSparkProperties(sparkConf, null) + securityManager = new SecurityManager(sparkConf) + + dataContext = new TestShuffleDataContext(2, 5) + dataContext.create() + // Write some sort data. + dataContext.insertSortShuffleData(0, 0, + Array[Array[Byte]](sortBlock0.getBytes(StandardCharsets.UTF_8), + sortBlock1.getBytes(StandardCharsets.UTF_8))) + registerExecutor() + } + + override def afterAll() { + try { + dataContext.cleanup() + } finally { + super.afterAll() + } + } + + def registerExecutor(): Unit = { + sparkConf.set("spark.shuffle.service.db.enabled", "true") + externalShuffleService = new ExternalShuffleService(sparkConf, securityManager) + + // external Shuffle Service start + externalShuffleService.start() + bockHandler = externalShuffleService.getBlockHandler + blockResolver = bockHandler.getBlockResolver + blockResolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)) + blockResolver.closeForTest() + // external Shuffle Service stop + externalShuffleService.stop() + } + + // This test getBlockData will be passed when the external shuffle service is restarted. + test("restart External Shuffle Service With InitRegisteredExecutorsDB") { + sparkConf.set("spark.shuffle.service.db.enabled", "true") + externalShuffleService = new ExternalShuffleService(sparkConf, securityManager) + // externalShuffleService restart + externalShuffleService.start() + bockHandler = externalShuffleService.getBlockHandler + blockResolver = bockHandler.getBlockResolver + + val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream + val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8)) + block0Stream.close() + assert(sortBlock0 == block0) + // pass + blockResolver.closeForTest() + // externalShuffleService stop + externalShuffleService.stop() + + } + + // This test getBlockData will't be passed when the external shuffle service is restarted. + test("restart External Shuffle Service Without InitRegisteredExecutorsDB") { + sparkConf.set("spark.shuffle.service.db.enabled", "false") + externalShuffleService = new ExternalShuffleService(sparkConf, securityManager) + // externalShuffleService restart + externalShuffleService.start() + bockHandler = externalShuffleService.getBlockHandler + blockResolver = bockHandler.getBlockResolver + + val error = intercept[RuntimeException] { + val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream + val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8)) + block0Stream.close() + assert(sortBlock0 == block0) + }.getMessage + + assert(error.contains("not registered")) + blockResolver.closeForTest() + // externalShuffleService stop + externalShuffleService.stop() + } + + /** + * Manages some sort-shuffle data, including the creation + * and cleanup of directories that can be read by the + * + * Copy from org.apache.spark.network.shuffle.TestShuffleDataContext + */ + class TestShuffleDataContext(val numLocalDirs: Int, val subDirsPerLocalDir: Int) { + val localDirs: Array[String] = new Array[String](numLocalDirs) + + def create(): Unit = { + for (i <- 0 to numLocalDirs - 1) { + localDirs(i) = Files.createTempDir().getAbsolutePath() + for (p <- 0 to subDirsPerLocalDir - 1) { + new File(localDirs(i), "%02x".format(p)).mkdirs() + } + } + } + + def cleanup(): Unit = { + for (i <- 0 to numLocalDirs - 1) { + try { + JavaUtils.deleteRecursively(new File(localDirs(i))) + } + catch { + case e: IOException => + logError("Unable to cleanup localDir = " + localDirs(i), e) + } + } + } + + // Creates reducer blocks in a sort-based data format within our local dirs. + def insertSortShuffleData(shuffleId: Int, mapId: Int, blocks: Array[Array[Byte]]): Unit = { + val blockId = "shuffle_" + shuffleId + "_" + mapId + "_0" + var dataStream: FileOutputStream = null + var indexStream: DataOutputStream = null + var suppressExceptionsDuringClose = true + try { + dataStream = new FileOutputStream(ExternalShuffleBlockResolver.getFileForTest(localDirs, + subDirsPerLocalDir, blockId + ".data")) + indexStream = new DataOutputStream(new FileOutputStream(ExternalShuffleBlockResolver + .getFileForTest(localDirs, subDirsPerLocalDir, blockId + ".index"))) + var offset = 0 + indexStream.writeLong(offset) + for (block <- blocks) { + offset += block.length + dataStream.write(block) + indexStream.writeLong(offset) + } + suppressExceptionsDuringClose = false + } finally { + Closeables.close(dataStream, suppressExceptionsDuringClose) + Closeables.close(indexStream, suppressExceptionsDuringClose) + } + } + + // Creates an ExecutorShuffleInfo object based on the given shuffle manager + // which targets this context's directories. + def createExecutorInfo(shuffleManager: String): ExecutorShuffleInfo = { + new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager) + } + } +} From b9ff5aaa9e45399ee21e6497811a52a605b9bb93 Mon Sep 17 00:00:00 2001 From: weixiuli Date: Thu, 24 Jan 2019 11:53:45 +0800 Subject: [PATCH 02/15] Check all the dirs and use the dir which has be used --- .../spark/deploy/ExternalShuffleService.scala | 41 +++++++------------ 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 79dd0521d6de..d212e4a892b2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.{File, IOException} +import java.io.File import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ @@ -60,35 +60,24 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private var server: TransportServer = _ - private final val MAX_DIR_CREATION_ATTEMPTS = 10 - private val shuffleServiceSource = new ExternalShuffleServiceSource - protected def createDirectory(root: String, name: String): File = { - var attempts = 0 - val maxAttempts = MAX_DIR_CREATION_ATTEMPTS - var dir: File = null - while (dir == null) { - attempts += 1 - if (attempts > maxAttempts) { - throw new IOException("Failed to create a temp directory (under " + root + ") after " + - maxAttempts + " attempts!") - } - try { - dir = new File(root, "registeredExecutors") - if (!dir.exists() && !dir.mkdirs()) { - dir = null - } - } catch { case e: SecurityException => dir = null; } - } - logInfo(s"registeredExecutorsDb path is ${dir.getAbsolutePath}") - new File(dir.getAbsolutePath, name) - } - protected def initRegisteredExecutorsDB(dbName: String): File = { val localDirs = sparkConf.get("spark.local.dir", "").split(",") - if (localDirs.length >= 1 && !"".equals(localDirs(0))) { - createDirectory(localDirs(0), dbName) + var dbFile: File = null + if (localDirs.length >= 1) { + for (dir <- localDirs) { + val tmpFile = new File(dir, dbName) + if (tmpFile.exists()) { + dbFile = tmpFile + } + } + if (dbFile != null) { + dbFile + } + else { + new File(localDirs(0), dbName) + } } else { logWarning(s"'spark.local.dir' should be set first.") From 628ffa5b9995a2d052dc1e8214d7162c44765568 Mon Sep 17 00:00:00 2001 From: weixiuli Date: Fri, 15 Mar 2019 12:07:38 +0800 Subject: [PATCH 03/15] Remove some registeredExecutors information of DB in external shuffle service --- .../scala/org/apache/spark/deploy/worker/Worker.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 52892c329a50..a0664b3fdd2e 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -466,6 +466,15 @@ private[deploy] class Worker( }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) + + // Remove some registeredExecutors information of DB in external shuffle service when + // #spark.shuffle.service.db.enabled=true, the one which comes to mind is, what happens + // if an application is stopped while the external shuffle service is down? + // So then it'll leave an entry in the DB and the entry should be removed. + if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && + conf.get(config.SHUFFLE_SERVICE_ENABLED)) { + shuffleService.applicationRemoved(dir.getName) + } } }(cleanupThreadExecutor) From d1b54fc72a963adf820993827a1358c0aac75cf4 Mon Sep 17 00:00:00 2001 From: weixiuli Date: Tue, 5 Mar 2019 13:45:21 +0800 Subject: [PATCH 04/15] VisibleForTesting --- .../spark/network/shuffle/ExternalShuffleBlockHandler.java | 2 +- .../network/shuffle/ExternalShuffleBlockResolver.java | 7 +------ .../org/apache/spark/deploy/ExternalShuffleService.scala | 2 +- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 705130a27f27..70dcc8b8b8b6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -66,7 +66,7 @@ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFi new ExternalShuffleBlockResolver(conf, registeredExecutorFile)); } - /** ForTesting */ + @VisibleForTesting public ExternalShuffleBlockResolver getBlockResolver() { return blockManager; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 0e94ebdf4c97..6a5c523e09b0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -321,12 +321,7 @@ void close() { } } - /**ForTesting**/ - public void closeForTest() { - close(); - } - - /**ForTesting**/ + @VisibleForTesting public static File getFileForTest(String[] localDirs, int subDirsPerLocalDir, String filename) { return getFile(localDirs, subDirsPerLocalDir, filename); } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index d212e4a892b2..b02faac62e99 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -85,7 +85,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana } } - /** ForTesting */ + /** Get blockhandler. */ def getBlockHandler: ExternalShuffleBlockHandler = { blockHandler } From 2ba36954ce6684a501da925e5255592727b04af2 Mon Sep 17 00:00:00 2001 From: weixiuli Date: Wed, 6 Mar 2019 14:12:36 +0800 Subject: [PATCH 05/15] findRegisteredExecutorsDBFile --- .../org/apache/spark/deploy/ExternalShuffleService.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index b02faac62e99..6724603c87e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -62,10 +62,10 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private val shuffleServiceSource = new ExternalShuffleServiceSource - protected def initRegisteredExecutorsDB(dbName: String): File = { + protected def findRegisteredExecutorsDBFile(dbName: String): File = { val localDirs = sparkConf.get("spark.local.dir", "").split(",") var dbFile: File = null - if (localDirs.length >= 1) { + if (localDirs.length > 1 || (localDirs.length == 1 && localDirs(0).nonEmpty)) { for (dir <- localDirs) { val tmpFile = new File(dir, dbName) if (tmpFile.exists()) { @@ -85,7 +85,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana } } - /** Get blockhandler. */ + /** Get blockhandler */ def getBlockHandler: ExternalShuffleBlockHandler = { blockHandler } @@ -93,7 +93,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana /** Create a new shuffle block handler. Factored out for subclasses to override. */ protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = { if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) { - new ExternalShuffleBlockHandler(conf, initRegisteredExecutorsDB(registeredExecutorsDB)) + new ExternalShuffleBlockHandler(conf, findRegisteredExecutorsDBFile(registeredExecutorsDB)) } else { new ExternalShuffleBlockHandler(conf, null) From 8424852896fcdf1dc6240b6a8d473b971fce078b Mon Sep 17 00:00:00 2001 From: weixiuli Date: Wed, 6 Mar 2019 14:14:18 +0800 Subject: [PATCH 06/15] fix ExternalShuffleServiceDbSuite --- core/pom.xml | 7 + .../ExternalShuffleServiceDbSuite.scala | 171 +++++++----------- 2 files changed, 68 insertions(+), 110 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index b9f78b2f5d37..45bda44916f5 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -372,6 +372,13 @@ tests test + + org.apache.spark + spark-network-shuffle_${scala.binary.version} + ${project.version} + tests + test +