Skip to content

Commit 6e5bb2e

Browse files
committed
=act Extract AtomicCancellable in Scheduler.
1 parent 8f5a5a3 commit 6e5bb2e

File tree

2 files changed

+59
-73
lines changed

2 files changed

+59
-73
lines changed

akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import scala.util.control.NonFatal
1616

1717
import com.typesafe.config.Config
1818

19+
import akka.actor.Scheduler.AtomicCancellable
1920
import akka.dispatch.AbstractNodeQueue
2021
import akka.event.LoggingAdapter
2122
import akka.util.Helpers
@@ -102,49 +103,24 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
102103
override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)(
103104
implicit executor: ExecutionContext): Cancellable = {
104105
checkMaxDelay(roundUp(delay).toNanos)
105-
try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self =>
106-
compareAndSet(
107-
InitialRepeatMarker,
106+
new AtomicCancellable(
107+
InitialRepeatMarker,
108+
stateRef =>
108109
schedule(
109110
executor,
110111
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
111112
override def run(): Unit = {
112113
try {
113114
runnable.run()
114115
val driftNanos = clock() - getAndAdd(delay.toNanos)
115-
if (self.get != null)
116-
swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
116+
if (stateRef.get != null)
117+
stateRef.swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
117118
} catch {
118119
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
119120
}
120121
}
121122
},
122123
roundUp(initialDelay)))
123-
124-
@tailrec private def swap(c: Cancellable): Unit = {
125-
get match {
126-
case null => if (c != null) c.cancel()
127-
case old => if (!compareAndSet(old, c)) swap(c)
128-
}
129-
}
130-
131-
final def cancel(): Boolean = {
132-
@tailrec def tailrecCancel(): Boolean = {
133-
get match {
134-
case null => false
135-
case c =>
136-
if (c.cancel()) compareAndSet(c, null)
137-
else compareAndSet(c, null) || tailrecCancel()
138-
}
139-
}
140-
141-
tailrecCancel()
142-
}
143-
144-
override def isCancelled: Boolean = get == null
145-
} catch {
146-
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
147-
}
148124
}
149125

150126
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(

akka-actor/src/main/scala/akka/actor/Scheduler.scala

Lines changed: 53 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ package akka.actor
66

77
import java.util.concurrent.atomic.AtomicReference
88

9+
import scala.annotation.nowarn
910
import scala.annotation.tailrec
1011
import scala.concurrent.ExecutionContext
1112
import scala.concurrent.duration._
1213
import scala.util.control.NoStackTrace
1314

14-
import scala.annotation.nowarn
15-
15+
import akka.actor.Scheduler.AtomicCancellable
1616
import akka.annotation.InternalApi
1717
import akka.util.JavaDurationConverters
1818

@@ -72,49 +72,23 @@ trait Scheduler {
7272
*/
7373
def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)(
7474
implicit executor: ExecutionContext): Cancellable = {
75-
try new AtomicReference[Cancellable](Cancellable.initialNotCancelled) with Cancellable { self =>
76-
compareAndSet(
77-
Cancellable.initialNotCancelled,
78-
scheduleOnce(
79-
initialDelay,
80-
new Runnable {
81-
override def run(): Unit = {
82-
try {
83-
runnable.run()
84-
if (self.get != null)
85-
swap(scheduleOnce(delay, this))
86-
} catch {
87-
// ignore failure to enqueue or terminated target actor
88-
case _: SchedulerException =>
89-
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
90-
}
75+
new AtomicCancellable(Cancellable.initialNotCancelled, stateRef => {
76+
scheduleOnce(
77+
initialDelay,
78+
new Runnable {
79+
override def run(): Unit = {
80+
try {
81+
runnable.run()
82+
if (stateRef.get != null)
83+
stateRef.swap(scheduleOnce(delay, this))
84+
} catch {
85+
// ignore failure to enqueue or terminated target actor
86+
case _: SchedulerException =>
87+
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
9188
}
92-
}))
93-
94-
@tailrec private def swap(c: Cancellable): Unit = {
95-
get match {
96-
case null => if (c != null) c.cancel()
97-
case old => if (!compareAndSet(old, c)) swap(c)
98-
}
99-
}
100-
101-
final def cancel(): Boolean = {
102-
@tailrec def tailrecCancel(): Boolean = {
103-
get match {
104-
case null => false
105-
case c =>
106-
if (c.cancel()) compareAndSet(c, null)
107-
else compareAndSet(c, null) || tailrecCancel()
10889
}
109-
}
110-
111-
tailrecCancel()
112-
}
113-
114-
override def isCancelled: Boolean = get == null
115-
} catch {
116-
case SchedulerException(msg) => throw new IllegalStateException(msg)
117-
}
90+
})
91+
})
11892
}
11993

12094
/**
@@ -562,4 +536,40 @@ object Scheduler {
562536
* a custom implementation of `Scheduler` must also implement this.
563537
*/
564538
trait TaskRunOnClose extends Runnable
539+
540+
private[akka] class AtomicCancellable(initialValue: Cancellable, next: AtomicCancellable => Cancellable)
541+
extends AtomicReference[Cancellable](initialValue)
542+
with Cancellable { self =>
543+
544+
try {
545+
compareAndSet(initialValue, next(self))
546+
} catch {
547+
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
548+
}
549+
550+
@tailrec final private[akka] def swap(c: Cancellable): Unit = {
551+
get match {
552+
case null => if (c != null) c.cancel()
553+
case old =>
554+
if (!compareAndSet(old, c))
555+
swap(c)
556+
}
557+
}
558+
559+
final def cancel(): Boolean = {
560+
@tailrec def tailrecCancel(): Boolean = {
561+
get match {
562+
case null => false
563+
case c =>
564+
if (c.cancel()) compareAndSet(c, null)
565+
else compareAndSet(c, null) || tailrecCancel()
566+
}
567+
}
568+
569+
tailrecCancel()
570+
}
571+
572+
final override def isCancelled: Boolean = get == null
573+
574+
}
565575
}

0 commit comments

Comments
 (0)