Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<!--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import java.io.Closeable;

import org.apache.spark.annotation.Private;

/**
* The local KV storage used to persist the shuffle state,
* the implementations may include LevelDB, RocksDB, etc.
*/
@Private
public interface DB extends Closeable {
/**
* Set the DB entry for "key" to "value".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Iterator;
import java.util.Map;

import org.apache.spark.annotation.Private;

@Private
public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ public void seek(byte[] key) {
it.seek(key);
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

private Map.Entry<byte[], byte[]> loadNext() {
boolean hasNext = it.hasNext();
if (!hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,13 @@ public ShuffleIndexInformation load(String filePath) throws IOException {
.weigher((Weigher<String, ShuffleIndexInformation>)
(filePath, indexInfo) -> indexInfo.getRetainedMemorySize())
.build(indexCacheLoader);
DBBackend dbBackend = null;
if (registeredExecutorFile != null) {
String dbBackendName =
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name());
dbBackend = DBBackend.byName(dbBackendName);
logger.info("Configured {} as {} and actually used value {}",
Constants.SHUFFLE_SERVICE_DB_BACKEND, dbBackendName, dbBackend);
}
String dbBackendName =
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name());
DBBackend dbBackend = DBBackend.byName(dbBackendName);
db = DBProvider.initDB(dbBackend, this.registeredExecutorFile, CURRENT_VERSION, mapper);
Copy link
Contributor

@mridulm mridulm Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review note: Recovery need not be enabled for node managers - in which case registeredExecutorFile will be null (in addition to tests).

DBProvider.initDB does handle null input though.

So the main change in this file and RemoteBlockPushResolver is moving the log message into if (db != null)

if (db != null) {
logger.info("Use {} as the implementation of {}",
dbBackend, Constants.SHUFFLE_SERVICE_DB_BACKEND);
executors = reloadRegisteredExecutors(db);
} else {
executors = Maps.newConcurrentMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,13 @@ public ShuffleIndexInformation load(String filePath) throws IOException {
(filePath, indexInfo) -> indexInfo.getRetainedMemorySize())
.build(indexCacheLoader);
this.recoveryFile = recoveryFile;
DBBackend dbBackend = null;
if (recoveryFile != null) {
String dbBackendName =
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name());
dbBackend = DBBackend.byName(dbBackendName);
logger.info("Configured {} as {} and actually used value {}",
Constants.SHUFFLE_SERVICE_DB_BACKEND, dbBackendName, dbBackend);
}
String dbBackendName =
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name());
DBBackend dbBackend = DBBackend.byName(dbBackendName);
db = DBProvider.initDB(dbBackend, this.recoveryFile, CURRENT_VERSION, mapper);
if (db != null) {
logger.info("Use {} as the implementation of {}",
dbBackend, Constants.SHUFFLE_SERVICE_DB_BACKEND);
reloadAndCleanUpAppShuffleInfo(db);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ protected void serviceInit(Configuration externalConf) throws Exception {
String dbBackendName = _conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND,
DBBackend.LEVELDB.name());
dbBackend = DBBackend.byName(dbBackendName);
logger.info("Configured {} as {} and actually used value {}",
Constants.SHUFFLE_SERVICE_DB_BACKEND, dbBackendName, dbBackend);
logger.info("Use {} as the implementation of {}",
dbBackend, Constants.SHUFFLE_SERVICE_DB_BACKEND);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
protected def newShuffleBlockHandler(conf: TransportConf): ExternalBlockHandler = {
if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND)
val dBBackend = DBBackend.byName(shuffleDBName)
logInfo(s"Configured ${config.SHUFFLE_SERVICE_DB_BACKEND.key} as $shuffleDBName " +
s"and actually used value ${dBBackend.name()} ")
val dbBackend = DBBackend.byName(shuffleDBName)
logInfo(s"Use ${dbBackend.name()} as the implementation of " +
s"${config.SHUFFLE_SERVICE_DB_BACKEND.key}")
new ExternalBlockHandler(conf,
findRegisteredExecutorsDBFile(dBBackend.fileName(registeredExecutorsDB)))
findRegisteredExecutorsDBFile(dbBackend.fileName(registeredExecutorsDB)))
} else {
new ExternalBlockHandler(conf, null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ private object YarnExternalShuffleDriver extends Logging with Matchers {
if (registeredExecFile != null && execStateCopy != null) {
val dbBackendName = conf.get(SHUFFLE_SERVICE_DB_BACKEND.key)
val dbBackend = DBBackend.byName(dbBackendName)
logWarning(s"Configured ${SHUFFLE_SERVICE_DB_BACKEND.key} as $dbBackendName " +
s"and actually used value ${dbBackend.name()}")
logWarning(s"Use ${dbBackend.name()} as the implementation of " +
s"${SHUFFLE_SERVICE_DB_BACKEND.key}")
FileUtils.copyDirectory(registeredExecFile, execStateCopy)
assert(!ShuffleTestAccessor
.reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.SecurityManager
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config._
import org.apache.spark.network.server.BlockPushNonFatalFailure
import org.apache.spark.network.shuffle.{MergedShuffleFileManager, NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor}
import org.apache.spark.network.shuffle.{Constants, MergedShuffleFileManager, NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor}
import org.apache.spark.network.shuffle.RemoteBlockPushResolver._
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.network.shuffledb.DBBackend
Expand Down Expand Up @@ -1068,6 +1068,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {

test("create remote block push resolver instance") {
val mockConf = mock(classOf[TransportConf])
when(mockConf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name()))
.thenReturn(shuffleDBBackend().name())
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
"org.apache.spark.network.shuffle.RemoteBlockPushResolver")
val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf, null)
Expand Down