-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4188] [Core] Perform network-level retry of shuffle file fetches #3101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
66e5a24
05ff43c
6f594cd
e80e4c2
c7fd107
72a2a32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,11 @@ public TransportConf(ConfigProvider conf) { | |
| /** IO mode: nio or epoll */ | ||
| public String ioMode() { return conf.get("spark.shuffle.io.mode", "NIO").toUpperCase(); } | ||
|
|
||
| /** If true, we will prefer allocating off-heap byte buffers within Netty. */ | ||
| public boolean preferDirectBufs() { | ||
| return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true); | ||
| } | ||
|
|
||
| /** Connect timeout in secs. Default 120 secs. */ | ||
| public int connectionTimeoutMs() { | ||
| return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000; | ||
|
|
@@ -58,4 +63,16 @@ public int connectionTimeoutMs() { | |
|
|
||
| /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ | ||
| public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); } | ||
|
|
||
| /** | ||
| * Max number of times we will try IO exceptions (such as connection timeouts) per request. | ||
| * If set to 0, we will not do any retries. | ||
| */ | ||
| public int maxIORetries() { return conf.getInt("spark.shuffle.io.maxIORetries", 3); } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spark.shuffle.io.maxRetries or maxNetworkRetries? |
||
|
|
||
| /** | ||
| * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. | ||
| * Only relevant if maxIORetries > 0. | ||
| */ | ||
| public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.ioRetryWaitTime", 5000); } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spark.shuffle.io.retryWaitMs ? |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,7 +57,7 @@ public void tearDown() { | |
| } | ||
|
|
||
| @Test | ||
| public void createAndReuseBlockClients() throws TimeoutException { | ||
| public void createAndReuseBlockClients() throws Exception { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we throwing more than Timeout now?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IOException instead of TimeoutException |
||
| TransportClientFactory factory = context.createClientFactory(); | ||
| TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); | ||
| TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); | ||
|
|
@@ -88,7 +88,7 @@ public void neverReturnInactiveClients() throws Exception { | |
| } | ||
|
|
||
| @Test | ||
| public void closeBlockClientsWithFactory() throws TimeoutException { | ||
| public void closeBlockClientsWithFactory() throws Exception { | ||
| TransportClientFactory factory = context.createClientFactory(); | ||
| TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); | ||
| TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why heap explicitly here? wouldn't your spark.shuffle.io.preferDirectBufs flag already controls it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The header is a very small buffer, I thought it was overkill to try to get direct byte bufs for it.