Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
181a597
Introduce LocalDB
LuciferYang Apr 14, 2022
1e5654a
refactor ShuffleTestAccessor
LuciferYang Apr 14, 2022
a4ae04c
refactor ShuffleTestAccessor
LuciferYang Apr 14, 2022
29a7a1e
refactor ShuffleTestAccessor
LuciferYang Apr 14, 2022
eacc159
reformat import
LuciferYang Apr 14, 2022
ea21178
reformat import
LuciferYang Apr 14, 2022
f57578e
reformat import
LuciferYang Apr 14, 2022
ad5b74f
remove ;
LuciferYang Apr 14, 2022
4d5739b
add APL
LuciferYang Apr 15, 2022
610e04b
Merge branch 'master' of github.com:apache/spark into local-db
LuciferYang Apr 15, 2022
c66c483
Merge branch 'upmaster' into local-db
LuciferYang Apr 20, 2022
6e0a6f7
rename
LuciferYang Apr 24, 2022
9ba181b
rename
LuciferYang Apr 24, 2022
c80024b
Merge branch 'upmaster' into local-db
LuciferYang Apr 27, 2022
1c3f09d
Merge branch 'master' of github.com:apache/spark into local-db
LuciferYang Apr 29, 2022
f03ea29
Merge branch 'upmaster' into local-db
LuciferYang May 10, 2022
ecba36a
Merge branch 'upmaster' into local-db
LuciferYang May 13, 2022
23753f5
Merge branch 'upmaster' into local-db
LuciferYang May 16, 2022
4630651
Merge branch 'upmaster' into local-db
LuciferYang May 23, 2022
712f99d
Merge branch 'upmaster' into local-db
LuciferYang May 30, 2022
01a6da1
Merge branch 'upmaster' into local-db
LuciferYang May 30, 2022
8e11695
Merge branch 'upmaster' into local-db
LuciferYang Jun 6, 2022
9c553a8
Merge branch 'upmaster' into local-db
LuciferYang Jun 13, 2022
797f4ca
Merge branch 'upmaster' into local-db
LuciferYang Jun 20, 2022
d5bcce9
Merge branch 'upmaster' into local-db
LuciferYang Jun 24, 2022
c6f6fa0
Merge branch 'upmaster' into local-db
LuciferYang Jun 27, 2022
768f207
Merge branch 'upmaster' into local-db
LuciferYang Jul 5, 2022
217df56
Merge branch 'upmaster' into local-db
LuciferYang Aug 2, 2022
9b20939
fix compile
LuciferYang Aug 2, 2022
515204d
add get method to DB
LuciferYang Aug 2, 2022
28ee973
comments
LuciferYang Aug 3, 2022
298c737
ident
LuciferYang Aug 3, 2022
a851c88
ident
LuciferYang Aug 3, 2022
1832b06
refactor test
LuciferYang Aug 3, 2022
e147c77
refactor test
LuciferYang Aug 3, 2022
004c4c9
remove unused import
LuciferYang Aug 3, 2022
a56f3c8
Merge branch 'master' of github.com:apache/spark into local-db
LuciferYang Aug 4, 2022
9df4fe5
fix format
LuciferYang Aug 4, 2022
7e1e562
fix compile
LuciferYang Aug 4, 2022
0f7b348
refactor to use iterator
LuciferYang Aug 5, 2022
c56a8f8
re-order import
LuciferYang Aug 5, 2022
78d163e
revert change in SPARK-39988
LuciferYang Aug 5, 2022
bee0737
revert changes
LuciferYang Aug 5, 2022
944c290
revert changes
LuciferYang Aug 5, 2022
cf7658c
remove get
LuciferYang Aug 5, 2022
e3e37a7
fix compile
LuciferYang Aug 5, 2022
d5a53a5
Merge branch 'master' of github.com:apache/spark into local-db
LuciferYang Aug 8, 2022
f8cb374
add config for DBBackend
LuciferYang Aug 8, 2022
35bcced
pass by config
LuciferYang Aug 8, 2022
2dc68c3
Merge branch 'upmaster' into local-db
LuciferYang Aug 9, 2022
ee1f283
remove DB.get method
LuciferYang Aug 9, 2022
5e856f6
fix comments LevelDBIterator
LuciferYang Aug 9, 2022
9590e97
add dBBackend.fileName
LuciferYang Aug 9, 2022
b299c55
Revert "remove DB.get method"
LuciferYang Aug 9, 2022
94a2831
remove unnecessary RuntimeEexception throws declaration
LuciferYang Aug 9, 2022
d4ce629
fix format
LuciferYang Aug 9, 2022
9889de3
IllegalArgumentException for DBProvider
LuciferYang Aug 9, 2022
abb0a58
add comments
LuciferYang Aug 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffledb;

import java.io.Closeable;

/**
* The local KV storage used to persist the shuffle state,
* the implementations may include LevelDB, RocksDB, etc.
*/
public interface DB extends Closeable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a developer API or user API? I didn't see the custom DB could be plugged in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

developer API. The implementation of RocksDB is under review: #37610,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So could you add annotation @DeveloperApi for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

@mridulm mridulm Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support users plugging in a different DB instance - it comes from within spark code.
Supported DB's come from DBBackend and DBProvider.initDB has an explicit switch for the supported types.
Wont be @DeveloperAPI

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm It's a public interface. I think it should either be a user API or developer API, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... should we use @Private as kvstore.KVStore?

@Private
public interface KVStore extends Closeable {

Copy link
Contributor

@mridulm mridulm Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration is public @Ngone51, not the implementation itself.
To put it differently, there is no way for users to leverage any of this - unless they modify spark code.

I am fine with marking it as @Private if we need to make the intent clearer.

It is the same for most submodules in common/* - except probably for common/kvstore, where Marcello marked the interfaces as @Private :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* Set the DB entry for "key" to "value".
*/
void put(byte[] key, byte[] value);

/**
* Get which returns a new byte array storing the value associated
* with the specified input key if any.
*/
byte[] get(byte[] key);

/**
* Delete the DB entry (if any) for "key".
*/
void delete(byte[] key);

/**
* Return an iterator over the contents of the DB.
*/
DBIterator iterator();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffledb;

import java.util.Locale;

/**
* The enum `DBBackend` use to specify a disk-based store used in shuffle service local db.
* Only LEVELDB is supported now.
*/
public enum DBBackend {
LEVELDB(".ldb");

private final String fileSuffix;

DBBackend(String fileSuffix) {
this.fileSuffix = fileSuffix;
}

public String fileName(String prefix) {
return prefix + fileSuffix;
}

public static DBBackend byName(String value) {
return DBBackend.valueOf(value.toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffledb;

import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;

public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeable {

/**
* Position at the first entry in the source whose `key` is at target.
*/
void seek(byte[] key);

default void remove() {
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffledb;

import java.io.IOException;

public class LevelDB implements DB {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can reuse the following.

public class LevelDB implements KVStore {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KVStore currently provides a slightly higher level abstraction - while existing implementation in shuffle does a more low level use of LevelDB :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the idea of leveraging existing code in kvstore. Right now, this part of the code is only being used by HistoryServer. But I think it is reasonable to either extend or refactor to make it being used for DBs in Shuffle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little difficult, as @mridulm said:

KVStore currently provides a slightly higher level abstraction - while existing implementation in shuffle does a more low level use of LevelDB :-)

private final org.iq80.leveldb.DB db;

public LevelDB(org.iq80.leveldb.DB db) {
this.db = db;
}

@Override
public void put(byte[] key, byte[] value) {
db.put(key, value);
}

@Override
public byte[] get(byte[] key) {
return db.get(key);
}

@Override
public void delete(byte[] key) {
db.delete(key);
}

@Override
public void close() throws IOException {
db.close();
}

@Override
public DBIterator iterator() {
return new LevelDBIterator(db.iterator());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffledb;

import com.google.common.base.Throwables;

import java.io.IOException;
import java.util.Map;
import java.util.NoSuchElementException;

public class LevelDBIterator implements DBIterator {

private final org.iq80.leveldb.DBIterator it;

private boolean checkedNext;

private boolean closed;

private Map.Entry<byte[], byte[]> next;

public LevelDBIterator(org.iq80.leveldb.DBIterator it) {
this.it = it;
}

@Override
public boolean hasNext() {
if (!checkedNext && !closed) {
next = loadNext();
checkedNext = true;
}
if (!closed && next == null) {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
}
}
return next != null;
}

@Override
public Map.Entry<byte[], byte[]> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
checkedNext = false;
Map.Entry<byte[], byte[]> ret = next;
next = null;
return ret;
}

@Override
public void close() throws IOException {
if (!closed) {
it.close();
closed = true;
next = null;
}
}

@Override
public void seek(byte[] key) {
it.seek(key);
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

private Map.Entry<byte[], byte[]> loadNext() {
boolean hasNext = it.hasNext();
if (!hasNext) {
return null;
}
return it.next();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffledb;

import java.nio.charset.StandardCharsets;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Used to identify the version of data stored in local shuffle state DB.
*/
public class StoreVersion {

public static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);

public final int major;
public final int minor;

@JsonCreator
public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
this.major = major;
this.minor = minor;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

StoreVersion that = (StoreVersion) o;

return major == that.major && minor == that.minor;
}

@Override
public int hashCode() {
int result = major;
result = 31 * result + minor;
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.util;

import java.io.File;
import java.io.IOException;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;

import org.apache.spark.network.shuffledb.DBBackend;
import org.apache.spark.network.shuffledb.LevelDB;
import org.apache.spark.network.shuffledb.DB;
import org.apache.spark.network.shuffledb.StoreVersion;

public class DBProvider {
public static DB initDB(
DBBackend dbBackend,
File dbFile,
StoreVersion version,
ObjectMapper mapper) throws IOException {
if (dbFile != null) {
// TODO: SPARK-38888, add rocksdb implementation.
switch (dbBackend) {
case LEVELDB:
org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version, mapper);
return levelDB != null ? new LevelDB(levelDB) : null;
default:
throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of relying on extension, we should make it explicit - for example what is in SPARK-37680.
Any thoughts on how to refactor this out @dongjoon-hyun ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc314e6 add a new config SHUFFLE_SERVICE_DB_BACKEND to specify a disk-based store used in shuffle service local db, only LEVELDB is supported now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the type of DBBackend is get in YarnShuffleService and ExternalShuffleService through SparkConf/Hadoop Configuration and explicitly passed as a method parameter, so it seems many files have been changed.

If re-parse the type of DBBackend from SparkConf/TransportConf/Hadoop Configuration at every required place in the code, it should reduce the changes, especially for test cases(such as MesosExternalShuffleService, NoOpMergedShuffleFileManager, CleanupNonShuffleServiceServedFilesSuite, etc).

Do you have any suggestions for this? @mridulm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

35bcced change to pass dbbackend by config

}
return null;
}

@VisibleForTesting
public static DB initDB(DBBackend dbBackend, File file) throws IOException {
if (file != null) {
// TODO: SPARK-38888, add rocksdb implementation.
switch (dbBackend) {
case LEVELDB: return new LevelDB(LevelDBProvider.initLevelDB(file));
default:
throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
}
}
return null;
}
}
Loading