Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2570,10 +2570,20 @@ class SparkContext(config: SparkConf) extends Logging {

private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()

private val nextRddId = new AtomicInteger(0)
private var nextRddId = new AtomicInteger(0)
Copy link

@schlosna schlosna Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the RDD ID overflows within a SparkContext? Are there tests that cover these cases?

Curious if it would be better to switch to an AtomicLong and just modulo max int?

Suggested change
private var nextRddId = new AtomicInteger(0)
private[spark] def newRddId(): Int = (nextRddId.getAndIncrement() % Integer.MAX_VALUE).toInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the RDD ID overflows within a SparkContext?
When BlockManager generates BlockId, BlockId only supports positive rddid, so BlockId generation fails.

switch to an AtomicLong ?
The scope of influence is very broad, which may require extensive discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont need to make this a var - see below.


/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
private[spark] def newRddId(): Int = {
var id = nextRddId.getAndIncrement()
if (id >= 0) {
return id
}
this.synchronized {
nextRddId = new AtomicInteger(0)
id = nextRddId.getAndIncrement()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid the duplicate call of 'nextRddId.getAndIncrement()'?

}
id
}
Comment on lines +2576 to +2586
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this would remove the need for synchronization, etc

Suggested change
private[spark] def newRddId(): Int = {
var id = nextRddId.getAndIncrement()
if (id >= 0) {
return id
}
this.synchronized {
nextRddId = new AtomicInteger(0)
id = nextRddId.getAndIncrement()
}
id
}
private[spark] def newRddId(): Int = {
nextRddId.getAndUpdate { i =>
var nextValue = i + 1
if (nextValue < 0) {
nextValue = 0
}
nextValue
}
}


/**
* Registers listeners specified in spark.extraListeners, then starts the listener bus.
Expand Down