Skip to content

Conversation

@jinxing64
Copy link

@jinxing64 jinxing64 commented Jun 22, 2017

What changes were proposed in this pull request?

A shuffle service can serves blocks from multiple apps/tasks. Thus the shuffle service can suffers high memory usage when lots of shuffle-reads happen at the same time. In my cluster, OOM always happens on shuffle service. Analyzing heap dump, memory cost by Netty(ChannelOutboundBuffer@Entry) can be up to 2~3G. It might make sense to reject "open blocks" request when memory usage is high on shuffle service.

93dd0c5 and 85c6ce6 tried to alleviate the memory pressure on shuffle service but cannot solve the root cause. This pr proposes to control currency of shuffle read.

How was this patch tested?

Added unit test.

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78439 has finished for PR 18388 at commit ed889b9.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class PooledByteBufAllocatorWithMetrics extends PooledByteBufAllocator
  • public class OpenBlocksFailed extends BlockTransferMessage

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78440 has finished for PR 18388 at commit 0a2bcee.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class PooledByteBufAllocatorWithMetrics extends PooledByteBufAllocator
  • public class OpenBlocksFailed extends BlockTransferMessage

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78455 has finished for PR 18388 at commit f4856c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class PooledByteBufAllocatorWithMetrics extends PooledByteBufAllocator
  • public class OpenBlocksFailed extends BlockTransferMessage

@jinxing64
Copy link
Author

jinxing64 commented Jun 27, 2017

@cloud-fan @vanzin @tgravescs
How do you think about this idea ?

@tgravescs
Copy link
Contributor

Haven't looked at the path in detail yet. High level questions/thoughts.
So you say the memory usage is by the netty chunks, so my assumption is this is during the actual transfer? failing the open blocks isn't necessarily going to solve that. If a bunch of reducers all due open blocks at once, it won't reject any and when they all start to transfer it could still run out of memory. It could help in the normal case where some run openblocks while other transfers going on though. Have you been running this patch, what are results?

So an alternative to this is limiting the number of blocks each reducer is fetching at once. Instead of calling open blocks with 500 at once, do them in chunks of say 20. We are working on a patch for that and should have it available in the next couple days. This again though doesn't guarantee it but it allows you to throttle down the # of blocks each reducer would get at once. MapReduce/Tez actually do this with a lot of success.

@jinxing64
Copy link
Author

Thanks a lot for quick reply :)

Yes, this patch doesn't guarantee avoiding the OOM on shuffle service when all reducers are opening the blocks at the same time. But we can alleviate this by adjusting spark.reducer.maxSizeInFlight. ShuffleBlockFetcherIterator will break the blocks in several requests(see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L240). e.g. 500 blocks can be split into 20 requests, which will be send one by one to shuffle service. If memory cost is high on shuffle service, following requests will be rejected. In normal case this is pretty useful.
Also if the the OpenBlocks is rejected, reducer can sleep for a random duration, say 2s~5s. Thus help to avoid all reducer open blocks at the same time.

@jinxing64
Copy link
Author

So an alternative to this is limiting the number of blocks each reducer is fetching at once

Is it relevant to spark.reducer.maxSizeInFlight ?
Breaking OpenBlocks into more requests is helpful. But I really think we should have some defensive approach on shuffle service side.

@tgravescs
Copy link
Contributor

I think having both sides would probably be good. limit the reducer connections and simultaneous block calls but have a fail safe on the shuffle server side where it can reject connections also makes sense.

Can you please give more details what is using the memory? If its the netty blocks is it when its actually streaming the data back to reducer? I thought it was using direct buffers for that so it wouldn't show up on the heap. I'll have to look in more detail.

@cloud-fan
Copy link
Contributor

cc @jiangxb1987

@jiangxb1987
Copy link
Contributor

Will review this tomorrow. Thanks!

@jinxing64
Copy link
Author

jinxing64 commented Jun 28, 2017

2017-06-28 12 37 34

As the screenshot, there are tons of io.netty.channel.ChannelOutboundBuffer$Entry. If I understand correctly, messages are written to ChannelOutboundBuffer first then flushed to network. If the message is sent successfully, io.netty.channel.ChannelOutboundBuffer$Entry will be released(recycled). The memory of messages are allocated by PooledByteBufAllocator. Thus we can have a control by referring this metric.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code quality is pretty good, only have some minor comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move these configs to org.apache.spark.internal.config?
In java, you can use them by:

import org.apache.spark.internal.config.package$;
...
package$.MODULE$.XXX()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will refine.

Copy link
Author

@jinxing64 jinxing64 Jun 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I do think it's good to put the config into org.apache.spark.internal.config.
But I found it hard. Since org.apache.spark.internal.config in core module. I didn't find a good way to import it from module spark-network-yarn or spark-network-shuffle. Did I miss something ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, core module relies on spark-network-common, so we don't have to change here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for shuffle service. Right? The config should start with spark.shuffle. I'm okey with one config since Netty either uses heap memory or off-heap memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this trace?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, just for debug :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MesosExternalShuffleService can use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, make sense.

@jinxing64
Copy link
Author

@jiangxb1987
Thanks a lot for taking time review this pr.
I will read your comments very carefully and refine it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's better to use Iterator pattern here, as the input list may not be an indexed list and list.get(i) becomes O(n).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I should refine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a config for shuffle service JVM heap size? maybe we can use that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant. Because Netty could use off-heap or on-heap(depends on spark.shuffle.io.preferDirectBufs) for allocating memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inconsistent with the executor memory configuration. For executor memory, we have spark.executor.memory for heap size, and spark.memory.offHeap.size for off-heap size, and these 2 together is the total memory consumption for each executor process.

Now we have a single config for shuffle service total memory consumption, this seems better, shall we fix the executor memory config?

Copy link
Author

@jinxing64 jinxing64 Jun 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, you mean:

  1. the change(spark.network.netty.memCostWaterMark) in this pr is ok
  2. we merge spark.executor.memory(heap size) and spark.memory.offHeap.size (off-heap size, used by Tungsten) to be one in executor memory config.

Do I understand correctly ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I do think spark.memory.offHeap.size is quite confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can create a JIRA and send it to dev-list to gather feedbacks. Do you mind to do this if you have time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, really would love to :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
I made a JIRA(https://issues.apache.org/jira/browse/SPARK-21270) about merging the memory configs. Please take a look when you have time and give some comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we merge this and the above log into one log entry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this.reason == o.reason?

Copy link
Contributor

@cloud-fan cloud-fan Jun 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments to explain the test?

@cloud-fan
Copy link
Contributor

LGTM except some minor comments, thanks for working on it!

@SparkQA
Copy link

SparkQA commented Jun 29, 2017

Test build #78880 has finished for PR 18388 at commit c5a01aa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 29, 2017

Test build #78897 has finished for PR 18388 at commit c5a01aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM. Can we add descriptions of these new configs in configuration.md? thanks!

@SparkQA
Copy link

SparkQA commented Jun 29, 2017

Test build #78914 has finished for PR 18388 at commit 1d34578.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rxin @JoshRosen @zsxwing any suggestion for the config name?

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 1, 2017

does this patch require server side change for shuffle service?

@jinxing64
Copy link
Author

Yes, there is a change. Server side may return OpenBlocksFailed for the "open blocks" request, which means that old client is not compatible with new server. Is it acceptable ?

@cloud-fan
Copy link
Contributor

cc @zsxwing how strictly we require for shuffle service compatibility?

channel.close();
}
ManagedBuffer buf;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra empty line

@SparkQA
Copy link

SparkQA commented Jul 22, 2017

Test build #79855 has finished for PR 18388 at commit 4bfeabb.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2017

Test build #79856 has finished for PR 18388 at commit 5f622c3.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2017

Test build #79857 has finished for PR 18388 at commit 4de417f.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@zsxwing
Copy link
Member

zsxwing commented Jul 22, 2017

@jinxing64 Sorry, I forgot to mention one request. Could you add a unit test? Right now it's disabled so the new codes are not tested. It will help avoid some obvious mistakes, such as the missing return issue :)

@SparkQA
Copy link

SparkQA commented Jul 22, 2017

Test build #79858 has finished for PR 18388 at commit 4de417f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2017

Test build #79879 has finished for PR 18388 at commit 8ee60f8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 23, 2017

Test build #79886 has finished for PR 18388 at commit 8ee60f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2017

Test build #79923 has finished for PR 18388 at commit 3a018b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 799e131 Jul 25, 2017
@jinxing64
Copy link
Author

Thanks for merging !

@tgravescs
Copy link
Contributor

sorry I didn't get a chance to review this. Started but kept getting distracted by other higher priority things. I think we should expand the description of the config to say what happens when the limit is hit. Since its not using real flow control a user might set this thinking nothing bad will happen, but its dropping connections so could cause failures if the retries don't work.

I'll file a separate jira for that.

Also what was the issue with implementing the actual flow control part? Was it just adding a queueing type mechanism? We should file a separate jira so we can add that later.

@tgravescs
Copy link
Contributor

@jinxing64
Copy link
Author

@tgravescs
Thanks for help.

I think we should expand the description of the config to say what happens when the limit is hit. Since its not using real flow control a user might set this thinking nothing bad will happen, but its dropping connections so could cause failures if the retries don't work.

Could you give the link for the JIRA ? I'm happy to work on a follow-up PR if possible.

For the flow control part, I'm just worrying the queue will be too large and causing memory issue.

@cloud-fan
Copy link
Contributor

if it's ok to break shuffle service backward compatibility(by default this config is off), I think we should introduce a new response type to tell the client that, the shuffle service is still up but just in memory shortage, please do not give up and keep trying.

Currently we just close the connection, so the client has no idea what's going on and may mistakenly report FetchFailure and fail the stage/job.

@tgravescs
Copy link
Contributor

its not ok to break the shuffle service backward compatibility though. Especially not in a minor release. We may choose to do it in like a 3.0 but even then it makes upgrading very hard to users.

@cloud-fan
Copy link
Contributor

OK then let's go with the flow control direction.

For the flow control part, I'm just worrying the queue will be too large and causing memory issue.

We can make an external queue, i.e. if it's too large, spill to disk.

Another concern is, with flow control, shuffle service may hang a request for a long time, and cause the client to timeout and fail. It's better than just closing the connection, but there is still a chance that the client mistakenly reports FetchFailure.

@tgravescs
Copy link
Contributor

the idea of the queue is not to queue entire reqeusts, its just to flow contol the # chunks being sent at once. for example you only create 5 outgoing chunks at a time per connection, once one of those has been sent you add another one. This limits the amount of memory being used by those outgoing chunks. This should not affect closing the connection, at least not change it from the current behavior.

@cloud-fan
Copy link
Contributor

oh i see, it's orthogonal to the current approach. Makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants