Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ package org.apache.spark.deploy.rest

import java.io.{DataOutputStream, FileNotFoundException}
import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
import java.util.concurrent.TimeoutException
import javax.servlet.http.HttpServletResponse

import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.io.Source

import com.fasterxml.jackson.core.JsonProcessingException
import com.google.common.base.Charsets

import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion, SparkConf}

/**
* A client that submits applications to a [[RestSubmissionServer]].
Expand Down Expand Up @@ -225,7 +228,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
* Exposed for testing.
*/
private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
try {
import scala.concurrent.ExecutionContext.Implicits.global
val responseFuture = Future {
val dataStream =
if (connection.getResponseCode == HttpServletResponse.SC_OK) {
connection.getInputStream
Expand All @@ -251,11 +255,15 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
throw new SubmitRestProtocolException(
s"Message received from server was not a response:\n${unexpected.toJson}")
}
} catch {
}

try { Await.result(responseFuture, 10.seconds) } catch {
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
throw new SubmitRestProtocolException("Malformed response received from server", malformed)
case timeout: TimeoutException =>
throw new SubmitRestConnectionException("No response from server", timeout)
}
}

Expand Down