Skip to content

Conversation

@LuciferYang
Copy link
Contributor

What changes were proposed in this pull request?

There are 2 initDB in DBProvider

  1. DB initDB(DBBackend dbBackend, File dbFile, StoreVersion version, ObjectMapper mapper)
  2. DB initDB(DBBackend dbBackend, File file)

The first method is used in production code and and the second one only used by ShuffleTestAccessor for YarnShuffleIntegrationSuite as follows:

var result = "failure"
val execStateCopy = Option(registeredExecFile).map { file =>
new File(file.getAbsolutePath + "_dup")
}.orNull
try {
val data = sc.parallelize(0 until 100, 10).map { x => (x % 10) -> x }.reduceByKey{ _ + _ }.
collect().toSet
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
data should be ((0 until 10).map{x => x -> (x * 10 + 450)}.toSet)
result = "success"
// only one process can open a leveldb file at a time, so we copy the files
if (registeredExecFile != null && execStateCopy != null) {
val dbBackendName = conf.get(SHUFFLE_SERVICE_DB_BACKEND.key)
val dbBackend = DBBackend.byName(dbBackendName)
logWarning(s"Use ${dbBackend.name()} as the implementation of " +
s"${SHUFFLE_SERVICE_DB_BACKEND.key}")
FileUtils.copyDirectory(registeredExecFile, execStateCopy)
assert(!ShuffleTestAccessor
.reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty)
}

This pr change to use the first method instead of second one.

Why are the changes needed?

Code clean up.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Pass Github Actions

@LuciferYang
Copy link
Contributor Author

cc @mridulm @tgravescs and @squito for further discussion, the previous thread is here #37648 (comment)

@LuciferYang
Copy link
Contributor Author

GA failed not related to current pr, need wait #37815

@Yikun
Copy link
Member

Yikun commented Sep 8, 2022

GHCR recovered just do a retriggered

return tmpDb;
}

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so as @mridulm said in the previous review this does add a bit of behavior change where we could hide a corruption that likely would have failed the test before. It would seem like that is a pretty rare case though and the test wasn't specifically trying to test that.

Honestly I can go either way on this change, I'm ok with it but at the same time don't think these extra functions are much maintenance unless you had other reasons for removing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wanted to create some failed cases to compare differences before and after this pr, I found an interesting thing:

There are 3 cases are related to the code we discussed: YarnShuffleIntegrationWithLevelDBBackendSuite, YarnShuffleAuthWithLevelDBBackendSuite, YarnShuffleAlternateNameConfigWithLevelDBBackendSuite.

But when I add println(s"registeredExecFile = $registeredExecFile") after val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)(line 73 as follow code)

test("external shuffle service") {
val shuffleServicePort = YarnTestAccessor.getShuffleServicePort
val shuffleService = YarnTestAccessor.getShuffleServiceInstance
val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(
false,
mainClassName(YarnExternalShuffleDriver.getClass),
appArgs = if (registeredExecFile != null) {
Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath)
} else {
Seq(result.getAbsolutePath)
},
extraConf = extraSparkConf()
)
checkResult(finalState, result)

I found all 3 case print registeredExecFile = null, so the code we discussed was not actually executed due to registeredExecFile is always null ....

I also test these 3 cases use Spark 3.3, The problem also exists(registeredExecFile = null), I haven't found out which version of this code began to not execute, but this may be a very old bug.

@tgravescs @mridulm do you know when YarnShuffleService should call the setRecoveryPath method? It seems that these tests did not call the setRecoveryPath before serviceInit, so registeredExecFile is always null

Copy link
Contributor

@tgravescs tgravescs Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember in this particular case, its called from YarnShuffleServiceSuite, maybe it was never setup correctly, in which case maybe it doesn't matter and we aren't changing behavior

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I saw that YarnShuffleServiceSuite called it. But I'm a little curious, how does NodeManager set RecoveryPath ? If not set, will LevelDB not be initialized? Let me investigate it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from context of #19032, The enable or disable of LevelDB is related to YarnConfiguration.NM_RECOVERY_ENABLED

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm It seems that LevelDB/RocksDB diskstore is not always enabled when YarnShuffleService is used

<td><code>spark.shuffle.service.db.enabled</code></td>
<td>true</td>
<td>
Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will
automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior
enabled). You should also enable <code>spark.worker.cleanup.enabled</code>, to ensure that the state
eventually gets cleaned up. This config may be removed in the future.
</td>
<td>3.0.0</td>
</tr>

the description(yarn always has this behavior enabled) in spark.shuffle.service.db.enabled is incorrect, the behavior of YarnShuffleService is not controlled by this config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give a pr to fix this description: #37853

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Sep 10, 2022

friendly ping @sunchao, do you know MiniYARNCluster can start with YarnConfiguration.NM_RECOVERY_ENABLED = true? I try to set this to YarnConfig, but MiniYARNCluster start failed:

2022-09-10T11:44:42.1710230Z Cause: java.lang.ClassNotFoundException: org.apache.hadoop.shaded.org.iq80.leveldb.DBException
2022-09-10T11:44:42.1715234Z at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
2022-09-10T11:44:42.1719347Z at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
2022-09-10T11:44:42.1723090Z at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
2022-09-10T11:44:42.1726759Z at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
2022-09-10T11:44:42.1731028Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartRecoveryStore(NodeManager.java:313)
2022-09-10T11:44:42.1735424Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:370)
2022-09-10T11:44:42.1740303Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
2022-09-10T11:44:42.1745576Z at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceInit(MiniYARNCluster.java:597)
2022-09-10T11:44:42.1828858Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
2022-09-10T11:44:42.1829712Z at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:109)
2022-09-10T11:44:42.1830633Z at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceInit(MiniYARNCluster.java:327)
2022-09-10T11:44:42.1831431Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
2022-09-10T11:44:42.1832279Z at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:112)

I found org.iq80.leveldb.* relocated to org.apache.hadoop.shaded.org.iq80.leveldb in NMLeveldbStateStoreService, but it not shaded to hadoop-client-minicluster-3.3.4.jar.

@LuciferYang
Copy link
Contributor Author

override def newYarnConfig(): YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, dbBackend.name())
yarnConfig
}

add yarnConfig.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true) to above newYarnConfig method, and test with hadoop-2.7

build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-2

then failed as follows:

YarnShuffleIntegrationWithLevelDBBackendSuite:
org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite *** ABORTED ***
  java.lang.IllegalArgumentException: Cannot support recovery with an ephemeral server port. Check the setting of yarn.nodemanager.address
  at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceStart(ContainerManagerImpl.java:395)
  at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
  at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
  at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272)
  at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
  at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceStart(MiniYARNCluster.java:560)
  at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
  at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
  at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:278)
  at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
  ...
Run completed in 3 seconds, 992 milliseconds.
Total number of tests run: 0
Suites: completed 1, aborted 1
Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
*** 1 SUITE ABORTED ***

It seems that we need to use a static port to support testing recovery

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Sep 10, 2022

So I think assert(!ShuffleTestAccessor.reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty) and the relevant code is an unreachable code currently... @tgravescs @mridulm

@sunchao
Copy link
Member

sunchao commented Sep 10, 2022

Thanks for the ping @LuciferYang , I'll take a look at the shading issue in Hadoop 3.3.4.

@LuciferYang
Copy link
Contributor Author

thanks @sunchao

@mridulm
Copy link
Contributor

mridulm commented Sep 12, 2022

I am unsure of this test - will rely on @tgravescs to comment better.

@LuciferYang
Copy link
Contributor Author

@tgravescs I tried to add yarnConfig.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true) to make registeredExecFile not null and re-enable the test to verify execStateCopy's reload, but it failed due to the limitation of MiniYARNCluster itself. Do we have any other way to make the test of execStateCopy reload check work again?

@tgravescs
Copy link
Contributor

The only other way I can think of is to have recovery enabled when a test config is set (spark.testing.XXXX). If that looks like a hassle then I would say we just ignore the test as one of those things that we can't unit test easily

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Sep 17, 2022

The only other way I can think of is to have recovery enabled when a test config is set (spark.testing.XXXX). If that looks like a hassle then I would say we just ignore the test as one of those things that we can't unit test easily

Let me try spark.testing.XXXX

@LuciferYang
Copy link
Contributor Author

The only other way I can think of is to have recovery enabled when a test config is set (spark.testing.XXXX). If that looks like a hassle then I would say we just ignore the test as one of those things that we can't unit test easily

Sorry for missing this comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants