Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ private[yarn] class YarnAllocator(
*
* @see SPARK-12864
*/
private var executorIdCounter: Int =
private lazy val initialExecutorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
private var executorIdCounter: Int = 0

// Queue to store the timestamp of failed executors
private val failedExecutorsTimeStamps = new Queue[Long]()
Expand Down Expand Up @@ -496,7 +497,7 @@ private[yarn] class YarnAllocator(
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
val executorId = (initialExecutorIdCounter + executorIdCounter).toString

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems a bit strange to me to "add" the Ids?
@vanzin @jerryshao

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial problem was that initialExecutorIdCounter is coming from the driver which is already stopped. Making this lazy solved this. The other integer is necessary because make it lazy var is not possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the point of fix. But also it seems a little strange to me.

Besides, do we really need to fix your issue? As I know the case here is not a normal one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is an issue only when the application is quite fast. Do you have concerns in general solving this or related the fix in the first commit? Asking it because pausing the user class thread would be definitely better as I've written below.

assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy.yarn

import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
Expand All @@ -33,6 +35,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.util.ManualClock

class MockResolver extends SparkRackResolver {
Expand Down Expand Up @@ -83,6 +86,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter

def createAllocator(
maxExecutors: Int = 5,
driverRef: RpcEndpointRef = mock(classOf[RpcEndpointRef]),
rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = {
val args = Array(
"--jar", "somejar.jar",
Expand All @@ -94,7 +98,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
.set("spark.executor.memory", "2048")
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
driverRef,
conf,
sparkConfClone,
rmClient,
Expand Down Expand Up @@ -284,7 +288,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
// Internally we track the set of blacklisted nodes, but yarn wants us to send *changes*
// to the blacklist. This makes sure we are sending the right updates.
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
val handler = createAllocator(4, mockAmClient)
val driverRef = mock(classOf[RpcEndpointRef])
val handler = createAllocator(4, driverRef, mockAmClient)
handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA"))
verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq[String]().asJava)

Expand Down Expand Up @@ -350,4 +355,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
clock.advance(50 * 1000L)
handler.getNumExecutorsFailed should be (0)
}

test("SPARK-23660: allocator should be created even if the driver not reachable") {
val driverRef = mock(classOf[RpcEndpointRef])
when(driverRef.askSync[Int](RetrieveLastAllocatedExecutorId))
.thenThrow(new RejectedExecutionException)
createAllocator(4, driverRef)
}
}