Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ private[spark] class PythonAccumulatorV2(
private val serverPort: Int)
extends CollectionAccumulator[Array[Byte]] {

Utils.checkHost(serverHost, "Expected hostname")
Utils.checkHost(serverHost)

val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[deploy] object DeployMessages {
memory: Int,
workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)
}

Expand Down Expand Up @@ -131,7 +131,7 @@ private[deploy] object DeployMessages {

// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Utils.checkHostPort(hostPort, "Required hostport")
Utils.checkHostPort(hostPort)
}

case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
Expand Down Expand Up @@ -183,7 +183,7 @@ private[deploy] object DeployMessages {
completedDrivers: Array[DriverInfo],
status: MasterState) {

Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)

def uri: String = "spark://" + host + ":" + port
Expand All @@ -201,7 +201,7 @@ private[deploy] object DeployMessages {
drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String,
cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {

Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[deploy] class Master(
private val waitingDrivers = new ArrayBuffer[DriverInfo]
private var nextDriverNumber = 0

Utils.checkHost(address.host, "Expected hostname")
Utils.checkHost(address.host)

private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) exte
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class WorkerInfo(
val webUiAddress: String)
extends Serializable {

Utils.checkHost(host, "Expected hostname")
Utils.checkHost(host)
assert (port > 0)

@transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[deploy] class Worker(
private val host = rpcEnv.address.host
private val port = rpcEnv.address.port

Utils.checkHost(host, "Expected hostname")
Utils.checkHost(host)
assert (port > 0)

// A scheduled executor used to send messages at the specified time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class Executor(
private val conf = env.conf

// No ip or host:port - just hostname
Utils.checkHost(executorHostname, "Expected executed slave to be a hostname")
Utils.checkHost(executorHostname)
// must not have port specified.
assert (0 == Utils.parseHostPort(executorHostname)._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BlockManagerId private (
def executorId: String = executorId_

if (null != host_) {
Utils.checkHost(host_, "Expected hostname")
Utils.checkHost(host_)
assert (port_ > 0)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] object RpcUtils {
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
Utils.checkHost(driverHost)
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -933,12 +933,13 @@ private[spark] object Utils extends Logging {
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}

def checkHost(host: String, message: String = "") {
assert(host.indexOf(':') == -1, message)
def checkHost(host: String) {
assert(host != null && host.indexOf(':') == -1, s"Expected hostname (not IP) but got $host")
}

def checkHostPort(hostPort: String, message: String = "") {
assert(hostPort.indexOf(':') != -1, message)
def checkHostPort(hostPort: String) {
assert(hostPort != null && hostPort.indexOf(':') != -1,
s"Expected host and port but got $hostPort")
}

// Typically, this will be of order of number of nodes in cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

Expand Down