diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DB.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DB.java new file mode 100644 index 0000000000000..3b2060cb74a47 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DB.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffledb; + +import java.io.Closeable; + +/** + * The local KV storage used to persist the shuffle state, + * the implementations may include LevelDB, RocksDB, etc. + */ +public interface DB extends Closeable { + /** + * Set the DB entry for "key" to "value". + */ + void put(byte[] key, byte[] value); + + /** + * Get which returns a new byte array storing the value associated + * with the specified input key if any. + */ + byte[] get(byte[] key); + + /** + * Delete the DB entry (if any) for "key". + */ + void delete(byte[] key); + + /** + * Return an iterator over the contents of the DB. + */ + DBIterator iterator(); +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java new file mode 100644 index 0000000000000..f8db8a16f16df --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffledb; + +import java.util.Locale; + +/** + * The enum `DBBackend` use to specify a disk-based store used in shuffle service local db. + * Only LEVELDB is supported now. + */ +public enum DBBackend { + LEVELDB(".ldb"); + + private final String fileSuffix; + + DBBackend(String fileSuffix) { + this.fileSuffix = fileSuffix; + } + + public String fileName(String prefix) { + return prefix + fileSuffix; + } + + public static DBBackend byName(String value) { + return DBBackend.valueOf(value.toUpperCase(Locale.ROOT)); + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBIterator.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBIterator.java new file mode 100644 index 0000000000000..4607faa070a9d --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBIterator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffledb; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.Map; + +public interface DBIterator extends Iterator>, Closeable { + + /** + * Position at the first entry in the source whose `key` is at target. + */ + void seek(byte[] key); + + default void remove() { + throw new UnsupportedOperationException(); + } + +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDB.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDB.java new file mode 100644 index 0000000000000..62e6450a9c7c9 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDB.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffledb; + +import java.io.IOException; + +public class LevelDB implements DB { + private final org.iq80.leveldb.DB db; + + public LevelDB(org.iq80.leveldb.DB db) { + this.db = db; + } + + @Override + public void put(byte[] key, byte[] value) { + db.put(key, value); + } + + @Override + public byte[] get(byte[] key) { + return db.get(key); + } + + @Override + public void delete(byte[] key) { + db.delete(key); + } + + @Override + public void close() throws IOException { + db.close(); + } + + @Override + public DBIterator iterator() { + return new LevelDBIterator(db.iterator()); + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java new file mode 100644 index 0000000000000..7f80928e461fc --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffledb; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.util.Map; +import java.util.NoSuchElementException; + +public class LevelDBIterator implements DBIterator { + + private final org.iq80.leveldb.DBIterator it; + + private boolean checkedNext; + + private boolean closed; + + private Map.Entry next; + + public LevelDBIterator(org.iq80.leveldb.DBIterator it) { + this.it = it; + } + + @Override + public boolean hasNext() { + if (!checkedNext && !closed) { + next = loadNext(); + checkedNext = true; + } + if (!closed && next == null) { + try { + close(); + } catch (IOException ioe) { + throw Throwables.propagate(ioe); + } + } + return next != null; + } + + @Override + public Map.Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + checkedNext = false; + Map.Entry ret = next; + next = null; + return ret; + } + + @Override + public void close() throws IOException { + if (!closed) { + it.close(); + closed = true; + next = null; + } + } + + @Override + public void seek(byte[] key) { + it.seek(key); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private Map.Entry loadNext() { + boolean hasNext = it.hasNext(); + if (!hasNext) { + return null; + } + return it.next(); + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/StoreVersion.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/StoreVersion.java new file mode 100644 index 0000000000000..c138163d21e18 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/StoreVersion.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffledb; + +import java.nio.charset.StandardCharsets; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Used to identify the version of data stored in local shuffle state DB. + */ +public class StoreVersion { + + public static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8); + + public final int major; + public final int minor; + + @JsonCreator + public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) { + this.major = major; + this.minor = minor; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + StoreVersion that = (StoreVersion) o; + + return major == that.major && minor == that.minor; + } + + @Override + public int hashCode() { + int result = major; + result = 31 * result + minor; + return result; + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java new file mode 100644 index 0000000000000..9563fa9ec354f --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.util; + +import java.io.File; +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; + +import org.apache.spark.network.shuffledb.DBBackend; +import org.apache.spark.network.shuffledb.LevelDB; +import org.apache.spark.network.shuffledb.DB; +import org.apache.spark.network.shuffledb.StoreVersion; + +public class DBProvider { + public static DB initDB( + DBBackend dbBackend, + File dbFile, + StoreVersion version, + ObjectMapper mapper) throws IOException { + if (dbFile != null) { + // TODO: SPARK-38888, add rocksdb implementation. + switch (dbBackend) { + case LEVELDB: + org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version, mapper); + return levelDB != null ? new LevelDB(levelDB) : null; + default: + throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend); + } + } + return null; + } + + @VisibleForTesting + public static DB initDB(DBBackend dbBackend, File file) throws IOException { + if (file != null) { + // TODO: SPARK-38888, add rocksdb implementation. + switch (dbBackend) { + case LEVELDB: return new LevelDB(LevelDBProvider.initLevelDB(file)); + default: + throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend); + } + } + return null; + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java index f96d068cf3d59..b27e3beb77ef9 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java @@ -19,11 +19,9 @@ import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; @@ -31,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.spark.network.shuffledb.StoreVersion; + /** * LevelDB utility class available in the network package. */ @@ -85,6 +85,14 @@ public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper map return tmpDb; } + @VisibleForTesting + static DB initLevelDB(File file) throws IOException { + Options options = new Options(); + options.createIfMissing(true); + JniDBFactory factory = new JniDBFactory(); + return factory.open(file, options); + } + private static class LevelDBLogger implements org.iq80.leveldb.Logger { private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class); @@ -118,35 +126,4 @@ public static void storeVersion(DB db, StoreVersion version, ObjectMapper mapper throws IOException { db.put(StoreVersion.KEY, mapper.writeValueAsBytes(version)); } - - public static class StoreVersion { - - static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8); - - public final int major; - public final int minor; - - @JsonCreator - public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) { - this.major = major; - this.minor = minor; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - StoreVersion that = (StoreVersion) o; - - return major == that.major && minor == that.minor; - } - - @Override - public int hashCode() { - int result = major; - result = 31 * result + minor; - return result; - } - } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/Constants.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/Constants.java index 01aca7efb12b1..53d1ce575d9bc 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/Constants.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/Constants.java @@ -21,4 +21,11 @@ public class Constants { public static final String SHUFFLE_SERVICE_FETCH_RDD_ENABLED = "spark.shuffle.service.fetch.rdd.enabled"; + + /** + * The Spark config defined by the core module cannot be obtained in the current module, + * hard coding is performed here to define `SHUFFLE_SERVICE_DB_BACKEND`. + */ + public static final String SHUFFLE_SERVICE_DB_BACKEND = + "spark.shuffle.service.db.backend"; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 3d18d20518410..ff49eb6244c8a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -38,8 +38,6 @@ import com.google.common.cache.LoadingCache; import com.google.common.cache.Weigher; import com.google.common.collect.Maps; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.DBIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +46,11 @@ import org.apache.spark.network.shuffle.checksum.Cause; import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; -import org.apache.spark.network.util.LevelDBProvider; -import org.apache.spark.network.util.LevelDBProvider.StoreVersion; +import org.apache.spark.network.shuffledb.DB; +import org.apache.spark.network.shuffledb.DBBackend; +import org.apache.spark.network.shuffledb.DBIterator; +import org.apache.spark.network.shuffledb.StoreVersion; +import org.apache.spark.network.util.DBProvider; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.NettyUtils; import org.apache.spark.network.util.TransportConf; @@ -124,7 +125,15 @@ public ShuffleIndexInformation load(String filePath) throws IOException { .weigher((Weigher) (filePath, indexInfo) -> indexInfo.getRetainedMemorySize()) .build(indexCacheLoader); - db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper); + DBBackend dbBackend = null; + if (registeredExecutorFile != null) { + String dbBackendName = + conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name()); + dbBackend = DBBackend.byName(dbBackendName); + logger.info("Configured {} as {} and actually used value {}", + Constants.SHUFFLE_SERVICE_DB_BACKEND, dbBackendName, dbBackend); + } + db = DBProvider.initDB(dbBackend, this.registeredExecutorFile, CURRENT_VERSION, mapper); if (db != null) { executors = reloadRegisteredExecutors(db); } else { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 9483e48ca446c..103b2f0e3baa0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -56,8 +56,6 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.DBIterator; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,8 +70,12 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffledb.DB; +import org.apache.spark.network.shuffledb.DBBackend; +import org.apache.spark.network.shuffledb.DBIterator; +import org.apache.spark.network.shuffledb.StoreVersion; +import org.apache.spark.network.util.DBProvider; import org.apache.spark.network.util.JavaUtils; -import org.apache.spark.network.util.LevelDBProvider; import org.apache.spark.network.util.NettyUtils; import org.apache.spark.network.util.TransportConf; @@ -107,8 +109,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private static final String APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX = "AppAttemptShuffleFinalized"; private static final String APP_ATTEMPT_PATH_KEY_PREFIX = "AppAttemptPathInfo"; - private static final LevelDBProvider.StoreVersion - CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0); + private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0); /** * A concurrent hashmap where the key is the applicationId, and the value includes @@ -155,7 +156,15 @@ public ShuffleIndexInformation load(String filePath) throws IOException { (filePath, indexInfo) -> indexInfo.getRetainedMemorySize()) .build(indexCacheLoader); this.recoveryFile = recoveryFile; - db = LevelDBProvider.initLevelDB(this.recoveryFile, CURRENT_VERSION, mapper); + DBBackend dbBackend = null; + if (recoveryFile != null) { + String dbBackendName = + conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name()); + dbBackend = DBBackend.byName(dbBackendName); + logger.info("Configured {} as {} and actually used value {}", + Constants.SHUFFLE_SERVICE_DB_BACKEND, dbBackendName, dbBackend); + } + db = DBProvider.initDB(dbBackend, this.recoveryFile, CURRENT_VERSION, mapper); if (db != null) { reloadAndCleanUpAppShuffleInfo(db); } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index af3f9b112fb98..1e2ff89480895 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -42,11 +42,14 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; +import org.apache.spark.network.shuffle.Constants; import org.apache.spark.network.shuffle.MergedShuffleFileManager; import org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager; -import org.apache.spark.network.util.LevelDBProvider; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.DBIterator; +import org.apache.spark.network.shuffledb.DB; +import org.apache.spark.network.shuffledb.DBBackend; +import org.apache.spark.network.shuffledb.DBIterator; +import org.apache.spark.network.shuffledb.StoreVersion; +import org.apache.spark.network.util.DBProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,10 +121,10 @@ public class YarnShuffleService extends AuxiliaryService { private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; - private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb"; - private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb"; + private static final String RECOVERY_FILE_NAME = "registeredExecutors"; + private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery"; @VisibleForTesting - static final String SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME = "sparkShuffleMergeRecovery.ldb"; + static final String SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME = "sparkShuffleMergeRecovery"; // Whether failure during service initialization should stop the NM. @VisibleForTesting @@ -133,8 +136,7 @@ public class YarnShuffleService extends AuxiliaryService { static int boundPort = -1; private static final ObjectMapper mapper = new ObjectMapper(); private static final String APP_CREDS_KEY_PREFIX = "AppCreds"; - private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider - .StoreVersion(1, 0); + private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0); /** * The name of the resource to search for on the classpath to find a shuffle service-specific @@ -187,6 +189,8 @@ public class YarnShuffleService extends AuxiliaryService { private DB db; + private DBBackend dbBackend = null; + public YarnShuffleService() { // The name of the auxiliary service configured within the NodeManager // (`yarn.nodemanager.aux-services`) is treated as the source-of-truth, so this one can be @@ -233,6 +237,14 @@ protected void serviceInit(Configuration externalConf) throws Exception { boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + if (_recoveryPath != null) { + String dbBackendName = _conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, + DBBackend.LEVELDB.name()); + dbBackend = DBBackend.byName(dbBackendName); + logger.info("Configured {} as {} and actually used value {}", + Constants.SHUFFLE_SERVICE_DB_BACKEND, dbBackendName, dbBackend); + } + try { // In case this NM was killed while there were running spark applications, we need to restore // lost state for the existing executors. We look for an existing file in the NM's local dirs. @@ -240,8 +252,9 @@ protected void serviceInit(Configuration externalConf) throws Exception { // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back if (_recoveryPath != null) { - registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); - mergeManagerFile = initRecoveryDb(SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME); + registeredExecutorFile = initRecoveryDb(dbBackend.fileName(RECOVERY_FILE_NAME)); + mergeManagerFile = + initRecoveryDb(dbBackend.fileName(SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME)); } TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf)); @@ -331,13 +344,13 @@ static MergedShuffleFileManager newMergedShuffleFileManagerInstance( } private void loadSecretsFromDb() throws IOException { - secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); + secretsFile = initRecoveryDb(dbBackend.fileName(SECRETS_RECOVERY_FILE_NAME)); // Make sure this is protected in case its not in the NM recovery dir FileSystem fs = FileSystem.getLocal(_conf); fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700)); - db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); + db = DBProvider.initDB(dbBackend, secretsFile, CURRENT_VERSION, mapper); logger.info("Recovery location is: " + secretsFile.getPath()); if (db != null) { logger.info("Going to reload spark shuffle data"); diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index eff1e15659fc4..a139f4c1708eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -30,6 +30,7 @@ import org.apache.spark.network.crypto.AuthServerBootstrap import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.ExternalBlockHandler +import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.util.TransportConf import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -49,7 +50,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED) private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT) - private val registeredExecutorsDB = "registeredExecutors.ldb" + private val registeredExecutorsDB = "registeredExecutors" private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0) @@ -79,7 +80,12 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana /** Create a new shuffle block handler. Factored out for subclasses to override. */ protected def newShuffleBlockHandler(conf: TransportConf): ExternalBlockHandler = { if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) { - new ExternalBlockHandler(conf, findRegisteredExecutorsDBFile(registeredExecutorsDB)) + val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND) + val dBBackend = DBBackend.byName(shuffleDBName) + logInfo(s"Configured ${config.SHUFFLE_SERVICE_DB_BACKEND.key} as $shuffleDBName " + + s"and actually used value ${dBBackend.name()} ") + new ExternalBlockHandler(conf, + findRegisteredExecutorsDBFile(dBBackend.fileName(registeredExecutorsDB))) } else { new ExternalBlockHandler(conf, null) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 589a1ffa713fd..3cf13dbe0931f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.launcher.SparkLauncher import org.apache.spark.metrics.GarbageCollectionMetrics import org.apache.spark.network.shuffle.Constants +import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{EventLoggingListener, SchedulingMode} import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO @@ -714,6 +715,16 @@ package object config { .booleanConf .createWithDefault(true) + private[spark] val SHUFFLE_SERVICE_DB_BACKEND = + ConfigBuilder(Constants.SHUFFLE_SERVICE_DB_BACKEND) + .doc("Specifies a disk-based store used in shuffle service local db. " + + "Only LEVELDB is supported now.") + .version("3.4.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(DBBackend.values.map(_.toString).toSet) + .createWithDefault(DBBackend.LEVELDB.name) + private[spark] val SHUFFLE_SERVICE_PORT = ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala index db001a946fddf..f0cb008321ae4 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark._ import org.apache.spark.internal.config._ +import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} import org.apache.spark.tags.ExtendedYarnTest @@ -30,8 +31,7 @@ import org.apache.spark.tags.ExtendedYarnTest * SPARK-34828: Integration test for the external shuffle service with an alternate name and * configs (by using a configuration overlay) */ -@ExtendedYarnTest -class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite { +abstract class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite { private[this] val shuffleServiceName = "custom_shuffle_service_name" @@ -77,3 +77,8 @@ class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite { } } } +@ExtendedYarnTest +class YarnShuffleAlternateNameConfigWithLevelDBBackendSuite + extends YarnShuffleAlternateNameConfigSuite { + override protected def dbBackend: DBBackend = DBBackend.LEVELDB +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 547bfca2891f1..fb9d3cc520cfa 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -32,14 +32,16 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network._ import org.apache.spark.network.shuffle.ShuffleTestAccessor +import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} import org.apache.spark.tags.ExtendedYarnTest /** * Integration test for the external shuffle service with a yarn mini-cluster */ -@ExtendedYarnTest -class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { +abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { + + protected def dbBackend: DBBackend override def newYarnConfig(): YarnConfiguration = { val yarnConfig = new YarnConfiguration() @@ -47,6 +49,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), classOf[YarnShuffleService].getCanonicalName) yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0") + yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, dbBackend.name()) yarnConfig } @@ -58,7 +61,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { Map( SHUFFLE_SERVICE_ENABLED.key -> "true", SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString, - MAX_EXECUTOR_FAILURES.key -> "1" + MAX_EXECUTOR_FAILURES.key -> "1", + SHUFFLE_SERVICE_DB_BACKEND.key -> dbBackend.name() ) } @@ -87,12 +91,16 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { } } +@ExtendedYarnTest +class YarnShuffleIntegrationWithLevelDBBackendSuite + extends YarnShuffleIntegrationSuite { + override protected def dbBackend: DBBackend = DBBackend.LEVELDB +} + /** * Integration test for the external shuffle service with auth on. */ -@ExtendedYarnTest -class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { - +abstract class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { override def newYarnConfig(): YarnConfiguration = { val yarnConfig = super.newYarnConfig() yarnConfig.set(NETWORK_AUTH_ENABLED.key, "true") @@ -106,7 +114,11 @@ class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { NETWORK_CRYPTO_ENABLED.key -> "true" ) } +} +@ExtendedYarnTest +class YarnShuffleAuthWithLevelDBBackendSuite extends YarnShuffleAuthSuite { + override protected def dbBackend: DBBackend = DBBackend.LEVELDB } private object YarnExternalShuffleDriver extends Logging with Matchers { @@ -148,8 +160,13 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { result = "success" // only one process can open a leveldb file at a time, so we copy the files if (registeredExecFile != null && execStateCopy != null) { + val dbBackendName = conf.get(SHUFFLE_SERVICE_DB_BACKEND.key) + val dbBackend = DBBackend.byName(dbBackendName) + logWarning(s"Configured ${SHUFFLE_SERVICE_DB_BACKEND.key} as $dbBackendName " + + s"and actually used value ${dbBackend.name()}") FileUtils.copyDirectory(registeredExecFile, execStateCopy) - assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty) + assert(!ShuffleTestAccessor + .reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty) } } finally { sc.stop() diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala index d67d372db2be7..a52fcae4f4c16 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala @@ -22,13 +22,13 @@ import java.util.List import java.util.concurrent.ConcurrentMap import org.apache.hadoop.yarn.api.records.ApplicationId -import org.fusesource.leveldbjni.JniDBFactory -import org.iq80.leveldb.{DB, Options} import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId import org.apache.spark.network.shuffle.RemoteBlockPushResolver._ import org.apache.spark.network.shuffle.protocol.{ExecutorShuffleInfo, FinalizeShuffleMerge} -import org.apache.spark.network.util.TransportConf +import org.apache.spark.network.shuffledb.DB +import org.apache.spark.network.shuffledb.DBBackend +import org.apache.spark.network.util.{DBProvider, TransportConf} /** * just a cheat to get package-visible members in tests @@ -68,7 +68,7 @@ object ShuffleTestAccessor { mergeManager.recoveryFile } - def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = { + def shuffleServiceDB(resolver: ExternalShuffleBlockResolver): DB = { resolver.db } @@ -208,11 +208,9 @@ object ShuffleTestAccessor { } def reloadRegisteredExecutors( + dbBackend: DBBackend, file: File): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = { - val options: Options = new Options - options.createIfMissing(true) - val factory = new JniDBFactory - val db = factory.open(file, options) + val db = DBProvider.initDB(dbBackend, file) val result = ExternalShuffleBlockResolver.reloadRegisteredExecutors(db) db.close() result diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index a23be6c6e7023..9a24ab8adff6a 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -48,13 +48,14 @@ import org.apache.spark.network.server.BlockPushNonFatalFailure import org.apache.spark.network.shuffle.{MergedShuffleFileManager, NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor} import org.apache.spark.network.shuffle.RemoteBlockPushResolver._ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo +import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.util.TransportConf import org.apache.spark.network.yarn.util.HadoopConfigProvider import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.Utils -@ExtendedLevelDBTest -class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { +abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { + private[yarn] var yarnConfig: YarnConfiguration = null private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" private[yarn] val SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1 = @@ -70,6 +71,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { private var recoveryLocalDir: File = _ protected var tempDir: File = _ + protected def shuffleDBBackend(): DBBackend + override def beforeEach(): Unit = { super.beforeEach() // Ensure that each test uses a fresh metrics system @@ -158,10 +161,13 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { private def createYarnShuffleServiceWithCustomMergeManager( createMergeManager: (TransportConf, File) => MergedShuffleFileManager): YarnShuffleService = { val shuffleService = createYarnShuffleService(false) + val dBBackend = shuffleDBBackend() + yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name()) val transportConf = new TransportConf("shuffle", new HadoopConfigProvider(yarnConfig)) + val dbName = dBBackend.fileName(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME) val testShuffleMergeManager = createMergeManager( transportConf, - shuffleService.initRecoveryDb(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME)) + shuffleService.initRecoveryDb(dbName)) shuffleService.setShuffleMergeManager(testShuffleMergeManager) shuffleService.init(yarnConfig) shuffleService @@ -372,7 +378,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { prepareAppShufflePartition(mergeManager, partitionId3, 1, "3") prepareAppShufflePartition(mergeManager, partitionId4, 2, "4") - val blockResolverDB = ShuffleTestAccessor.shuffleServiceLevelDB(blockResolver) + val blockResolverDB = ShuffleTestAccessor.shuffleServiceDB(blockResolver) ShuffleTestAccessor.reloadRegisteredExecutors(blockResolverDB) should not be empty val mergeManagerDB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager) ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager, mergeManagerDB) should not be empty @@ -1076,3 +1082,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager]) } } + +@ExtendedLevelDBTest +class YarnShuffleServiceWithLevelDBBackendSuite extends YarnShuffleServiceSuite { + override protected def shuffleDBBackend(): DBBackend = DBBackend.LEVELDB +}