diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 98f33b70fea23..121dfbd4f6838 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.annotations.VisibleForTesting; @@ -153,24 +154,72 @@ public void write(Object value) throws Exception { try (WriteBatch batch = db().createWriteBatch()) { byte[] data = serializer.serialize(value); synchronized (ti) { - Object existing; - try { - existing = get(ti.naturalIndex().entityKey(null, value), value.getClass()); - } catch (NoSuchElementException e) { - existing = null; - } + updateBatch(batch, value, data, value.getClass(), ti.naturalIndex(), ti.indices()); + db().write(batch); + } + } + } + + public void writeAll(List values) throws Exception { + Preconditions.checkArgument(values != null && !values.isEmpty(), + "Non-empty values required."); + + // Group by class, in case there are values from different classes in the values + // Typical usecase is for this to be a single class. + // A NullPointerException will be thrown if values contain null object. + for (Map.Entry, ? extends List> entry : + values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) { + + final Iterator valueIter = entry.getValue().iterator(); + final Iterator serializedValueIter; + + // Deserialize outside synchronized block + List list = new ArrayList<>(entry.getValue().size()); + for (Object value : values) { + list.add(serializer.serialize(value)); + } + serializedValueIter = list.iterator(); + + final Class klass = entry.getKey(); + final LevelDBTypeInfo ti = getTypeInfo(klass); - PrefixCache cache = new PrefixCache(value); - byte[] naturalKey = ti.naturalIndex().toKey(ti.naturalIndex().getValue(value)); - for (LevelDBTypeInfo.Index idx : ti.indices()) { - byte[] prefix = cache.getPrefix(idx); - idx.add(batch, value, existing, data, naturalKey, prefix); + synchronized (ti) { + final LevelDBTypeInfo.Index naturalIndex = ti.naturalIndex(); + final Collection indices = ti.indices(); + + try (WriteBatch batch = db().createWriteBatch()) { + while (valueIter.hasNext()) { + updateBatch(batch, valueIter.next(), serializedValueIter.next(), klass, + naturalIndex, indices); + } + db().write(batch); } - db().write(batch); } } } + private void updateBatch( + WriteBatch batch, + Object value, + byte[] data, + Class klass, + LevelDBTypeInfo.Index naturalIndex, + Collection indices) throws Exception { + Object existing; + try { + existing = get(naturalIndex.entityKey(null, value), klass); + } catch (NoSuchElementException e) { + existing = null; + } + + PrefixCache cache = new PrefixCache(value); + byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value)); + for (LevelDBTypeInfo.Index idx : indices) { + byte[] prefix = cache.getPrefix(idx); + idx.add(batch, value, existing, data, naturalKey, prefix); + } + } + @Override public void delete(Class type, Object naturalKey) throws Exception { Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala index 96db86f8e745a..08db2bd0766c3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ +import com.google.common.collect.Lists; + import org.apache.spark.util.kvstore._ /** @@ -144,10 +146,9 @@ private[history] class HybridStore extends KVStore { backgroundThread = new Thread(() => { try { for (klass <- klassMap.keys().asScala) { - val it = inMemoryStore.view(klass).closeableIterator() - while (it.hasNext()) { - levelDB.write(it.next()) - } + val values = Lists.newArrayList( + inMemoryStore.view(klass).closeableIterator()) + levelDB.writeAll(values) } listener.onSwitchToLevelDBSuccess() shouldUseInMemoryStore.set(false)