Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.OutputStream
import java.nio.ByteBuffer
import java.util.LinkedHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
Expand Down Expand Up @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
}
}

private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
entry match {
case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just make these case classes Closeable and then you can close them consistently

case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeserializedMemoryEntry(values, _, _) to match the rest of code style.

Copy link
Member

@viirya viirya May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I know, broadcasted variables can be serialized on disk too (BlockManager.doPutIterator) and not in MemoryStore. In the case, seems AutoCloseable broadcasted variables won't hit this release logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expect a never-deserialized Memory Entry to be closed, as it was never really instantiated to begin with - so if it only lands on disk, I think that's reasonable (as the variable in question would never have had a chance to allocate anything either).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, you can have working broadcasted object and at the same time it is not in MemoryStore.

During storing the merged object into BlockManager by calling putSingle, it can be stored into disk store.

Once the object is going to be used, if we can't find it in cache, we call BlockManager.getLocalValues to retrieve it back from disk store. Although it will try to store it to MemoryStore, it may not success.

I think the point is here the change assumes that if there is a deserialized broadcasted object, it is definitely in MemoryStore. But if I read the code correctly, it is not the case. You can have serialized bytes of the object in disk store and use a deserialized object at the same time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah- ok, I see where the issue is. So in this case you may have a deserialized instance but the memory store is full, so it fails to be put. Now we've got a live, deserialized object not in MemoryStore. Thanks for catching this. It looks like this case could be handled in MemoryStore.putIteratorAsValues when the putIterator call fails, which would handle several cases in BlockManager where we try (and fail) to put deserialized values, but it means a check for potential AutoClosable values any time we fail to put into MemoryStore, and I'm not sure of the performance impact of this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, digging further, there's other places where we may deserialize an object from the disk store and never put it into the memory store - it seems like punting on a guarantee that your AutoClosable object is closed and making this a best-effort thing when calling BlockManager.removeBroadcast (which is how we used it in the case that caused us to put together this PR) may make the most sense. It'll still be better than depending on GC and a finalizer to get the resources cleaned up when the driver can call Broadcast#destroy but we can document it as a best practice to have one just in case the close() call doesn't happen due to edge cases.

case _ =>
}
}

private def maybeCloseValues(objs: Array[Any]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

values: Array[Any]

objs.foreach {
case closable: AutoCloseable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent style: two spaces.

safelyCloseValue(closable)
case _ =>
}
}

private def safelyCloseValue(closable: AutoCloseable) = {
try {
closable.close()
} catch {
case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex)
}
}

def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
Copy link
Member

@viirya viirya May 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do it in remove, I don't think we can avoid the issue I mentioned before. If you have a deserilized value in broadcast cache, it's possible to be cleaned by GC if this broadcasted value isn't stored as deserialized entry in MemoryStore.

If the object already claims some resources we want to release by using AutoCloseable interface, we don't properly release it when it's cleaned by GC. That is happened before remove is called and they are not in memory store.

val entry = entries.synchronized {
entries.remove(blockId)
}
if (entry != null) {
entry match {
case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
case _ =>
}
maybeReleaseResources(entry)
memoryManager.releaseStorageMemory(entry.size, entry.memoryMode)
logDebug(s"Block $blockId of size ${entry.size} dropped " +
s"from memory (free ${maxMemory - blocksMemoryUsed})")
Expand All @@ -404,6 +426,7 @@ private[spark] class MemoryStore(

def clear(): Unit = memoryManager.synchronized {
entries.synchronized {
entries.values().asScala.foreach(maybeReleaseResources)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if the keys are blockId.isBroadcast == true?

entries.clear()
}
onHeapUnrollMemoryMap.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,4 +526,84 @@ class MemoryStoreSuite
}
}
}

test("[SPARK-24225]: remove should close AutoCloseable object") {

val (store, _) = makeMemoryStore(12000)

val id = BroadcastBlockId(0)
val tracker = new CloseTracker()
store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
assert(store.contains(id))
store.remove(id)
assert(tracker.getClosed())
}

test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") {

val (store, _) = makeMemoryStore(12000)

val id = BroadcastBlockId(0)
val tracker = new CloseTracker(true)
store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
assert(store.contains(id))
store.remove(id)
assert(tracker.getClosed())
}

test("[SPARK-24225]: clear should close AutoCloseable objects") {

val (store, _) = makeMemoryStore(12000)

val id = BroadcastBlockId(0)
val tracker = new CloseTracker
store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
assert(store.contains(id))
store.clear()
assert(tracker.getClosed())
}

test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if you have multiple autocloseable objects in an iterator, and only one of them is removed, the rests of the objects will not be closed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand the API correctly, there is no way to remove a single item that was put as part of a call to putIterator - because operations are conducted by blockId you would only be able to remove the whole group of entries, not a single part of an iterator.


val (store, _) = makeMemoryStore(12000)

val id1 = BroadcastBlockId(1)
val tracker2 = new CloseTracker
val tracker1 = new CloseTracker
store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), ClassTag.Any)
assert(store.contains(id1))
store.clear()
assert(tracker1.getClosed())
assert(tracker2.getClosed())
}

test("[SPARK-24225]: clear should close AutoCloseable objects even if they throw exceptions") {

val (store, _) = makeMemoryStore(12000)

val id1 = BroadcastBlockId(1)
val id2 = BroadcastBlockId(2)
val tracker2 = new CloseTracker(true)
val tracker1 = new CloseTracker(true)
store.putIteratorAsValues(id1, Iterator(tracker1), ClassTag.Any)
store.putIteratorAsValues(id2, Iterator(tracker2), ClassTag.Any)
assert(store.contains(id1))
assert(store.contains(id2))
store.clear()
assert(tracker1.getClosed())
assert(tracker2.getClosed())
}
}

private class CloseTracker (val throwsOnClosed: Boolean = false) extends AutoCloseable {
var closed = false
override def close(): Unit = {
closed = true
if (throwsOnClosed) {
throw new RuntimeException("Throwing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add var isExcpetionThrown = false, and check it in the test whether the exception is thrown?

}
}
def getClosed(): Boolean = {
closed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since closed is public, you might use it directly. Or you can make closed private.

}
}