-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-38888][BUILD][CORE][YARN][DOCS] Add RocksDB support for shuffle service state store
#37610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Testing first and will update the pr description later |
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
RocksDB support for shuffle state store
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(RocksDBProvider.class); | ||
|
|
||
| public static RocksDB initRockDB(File dbFile, StoreVersion version, ObjectMapper mapper) throws |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @dongjoon-hyun Do you have time to help review the RocksDB part?
RocksDB support for shuffle state storeRocksDB support for shuffle state store
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good, but for a couple of comments.
Particularly have not looked at RocksDBProvider - will let @dongjoon-hyun review that (in addition to rest of the PR).
699530c to
15b2325
Compare
|
friendly ping @Ngone51 |
docs/configuration.md
Outdated
| eventually gets cleaned up. This config may be removed in the future. | ||
| </td> | ||
| <td>3.0.0</td> | ||
| </tr> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be in the section at the bottom - in External Shuffle service(server) side configuration options section.
Having said that, I was wrong about spark.shuffle.service.db.enabled - it is always enabled in yarn mode - so we cannot control it with .enabled flag.
The newly introduced spark.shuffle.service.db.backend is relevant though; but can we add a blurb that it is relevant for yarn and standalone - with db enabled by default for yarn, and to look at standalone.md for more details on how to configure it for standalone ? Thx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does e124c84 look better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
Is this table item should sorted by Property Name? Seems not follow this rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, that section I mentioned is for push based shuffle - so probably does not make sense for our changes here.
There is a section here which looks more relevant for yarn.
We should perhaps move the push based shuffle blurb to the yarn doc as well ... but let us do that in a later PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it back to after
Lines 1000 to 1008 in 6e62b93
| <td><code>spark.shuffle.service.removeShuffle</code></td> | |
| <td>false</td> | |
| <td> | |
| Whether to use the ExternalShuffleService for deleting shuffle blocks for | |
| deallocated executors when the shuffle is no longer needed. Without this enabled, | |
| shuffle data on executors that are deallocated will remain on disk until the | |
| application ends. | |
| </td> | |
| <td>3.3.0</td> |
?
They have been add into spark-standalone.md and running-on-yarn.md.Is it necessary to keep them in configuration.md?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, we need it only in standalone and yarn docs for now - not in configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
This reverts commit 4d5f44c.
|
cc @HeartSaVioR FYI |
|
thanks @HyukjinKwon |
common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java
Outdated
Show resolved
Hide resolved
| When <code>spark.shuffle.service.db.enabled</code> is true, user can use this to specify the kind of disk-based | ||
| store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. | ||
| This only affects standalone mode (yarn always has this behavior enabled). | ||
| The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the data that is stored in LevelDB is not available after changing to RocksDB for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there is no function to automatically switch *.ldb to *.rdb now.
| } | ||
|
|
||
| @Override | ||
| public void remove() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why we have a remove() (and even no parameter) API for the iterator? What's the expected behavior with a possible implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove this , I found
default void remove() {
throw new UnsupportedOperationException("remove");
}
in java.util.Iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| logger.error("error opening rocksdb file {}. Creating new file, will not be able to " + | ||
| "recover state for existing applications", dbFile, e); | ||
| if (dbFile.isDirectory()) { | ||
| for (File f : Objects.requireNonNull(dbFile.listFiles())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have sub-dir here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The process here refers to LevelDBProvider:
spark/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java
Lines 40 to 85 in a27238a
| public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper mapper) throws | |
| IOException { | |
| DB tmpDb = null; | |
| if (dbFile != null) { | |
| Options options = new Options(); | |
| options.createIfMissing(false); | |
| options.logger(new LevelDBLogger()); | |
| try { | |
| tmpDb = JniDBFactory.factory.open(dbFile, options); | |
| } catch (NativeDB.DBException e) { | |
| if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { | |
| logger.info("Creating state database at " + dbFile); | |
| options.createIfMissing(true); | |
| try { | |
| tmpDb = JniDBFactory.factory.open(dbFile, options); | |
| } catch (NativeDB.DBException dbExc) { | |
| throw new IOException("Unable to create state store", dbExc); | |
| } | |
| } else { | |
| // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new | |
| // one, so we can keep processing new apps | |
| logger.error("error opening leveldb file {}. Creating new file, will not be able to " + | |
| "recover state for existing applications", dbFile, e); | |
| if (dbFile.isDirectory()) { | |
| for (File f : dbFile.listFiles()) { | |
| if (!f.delete()) { | |
| logger.warn("error deleting {}", f.getPath()); | |
| } | |
| } | |
| } | |
| if (!dbFile.delete()) { | |
| logger.warn("error deleting {}", dbFile.getPath()); | |
| } | |
| options.createIfMissing(true); | |
| try { | |
| tmpDb = JniDBFactory.factory.open(dbFile, options); | |
| } catch (NativeDB.DBException dbExc) { | |
| throw new IOException("Unable to create state store", dbExc); | |
| } | |
| } | |
| } | |
| // if there is a version mismatch, we throw an exception, which means the service is unusable | |
| checkVersion(tmpDb, version, mapper); | |
| } | |
| return tmpDb; |
I think it is safer to follow the old code flow, could LevelDBProvider and RocksDBProvider be fixed together after further investigation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this scenario here is a directory or file with the same name as dbFile, but it cannot be recovered, and it is not in the NotFound state, do I understand correctly @tgravescs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For it is a directory, it looks like a corner case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is safer to follow the old code flow, could LevelDBProvider and RocksDBProvider be fixed together after further investigation?
sounds good.
| if (!dbFile.delete()) { | ||
| logger.warn("error deleting {}", dbFile.getPath()); | ||
| } | ||
| dbOptions.setCreateIfMissing(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not take effect if dbFile.delete() failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If dbFile.delete() fails, the following RocksDB.open will also fail
| dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION); | ||
| dbOptions.setTableFormatConfig(tableFormatConfig); | ||
| try { | ||
| return RocksDB.open(dbOptions, file.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we have setCreateIfMissing resetting as we do in initRockDB(File dbFile, StoreVersion version, ObjectMapper mapper)? Is the call on this method supposed to be the first time RocksDB creation?
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method just use by test (ShuffleTestAccessor), and it should already exist in the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 210 to 217 in 43b02d7
| def reloadRegisteredExecutors( | |
| dbBackend: DBBackend, | |
| file: File): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = { | |
| val db = DBProvider.initDB(dbBackend, file) | |
| val result = ExternalShuffleBlockResolver.reloadRegisteredExecutors(db) | |
| db.close() | |
| result | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 should we move it to a new Test Utils or ShuffleTestAccessor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me - this PR depends on other PR, so we should wait for those to be merged.
+CC @Ngone51
|
LGTM |
RocksDB support for shuffle state storeRocksDB support for shuffle service state store
|
Will wait for a day or so to given Dongjoon a chance to review the PR. |
|
|
||
| test("Finalized merged shuffle are written into DB and cleaned up after application stopped") { | ||
| // TODO: should enable this test after SPARK-40186 is resolved. | ||
| ignore("Finalized merged shuffle are written into DB and cleaned up after application stopped") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will re-enable this due to it should pass after SPARK-40186 merged
mvn clean test -pl resource-managers/yarn -Pyarn -DwildcardSuites=org.apache.spark.network.yarn.YarnShuffleServiceWithRocksDBBackendSuite
YarnShuffleServiceWithRocksDBBackendSuite:
- executor and merged shuffle state kept across NM restart
- removed applications should not be in registered executor file and merged shuffle file
- shuffle service should be robust to corrupt registered executor file
- get correct recovery path
- moving recovery file from NM local dir to recovery path
- service throws error if cannot start
- Consistency in AppPathInfo between in-memory hashmap and the DB
- Finalized merged shuffle are written into DB and cleaned up after application stopped
- SPARK-40186: shuffleMergeManager should have been shutdown before db closed
- Dangling finalized merged partition info in DB will be removed during restart
- Dangling application path or shuffle information in DB will be removed during restart
- Cleanup for former attempts local path info should be triggered in applicationRemoved
- recovery db should not be created if NM recovery is not enabled
- SPARK-31646: metrics should be registered into Node Manager's metrics system
- SPARK-34828: metrics should be registered with configured name
- create default merged shuffle file manager instance
- create remote block push resolver instance
- invalid class name of merge manager will use noop instance
Run completed in 9 seconds, 454 milliseconds.
Total number of tests run: 18
Suites: completed 2, aborted 0
Tests: succeeded 18, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
|
Merged to master. |
|
Thank you, @LuciferYang, @mridulm , @Ngone51 and @HyukjinKwon . |
What changes were proposed in this pull request?
This is a extended work of SPARK-38909, in this pr, the
RocksDBimplementation is added for shuffle local state store.This PR adds the following code:
shuffledb.RocksDBandshuffledb.RocksDBIterator: implementation of RocksDB corresponding toshuffledb.DBandshuffledb.DBIteratorROCKSDBto shuffle.DBBackend and the corresponding file suffix is.rdband the description ofSHUFFLE_SERVICE_DB_BACKENDin also changedRocksDBProviderto buildRocksDBinstance and extendDBProviderto produce corresponding instancesrocksdbjnitonetwork-commonmoduleWhy are the changes needed?
Support shuffle local state store to use RocksDB
Does this PR introduce any user-facing change?
When user configures
spark.shuffle.service.db.enabledas true, the user can use rocksdb as the shuffle lcoal state store by specifyingSHUFFLE_SERVICE_DB_BACKEND(spark.shuffle.service.db.backend)asRocksDBinspark-default.conforspark-shuffle-site.xml(for yarn).The original data store in
LevelDB/RocksDBwill not be automatically convert to another kind of storage now.How was this patch tested?
Add new test.