Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ private[remote] object Decoder {
recipientPath: String,
inboundEnvelope: InboundEnvelope)

private object Tick
private case object Tick
private case object EvictActorRefResolveCache

/** Materialized value of [[Encoder]] which allows safely calling into the operator to interfact with compression tables. */
private[remote] trait InboundCompressionAccess {
Expand Down Expand Up @@ -359,7 +360,8 @@ private[remote] class Decoder(
inEnvelopePool: ObjectPool[ReusableInboundEnvelope])
extends GraphStageWithMaterializedValue[FlowShape[EnvelopeBuffer, InboundEnvelope], InboundCompressionAccess] {

import Decoder.Tick
import Decoder.{ EvictActorRefResolveCache, Tick }

val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
Expand Down Expand Up @@ -395,6 +397,9 @@ private[remote] class Decoder(
val tickDelay = 1.seconds
scheduleWithFixedDelay(Tick, tickDelay, tickDelay)

val evictDelay = 60.seconds // FIXME config
scheduleWithFixedDelay(EvictActorRefResolveCache, evictDelay, evictDelay)

if (settings.Advanced.Compression.ActorRefs.Enabled) {
val d = settings.Advanced.Compression.ActorRefs.AdvertisementInterval
scheduleWithFixedDelay(AdvertiseActorRefsCompressionTable, d, d)
Expand Down Expand Up @@ -584,6 +589,9 @@ private[remote] class Decoder(
tickMessageCount = messageCount
tickTimestamp = now

case EvictActorRefResolveCache =>
actorRefResolver.clearRemovedAssociations()

case AdvertiseActorRefsCompressionTable =>
compressions
.runNextActorRefAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag]
find(position = h & Mask, probeDistance = 0)
}

final def valuesIterator(): Iterator[V] =
values.iterator.filterNot(_ eq null)

final def stats: CacheStatistics = {
var i = 0
var sum = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.remote.serialization

import scala.concurrent.duration.Deadline
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

import akka.actor.ActorRef
Expand Down Expand Up @@ -35,12 +37,20 @@ private[akka] object ActorRefResolveThreadLocalCache

override def createExtension(system: ExtendedActorSystem): ActorRefResolveThreadLocalCache =
new ActorRefResolveThreadLocalCache(system)

private final case class ActorRefResolveCacheHolder(cache: ActorRefResolveCache, evictDeadine: Deadline)
}

/**
* INTERNAL API
*/
private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSystem) extends Extension {
import ActorRefResolveThreadLocalCache.ActorRefResolveCacheHolder

val evictInterval: FiniteDuration = {
import scala.concurrent.duration._
60.seconds // FIXME config
}

private val provider = system.provider match {
case r: RemoteActorRefProvider => r
Expand All @@ -50,12 +60,21 @@ private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSys
s"not with ${system.provider.getClass}")
}

private val current = new ThreadLocal[ActorRefResolveCache] {
override def initialValue: ActorRefResolveCache = new ActorRefResolveCache(provider)
private val current = new ThreadLocal[ActorRefResolveCacheHolder] {
override def initialValue: ActorRefResolveCacheHolder = {
ActorRefResolveCacheHolder(new ActorRefResolveCache(provider), Deadline.now + evictInterval)
}
}

def threadLocalCache(@unused provider: RemoteActorRefProvider): ActorRefResolveCache =
current.get
def threadLocalCache(@unused provider: RemoteActorRefProvider): ActorRefResolveCache = {
val holder = current.get
val cache = holder.cache
if (holder.evictDeadine.isOverdue()) {
cache.clearRemovedAssociations()
current.set(ActorRefResolveCacheHolder(cache, Deadline.now + evictInterval))
}
cache
}

}

Expand Down Expand Up @@ -91,6 +110,19 @@ private[akka] abstract class AbstractActorRefResolveCache[R <: ActorRef: ClassTa
ref
}

/**
* Invalidate cachedAssociation in all RemoteActorRef entries where the `Association` is removed.
*/
def clearRemovedAssociations(): Unit = {
valuesIterator().foreach {
case r: RemoteActorRef =>
val cachedAssociation = r.cachedAssociation
if (cachedAssociation != null && cachedAssociation.isRemovedAfterQuarantined())
r.cachedAssociation = null
case _ =>
}
}

override protected def compute(k: String): R

override protected def hash(k: String): Int = Unsafe.fastHash(k)
Expand Down