Skip to content

Conversation

@JeetKunDoug
Copy link

This allows Broadcast Variables can be released properly

What changes were proposed in this pull request?

Broadcast variables, while usually used to broadcast data to executors, can also be used to control the scope and lifecycle of shared resources (e.g. connection pools). When creating and destroying those resources within a task is expensive, using a broadcast variable to keep them deserialized in memory for multiple tasks to share can make a huge difference in the efficiency of a Spark job.

In MemoryStore, check if any entries in a DeserializedMemoryEntry implement AutoClosable and, if so, call close on those resources. This occurs in two places:

  • remove of an individual item
  • clear of the MemoryStore

How was this patch tested?

Added additional tests to MemoryStoreSuite in order to check that we properly close resources, and handle exceptions properly.

…Broadcast Variables can be released properly
@JeetKunDoug
Copy link
Author

@dbtsai Here's the PR we discussed earlier.

@dbtsai
Copy link
Member

dbtsai commented May 14, 2018

Could you change the title to [SPARK-24225][CORE] Support closing AutoClosable objects...? Thanks.

@dbtsai
Copy link
Member

dbtsai commented May 14, 2018

Jenkins, add to whitelist.

@dbtsai
Copy link
Member

dbtsai commented May 14, 2018

test this please

@gatorsmile
Copy link
Member

cc @jiangxb1987 @cloud-fan

@JeetKunDoug JeetKunDoug changed the title [SPARK-24225] Support closing AutoClosable objects in MemoryStore [SPARK-24225][CORE] Support closing AutoClosable objects in MemoryStore May 14, 2018
Copy link
Member

@dbtsai dbtsai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Only some minor comments. Let's wait more feedback from committers. Thanks.

private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
entry match {
case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeserializedMemoryEntry(values, _, _) to match the rest of code style.

}
}

private def maybeCloseValues(objs: Array[Any]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

values: Array[Any]

override def close(): Unit = {
closed = true
if (throwsOnClosed) {
throw new RuntimeException("Throwing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add var isExcpetionThrown = false, and check it in the test whether the exception is thrown?

}
}
def getClosed(): Boolean = {
closed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since closed is public, you might use it directly. Or you can make closed private.

assert(tracker.getClosed())
}

test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if you have multiple autocloseable objects in an iterator, and only one of them is removed, the rests of the objects will not be closed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand the API correctly, there is no way to remove a single item that was put as part of a call to putIterator - because operations are conducted by blockId you would only be able to remove the whole group of entries, not a single part of an iterator.

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90599 has finished for PR 21322 at commit f254f94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
entry match {
case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
Copy link
Member

@viirya viirya May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I know, broadcasted variables can be serialized on disk too (BlockManager.doPutIterator) and not in MemoryStore. In the case, seems AutoCloseable broadcasted variables won't hit this release logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't expect a never-deserialized Memory Entry to be closed, as it was never really instantiated to begin with - so if it only lands on disk, I think that's reasonable (as the variable in question would never have had a chance to allocate anything either).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, you can have working broadcasted object and at the same time it is not in MemoryStore.

During storing the merged object into BlockManager by calling putSingle, it can be stored into disk store.

Once the object is going to be used, if we can't find it in cache, we call BlockManager.getLocalValues to retrieve it back from disk store. Although it will try to store it to MemoryStore, it may not success.

I think the point is here the change assumes that if there is a deserialized broadcasted object, it is definitely in MemoryStore. But if I read the code correctly, it is not the case. You can have serialized bytes of the object in disk store and use a deserialized object at the same time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah- ok, I see where the issue is. So in this case you may have a deserialized instance but the memory store is full, so it fails to be put. Now we've got a live, deserialized object not in MemoryStore. Thanks for catching this. It looks like this case could be handled in MemoryStore.putIteratorAsValues when the putIterator call fails, which would handle several cases in BlockManager where we try (and fail) to put deserialized values, but it means a check for potential AutoClosable values any time we fail to put into MemoryStore, and I'm not sure of the performance impact of this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, digging further, there's other places where we may deserialize an object from the disk store and never put it into the memory store - it seems like punting on a guarantee that your AutoClosable object is closed and making this a best-effort thing when calling BlockManager.removeBroadcast (which is how we used it in the case that caused us to put together this PR) may make the most sense. It'll still be better than depending on GC and a finalizer to get the resources cleaned up when the driver can call Broadcast#destroy but we can document it as a best practice to have one just in case the close() call doesn't happen due to edge cases.


private def maybeCloseValues(objs: Array[Any]): Unit = {
objs.foreach {
case closable: AutoCloseable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent style: two spaces.

@cloud-fan
Copy link
Contributor

cloud-fan commented May 15, 2018

I think users are responsible to call Broadcast#destroy, which unpersist broadcast blocks from block manager and run user-defined driver side cleanup.

It is a valid use case to allow users to define some executor side cleanup via AutoCloseable. However, I don't think we should always detect AutoCloseable when removing a block, as it may break existing program(because users may not expect to do close when an RDD block is evicted) and cause perf regression. We should only do it for broadcast blocks.

A good place to do it seems to be BlockManager.removeBroadcast

@viirya
Copy link
Member

viirya commented May 15, 2018

One concern I have is, now we keep broadcasted variables in BroadcastManager.cachedValues by using weak reference. So if a broadcasted variable with AutoCloseable is released by GC before we call Broadcast#destroy, we still can't properly release the resources.

@JeetKunDoug
Copy link
Author

@viirya it seems from my admittedly cursory look at where we use the cachedValues reference map that we should be OK in this case - if there's a deserialized version of the variable (the only case we really care about), it'll have a hard reference in an entry in MemoryStore, and, when removed (which occurs if BlockManager drops it from memory) we can do the cleanup there explicitly. Perhaps I missed a way to have a broadcast variable in deserialized form in the cache but not in MemoryStore.entries?

@viirya
Copy link
Member

viirya commented May 15, 2018

@JeetKunDoug The same issue we discuss above. I think if there's a deserialized version of the variable, it can be not in MemoryStore but only serialized bytes in disk store.

The reason is, we use getLocalValues to retrieve the deserialized object. If it is stored in disk store by putSingle, we read it back and call maybeCacheDiskValuesInMemory trying to cache it in MemoryStore. But it is not guaranteed to be succeed. When it fails, you still only have serialized value in disk store.

@JeetKunDoug
Copy link
Author

JeetKunDoug commented May 15, 2018

@cloud-fan So your suggestion makes sense - it seems like the best path forward is to check the isBroadcast flag on the BlockId passed in to MemoryStore.remove and release resources only if it's a broadcast ID - does that make sense to you? Note that it appears this would also catch the data from the MapOutputTracker, but that's the only other usage of broadcasts I saw in the codebase.

- Also fixed indentation issue.
@JeetKunDoug
Copy link
Author

@cloud-fan I added the above-mentioned check for isBroadcast and only release resources if it's a broadcast ID. This will affect the MapOutputTracker as well, but I think it's a pretty limited change and should work well. If you're not crazy about that, I can start at Broadcast.destroy and pass down a flag to control the behavior, but it's a much more invasive (but doable) change.

@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90651 has finished for PR 21322 at commit 62d46d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90652 has finished for PR 21322 at commit 6a08c43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, what happen when the blockId is not broadcast? The existing cleaning-up will not be called.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbtsai thanks - That whole "my day job vs. OSS" rush to fix. Will take care of it correctly and push an update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if the keys are blockId.isBroadcast == true?

Copy link
Member

@viirya viirya May 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do it in remove, I don't think we can avoid the issue I mentioned before. If you have a deserilized value in broadcast cache, it's possible to be cleaned by GC if this broadcasted value isn't stored as deserialized entry in MemoryStore.

If the object already claims some resources we want to release by using AutoCloseable interface, we don't properly release it when it's cleaned by GC. That is happened before remove is called and they are not in memory store.

@cloud-fan
Copy link
Contributor

according to TorrentBroadcast.writeBlocks and TorrentBroadcast.readBroadcastBlock, broadcasted data are always written into the block manager with MEMORY_AND_DISK_SER mode. When a task wants to get the broadcasted value, it needs to get the serialized bytes from block manager(maybe memory, maybe disk) and deserialize it into an object, put the object into a cache, and return the object to the user function.

That said, at the executor side we may create the broadcasted object multiple times: e.g. if it's evicted from cache and GCed, we read and deserialize it from block manager again. A simple way to do executor side clean up is implementing finalize method in the class you want to broadcast.

Alternatively, we can improve the cache and watch the eviction event. If eviction happens, do the cleanup.

@JeetKunDoug
Copy link
Author

JeetKunDoug commented May 18, 2018

@cloud-fan as to eviction, it ends up calling BlockManager.dropFromMemory, which will in fact call MemoryStore.remove and does the cleanup appropriately, so that should already be handled.

I think you're right in that the TorrentBroadcast.readBroadcastBlock will only succeed if it can, in fact, push the unblockified object to the MemoryStore, but I think @viirya is right that there's still a possibility that it's been spilled to disk (and appropriately closed), but then we deserialize it again in BlockManager.getLocalValues but fail to write it back to the MemoryStore there, and then it's possible we can return the value to the client but not add it to the MemoryStore. In that case, a finalize method can handle the cleanup. However, I wonder if we should always store it back to the blockManager in the Some() case when we read from getLocalValues just to ensure it's there? Since we refuse to allow it to be used if we can't save it in the case we read it from another node, why would we allow it here?

Regardless, I'm pushing some fix-ups to the other issues @dbtsai raised, which I think will handle all of the cases we'd need to consider this a "best-effort" cleanup while still requiring a finalizer. Please let me know what your thoughts on forcing the save to the memory store in readBroadcastBlock's Some block where it manages to read from local values ( before TorrentBroadcast.scala:218 in my branch).

Needd to do the check further down the call stack so the original release still works.
@JeetKunDoug JeetKunDoug force-pushed the handle-autoclosable-objects branch from 6a08c43 to 790c906 Compare May 18, 2018 15:03
@SparkQA
Copy link

SparkQA commented May 18, 2018

Test build #90796 has finished for PR 21322 at commit 790c906.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented May 19, 2018

retest this please

@SparkQA
Copy link

SparkQA commented May 19, 2018

Test build #90824 has finished for PR 21322 at commit 790c906.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

IIUC it's impossible to clean up broadcasted object in MemoryStore. The life cycle of the broadcasted object is:

  1. The first task that tries to get the broadcasted object will read bytes from block manager and deserialize it to object. The object will be put into the executor-wise cache.
  2. Other tasks in this executor try to get the broadcast object, read from cache if it's there.
  3. Other tasks in this executor try to get the broadcast object, redo step 1 if it has been evicted from cache.
  4. Job finishes, remove the value in block manager and cache.

If we do cleanup in MemoryStore, we just rebuild the object from bytes and call its close. This doesn't help as we need to do cleanup for all the objects that have been created during the job.

@JeetKunDoug
Copy link
Author

@cloud-fan So I see what's been messing with me - it's the whole Broadcast variable cache/week reference thing. I originally wrote this internally for Spark 1.6.3 (still supporting some older Spark installations), which didn't have this concept. I'll take another look at how we may be able to better handle the executor-wide cache in a more coherent way (pun intended) and get back to you. You may be right that we will have to depend on a combination of a finalizer + autocloseable object handled more directly in Broadcast#destroy for the happy-path case where we're not running into memory pressure on the MemoryStore that is causing the variable to churn.... I just hate having to depend on a finalizer to do this if there's a way around it.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@JeetKunDoug
Copy link
Author

@cloud-fan (Also left on the JIRA ticket) - Sorry this has dropped off my radar for so long - work + life took me away from it for a while. So looking at the PR review comments and better understanding Broadcast Variable behavior (and some of the changes that took place in the 2.X series), it seems like simply trying to close Broadcast variables won't work as intended. However, I believe the underlying concept (driver-scoped shared variables, where the variable lives until the job is done or the driver removes it) is still worth pursuing. Being able to scope shared resources (like DB connection pools, which may need to change per phase of a job, or be able to be disposed of early in a process, which makes static variables not useful). Given that, I'd like to propose we add a new concept, similar to Broadcast Variables, called, perhaps, Scoped Variables. The intent would be for these to be scoped by the driver, be relatively small from a memory-consumption perspective (unlike broadcast variables, which can be much larger), and to be held in memory until explicitly removed by the driver. Most of the infrastructure work for broadcast variables supports this use-case, but we'd need to have either a "non-purgable" type in the MemoryStore, or some other store specific to these new scoped variables, in order to prevent them from being evicted like cached items are.

Thoughts on this? I'll start working on updating the PR to support something like this sometime today, but it might still take a while to get something workable put together, so I'd appreciate any feedback when someone has the time.


private def maybeCloseValues(values: Array[Any], blockId: BlockId): Unit = {
if (blockId.isBroadcast) {
values.foreach(value => Utils.tryClose(value))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a style thing, but could be values.foreach(Utils.tryClose)


private def maybeReleaseResources(blockId: BlockId, entry: MemoryEntry[_]): Unit = {
entry match {
case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just make these case classes Closeable and then you can close them consistently

}
}

def tryClose(value: Any): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should accept at best AnyRef. It doesn't really seem like we need a new global utility method for this. It's a little unusual to try closing things that aren't Closeable and we can try to rationalize that in the callers above if possible.

@cloud-fan
Copy link
Contributor

Introducing a new concept which is similar to broadcast seems like an overkill. We can just update broadcast, to allow it to be memory-only.

However, there might be simpler solutions to fit your case, without broadcast. e.g.

val myObj = ...
rdd.mapPartitions { it =>
  try {
    // process data
  } finally {
    myObj.close()
  }
}

I think we need to clear define the use case and think about whether we need a new API or not.

@JeetKunDoug
Copy link
Author

JeetKunDoug commented Oct 16, 2018

@cloud-fan The use-case we have in mind (and are currently using Broadcast + finalizers for) is the case where you have, for example, a connection pool that is, for one reason or another, scoped to a particular stage in the job. In this case, the pool itself is expensive to create, and can be shared across tasks, which makes closing the object in a try/finally for a single task, or even a single partition, inefficient as you'd end up potentially closing the resource early, and having to rebuild it several times. The fundamental trick is to figure out a way to allow the driver to define the scope of the shared resource (like a broadcast variable) and ensure it's really memory-only, so if there's a better way to use the existing broadcast variable infrastructure to do this, and prevent this kind of broadcast variable from being purged from the MemoryStore, then I'm all for it.

@cloud-fan
Copy link
Contributor

Ah i see. It's kind of a broadcast but has a much small scope and its lifecycle is bound to a stage. I'm looking forward to a full design of it, thanks!

@JeetKunDoug
Copy link
Author

@cloud-fan yes, essentially... the only thing I'd add is that it's bound to "whatever scope the driver sees fit to bind it to", if that makes sense. In my particular case, it happens to be a stage, but it's possible that others may find there are longer-lived cases, all of which should be supportable... As this is my side-project for work, it may take me a bit to get something more together, but I'll see what I can sketch out. Would you prefer some sort of design doc, or is an update to this PR (or a new one) appropriate?

@cloud-fan
Copy link
Contributor

I think a design doc is better, to make sure we are on the same page before the actual coding, which saves time.

@JeetKunDoug
Copy link
Author

Sounds good - it may take me a while to get the time to work this up, but I'll get something put together and attached to the underlying issue as soon as I can. If possible, can you point me at what you would consider a "good" design document, so I have a starting point to work with?

@cloud-fan
Copy link
Contributor

@HyukjinKwon
Copy link
Member

Shall we leave this PR closed and start it from a design doc? Let me suggest to close this for now while I am looking through old PRs.

@JeetKunDoug, please feel free to create a clone of this PR if there's any reason to keep this open that I missed. No objection.

Thanks.

@JeetKunDoug
Copy link
Author

@HyukjinKwon makes sense - I'll try to get a design doc put together as soon as I can, but my "day job" is preventing me from working on it right now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants