Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,11 @@ public TransportClient createClient(String remoteHost, int remotePort)
final long preResolveHost = System.nanoTime();
final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
final String resolvMsg = resolvedAddress.isUnresolved() ? "failed" : "succeed";
if (hostResolveTimeMs > 2000) {
logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
logger.warn("DNS resolution {} for {} took {} ms", resolvMsg, resolvedAddress, hostResolveTimeMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arg, the line is too long and fails Java style checks. I'll fix. I don't see why the last test run didn't catch it ... the 'does not merge' message isn't related.

} else {
logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
logger.trace("DNS resolution {} for {} took {} ms", resolvMsg, resolvedAddress, hostResolveTimeMs);
}

synchronized (clientPool.locks[clientIndex]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,19 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
executorConf,
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)

var driver: RpcEndpointRef = null
val nTries = 3
for (i <- 0 until nTries if driver == null) {
try {
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
} catch {
case e: Throwable => if (i == nTries - 1) {
throw e
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes not just for k8s, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @felixcheung
It's the executor used for kubernetes but It canbe called for different resource managers.
I Labeled it as K8s because it also modifies the docker images used in k8s. Not sure what would be the best component. For instance YarnCoarseGrainedExecutorBackend extends this class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's what I mean. in case of shared class, the PR should tag all of them

Copy link
Contributor Author

@jlpedrosa jlpedrosa May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @felixcheung
Then this patch belongs to core and Kubernetes due to the java property in the docker image. Looking at https://spark-prs.appspot.com/ for the categories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @felixcheung any feedback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually If we don't add the conditions int the for loop, it will try to get many connections, even if they are succeeding, so I fixed the other way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops @jlpedrosa I meant to retain the condition in the loop, but otherwise I find the code right above simpler. However I like retaining the original exception. Just please pay attention to the style. driver = indent is off and needs to say if (i == nTries - 1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen
Fixed the indentation. Please double check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks the same; did you push?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I think it's in the right place, I'm rewriting history to clean the multiple commits. I think it is in the right place...

}

val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
fetcher.shutdown()
Expand Down