Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import scala.util.control.NonFatal

import com.typesafe.config.Config

import akka.actor.Scheduler.AtomicCancellable
import akka.dispatch.AbstractNodeQueue
import akka.event.LoggingAdapter
import akka.util.Helpers
Expand Down Expand Up @@ -102,48 +103,23 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable = {
checkMaxDelay(roundUp(delay).toNanos)
try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self =>
compareAndSet(
InitialRepeatMarker,
new AtomicCancellable(InitialRepeatMarker) { self =>
final override protected def scheduleFirst(): Cancellable =
schedule(
executor,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
override def run(): Unit = {
try {
runnable.run()
val driftNanos = clock() - getAndAdd(delay.toNanos)
if (self.get != null)
if (self.get() != null)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self refs to the AtomicReference.

swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
} catch {
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
}
}
},
roundUp(initialDelay)))

@tailrec private def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old => if (!compareAndSet(old, c)) swap(c)
}
}

final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
}
}

tailrecCancel()
}

override def isCancelled: Boolean = get == null
} catch {
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
roundUp(initialDelay))
}
}

Expand Down
78 changes: 47 additions & 31 deletions akka-actor/src/main/scala/akka/actor/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NoStackTrace

import akka.actor.Scheduler.AtomicCancellable
import akka.annotation.InternalApi
import akka.util.JavaDurationConverters

Expand Down Expand Up @@ -70,51 +71,25 @@ trait Scheduler {
* Note: For scheduling within actors `with Timers` should be preferred.
*/
def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable = {
try new AtomicReference[Cancellable](Cancellable.initialNotCancelled) with Cancellable { self =>
compareAndSet(
Cancellable.initialNotCancelled,
implicit executor: ExecutionContext): Cancellable =
new AtomicCancellable(Cancellable.initialNotCancelled) {
final override protected def scheduleFirst(): Cancellable =
scheduleOnce(
initialDelay,
new Runnable {
override def run(): Unit = {
try {
runnable.run()
if (self.get != null)
if (get != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the other place it is

if (self.get() != null)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is self => used there but not here?

Copy link
Contributor Author

@He-Pin He-Pin Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I need to access the AtomicReference there but not the outer AtomicLong, and here we can access it directly.

swap(scheduleOnce(delay, this))
} catch {
// ignore failure to enqueue or terminated target actor
case _: SchedulerException =>
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
}
}
}))

@tailrec private def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old => if (!compareAndSet(old, c)) swap(c)
}
}

final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
}
}

tailrecCancel()
}

override def isCancelled: Boolean = get == null
} catch {
case SchedulerException(msg) => throw new IllegalStateException(msg)
})
}
}

/**
* Java API: Schedules a `Runnable` to be run repeatedly with an initial delay and
Expand Down Expand Up @@ -561,4 +536,45 @@ object Scheduler {
* a custom implementation of `Scheduler` must also implement this.
*/
trait TaskRunOnClose extends Runnable

/**
* INTERNAL API
*/
@InternalApi
private[akka] abstract class AtomicCancellable(initialValue: Cancellable)
extends AtomicReference[Cancellable](initialValue)
with Cancellable {
try {
compareAndSet(initialValue, scheduleFirst())
} catch {
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
}

protected def scheduleFirst(): Cancellable

@tailrec final protected def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old =>
if (!compareAndSet(old, c))
swap(c)
}
}

final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
}
}

tailrecCancel()
}

final override def isCancelled: Boolean = get == null

}
}