Skip to content

Commit

Permalink
Shard state and active nodes do not get updated when the node fails o…
Browse files Browse the repository at this point in the history
…n recovery (#120)

Shard state and active nodes do not get updated when the node fails on recovery.
  • Loading branch information
Helena Edelson committed Jan 31, 2018
1 parent 4a502ed commit fddc62e
Show file tree
Hide file tree
Showing 17 changed files with 531 additions and 439 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ final class FilodbCluster(val system: ExtendedActorSystem) extends Extension wit

/** Idempotent. */
def kamonInit(role: ClusterRole): ActorRef =
Await.result((guardian ? CreateTraceLogger(role)).mapTo[TraceLoggerRef], DefaultTaskTimeout).ref
Await.result((guardian ? CreateTraceLogger(role)).mapTo[TraceLoggerRef], DefaultTaskTimeout).identity

def coordinatorActor: ActorRef = _coordinatorActor.get.getOrElse {
val actor = Await.result((guardian ? CreateCoordinator).mapTo[CoordinatorRef], DefaultTaskTimeout).ref
val actor = Await.result((guardian ? CreateCoordinator).mapTo[CoordinatorIdentity], DefaultTaskTimeout).identity
logger.info(s"NodeCoordinatorActor created: $actor")
actor
}
Expand Down Expand Up @@ -165,7 +165,7 @@ final class FilodbCluster(val system: ExtendedActorSystem) extends Extension wit
def clusterSingleton(role: ClusterRole, watcher: Option[ActorRef]): ActorRef =
_clusterActor.get.getOrElse {
val e = CreateClusterSingleton(role.roleName, watcher)
val actor = Await.result((guardian ? e).mapTo[ClusterSingletonRef], DefaultTaskTimeout).ref
val actor = Await.result((guardian ? e).mapTo[ClusterSingletonIdentity], DefaultTaskTimeout).identity
_clusterActor.set(Some(actor))
_isInitialized.set(true)
actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ final class FilodbSettings(val conf: Config) {
lazy val DatasetDefinitions = config.as[Option[Map[String, Config]]]("dataset-definitions")
.getOrElse(Map.empty[String, Config])

/** The timeout to use to resolve NodeCoordinatorActor refs for new nodes. */
/** The timeout to use to resolve an actor ref for new nodes. */
val ResolveActorTimeout = config.as[FiniteDuration]("tasks.timeouts.resolve-actor")

}
Expand Down
114 changes: 60 additions & 54 deletions coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package filodb.coordinator

import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.{HashMap => MutableHashMap, HashSet => MutableHashSet}
import scala.concurrent.Future

import akka.actor._
Expand Down Expand Up @@ -92,6 +92,7 @@ object NodeClusterActor {
* INTERNAL MESSAGES
*/
private[coordinator] final case class AddCoordinator(roles: Set[String], addr: Address, coordinator: ActorRef)

/** Lets each NodeCoordinator know about the `clusterActor` so it can send it updates.
* Would not be necessary except coordinator currently is created before the clusterActor.
*/
Expand All @@ -107,14 +108,14 @@ object NodeClusterActor {
/**
* Creates a new NodeClusterActor.
*
* @param nodeCoordRole String, for the role containing the NodeCoordinatorActor or ingestion nodes
* @param localRole String, for the role containing the NodeCoordinatorActor or ingestion nodes
*/
def props(settings: FilodbSettings,
nodeCoordRole: String,
localRole: String,
metaStore: MetaStore,
assignmentStrategy: ShardAssignmentStrategy,
actors: ActorArgs): Props =
Props(new NodeClusterActor(settings, nodeCoordRole, metaStore, assignmentStrategy, actors))
Props(new NodeClusterActor(settings, localRole, metaStore, assignmentStrategy, actors))

class RemoteAddressExtension(system: ExtendedActorSystem) extends Extension {
def address: Address = system.provider.getDefaultAddress
Expand Down Expand Up @@ -160,7 +161,7 @@ object NodeClusterActor {
* - It tracks dataset shard assignments and coordinates new dataset setup
*/
private[filodb] class NodeClusterActor(settings: FilodbSettings,
nodeCoordRole: String,
localRole: String,
metaStore: MetaStore,
assignmentStrategy: ShardAssignmentStrategy,
actors: NodeClusterActor.ActorArgs
Expand All @@ -169,23 +170,22 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,
import ActorName._, NodeClusterActor._, ShardSubscriptions._
import actors._
import settings._
import context.dispatcher

val cluster = Cluster(context.system)

val memberRefs = new HashMap[Address, ActorRef]
val roleToCoords = new HashMap[String, Set[ActorRef]]().withDefaultValue(Set.empty[ActorRef])
val datasets = new HashMap[DatasetRef, Dataset]
val sources = new HashMap[DatasetRef, IngestionSource]
val initDatasets = new MutableHashSet[DatasetRef]
val roleToCoords = new MutableHashMap[String, Set[ActorRef]]().withDefaultValue(Set.empty[ActorRef])
val datasets = new MutableHashMap[DatasetRef, Dataset]
val sources = new MutableHashMap[DatasetRef, IngestionSource]
val shardManager = new ShardManager(assignmentStrategy)
val localRemoteAddr = RemoteAddressExtension(context.system).address
var everybodyLeftSender: Option[ActorRef] = None

private val initDatasets = new HashSet[DatasetRef]

import context.dispatcher

// subscribe to cluster changes, re-subscribe when restart
override def preStart(): Unit = {
super.preStart()
watcher ! NodeProtocol.PreStart(self.path)
watcher ! NodeProtocol.PreStart(singletonProxy, cluster.selfAddress)

// Restore previously set up datasets and shards. This happens in a very specific order so that
// shard and dataset state can be recovered correctly. First all the datasets are set up.
// Then shard state is recovered, and finally cluster membership events are replayed.
Expand All @@ -204,12 +204,12 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,
override def postStop(): Unit = {
super.postStop()
cluster.unsubscribe(self)
watcher ! NodeProtocol.PostStop(self.path)
watcher ! NodeProtocol.PostStop(singletonProxy, cluster.selfAddress)
}

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
watcher ! NodeProtocol.PreRestart(self.path, reason)
watcher ! NodeProtocol.PreRestart(singletonProxy, cluster.selfAddress, reason)
}

private def withRole(role: String, requester: ActorRef)(f: Set[ActorRef] => Unit): Unit =
Expand All @@ -218,9 +218,6 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,
case Some(refs) => f(refs)
}

val localRemoteAddr = RemoteAddressExtension(context.system).address
var everybodyLeftSender: Option[ActorRef] = None

def membershipHandler: Receive = LoggingReceive {
case MemberUp(member) =>
logger.info(s"Member ${member.status}: ${member.address} with roles ${member.roles}")
Expand All @@ -236,21 +233,26 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,
case UnreachableMember(member) =>
logger.info(s"Member detected as unreachable: $member")

case MemberRemoved(member, previousStatus) =>
case e @ MemberRemoved(member, previousStatus) =>
logger.info(s"Member is Removed: ${member.address} after $previousStatus")
memberRefs.remove(member.address) match {
case Some(removedCoordinator) =>
// roleToCoords(role) = roleToCoords(role).filterNot { ref =>
// // if we don't do this cannot properly match when self node is asked to leave
// val addr = if (ref.path.address.hasLocalScope) localRemoteAddr else ref.path.address
// addr == member.address
// }
roleToCoords.transform { case (_, refs) => refs - removedCoordinator }
roleToCoords.retain { case (role, refs) => refs.nonEmpty }

shardManager.removeMember(removedCoordinator)

shardManager.removeMember(member.address) match {
case Some(ref) =>
// roleToCoords(role) = roleToCoords(role).filterNot { ref =>
// // if we don't do this cannot properly match when self node is asked to leave
// val addr = if (ref.path.address.hasLocalScope) localRemoteAddr else ref.path.address
// addr == member.address
// }
roleToCoords.transform { case (_, refs) => refs - ref }
roleToCoords.retain { case (role, refs) => refs.nonEmpty }
case _ =>
logger.warn(s"UNABLE TO REMOVE ${member.address} FROM memberRefs")
/* Recovery during ClusterSingleton handoff: replayed from cluster.subscribe events.
If this is a downed node that was assigned shards, it can be in the mapper, stale.
Or after downing the oldest/first deployed node, which may never be assigned shards
based on the number of nodes joined and shards to assign, no action is needed. */
watcher ! e // only TestProbe
logger.warn(s"MemberRemoved(${member.address}) may be stale, attempting to remove and update.")
shardManager remove member.address
}

if (roleToCoords.isEmpty) {
Expand All @@ -269,10 +271,11 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,

// The initial recovery handler: recover dataset setup/ingestion config first
def datasetInitHandler: Receive = LoggingReceive {
case e: SetupDataset => setupDataset(e, sender()) map { _ =>
initDatasets -= e.ref
if (initDatasets.isEmpty) initiateShardStateRecovery()
}
case e: SetupDataset =>
setupDataset(e, sender()) map { _ =>
initDatasets -= e.ref
if (initDatasets.isEmpty) initiateShardStateRecovery()
}
}

private def initiateShardStateRecovery(): Unit = {
Expand All @@ -282,14 +285,14 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,
}

def shardMapRecoveryHandler: Receive = LoggingReceive {
case ms: NodeProtocol.MapsAndSubscriptions =>
ms.shardMaps foreach { case (ref, map) => shardManager.recoverShardState(ref, map) }
shardManager.recoverSubscriptions(ms.subscriptions)
case NodeProtocol.ClusterState(mappers, subscriptions) =>
// never includes downed nodes, which come through cluster.subscribe event replay
mappers foreach { case (ref, map) => shardManager.recoverShards(ref, map) }
shardManager.recoverSubscriptions(subscriptions)

// NOW, subscribe to cluster membership state and then switch to normal receiver
logger.info("Now subscribing to cluster events and switching to normalReceive")
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberEvent])
logger.info("Subscribing to cluster events and switching to normalReceive")
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent])
context.become(normalReceive)
}

Expand All @@ -307,7 +310,7 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,

/** Send a message to recover the current shard maps and subscriptions from the Guardian of local node. */
private def sendShardStateRecoveryMessage(): Unit =
guardian ! NodeProtocol.GetShardMapsSubscriptions
guardian ! NodeProtocol.GetClusterState

/** If the dataset is registered as a subscription, a `CurrentShardSnapshot` is sent
* to the subscriber, otherwise a `DatasetUnknown` is returned.
Expand All @@ -328,18 +331,14 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,
context watch subscriber
}

/** Sets up the new coordinator, forwards to shard status actor to complete
* its subscription setup. The coordinator is sent an ack.
* The shard stats actor responds with a `SendDatasetSetup` to handle the updates.
*/
/** Sets up the new coordinator which is sent an ack. */
private def addCoordinator(e: AddCoordinator): Unit = {
e.roles.foreach { role => roleToCoords(role) += e.coordinator }
logger.debug(s"Updated roleToCoords: $roleToCoords")

if (e.roles contains nodeCoordRole) {
if (e.roles contains localRole) {
e.coordinator ! CoordinatorRegistered(singletonProxy)
shardManager.addMember(e.coordinator, e.addr)
memberRefs(e.addr) = e.coordinator
shardManager.addMember(e.addr, e.coordinator)
}
}

Expand Down Expand Up @@ -394,7 +393,14 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,
origin ! NodeProtocol.StateReset
}

def normalReceive: Receive = membershipHandler orElse shardMapHandler orElse infoHandler orElse
routerEvents orElse subscriptionHandler
def receive: Receive = datasetInitHandler orElse subscriptionHandler
def normalReceive: Receive =
membershipHandler orElse
shardMapHandler orElse
infoHandler orElse
routerEvents orElse
subscriptionHandler

def receive: Receive =
datasetInitHandler orElse
subscriptionHandler
}
88 changes: 48 additions & 40 deletions coordinator/src/main/scala/filodb.coordinator/NodeGuardian.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,25 @@ final class NodeGuardian(val settings: FilodbSettings,
case CreateCoordinator => createCoordinator(sender())
case e: CreateClusterSingleton => createSingleton(e, sender())
case e: ShardEvent => shardEvent(e)
case s: CurrentShardSnapshot => setShardMap(s)
case s: CurrentShardSnapshot => shardSnapshot(s)
case e: ShardSubscriptions => subscriptions = e
case GetShardMapsSubscriptions => getMapsSubscriptions(sender())
case GetClusterState => state(sender())
case e: ListenerRef => failureAware ! e
}

override def receive: Actor.Receive = guardianReceive orElse super.receive

private def setShardMap(s: CurrentShardSnapshot): Unit = {
logger.debug(s"Guardian setting shardmap for ref ${s.ref}")
/** Sends the current state of local `ShardMapper`
* and `ShardSubscription` collections to the `requestor`.
*/
private def state(requestor: ActorRef): Unit =
requestor ! ClusterState(shardMappers, subscriptions)

private def shardSnapshot(s: CurrentShardSnapshot): Unit = {
logger.debug(s"Updating shardMappers for ref ${s.ref}")
shardMappers(s.ref) = s.map
}

private def getMapsSubscriptions(requestor: ActorRef): Unit =
requestor ! MapsAndSubscriptions(shardMappers, subscriptions)

private def shardEvent(e: ShardEvent): Unit = {
logger.debug(s"Updating shard mapper for ref ${e.ref} with event $e")
for {
Expand Down Expand Up @@ -95,7 +98,7 @@ final class NodeGuardian(val settings: FilodbSettings,
val props = NodeCoordinatorActor.props(metaStore, memStore, settings.config)
context.actorOf(props, CoordinatorName) }

requester ! CoordinatorRef(actor)
requester ! CoordinatorIdentity(actor)
}

/** Creates a singleton NodeClusterActor and returns a proxy ActorRef to it.
Expand All @@ -120,7 +123,7 @@ final class NodeGuardian(val settings: FilodbSettings,
logger.info(s"Created ClusterSingletonManager for NodeClusterActor [mgr=$mgr, role=${e.role}]")
}

requester ! ClusterSingletonRef(proxy)
requester ! ClusterSingletonIdentity(proxy)
}

/** Returns reference to the cluster actor. The proxy
Expand Down Expand Up @@ -160,54 +163,59 @@ private[filodb] object NodeGuardian {
object NodeProtocol {

/** Commands to start a task. */
sealed trait TaskCommand
@SerialVersionUID(1)
sealed trait TaskCommand extends Serializable
/* Acked on task complete */
sealed trait TaskAck
@SerialVersionUID(1)
sealed trait TaskAck extends Serializable

sealed trait CreationCommand extends TaskCommand
sealed trait LifecycleCommand extends TaskCommand
sealed trait StateCommand extends TaskCommand

sealed trait LifecycleAck extends TaskAck
sealed trait StateTaskAck extends TaskAck

/**
* @param role the role to assign
* @param watcher the guardian actor. In Test this can include a probe.
*/
private[coordinator] final case class CreateClusterSingleton(role: String,
watcher: Option[ActorRef]
) extends CreationCommand

private[coordinator] final case class CreateTraceLogger(role: ClusterRole) extends CreationCommand
private[coordinator] case object CreateCoordinator extends CreationCommand

sealed trait CreationAck extends TaskAck
private[coordinator] final case class CoordinatorRef(ref: ActorRef) extends CreationAck
private[coordinator] final case class ClusterSingletonRef(ref: ActorRef) extends CreationAck
private[coordinator] final case class TraceLoggerRef(ref: ActorRef) extends CreationAck
private[coordinator] final case class ListenerRef(ref: ActorRef) extends CreationAck

sealed trait RecoveryCommand extends TaskCommand
private[coordinator] case object GetShardMapsSubscriptions extends RecoveryCommand
) extends LifecycleCommand

sealed trait RecoveryAck extends TaskAck
private[coordinator] final case class MapsAndSubscriptions(shardMaps: MMap[DatasetRef, ShardMapper],
subscriptions: ShardSubscriptions) extends RecoveryAck

sealed trait LifecycleCommand
private[coordinator] final case class CreateTraceLogger(role: ClusterRole) extends LifecycleCommand
private[coordinator] case object CreateCoordinator extends LifecycleCommand
private[coordinator] case object GracefulShutdown extends LifecycleCommand

sealed trait LifecycleAck extends TaskAck
private[coordinator] final case class ShutdownComplete(ref: ActorRef) extends LifecycleAck

sealed trait StateCommand
/** Identity ACK. */
@SerialVersionUID(1)
sealed trait ActorIdentity extends Serializable {
def identity: ActorRef
}

/* Specific load-time 'create' task completion ACKs: */
private[coordinator] final case class CoordinatorIdentity(identity: ActorRef) extends ActorIdentity
private[coordinator] final case class ClusterSingletonIdentity(identity: ActorRef) extends ActorIdentity
private[coordinator] final case class TraceLoggerRef(identity: ActorRef) extends ActorIdentity
private[coordinator] final case class ListenerRef(identity: ActorRef) extends ActorIdentity

private[filodb] case object ResetState extends StateCommand
private[coordinator] case object GetClusterState extends StateCommand

sealed trait StateTaskAck extends TaskAck
private[filodb] case object StateReset extends StateTaskAck

/** For watchers aware of specific actor transitions in lifecycle. */
sealed trait ActorLifecycle extends LifecycleCommand {
def identity: ActorPath
}
private[coordinator] final case class PreStart(identity: ActorPath) extends ActorLifecycle
private[coordinator] final case class PreRestart(identity: ActorPath, reason: Throwable) extends ActorLifecycle
private[coordinator] final case class PostStop(identity: ActorPath) extends ActorLifecycle
private[coordinator] final case class ClusterState(shardMaps: MMap[DatasetRef, ShardMapper],
subscriptions: ShardSubscriptions) extends StateTaskAck

/** Self-generated events for watchers of specific actor lifecycle transitions such as
* restarts of an actor on error, or restarts of one actor on a different node than
* it was first created on, for host or new instance verification.
*/
sealed trait ActorLifecycleEvent extends ActorIdentity
final case class PreStart(identity: ActorRef, address: Address) extends ActorLifecycleEvent
final case class PreRestart(identity: ActorRef, address: Address, reason: Throwable) extends ActorLifecycleEvent
final case class PostStop(identity: ActorRef, address: Address) extends ActorLifecycleEvent

}
Loading

0 comments on commit fddc62e

Please sign in to comment.