From fc388bda71d3bf5f7748d2e473ee00a41efcc07f Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 22 Aug 2022 13:37:55 +0800 Subject: [PATCH 01/22] add rocksdb support --- common/network-common/pom.xml | 4 + .../spark/network/shuffledb/DBBackend.java | 4 +- .../spark/network/shuffledb/RocksDB.java | 71 ++++++++ .../network/shuffledb/RocksDBIterator.java | 100 +++++++++++ .../apache/spark/network/util/DBProvider.java | 13 +- .../spark/network/util/RocksDBProvider.java | 169 ++++++++++++++++++ .../spark/internal/config/package.scala | 2 +- .../YarnShuffleAlternateNameConfigSuite.scala | 6 + .../yarn/YarnShuffleIntegrationSuite.scala | 11 ++ .../yarn/YarnShuffleServiceSuite.scala | 4 + 10 files changed, 376 insertions(+), 8 deletions(-) create mode 100644 common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java create mode 100644 common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java create mode 100644 common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index ef7f9824e13c1..e9cf952ea69f2 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -82,6 +82,10 @@ leveldbjni-all 1.8 + + org.rocksdb + rocksdbjni + com.fasterxml.jackson.core diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java index f8db8a16f16df..e09ccd37b4d9b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java @@ -21,10 +21,10 @@ /** * The enum `DBBackend` use to specify a disk-based store used in shuffle service local db. - * Only LEVELDB is supported now. + * Support the use of LevelDB and RocksDB. */ public enum DBBackend { - LEVELDB(".ldb"); + LEVELDB(".ldb"), ROCKSDB(".rdb"); private final String fileSuffix; diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java new file mode 100644 index 0000000000000..d33895d6c2d62 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffledb; + +import java.io.IOException; + +import com.google.common.base.Throwables; +import org.rocksdb.RocksDBException; + +/** + * RocksDB implementation of the local KV storage used to persist the shuffle state. + */ +public class RocksDB implements DB { + private final org.rocksdb.RocksDB db; + + public RocksDB(org.rocksdb.RocksDB db) { + this.db = db; + } + + @Override + public void put(byte[] key, byte[] value) { + try { + db.put(key, value); + } catch (RocksDBException e) { + throw Throwables.propagate(e); + } + } + + @Override + public byte[] get(byte[] key) { + try { + return db.get(key); + } catch (RocksDBException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void delete(byte[] key) { + try { + db.delete(key); + } catch (RocksDBException e) { + throw Throwables.propagate(e); + } + } + + @Override + public DBIterator iterator() { + return new RocksDBIterator(db.newIterator()); + } + + @Override + public void close() throws IOException { + db.close(); + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java new file mode 100644 index 0000000000000..c14034a641892 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffledb; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Map; +import java.util.NoSuchElementException; + +import com.google.common.base.Throwables; +import org.rocksdb.RocksIterator; + +/** + * RocksDB implementation of `DBIterator`. + */ +public class RocksDBIterator implements DBIterator { + + private final RocksIterator it; + + private boolean checkedNext; + + private boolean closed; + + private Map.Entry next; + + public RocksDBIterator(RocksIterator it) { + this.it = it; + } + + @Override + public boolean hasNext() { + if (!checkedNext && !closed) { + next = loadNext(); + checkedNext = true; + } + if (!closed && next == null) { + try { + close(); + } catch (IOException ioe) { + throw Throwables.propagate(ioe); + } + } + return next != null; + } + + @Override + public Map.Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + checkedNext = false; + Map.Entry ret = next; + next = null; + return ret; + } + + @Override + public void close() throws IOException { + if (!closed) { + it.close(); + closed = true; + next = null; + } + } + + @Override + public void seek(byte[] key) { + it.seek(key); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private Map.Entry loadNext() { + if (it.isValid()) { + Map.Entry nextEntry = + new AbstractMap.SimpleEntry<>(it.key(), it.value()); + it.next(); + return nextEntry; + } + return null; + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java index 9563fa9ec354f..2a4afa736224d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java @@ -22,9 +22,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.network.shuffledb.DB; import org.apache.spark.network.shuffledb.DBBackend; import org.apache.spark.network.shuffledb.LevelDB; -import org.apache.spark.network.shuffledb.DB; +import org.apache.spark.network.shuffledb.RocksDB; import org.apache.spark.network.shuffledb.StoreVersion; public class DBProvider { @@ -34,11 +35,13 @@ public static DB initDB( StoreVersion version, ObjectMapper mapper) throws IOException { if (dbFile != null) { - // TODO: SPARK-38888, add rocksdb implementation. switch (dbBackend) { case LEVELDB: org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version, mapper); return levelDB != null ? new LevelDB(levelDB) : null; + case ROCKSDB: + org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile, version, mapper); + return rocksDB != null ? new RocksDB(rocksDB) : null; default: throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend); } @@ -49,11 +52,11 @@ public static DB initDB( @VisibleForTesting public static DB initDB(DBBackend dbBackend, File file) throws IOException { if (file != null) { - // TODO: SPARK-38888, add rocksdb implementation. switch (dbBackend) { case LEVELDB: return new LevelDB(LevelDBProvider.initLevelDB(file)); - default: - throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend); + case ROCKSDB: return new RocksDB(RocksDBProvider.initRocksDB(file)); + default: + throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend); } } return null; diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java new file mode 100644 index 0000000000000..7eacc9d872d2e --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.io.File; +import java.io.IOException; +import java.util.Objects; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import org.rocksdb.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.shuffledb.StoreVersion; + +/** + * RocksDB utility class available in the network package. + */ +public class RocksDBProvider { + + static { + org.rocksdb.RocksDB.loadLibrary(); + } + + private static final Logger logger = LoggerFactory.getLogger(RocksDBProvider.class); + + public static RocksDB initRockDB(File dbFile, StoreVersion version, ObjectMapper mapper) throws + IOException { + RocksDB tmpDb = null; + if (dbFile != null) { + BloomFilter fullFilter = + new BloomFilter(10.0D /* BloomFilter.DEFAULT_BITS_PER_KEY */, false); + BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig() + .setFilterPolicy(fullFilter) + .setEnableIndexCompression(false) + .setIndexBlockRestartInterval(8) + .setFormatVersion(5); + + Options dbOptions = new Options(); + RocksDBLogger rocksDBLogger = new RocksDBLogger(dbOptions); + + dbOptions.setCreateIfMissing(false); + dbOptions.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION); + dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION); + dbOptions.setTableFormatConfig(tableFormatConfig); + dbOptions.setLogger(rocksDBLogger); + + try { + tmpDb = RocksDB.open(dbOptions, dbFile.toString()); + } catch (RocksDBException e) { + if (e.getStatus().getCode() == Status.Code.NotFound) { + logger.info("Creating state database at " + dbFile); + dbOptions.setCreateIfMissing(true); + try { + tmpDb = RocksDB.open(dbOptions, dbFile.toString()); + } catch (RocksDBException dbExc) { + throw new IOException("Unable to create state store", dbExc); + } + } else { + // the RocksDB file seems to be corrupt somehow. Let's just blow it away and create a new + // one, so we can keep processing new apps + logger.error("error opening rocksdb file {}. Creating new file, will not be able to " + + "recover state for existing applications", dbFile, e); + if (dbFile.isDirectory()) { + for (File f : Objects.requireNonNull(dbFile.listFiles())) { + if (!f.delete()) { + logger.warn("error deleting {}", f.getPath()); + } + } + } + if (!dbFile.delete()) { + logger.warn("error deleting {}", dbFile.getPath()); + } + dbOptions.setCreateIfMissing(true); + try { + tmpDb = RocksDB.open(dbOptions, dbFile.toString()); + } catch (RocksDBException dbExc) { + throw new IOException("Unable to create state store", dbExc); + } + } + } + try { + // if there is a version mismatch, we throw an exception, which means the service is unusable + checkVersion(tmpDb, version, mapper); + } catch (RocksDBException e) { + throw new IOException(e.getMessage(), e); + } + } + return tmpDb; + } + + @VisibleForTesting + static RocksDB initRocksDB(File file) throws IOException { + BloomFilter fullFilter = + new BloomFilter(10.0D /* BloomFilter.DEFAULT_BITS_PER_KEY */, false); + BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig() + .setFilterPolicy(fullFilter) + .setEnableIndexCompression(false) + .setIndexBlockRestartInterval(8) + .setFormatVersion(5); + + Options dbOptions = new Options(); + dbOptions.setCreateIfMissing(false); + dbOptions.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION); + dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION); + dbOptions.setTableFormatConfig(tableFormatConfig); + try { + return RocksDB.open(dbOptions, file.toString()); + } catch (RocksDBException e) { + throw new IOException("Unable to open state store", e); + } + } + + private static class RocksDBLogger extends org.rocksdb.Logger { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBLogger.class); + + public RocksDBLogger(Options options) { + super(options); + } + + @Override + protected void log(InfoLogLevel infoLogLevel, String message) { + if (infoLogLevel == InfoLogLevel.INFO_LEVEL) { + LOG.info(message); + } + } + } + + /** + * Simple major.minor versioning scheme. Any incompatible changes should be across major + * versions. Minor version differences are allowed -- meaning we should be able to read + * dbs that are either earlier *or* later on the minor version. + */ + public static void checkVersion(RocksDB db, StoreVersion newversion, ObjectMapper mapper) throws + IOException, RocksDBException { + byte[] bytes = db.get(StoreVersion.KEY); + if (bytes == null) { + storeVersion(db, newversion, mapper); + } else { + StoreVersion version = mapper.readValue(bytes, StoreVersion.class); + if (version.major != newversion.major) { + throw new IOException("cannot read state DB with version " + version + ", incompatible " + + "with current version " + newversion); + } + storeVersion(db, newversion, mapper); + } + } + + public static void storeVersion(RocksDB db, StoreVersion version, ObjectMapper mapper) + throws IOException, RocksDBException { + db.put(StoreVersion.KEY, mapper.writeValueAsBytes(version)); + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3cf13dbe0931f..1097a2c33ca6d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -718,7 +718,7 @@ package object config { private[spark] val SHUFFLE_SERVICE_DB_BACKEND = ConfigBuilder(Constants.SHUFFLE_SERVICE_DB_BACKEND) .doc("Specifies a disk-based store used in shuffle service local db. " + - "Only LEVELDB is supported now.") + "LEVELDB or ROCKSDB.") .version("3.4.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala index f0cb008321ae4..692980b96f208 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala @@ -82,3 +82,9 @@ class YarnShuffleAlternateNameConfigWithLevelDBBackendSuite extends YarnShuffleAlternateNameConfigSuite { override protected def dbBackend: DBBackend = DBBackend.LEVELDB } + +@ExtendedYarnTest +class YarnShuffleAlternateNameConfigWithRocksDBBackendSuite + extends YarnShuffleAlternateNameConfigSuite { + override protected def dbBackend: DBBackend = DBBackend.ROCKSDB +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index fb9d3cc520cfa..ecacd1c9d94ff 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -97,6 +97,12 @@ class YarnShuffleIntegrationWithLevelDBBackendSuite override protected def dbBackend: DBBackend = DBBackend.LEVELDB } +@ExtendedYarnTest +class YarnShuffleIntegrationWithRocksDBBackendSuite + extends YarnShuffleIntegrationSuite { + override protected def dbBackend: DBBackend = DBBackend.ROCKSDB +} + /** * Integration test for the external shuffle service with auth on. */ @@ -121,6 +127,11 @@ class YarnShuffleAuthWithLevelDBBackendSuite extends YarnShuffleAuthSuite { override protected def dbBackend: DBBackend = DBBackend.LEVELDB } +@ExtendedYarnTest +class YarnShuffleAuthWithRocksDBBackendSuite extends YarnShuffleAuthSuite { + override protected def dbBackend: DBBackend = DBBackend.ROCKSDB +} + private object YarnExternalShuffleDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 9a24ab8adff6a..350e02cd9b66c 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -1087,3 +1087,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { class YarnShuffleServiceWithLevelDBBackendSuite extends YarnShuffleServiceSuite { override protected def shuffleDBBackend(): DBBackend = DBBackend.LEVELDB } + +class YarnShuffleServiceWithRocksDBBackendSuite extends YarnShuffleServiceSuite { + override protected def shuffleDBBackend(): DBBackend = DBBackend.ROCKSDB +} From e0a35c872fcafa9a65cf713e26638edf30ccc795 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 22 Aug 2022 15:51:14 +0800 Subject: [PATCH 02/22] test --- .../apache/spark/network/shuffle/RemoteBlockPushResolver.java | 3 ++- .../apache/spark/network/yarn/YarnShuffleServiceSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 103b2f0e3baa0..4b39512a8bb3a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -132,7 +132,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { final File recoveryFile; @VisibleForTesting - final DB db; + DB db; @SuppressWarnings("UnstableApiUsage") public RemoteBlockPushResolver(TransportConf conf, File recoveryFile) throws IOException { @@ -802,6 +802,7 @@ public void close() { if (db != null) { try { db.close(); + db = null; } catch (IOException e) { logger.error("Exception closing leveldb with registered app paths info and " + "shuffle partition info", e); diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 350e02cd9b66c..285a85d110d76 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -88,6 +88,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath) yarnConfig.set("spark.shuffle.push.server.mergedShuffleFileManagerImpl", "org.apache.spark.network.shuffle.RemoteBlockPushResolver") + yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name()) recoveryLocalDir = Utils.createTempDir() tempDir = Utils.createTempDir() @@ -162,7 +163,6 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { createMergeManager: (TransportConf, File) => MergedShuffleFileManager): YarnShuffleService = { val shuffleService = createYarnShuffleService(false) val dBBackend = shuffleDBBackend() - yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name()) val transportConf = new TransportConf("shuffle", new HadoopConfigProvider(yarnConfig)) val dbName = dBBackend.fileName(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME) val testShuffleMergeManager = createMergeManager( From 036d761cbaddd46cf5a4fe0087a115a465ae7438 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 22 Aug 2022 16:31:47 +0800 Subject: [PATCH 03/22] fix format --- .../apache/spark/network/util/RocksDBProvider.java | 11 ++++++----- .../network/shuffle/RemoteBlockPushResolver.java | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java index 7eacc9d872d2e..2a8238972ed6f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java @@ -73,9 +73,9 @@ public static RocksDB initRockDB(File dbFile, StoreVersion version, ObjectMapper throw new IOException("Unable to create state store", dbExc); } } else { - // the RocksDB file seems to be corrupt somehow. Let's just blow it away and create a new - // one, so we can keep processing new apps - logger.error("error opening rocksdb file {}. Creating new file, will not be able to " + + // the RocksDB file seems to be corrupt somehow. Let's just blow it away and create + // a new one, so we can keep processing new apps + logger.error("error opening rocksdb file {}. Creating new file, will not be able to " + "recover state for existing applications", dbFile, e); if (dbFile.isDirectory()) { for (File f : Objects.requireNonNull(dbFile.listFiles())) { @@ -96,7 +96,8 @@ public static RocksDB initRockDB(File dbFile, StoreVersion version, ObjectMapper } } try { - // if there is a version mismatch, we throw an exception, which means the service is unusable + // if there is a version mismatch, we throw an exception, which means the service + // is unusable checkVersion(tmpDb, version, mapper); } catch (RocksDBException e) { throw new IOException(e.getMessage(), e); @@ -130,7 +131,7 @@ static RocksDB initRocksDB(File file) throws IOException { private static class RocksDBLogger extends org.rocksdb.Logger { private static final Logger LOG = LoggerFactory.getLogger(RocksDBLogger.class); - public RocksDBLogger(Options options) { + RocksDBLogger(Options options) { super(options); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 4b39512a8bb3a..3337a704daa3a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -802,7 +802,7 @@ public void close() { if (db != null) { try { db.close(); - db = null; + // db = null; } catch (IOException e) { logger.error("Exception closing leveldb with registered app paths info and " + "shuffle partition info", e); From 5154462d536de1676039474f2c536c3ac52c7106 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 22 Aug 2022 16:51:29 +0800 Subject: [PATCH 04/22] set db to null --- .../apache/spark/network/shuffle/RemoteBlockPushResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 3337a704daa3a..4b39512a8bb3a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -802,7 +802,7 @@ public void close() { if (db != null) { try { db.close(); - // db = null; + db = null; } catch (IOException e) { logger.error("Exception closing leveldb with registered app paths info and " + "shuffle partition info", e); From 1aaa1ff320c01397493533033b1367c5bc89d83f Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 22 Aug 2022 19:17:46 +0800 Subject: [PATCH 05/22] add md --- docs/spark-standalone.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 8d1d05fbbbe8e..37cb290b0bfff 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -328,6 +328,17 @@ SPARK_WORKER_OPTS supports the following system properties: 3.0.0 + + spark.shuffle.service.db.enabled + LEVELDB + + When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based + store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. + This only affects standalone mode (yarn always has this behavior enabled). + The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. + + 3.4.0 + spark.storage.cleanupFilesAfterExecutorExit true From 580414007c39de1cd25b8f57a862f75a181c86f2 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 23 Aug 2022 10:49:09 +0800 Subject: [PATCH 06/22] fix doc --- docs/spark-standalone.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 37cb290b0bfff..7c23889324bb8 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -329,7 +329,7 @@ SPARK_WORKER_OPTS supports the following system properties: 3.0.0 - spark.shuffle.service.db.enabled + spark.shuffle.service.db.backend LEVELDB When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based From 2191659636427ea434876b303024af36acfb9b45 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 23 Aug 2022 11:30:24 +0800 Subject: [PATCH 07/22] ignore test --- .../apache/spark/network/shuffle/RemoteBlockPushResolver.java | 3 +-- .../apache/spark/network/yarn/YarnShuffleServiceSuite.scala | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 4b39512a8bb3a..103b2f0e3baa0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -132,7 +132,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { final File recoveryFile; @VisibleForTesting - DB db; + final DB db; @SuppressWarnings("UnstableApiUsage") public RemoteBlockPushResolver(TransportConf conf, File recoveryFile) throws IOException { @@ -802,7 +802,6 @@ public void close() { if (db != null) { try { db.close(); - db = null; } catch (IOException e) { logger.error("Exception closing leveldb with registered app paths info and " + "shuffle partition info", e); diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 285a85d110d76..53f10f22a8ab0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -687,7 +687,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { s1.stop() } - test("Finalized merged shuffle are written into DB and cleaned up after application stopped") { + // TODO: should enable this test after SPARK-40186 is resolved. + ignore("Finalized merged shuffle are written into DB and cleaned up after application stopped") { s1 = createYarnShuffleService() val app1Id = ApplicationId.newInstance(0, 1) From fed1e60b6a754a442d96e0b35c99468cdf80fe71 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 23 Aug 2022 14:48:12 +0800 Subject: [PATCH 08/22] add more test --- .../deploy/ExternalShuffleServiceDbSuite.scala | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala index bc1d43d67330c..c16e705814541 100644 --- a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.{ExternalBlockHandler, ExternalShuffleBlockResolver} import org.apache.spark.network.shuffle.TestShuffleDataContext +import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.Utils @@ -34,8 +35,8 @@ import org.apache.spark.util.Utils * with #spark.shuffle.service.db.enabled = true or false * Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false */ -@ExtendedLevelDBTest -class ExternalShuffleServiceDbSuite extends SparkFunSuite { +abstract class ExternalShuffleServiceDbSuite extends SparkFunSuite { + val sortBlock0 = "Hello!" val sortBlock1 = "World!" val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" @@ -48,6 +49,8 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite { var blockHandler: ExternalBlockHandler = _ var blockResolver: ExternalShuffleBlockResolver = _ + protected def shuffleDBBackend(): DBBackend + override def beforeAll(): Unit = { super.beforeAll() sparkConf = new SparkConf() @@ -78,6 +81,7 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite { def registerExecutor(): Unit = { try { sparkConf.set("spark.shuffle.service.db.enabled", "true") + sparkConf.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name()) externalShuffleService = new ExternalShuffleService(shuffleServiceConf, securityManager) // external Shuffle Service start @@ -99,6 +103,7 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite { "shuffle service restart") { try { sparkConf.set("spark.shuffle.service.db.enabled", "true") + sparkConf.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name()) externalShuffleService = new ExternalShuffleService(shuffleServiceConf, securityManager) // externalShuffleService restart externalShuffleService.start() @@ -143,3 +148,12 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite { } } } + +@ExtendedLevelDBTest +class ExternalShuffleServiceLevelDBSuite extends ExternalShuffleServiceDbSuite { + override protected def shuffleDBBackend(): DBBackend = DBBackend.LEVELDB +} + +class ExternalShuffleServiceRocksDBSuite extends ExternalShuffleServiceDbSuite { + override protected def shuffleDBBackend(): DBBackend = DBBackend.ROCKSDB +} From ce561e54c539d9b6cf9ef0bef0b2fb554047313f Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 23 Aug 2022 15:43:56 +0800 Subject: [PATCH 09/22] more test --- .../spark/deploy/worker/WorkerSuite.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index 632abd9f566fb..a07d4f76905a7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -40,7 +40,9 @@ import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService} import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup} import org.apache.spark.deploy.master.DriverState import org.apache.spark.internal.config +import org.apache.spark.internal.config.SHUFFLE_SERVICE_DB_BACKEND import org.apache.spark.internal.config.Worker._ +import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.resource.{ResourceAllocation, ResourceInformation} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, WORKER_GPU_ID} @@ -339,8 +341,14 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { } test("WorkDirCleanup cleans app dirs and shuffle metadata when " + - "spark.shuffle.service.db.enabled=true") { - testWorkDirCleanupAndRemoveMetadataWithConfig(true) + "spark.shuffle.service.db.enabled=true, spark.shuffle.service.db.backend=RocksDB") { + testWorkDirCleanupAndRemoveMetadataWithConfig(true, DBBackend.ROCKSDB) + } + + test("WorkDirCleanup cleans app dirs and shuffle metadata when " + + "spark.shuffle.service.db.enabled=true, spark.shuffle.service.db.backend=LevelDB") { + assume(!Utils.isMacOnAppleSilicon) + testWorkDirCleanupAndRemoveMetadataWithConfig(true, DBBackend.LEVELDB) } test("WorkDirCleanup cleans only app dirs when" + @@ -348,8 +356,13 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { testWorkDirCleanupAndRemoveMetadataWithConfig(false) } - private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean): Unit = { + private def testWorkDirCleanupAndRemoveMetadataWithConfig( + dbCleanupEnabled: Boolean, shuffleDBBackend: DBBackend = null): Unit = { val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString) + if (dbCleanupEnabled) { + assert(shuffleDBBackend != null) + conf.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend.name()) + } conf.set("spark.worker.cleanup.appDataTtl", "60") conf.set("spark.shuffle.service.enabled", "true") From db90b273fb941e9f32010562f91a644777605825 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 23 Aug 2022 16:17:13 +0800 Subject: [PATCH 10/22] remove empty lin --- .../org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala index c16e705814541..921175bd41038 100644 --- a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala @@ -36,7 +36,6 @@ import org.apache.spark.util.Utils * Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false */ abstract class ExternalShuffleServiceDbSuite extends SparkFunSuite { - val sortBlock0 = "Hello!" val sortBlock1 = "World!" val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" From 15b23255f955ef746fdd292b3fba67dff6c4cd4f Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 24 Aug 2022 15:20:23 +0800 Subject: [PATCH 11/22] docs/configuration.md --- docs/configuration.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 55e595ad30158..6abf0b75f4f9b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1007,6 +1007,28 @@ Apart from these, the following properties are also available, and may be useful 3.3.0 + + spark.shuffle.service.db.enabled + true + + Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will + automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior + enabled). You should also enable spark.worker.cleanup.enabled, to ensure that the state + eventually gets cleaned up. This config may be removed in the future. + + 3.0.0 + + + spark.shuffle.service.db.backend + LEVELDB + + When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based + store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. + This only affects standalone mode (yarn always has this behavior enabled). + The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. + + 3.4.0 + spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE From e124c845eee9c534dff913baa25c3b5a65c83113 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 25 Aug 2022 10:50:35 +0800 Subject: [PATCH 12/22] doc --- docs/configuration.md | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 6abf0b75f4f9b..9b6e24c9440d2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1007,28 +1007,6 @@ Apart from these, the following properties are also available, and may be useful 3.3.0 - - spark.shuffle.service.db.enabled - true - - Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will - automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior - enabled). You should also enable spark.worker.cleanup.enabled, to ensure that the state - eventually gets cleaned up. This config may be removed in the future. - - 3.0.0 - - - spark.shuffle.service.db.backend - LEVELDB - - When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based - store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. - This only affects standalone mode (yarn always has this behavior enabled). - The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. - - 3.4.0 - spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE @@ -1124,6 +1102,27 @@ Apart from these, the following properties are also available, and may be useful 3.0.0 + + spark.shuffle.service.db.enabled + true + + To determine whether external shuffle uses db to store shuffle state. + In standalone and Yarn modes, for more detail, see + this description. + + 3.0.0 + + + spark.shuffle.service.db.backend + LEVELDB + + To specify a disk-based store used in shuffle service local db when + spark.shuffle.service.db.enabled is true, LEVELDB or ROCKSDB. + In standalone and Yarn modes, for more detail, see + this description. + + 3.4.0 + ### Spark UI From 4d5f44ce0b924e8f52f787843ad4759a1731dc4a Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 25 Aug 2022 10:55:24 +0800 Subject: [PATCH 13/22] reorder --- docs/spark-standalone.md | 44 ++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 7c23889324bb8..f0fddc3528669 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -317,28 +317,6 @@ SPARK_WORKER_OPTS supports the following system properties: 1.0.0 - - spark.shuffle.service.db.enabled - true - - Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will - automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior - enabled). You should also enable spark.worker.cleanup.enabled, to ensure that the state - eventually gets cleaned up. This config may be removed in the future. - - 3.0.0 - - - spark.shuffle.service.db.backend - LEVELDB - - When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based - store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. - This only affects standalone mode (yarn always has this behavior enabled). - The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. - - 3.4.0 - spark.storage.cleanupFilesAfterExecutorExit true @@ -362,6 +340,28 @@ SPARK_WORKER_OPTS supports the following system properties: 2.0.2 + + spark.shuffle.service.db.enabled + true + + Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will + automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior + enabled). You should also enable spark.worker.cleanup.enabled, to ensure that the state + eventually gets cleaned up. This config may be removed in the future. + + 3.0.0 + + + spark.shuffle.service.db.backend + LEVELDB + + When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based + store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. + This only affects standalone mode (yarn always has this behavior enabled). + The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. + + 3.4.0 + # Resource Allocation and Configuration Overview From 8907d7345e7bfe5b078559e0fd609fd131c6d822 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 25 Aug 2022 10:57:04 +0800 Subject: [PATCH 14/22] Revert "reorder" This reverts commit 4d5f44ce0b924e8f52f787843ad4759a1731dc4a. --- docs/spark-standalone.md | 44 ++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index f0fddc3528669..7c23889324bb8 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -317,6 +317,28 @@ SPARK_WORKER_OPTS supports the following system properties: 1.0.0 + + spark.shuffle.service.db.enabled + true + + Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will + automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior + enabled). You should also enable spark.worker.cleanup.enabled, to ensure that the state + eventually gets cleaned up. This config may be removed in the future. + + 3.0.0 + + + spark.shuffle.service.db.backend + LEVELDB + + When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based + store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. + This only affects standalone mode (yarn always has this behavior enabled). + The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. + + 3.4.0 + spark.storage.cleanupFilesAfterExecutorExit true @@ -340,28 +362,6 @@ SPARK_WORKER_OPTS supports the following system properties: 2.0.2 - - spark.shuffle.service.db.enabled - true - - Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will - automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior - enabled). You should also enable spark.worker.cleanup.enabled, to ensure that the state - eventually gets cleaned up. This config may be removed in the future. - - 3.0.0 - - - spark.shuffle.service.db.backend - LEVELDB - - When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based - store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. - This only affects standalone mode (yarn always has this behavior enabled). - The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. - - 3.4.0 - # Resource Allocation and Configuration Overview From 097d39e9540bd4588f2ec6932b5a9148bdec106c Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 26 Aug 2022 17:37:10 +0800 Subject: [PATCH 15/22] fix comments --- .../network/shuffledb/RocksDBIterator.java | 23 ++++++++----------- docs/configuration.md | 4 ++-- docs/spark-standalone.md | 1 - 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java index c14034a641892..78562f91a4b75 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java @@ -39,7 +39,7 @@ public class RocksDBIterator implements DBIterator { private Map.Entry next; public RocksDBIterator(RocksIterator it) { - this.it = it; + this.it = it; } @Override @@ -49,11 +49,11 @@ public boolean hasNext() { checkedNext = true; } if (!closed && next == null) { - try { - close(); - } catch (IOException ioe) { - throw Throwables.propagate(ioe); - } + try { + close(); + } catch (IOException ioe) { + throw Throwables.propagate(ioe); + } } return next != null; } @@ -72,9 +72,9 @@ public Map.Entry next() { @Override public void close() throws IOException { if (!closed) { - it.close(); - closed = true; - next = null; + it.close(); + closed = true; + next = null; } } @@ -83,11 +83,6 @@ public void seek(byte[] key) { it.seek(key); } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - private Map.Entry loadNext() { if (it.isValid()) { Map.Entry nextEntry = diff --git a/docs/configuration.md b/docs/configuration.md index 9b6e24c9440d2..701d014c27eee 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1106,8 +1106,8 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.service.db.enabled true - To determine whether external shuffle uses db to store shuffle state. - In standalone and Yarn modes, for more detail, see + To determine whether external shuffle service uses db to store shuffle state. + In standalone mode, for more detail, see this description. 3.0.0 diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 7c23889324bb8..8c306dd3ae8f7 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -334,7 +334,6 @@ SPARK_WORKER_OPTS supports the following system properties: When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. - This only affects standalone mode (yarn always has this behavior enabled). The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. 3.4.0 From 0920f0e000e638f15d283e750b639d41b3501fab Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 26 Aug 2022 18:10:31 +0800 Subject: [PATCH 16/22] link yarn --- docs/configuration.md | 4 +++- docs/running-on-yarn.md | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 701d014c27eee..f2028b42a7ce7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1118,8 +1118,10 @@ Apart from these, the following properties are also available, and may be useful To specify a disk-based store used in shuffle service local db when spark.shuffle.service.db.enabled is true, LEVELDB or ROCKSDB. - In standalone and Yarn modes, for more detail, see + In standalone mode, for more detail, see this description. + In Yarn mode, for more detail, see + this description. 3.4.0 diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 48b0c7dc315c4..34a75a4ea3e97 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -840,6 +840,16 @@ The following extra configuration options are available when the shuffle service would be a valid Java package or class name and not include spaces. + + spark.shuffle.service.db.backend + LEVELDB + + To specify the kind of disk-base store used in shuffle state store, supports `LEVELDB` and `ROCKSDB` now + and `LEVELDB` as default value. + The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. + + 3.4.0 + Please note that the instructions above assume that the default shuffle service name, From c76bc20adb0697ba901cad041efb94fb3a956fb2 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 26 Aug 2022 18:12:54 +0800 Subject: [PATCH 17/22] add service --- docs/running-on-yarn.md | 2 +- docs/spark-standalone.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 34a75a4ea3e97..1388868c126ca 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -844,7 +844,7 @@ The following extra configuration options are available when the shuffle service spark.shuffle.service.db.backend LEVELDB - To specify the kind of disk-base store used in shuffle state store, supports `LEVELDB` and `ROCKSDB` now + To specify the kind of disk-base store used in shuffle service state store, supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 8c306dd3ae8f7..559e3bca6c934 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -333,7 +333,7 @@ SPARK_WORKER_OPTS supports the following system properties: LEVELDB When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based - store used in shuffle state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. + store used in shuffle service state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value. The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. 3.4.0 From 7e73ad52a4dd85d6716849b033c71be51f67cd7a Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 26 Aug 2022 23:44:36 +0800 Subject: [PATCH 18/22] fix setCreateIfMissing --- .../java/org/apache/spark/network/util/RocksDBProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java index 2a8238972ed6f..f1f702c44245a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java @@ -117,7 +117,7 @@ static RocksDB initRocksDB(File file) throws IOException { .setFormatVersion(5); Options dbOptions = new Options(); - dbOptions.setCreateIfMissing(false); + dbOptions.setCreateIfMissing(true); dbOptions.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION); dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION); dbOptions.setTableFormatConfig(tableFormatConfig); From 8acf4426bba8fbdda250c084bd89968ca8d6b1dd Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 29 Aug 2022 10:45:17 +0800 Subject: [PATCH 19/22] to one --- .../spark/network/util/TransportConf.java | 9 ++++ .../shuffle/RemoteBlockPushResolver.java | 43 +++++++++++++++++-- .../network/shuffle/ShuffleTestAccessor.scala | 4 ++ .../yarn/YarnShuffleServiceSuite.scala | 27 ++++++++++++ 4 files changed, 80 insertions(+), 3 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 57bd494db4feb..f2848c2d4c9a3 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -386,4 +386,13 @@ public long mergedIndexCacheSize() { public int ioExceptionsThresholdDuringMerge() { return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4); } + + /** + * The RemoteBlockPushResolver#mergedShuffleCleanermergedShuffleCleaner + * shutdown timeout, in seconds. + */ + public long mergedShuffleCleanerShutdownTimeout() { + return JavaUtils.timeStringAsSec( + conf.get("spark.shuffle.push.server.mergedShuffleCleaner.shutdown.timeout", "60s")); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 103b2f0e3baa0..4d2b3381ff298 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -36,8 +36,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -120,8 +121,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @VisibleForTesting final ConcurrentMap appsShuffleInfo; - private final Executor mergedShuffleCleaner; + private final ExecutorService mergedShuffleCleaner; + private final TransportConf conf; + + private final long cleanerShutdownTimeout; + private final int minChunkSize; private final int ioExceptionsThresholdDuringMerge; @@ -141,6 +146,7 @@ public RemoteBlockPushResolver(TransportConf conf, File recoveryFile) throws IOE this.mergedShuffleCleaner = Executors.newSingleThreadExecutor( // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); + this.cleanerShutdownTimeout = conf.mergedShuffleCleanerShutdownTimeout(); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge(); CacheLoader indexCacheLoader = @@ -795,10 +801,33 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } /** - * Close the DB during shutdown + * Shutdown mergedShuffleCleaner and close the DB during shutdown */ @Override public void close() { + if (!mergedShuffleCleaner.isShutdown()) { + // SPARK-40186:Use two phases shutdown refer to + // https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html + // Use two phases shutdown can prevent new tasks and wait for executing tasks to + // complete gracefully, and once timeout is reached, we want to interrupt running tasks, + // so that they fail. This is to prevent updates to shuffle state db after it is closed. + try { + mergedShuffleCleaner.shutdown(); + // Wait a while for existing tasks to terminate + if (!mergedShuffleCleaner.awaitTermination(cleanerShutdownTimeout, TimeUnit.SECONDS)) { + List unfinishedTasks = mergedShuffleCleaner.shutdownNow(); + logger.warn("There are still {} tasks not completed in mergedShuffleCleaner " + + "after {} seconds.", unfinishedTasks.size(), cleanerShutdownTimeout); + // Wait a while for tasks to respond to being cancelled + if (!mergedShuffleCleaner.awaitTermination(cleanerShutdownTimeout, TimeUnit.SECONDS)) { + logger.warn("mergedShuffleCleaner did not terminate"); + } + } + } catch (InterruptedException ignored) { + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } if (db != null) { try { db.close(); @@ -1030,6 +1059,14 @@ void submitCleanupTask(Runnable task) { mergedShuffleCleaner.execute(task); } + /** + * Check `mergedShuffleCleaner` is already shutdown. + */ + @VisibleForTesting + boolean isCleanerShutdown() { + return mergedShuffleCleaner.isShutdown(); + } + /** * Callback for push stream that handles blocks which are not already merged. */ diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala index a52fcae4f4c16..0bc6f0bb82784 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala @@ -76,6 +76,10 @@ object ShuffleTestAccessor { mergeManager.db } + def isMergedShuffleCleanerShutdown(mergeManager: RemoteBlockPushResolver): Boolean = { + mergeManager.isCleanerShutdown + } + def createMergeManagerWithSynchronizedCleanup( transportConf: TransportConf, file: File): MergedShuffleFileManager = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 53f10f22a8ab0..14550bb1ef4af 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.attribute.PosixFilePermission._ import java.util.EnumSet +import java.util.concurrent.RejectedExecutionException import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -771,6 +772,32 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { s1.stop() } + test("SPARK-40186: shuffleMergeManager should have been shutdown before db closed") { + val maxId = 100 + s1 = createYarnShuffleService() + val resolver = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver] + (0 until maxId).foreach { id => + val appId = ApplicationId.newInstance(0, id) + val appInfo = makeAppInfo("user", appId) + s1.initializeApplication(appInfo) + val mergedShuffleInfo = + new ExecutorShuffleInfo( + Array(new File(tempDir, "foo/foo").getAbsolutePath, + new File(tempDir, "bar/bar").getAbsolutePath), 3, + SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1) + resolver.registerExecutor(appId.toString, mergedShuffleInfo) + } + + (0 until maxId).foreach { id => + val appId = ApplicationId.newInstance(0, id) + resolver.applicationRemoved(appId.toString, true) + } + + s1.stop() + + assert(ShuffleTestAccessor.isMergedShuffleCleanerShutdown(resolver)) + } + test("Dangling finalized merged partition info in DB will be removed during restart") { s1 = createYarnShuffleServiceWithCustomMergeManager( ShuffleTestAccessor.createMergeManagerWithNoOpAppShuffleDBCleanup) From e06853097d307c74f784a58563dcb3f44c157e64 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 29 Aug 2022 18:04:53 +0800 Subject: [PATCH 20/22] Revert "to one" This reverts commit 8acf4426bba8fbdda250c084bd89968ca8d6b1dd. --- .../spark/network/util/TransportConf.java | 9 ---- .../shuffle/RemoteBlockPushResolver.java | 43 ++----------------- .../network/shuffle/ShuffleTestAccessor.scala | 4 -- .../yarn/YarnShuffleServiceSuite.scala | 27 ------------ 4 files changed, 3 insertions(+), 80 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index f2848c2d4c9a3..57bd494db4feb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -386,13 +386,4 @@ public long mergedIndexCacheSize() { public int ioExceptionsThresholdDuringMerge() { return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4); } - - /** - * The RemoteBlockPushResolver#mergedShuffleCleanermergedShuffleCleaner - * shutdown timeout, in seconds. - */ - public long mergedShuffleCleanerShutdownTimeout() { - return JavaUtils.timeStringAsSec( - conf.get("spark.shuffle.push.server.mergedShuffleCleaner.shutdown.timeout", "60s")); - } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 4d2b3381ff298..103b2f0e3baa0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -36,9 +36,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -121,12 +120,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @VisibleForTesting final ConcurrentMap appsShuffleInfo; - private final ExecutorService mergedShuffleCleaner; - + private final Executor mergedShuffleCleaner; private final TransportConf conf; - - private final long cleanerShutdownTimeout; - private final int minChunkSize; private final int ioExceptionsThresholdDuringMerge; @@ -146,7 +141,6 @@ public RemoteBlockPushResolver(TransportConf conf, File recoveryFile) throws IOE this.mergedShuffleCleaner = Executors.newSingleThreadExecutor( // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); - this.cleanerShutdownTimeout = conf.mergedShuffleCleanerShutdownTimeout(); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge(); CacheLoader indexCacheLoader = @@ -801,33 +795,10 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } /** - * Shutdown mergedShuffleCleaner and close the DB during shutdown + * Close the DB during shutdown */ @Override public void close() { - if (!mergedShuffleCleaner.isShutdown()) { - // SPARK-40186:Use two phases shutdown refer to - // https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html - // Use two phases shutdown can prevent new tasks and wait for executing tasks to - // complete gracefully, and once timeout is reached, we want to interrupt running tasks, - // so that they fail. This is to prevent updates to shuffle state db after it is closed. - try { - mergedShuffleCleaner.shutdown(); - // Wait a while for existing tasks to terminate - if (!mergedShuffleCleaner.awaitTermination(cleanerShutdownTimeout, TimeUnit.SECONDS)) { - List unfinishedTasks = mergedShuffleCleaner.shutdownNow(); - logger.warn("There are still {} tasks not completed in mergedShuffleCleaner " + - "after {} seconds.", unfinishedTasks.size(), cleanerShutdownTimeout); - // Wait a while for tasks to respond to being cancelled - if (!mergedShuffleCleaner.awaitTermination(cleanerShutdownTimeout, TimeUnit.SECONDS)) { - logger.warn("mergedShuffleCleaner did not terminate"); - } - } - } catch (InterruptedException ignored) { - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } if (db != null) { try { db.close(); @@ -1059,14 +1030,6 @@ void submitCleanupTask(Runnable task) { mergedShuffleCleaner.execute(task); } - /** - * Check `mergedShuffleCleaner` is already shutdown. - */ - @VisibleForTesting - boolean isCleanerShutdown() { - return mergedShuffleCleaner.isShutdown(); - } - /** * Callback for push stream that handles blocks which are not already merged. */ diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala index 0bc6f0bb82784..a52fcae4f4c16 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala @@ -76,10 +76,6 @@ object ShuffleTestAccessor { mergeManager.db } - def isMergedShuffleCleanerShutdown(mergeManager: RemoteBlockPushResolver): Boolean = { - mergeManager.isCleanerShutdown - } - def createMergeManagerWithSynchronizedCleanup( transportConf: TransportConf, file: File): MergedShuffleFileManager = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 14550bb1ef4af..53f10f22a8ab0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.attribute.PosixFilePermission._ import java.util.EnumSet -import java.util.concurrent.RejectedExecutionException import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -772,32 +771,6 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { s1.stop() } - test("SPARK-40186: shuffleMergeManager should have been shutdown before db closed") { - val maxId = 100 - s1 = createYarnShuffleService() - val resolver = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver] - (0 until maxId).foreach { id => - val appId = ApplicationId.newInstance(0, id) - val appInfo = makeAppInfo("user", appId) - s1.initializeApplication(appInfo) - val mergedShuffleInfo = - new ExecutorShuffleInfo( - Array(new File(tempDir, "foo/foo").getAbsolutePath, - new File(tempDir, "bar/bar").getAbsolutePath), 3, - SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1) - resolver.registerExecutor(appId.toString, mergedShuffleInfo) - } - - (0 until maxId).foreach { id => - val appId = ApplicationId.newInstance(0, id) - resolver.applicationRemoved(appId.toString, true) - } - - s1.stop() - - assert(ShuffleTestAccessor.isMergedShuffleCleanerShutdown(resolver)) - } - test("Dangling finalized merged partition info in DB will be removed during restart") { s1 = createYarnShuffleServiceWithCustomMergeManager( ShuffleTestAccessor.createMergeManagerWithNoOpAppShuffleDBCleanup) From cb5148d6f267821f6f4c2d9f93773a83191409ac Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 31 Aug 2022 16:02:29 +0800 Subject: [PATCH 21/22] revert configuration.md --- docs/configuration.md | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index f2028b42a7ce7..55e595ad30158 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1102,29 +1102,6 @@ Apart from these, the following properties are also available, and may be useful 3.0.0 - - spark.shuffle.service.db.enabled - true - - To determine whether external shuffle service uses db to store shuffle state. - In standalone mode, for more detail, see - this description. - - 3.0.0 - - - spark.shuffle.service.db.backend - LEVELDB - - To specify a disk-based store used in shuffle service local db when - spark.shuffle.service.db.enabled is true, LEVELDB or ROCKSDB. - In standalone mode, for more detail, see - this description. - In Yarn mode, for more detail, see - this description. - - 3.4.0 - ### Spark UI From 52c1c7e11b98784ab695afe64d5bb5e519a7fcd5 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 8 Sep 2022 10:59:44 +0800 Subject: [PATCH 22/22] re-enable test in YarnShuffleServiceSuite --- .../apache/spark/network/yarn/YarnShuffleServiceSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 1884f3fcf660d..16fa42056921e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -687,8 +687,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { s1.stop() } - // TODO: should enable this test after SPARK-40186 is resolved. - ignore("Finalized merged shuffle are written into DB and cleaned up after application stopped") { + test("Finalized merged shuffle are written into DB and cleaned up after application stopped") { s1 = createYarnShuffleService() val app1Id = ApplicationId.newInstance(0, 1)