Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Arrays

import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.StageStatus
import org.apache.spark.util.Utils

/**
* Low-level status reporting APIs for monitoring job and stage progress.
Expand Down Expand Up @@ -103,10 +104,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore
*/
def getExecutorInfos: Array[SparkExecutorInfo] = {
store.executorList(true).map { exec =>
val (host, port) = exec.hostPort.split(":", 2) match {
case Array(h, p) => (h, p.toInt)
case Array(h) => (h, -1)
}
val (host, port) = Utils.parseHostPort(exec.hostPort)
val cachedMem = exec.memoryMetrics.map { mem =>
mem.usedOnHeapStorageMemory + mem.usedOffHeapStorageMemory
}.getOrElse(0L)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.{RDDInfo, StorageLevel}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.{AccumulatorContext, Utils}
import org.apache.spark.util.collection.OpenHashSet

/**
Expand Down Expand Up @@ -307,7 +307,7 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend
// peak values for executor level metrics
val peakExecutorMetrics = new ExecutorMetrics()

def hostname: String = if (host != null) host else hostPort.split(":")(0)
def hostname: String = if (host != null) host else Utils.parseHostPort(hostPort)._1

override protected def doUpdate(): Any = {
val memoryMetrics = if (totalOnHeap >= 0) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1._
import org.apache.spark.ui.scope._
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.KVIndex

private[spark] case class AppStatusStoreMetadata(version: Long)
Expand Down Expand Up @@ -57,7 +58,7 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) {
private def active: Boolean = info.isActive

@JsonIgnore @KVIndex("host")
val host: String = info.hostPort.split(":")(0)
val host: String = Utils.parseHostPort(info.hostPort)._1

}

Expand Down
48 changes: 37 additions & 11 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1026,13 +1026,27 @@ private[spark] object Utils extends Logging {
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}

/**
* Checks if the host contains only valid hostname/ip without port
* NOTE: Incase of IPV6 ip it should be enclosed inside []
*/
def checkHost(host: String): Unit = {
assert(host != null && host.indexOf(':') == -1, s"Expected hostname (not IP) but got $host")
if (host != null && host.split(":").length > 2) {
assert(host.startsWith("[") && host.endsWith("]"),
s"Expected hostname or IPv6 IP enclosed in [] but got $host")
} else {
assert(host != null && host.indexOf(':') == -1, s"Expected hostname or IP but got $host")
}
}

def checkHostPort(hostPort: String): Unit = {
assert(hostPort != null && hostPort.indexOf(':') != -1,
s"Expected host and port but got $hostPort")
if (hostPort != null && hostPort.split(":").length > 2) {
assert(hostPort != null && hostPort.indexOf("]:") != -1,
s"Expected host and port but got $hostPort")
} else {
assert(hostPort != null && hostPort.indexOf(':') != -1,
s"Expected host and port but got $hostPort")
}
}

// Typically, this will be of order of number of nodes in cluster
Expand All @@ -1046,18 +1060,30 @@ private[spark] object Utils extends Logging {
return cached
}

val indx: Int = hostPort.lastIndexOf(':')
// This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
// but then hadoop does not support ipv6 right now.
// For now, we assume that if port exists, then it is valid - not check if it is an int > 0
if (-1 == indx) {
def setDefaultPortValue: (String, Int) = {
val retval = (hostPort, 0)
hostPortParseResults.put(hostPort, retval)
return retval
retval
}
// checks if the hostport contains IPV6 ip and parses the host, port
if (hostPort != null && hostPort.split(":").length > 2) {
val indx: Int = hostPort.lastIndexOf("]:")
if (-1 == indx) {
return setDefaultPortValue
}
val port = hostPort.substring(indx + 2).trim()
val retval = (hostPort.substring(0, indx + 1).trim(), if (port.isEmpty) 0 else port.toInt)
hostPortParseResults.putIfAbsent(hostPort, retval)
} else {
val indx: Int = hostPort.lastIndexOf(':')
if (-1 == indx) {
return setDefaultPortValue
}
val port = hostPort.substring(indx + 1).trim()
val retval = (hostPort.substring(0, indx).trim(), if (port.isEmpty) 0 else port.toInt)
hostPortParseResults.putIfAbsent(hostPort, retval)
}

val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt)
hostPortParseResults.putIfAbsent(hostPort, retval)
hostPortParseResults.get(hostPort)
}

Expand Down
97 changes: 97 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,103 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]")
assert(Utils.buildLocationMetadata(paths, 25) == "[path0, path1, path2, path3]")
}

test("checkHost supports both IPV4 and IPV6") {
// IPV4 ips
Utils.checkHost("0.0.0.0")
var e: AssertionError = intercept[AssertionError] {
Utils.checkHost("0.0.0.0:0")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it would be great if we check the error message to prevent future regression.

assert(e.getMessage.contains("Expected hostname or IP but got 0.0.0.0:0"))
e = intercept[AssertionError] {
Utils.checkHost("0.0.0.0:")
}
assert(e.getMessage.contains("Expected hostname or IP but got 0.0.0.0:"))
// IPV6 ips
Utils.checkHost("[::1]")
e = intercept[AssertionError] {
Utils.checkHost("[::1]:0")
}
assert(e.getMessage.contains("Expected hostname or IPv6 IP enclosed in [] but got [::1]:0"))
e = intercept[AssertionError] {
Utils.checkHost("[::1]:")
}
assert(e.getMessage.contains("Expected hostname or IPv6 IP enclosed in [] but got [::1]:"))
// hostname
Utils.checkHost("localhost")
e = intercept[AssertionError] {
Utils.checkHost("localhost:0")
}
assert(e.getMessage.contains("Expected hostname or IP but got localhost:0"))
e = intercept[AssertionError] {
Utils.checkHost("localhost:")
}
assert(e.getMessage.contains("Expected hostname or IP but got localhost:"))
}

test("checkHostPort support IPV6 and IPV4") {
// IPV4 ips
Utils.checkHostPort("0.0.0.0:0")
var e: AssertionError = intercept[AssertionError] {
Utils.checkHostPort("0.0.0.0")
}
assert(e.getMessage.contains("Expected host and port but got 0.0.0.0"))

// IPV6 ips
Utils.checkHostPort("[::1]:0")
e = intercept[AssertionError] {
Utils.checkHostPort("[::1]")
}
assert(e.getMessage.contains("Expected host and port but got [::1]"))

// hostname
Utils.checkHostPort("localhost:0")
e = intercept[AssertionError] {
Utils.checkHostPort("localhost")
}
assert(e.getMessage.contains("Expected host and port but got localhost"))
}

test("parseHostPort support IPV6 and IPV4") {
// IPV4 ips
var hostnamePort = Utils.parseHostPort("0.0.0.0:80")
assert(hostnamePort._1.equals("0.0.0.0"))
assert(hostnamePort._2 === 80)

hostnamePort = Utils.parseHostPort("0.0.0.0")
assert(hostnamePort._1.equals("0.0.0.0"))
assert(hostnamePort._2 === 0)

hostnamePort = Utils.parseHostPort("0.0.0.0:")
assert(hostnamePort._1.equals("0.0.0.0"))
assert(hostnamePort._2 === 0)

// IPV6 ips
hostnamePort = Utils.parseHostPort("[::1]:80")
assert(hostnamePort._1.equals("[::1]"))
assert(hostnamePort._2 === 80)

hostnamePort = Utils.parseHostPort("[::1]")
assert(hostnamePort._1.equals("[::1]"))
assert(hostnamePort._2 === 0)

hostnamePort = Utils.parseHostPort("[::1]:")
assert(hostnamePort._1.equals("[::1]"))
assert(hostnamePort._2 === 0)

// hostname
hostnamePort = Utils.parseHostPort("localhost:80")
assert(hostnamePort._1.equals("localhost"))
assert(hostnamePort._2 === 80)

hostnamePort = Utils.parseHostPort("localhost")
assert(hostnamePort._1.equals("localhost"))
assert(hostnamePort._2 === 0)

hostnamePort = Utils.parseHostPort("localhost:")
assert(hostnamePort._1.equals("localhost"))
assert(hostnamePort._2 === 0)
}
}

private class SimpleExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

/**
* Handles registering and unregistering the application with the YARN ResourceManager.
Expand Down Expand Up @@ -107,7 +108,7 @@ private[spark] class YarnRMClient extends Logging {
// so not all stable releases have it.
val prefix = WebAppUtils.getHttpSchemePrefix(conf)
val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)
val hosts = proxies.asScala.map(_.split(":").head)
val hosts = proxies.asScala.map(proxy => Utils.parseHostPort(proxy)._1)
val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
val params =
Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
Expand Down