Skip to content

Commit 0e66f16

Browse files
committed
=act Extract AtomicCancellable in Scheduler.
1 parent 861a188 commit 0e66f16

File tree

2 files changed

+49
-60
lines changed

2 files changed

+49
-60
lines changed

akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import scala.util.control.NonFatal
1616

1717
import com.typesafe.config.Config
1818

19+
import akka.actor.Scheduler.AtomicCancellable
1920
import akka.dispatch.AbstractNodeQueue
2021
import akka.event.LoggingAdapter
2122
import akka.util.Helpers
@@ -102,48 +103,23 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
102103
override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)(
103104
implicit executor: ExecutionContext): Cancellable = {
104105
checkMaxDelay(roundUp(delay).toNanos)
105-
try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self =>
106-
compareAndSet(
107-
InitialRepeatMarker,
106+
new AtomicCancellable(InitialRepeatMarker) { self =>
107+
final override protected def next(): Cancellable =
108108
schedule(
109109
executor,
110110
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
111111
override def run(): Unit = {
112112
try {
113113
runnable.run()
114114
val driftNanos = clock() - getAndAdd(delay.toNanos)
115-
if (self.get != null)
115+
if (self.get() != null)
116116
swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
117117
} catch {
118118
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
119119
}
120120
}
121121
},
122-
roundUp(initialDelay)))
123-
124-
@tailrec private def swap(c: Cancellable): Unit = {
125-
get match {
126-
case null => if (c != null) c.cancel()
127-
case old => if (!compareAndSet(old, c)) swap(c)
128-
}
129-
}
130-
131-
final def cancel(): Boolean = {
132-
@tailrec def tailrecCancel(): Boolean = {
133-
get match {
134-
case null => false
135-
case c =>
136-
if (c.cancel()) compareAndSet(c, null)
137-
else compareAndSet(c, null) || tailrecCancel()
138-
}
139-
}
140-
141-
tailrecCancel()
142-
}
143-
144-
override def isCancelled: Boolean = get == null
145-
} catch {
146-
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
122+
roundUp(initialDelay))
147123
}
148124
}
149125

akka-actor/src/main/scala/akka/actor/Scheduler.scala

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import scala.concurrent.ExecutionContext
1212
import scala.concurrent.duration._
1313
import scala.util.control.NoStackTrace
1414

15+
import akka.actor.Scheduler.AtomicCancellable
1516
import akka.annotation.InternalApi
1617
import akka.util.JavaDurationConverters
1718

@@ -70,51 +71,25 @@ trait Scheduler {
7071
* Note: For scheduling within actors `with Timers` should be preferred.
7172
*/
7273
def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)(
73-
implicit executor: ExecutionContext): Cancellable = {
74-
try new AtomicReference[Cancellable](Cancellable.initialNotCancelled) with Cancellable { self =>
75-
compareAndSet(
76-
Cancellable.initialNotCancelled,
74+
implicit executor: ExecutionContext): Cancellable =
75+
new AtomicCancellable(Cancellable.initialNotCancelled) {
76+
final override protected def next(): Cancellable =
7777
scheduleOnce(
7878
initialDelay,
7979
new Runnable {
8080
override def run(): Unit = {
8181
try {
8282
runnable.run()
83-
if (self.get != null)
83+
if (get != null)
8484
swap(scheduleOnce(delay, this))
8585
} catch {
8686
// ignore failure to enqueue or terminated target actor
8787
case _: SchedulerException =>
8888
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
8989
}
9090
}
91-
}))
92-
93-
@tailrec private def swap(c: Cancellable): Unit = {
94-
get match {
95-
case null => if (c != null) c.cancel()
96-
case old => if (!compareAndSet(old, c)) swap(c)
97-
}
98-
}
99-
100-
final def cancel(): Boolean = {
101-
@tailrec def tailrecCancel(): Boolean = {
102-
get match {
103-
case null => false
104-
case c =>
105-
if (c.cancel()) compareAndSet(c, null)
106-
else compareAndSet(c, null) || tailrecCancel()
107-
}
108-
}
109-
110-
tailrecCancel()
111-
}
112-
113-
override def isCancelled: Boolean = get == null
114-
} catch {
115-
case SchedulerException(msg) => throw new IllegalStateException(msg)
91+
})
11692
}
117-
}
11893

11994
/**
12095
* Java API: Schedules a `Runnable` to be run repeatedly with an initial delay and
@@ -561,4 +536,42 @@ object Scheduler {
561536
* a custom implementation of `Scheduler` must also implement this.
562537
*/
563538
trait TaskRunOnClose extends Runnable
539+
540+
private[akka] abstract class AtomicCancellable(initialValue: Cancellable)
541+
extends AtomicReference[Cancellable](initialValue)
542+
with Cancellable { self =>
543+
544+
try {
545+
compareAndSet(initialValue, next())
546+
} catch {
547+
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
548+
}
549+
550+
protected def next(): Cancellable
551+
552+
@tailrec final protected def swap(c: Cancellable): Unit = {
553+
get match {
554+
case null => if (c != null) c.cancel()
555+
case old =>
556+
if (!compareAndSet(old, c))
557+
swap(c)
558+
}
559+
}
560+
561+
final def cancel(): Boolean = {
562+
@tailrec def tailrecCancel(): Boolean = {
563+
get match {
564+
case null => false
565+
case c =>
566+
if (c.cancel()) compareAndSet(c, null)
567+
else compareAndSet(c, null) || tailrecCancel()
568+
}
569+
}
570+
571+
tailrecCancel()
572+
}
573+
574+
final override def isCancelled: Boolean = get == null
575+
576+
}
564577
}

0 commit comments

Comments
 (0)