From 4e5a347b4b36f78c25c63104fe5fd66a3f1a57bd Mon Sep 17 00:00:00 2001 From: yorksity Date: Thu, 24 Nov 2022 12:12:45 +0800 Subject: [PATCH 1/2] Solve the problem of RddId negative --- .../scala/org/apache/spark/SparkContext.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5cbf2e833717..dd8a468ca347 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2570,10 +2570,23 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() - private val nextRddId = new AtomicInteger(0) + private var nextRddId = new AtomicInteger(0) /** Register a new RDD, returning its RDD ID */ - private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + private[spark] def newRddId(): Int = { + var id = nextRddId.getAndIncrement() + if (id >= 0) { + return id + } + this.synchronized { + id = nextRddId.getAndIncrement() + if (id < 0) { + nextRddId = new AtomicInteger(0) + id = nextRddId.getAndIncrement() + } + } + id + } /** * Registers listeners specified in spark.extraListeners, then starts the listener bus. From 9e96549130c8258464247a6edac47265a418aaf6 Mon Sep 17 00:00:00 2001 From: yorksity Date: Sun, 11 Dec 2022 21:25:38 +0800 Subject: [PATCH 2/2] Solve the problem of RddId negative --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index dd8a468ca347..1f35897a1ed3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2579,11 +2579,8 @@ class SparkContext(config: SparkConf) extends Logging { return id } this.synchronized { + nextRddId = new AtomicInteger(0) id = nextRddId.getAndIncrement() - if (id < 0) { - nextRddId = new AtomicInteger(0) - id = nextRddId.getAndIncrement() - } } id }