-
Notifications
You must be signed in to change notification settings - Fork 29k
[WIP][SPARK-30694][SHUFFLE]If exception occured while fetching blocks by ExternalBlockClient, fail early when External Shuffle Service is not alive #27419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan But this module didn't dependency on And maybe we should add a check to make sure External Shuffle Service down |
|
Can one of the admins verify this patch? |
| (blockIds1, listener1) -> { | ||
| // Unless this client is closed. | ||
| if (clientFactory != null) { | ||
| TransportClient client = clientFactory.createClient(host, port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we only add try-catch to wrap this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we only add try-catch to wrap this line?
It's OK to only wrap this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we only add try-catch to wrap this line?
Updated, hope for review current change
| } | ||
| } catch (IOException e) { | ||
| logger.info("The relative remote external shuffle service (host: " + host + "," + | ||
| "port: "+ port + "), which maintains the block data can't been connected."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so what you do is just printing a log when error happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so what you do is just printing a log when error happens?
Sorry, I miss the way to stop retry.
May be we should add some way to check external shuffle service alive?
| } catch (IOException e) { | ||
| logger.info("The relative remote external shuffle service (host: " + host + "," + | ||
| "port: "+ port + "), which maintains the block data can't been connected."); | ||
| throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan
I want to throw exception like https://github.com/AngersZhuuuu/spark/blob/e2dbe4bca387542e2043abc0801190531e805684/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L130
But this module don't dependency spark-core, Add deps and throw exception like
/**
* Exception throw when can't connect to External Shuffle Service
*/
private[spark] case class ExternalShuffleServiceLostException(message: String)
extends SparkException(message)
Is what I want to do.
And seem we don't have way to check external shuffle service is alive.
| try { | ||
| RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = | ||
| (blockIds1, listener1) -> { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we revert this unnecessary line?
| } else { | ||
| logger.info("This clientFactory was closed. Skipping further block fetch retries."); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert?
| } | ||
| new OneForOneBlockFetcher(client, appId, execId, | ||
| blockIds1, listener1, conf, downloadFileManager).start(); | ||
| blockIds1, listener1, conf, downloadFileManager).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we use 2 indents in java?
| TransportClient client = null; | ||
| try { | ||
| client = clientFactory.createClient(host, port); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that any exception implies the lost external shuffle service?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that any exception implies the lost external shuffle service?
No, that's why I say that we may need a way to check External Shuffle Service alive.
Any good suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any good idea yet.
And if we disallow retry for any exception, why shouldn't we just set to not retry at the beginning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any good idea yet.
And if we disallow retry for any exception, why shouldn't we just set to not retry at the beginning?
Connect failed because of network should retry, but if because of server down, shouldn't retry, I need to find way to check external shuffle service is down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any good idea yet.
And if we disallow retry for any exception, why shouldn't we just set to not retry at the beginning?
How about current check , check host reachable and port can be connected.
| } catch (InterruptedException e) { | ||
| throw e; | ||
| } catch (IOException e) { | ||
| if (!isHostReachable(host, 3000) || !isHostPortConnectable(host, port)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also possible that Host is unreachable or connectable when network is unstable, right?
|
If you skip retry, how can you know it's not transient network problem? |
Yea, hard to choose,how about add a manager of External Shuffle Service and check service with heart beat, then when this error happen, ask for this manager? but keep this manager always alive is a big problem. |
What changes were proposed in this pull request?
Like SPARK-27637,
When we fetch data use External Shuffle Service, we use ExternalBlockStoreClient to fetch blocks
from External Shuffle Service, in real huge cluster, we always need to offline some bad machine, and remote External Shuffle Service will down and we will can't create TransportClient to connect.
It will throw IOException, but in current way, we will only retry until run out of retry times.
I think we should throw this error early like SPARK-27637
Why are the changes needed?
Old solution not comprehensive. Throw exception early and reduce unnecessary retry.
Does this PR introduce any user-facing change?
NO
How was this patch tested?
WIP