Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

import org.apache.spark.network.shuffledb.DB;
import org.apache.spark.network.shuffledb.DBBackend;
Expand Down Expand Up @@ -48,17 +47,4 @@ public static DB initDB(
}
return null;
}

@VisibleForTesting
public static DB initDB(DBBackend dbBackend, File file) throws IOException {
if (file != null) {
switch (dbBackend) {
case LEVELDB: return new LevelDB(LevelDBProvider.initLevelDB(file));
case ROCKSDB: return new RocksDB(RocksDBProvider.initRocksDB(file));
default:
throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
Expand Down Expand Up @@ -85,14 +84,6 @@ public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper map
return tmpDb;
}

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so as @mridulm said in the previous review this does add a bit of behavior change where we could hide a corruption that likely would have failed the test before. It would seem like that is a pretty rare case though and the test wasn't specifically trying to test that.

Honestly I can go either way on this change, I'm ok with it but at the same time don't think these extra functions are much maintenance unless you had other reasons for removing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wanted to create some failed cases to compare differences before and after this pr, I found an interesting thing:

There are 3 cases are related to the code we discussed: YarnShuffleIntegrationWithLevelDBBackendSuite, YarnShuffleAuthWithLevelDBBackendSuite, YarnShuffleAlternateNameConfigWithLevelDBBackendSuite.

But when I add println(s"registeredExecFile = $registeredExecFile") after val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)(line 73 as follow code)

test("external shuffle service") {
val shuffleServicePort = YarnTestAccessor.getShuffleServicePort
val shuffleService = YarnTestAccessor.getShuffleServiceInstance
val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(
false,
mainClassName(YarnExternalShuffleDriver.getClass),
appArgs = if (registeredExecFile != null) {
Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath)
} else {
Seq(result.getAbsolutePath)
},
extraConf = extraSparkConf()
)
checkResult(finalState, result)

I found all 3 case print registeredExecFile = null, so the code we discussed was not actually executed due to registeredExecFile is always null ....

I also test these 3 cases use Spark 3.3, The problem also exists(registeredExecFile = null), I haven't found out which version of this code began to not execute, but this may be a very old bug.

@tgravescs @mridulm do you know when YarnShuffleService should call the setRecoveryPath method? It seems that these tests did not call the setRecoveryPath before serviceInit, so registeredExecFile is always null

Copy link
Contributor

@tgravescs tgravescs Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember in this particular case, its called from YarnShuffleServiceSuite, maybe it was never setup correctly, in which case maybe it doesn't matter and we aren't changing behavior

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I saw that YarnShuffleServiceSuite called it. But I'm a little curious, how does NodeManager set RecoveryPath ? If not set, will LevelDB not be initialized? Let me investigate it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from context of #19032, The enable or disable of LevelDB is related to YarnConfiguration.NM_RECOVERY_ENABLED

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm It seems that LevelDB/RocksDB diskstore is not always enabled when YarnShuffleService is used

<td><code>spark.shuffle.service.db.enabled</code></td>
<td>true</td>
<td>
Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will
automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior
enabled). You should also enable <code>spark.worker.cleanup.enabled</code>, to ensure that the state
eventually gets cleaned up. This config may be removed in the future.
</td>
<td>3.0.0</td>
</tr>

the description(yarn always has this behavior enabled) in spark.shuffle.service.db.enabled is incorrect, the behavior of YarnShuffleService is not controlled by this config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give a pr to fix this description: #37853

static DB initLevelDB(File file) throws IOException {
Options options = new Options();
options.createIfMissing(true);
JniDBFactory factory = new JniDBFactory();
return factory.open(file, options);
}

private static class LevelDBLogger implements org.iq80.leveldb.Logger {
private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Objects;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.rocksdb.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,28 +105,6 @@ public static RocksDB initRockDB(File dbFile, StoreVersion version, ObjectMapper
return tmpDb;
}

@VisibleForTesting
static RocksDB initRocksDB(File file) throws IOException {
BloomFilter fullFilter =
new BloomFilter(10.0D /* BloomFilter.DEFAULT_BITS_PER_KEY */, false);
BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()
.setFilterPolicy(fullFilter)
.setEnableIndexCompression(false)
.setIndexBlockRestartInterval(8)
.setFormatVersion(5);

Options dbOptions = new Options();
dbOptions.setCreateIfMissing(true);
dbOptions.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION);
dbOptions.setTableFormatConfig(tableFormatConfig);
try {
return RocksDB.open(dbOptions, file.toString());
} catch (RocksDBException e) {
throw new IOException("Unable to open state store", e);
}
}

private static class RocksDBLogger extends org.rocksdb.Logger {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBLogger.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.nio.charset.StandardCharsets

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network._
import org.apache.spark.network.shuffle.ShuffleTestAccessor
import org.apache.spark.network.shuffledb.DBBackend
import org.apache.spark.network.shuffledb.{DBBackend, StoreVersion}
import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
import org.apache.spark.tags.ExtendedYarnTest

Expand Down Expand Up @@ -176,8 +177,8 @@ private object YarnExternalShuffleDriver extends Logging with Matchers {
logWarning(s"Use ${dbBackend.name()} as the implementation of " +
s"${SHUFFLE_SERVICE_DB_BACKEND.key}")
FileUtils.copyDirectory(registeredExecFile, execStateCopy)
assert(!ShuffleTestAccessor
.reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty)
assert(!ShuffleTestAccessor.reloadRegisteredExecutors(
dbBackend, execStateCopy, new StoreVersion(1, 0), new ObjectMapper()).isEmpty)
}
} finally {
sc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import java.nio.channels.FileChannel
import java.util.List
import java.util.concurrent.ConcurrentMap

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.yarn.api.records.ApplicationId

import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId
import org.apache.spark.network.shuffle.RemoteBlockPushResolver._
import org.apache.spark.network.shuffle.protocol.{ExecutorShuffleInfo, FinalizeShuffleMerge}
import org.apache.spark.network.shuffledb.DB
import org.apache.spark.network.shuffledb.DBBackend
import org.apache.spark.network.shuffledb.{DB, DBBackend, StoreVersion}
import org.apache.spark.network.util.{DBProvider, TransportConf}

/**
Expand Down Expand Up @@ -212,9 +212,12 @@ object ShuffleTestAccessor {
}

def reloadRegisteredExecutors(
dbBackend: DBBackend,
file: File): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = {
val db = DBProvider.initDB(dbBackend, file)
dbBackend: DBBackend,
file: File,
version: StoreVersion,
mapper: ObjectMapper)
: ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = {
val db = DBProvider.initDB(dbBackend, file, version, mapper)
val result = ExternalShuffleBlockResolver.reloadRegisteredExecutors(db)
db.close()
result
Expand Down