@@ -58,11 +58,6 @@ private[yarn] class YarnAllocationHandler(
5858 map : collection.Map [String , collection.Set [SplitInfo ]])
5959 extends YarnAllocator with Logging {
6060
61- private val ANY_HOST = " *"
62- // All requests are issued with same priority : we do not (yet) have any distinction between
63- // request types (like map/reduce in hadoop for example)
64- private val PRIORITY = 1
65-
6661 // These three are locked on allocatedHostToContainersMap. Complementary data structures
6762 // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
6863 // allocatedContainerToHostMap: container to host mapping.
@@ -362,7 +357,7 @@ private[yarn] class YarnAllocationHandler(
362357 for (container <- hostContainers) {
363358 val candidateHost = container.getHostName
364359 val candidateNumContainers = container.getNumContainers
365- assert(ANY_HOST != candidateHost)
360+ assert(YarnSparkHadoopUtil . ANY_HOST != candidateHost)
366361
367362 val rack = YarnSparkHadoopUtil .lookupRack(conf, candidateHost)
368363 if (rack != null ) {
@@ -376,7 +371,8 @@ private[yarn] class YarnAllocationHandler(
376371 new ArrayBuffer [ResourceRequest ](rackToCounts.size)
377372 for ((rack, count) <- rackToCounts){
378373 requestedContainers +=
379- createResourceRequest(AllocationType .RACK , rack, count, PRIORITY )
374+ createResourceRequest(AllocationType .RACK , rack, count,
375+ YarnSparkHadoopUtil .RM_REQUEST_PRIORITY )
380376 }
381377
382378 requestedContainers.toList
@@ -407,7 +403,7 @@ private[yarn] class YarnAllocationHandler(
407403 logDebug(" numExecutors: " + numExecutors + " , host preferences: " +
408404 preferredHostToCount.isEmpty)
409405 resourceRequests = List (createResourceRequest(
410- AllocationType .ANY , null , numExecutors, PRIORITY ))
406+ AllocationType .ANY , null , numExecutors, YarnSparkHadoopUtil . RM_REQUEST_PRIORITY ))
411407 } else {
412408 // request for all hosts in preferred nodes and for numExecutors -
413409 // candidates.size, request by default allocation policy.
@@ -421,7 +417,7 @@ private[yarn] class YarnAllocationHandler(
421417 AllocationType .HOST ,
422418 candidateHost,
423419 requiredCount,
424- PRIORITY )
420+ YarnSparkHadoopUtil . RM_REQUEST_PRIORITY )
425421 }
426422 }
427423 val rackContainerRequests : List [ResourceRequest ] = createRackResourceRequests(
@@ -431,7 +427,7 @@ private[yarn] class YarnAllocationHandler(
431427 AllocationType .ANY ,
432428 resource = null ,
433429 numExecutors,
434- PRIORITY )
430+ YarnSparkHadoopUtil . RM_REQUEST_PRIORITY )
435431
436432 val containerRequests : ArrayBuffer [ResourceRequest ] = new ArrayBuffer [ResourceRequest ](
437433 hostContainerRequests.size + rackContainerRequests.size + 1 )
@@ -481,7 +477,7 @@ private[yarn] class YarnAllocationHandler(
481477 // There must be a third request - which is ANY : that will be specially handled.
482478 requestType match {
483479 case AllocationType .HOST => {
484- assert(ANY_HOST != resource)
480+ assert(YarnSparkHadoopUtil . ANY_HOST != resource)
485481 val hostname = resource
486482 val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
487483
@@ -495,7 +491,7 @@ private[yarn] class YarnAllocationHandler(
495491 createResourceRequestImpl(rack, numExecutors, priority)
496492 }
497493 case AllocationType .ANY => createResourceRequestImpl(
498- ANY_HOST , numExecutors, priority)
494+ YarnSparkHadoopUtil . ANY_HOST , numExecutors, priority)
499495 case _ => throw new IllegalArgumentException (
500496 " Unexpected/unsupported request type: " + requestType)
501497 }
0 commit comments