Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 44 additions & 91 deletions elasticsearch.tla
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ CONSTANTS Nil
\* Set of in-flight requests and responses sent between data nodes.
VARIABLE messages

(* Another failure modeled in the specification is the crashing of nodes.
The following variable denotes the set of nodes that are crashed. Only non-crashed nodes
can accept requests.
*)
VARIABLE crashedNodes


(* Beside managing and broadcasting the routing table, the master also tracks if
a primary failed and/or a replica was promoted to primary, incrementing a number called the
"primary term" whenever this happens. Each new primary operates under a new term, allowing nodes
Expand Down Expand Up @@ -289,15 +282,6 @@ Replicas(routingTable) == {n \in DOMAIN routingTable : routingTable[n] = Replica
\* Returns shards that are marked as Primary or Replica in routing table
Assigned(routingTable) == {n \in DOMAIN routingTable : routingTable[n] /= Unassigned}

\* Whether the routing table has exactly one primary
HasUniquePrimary(routingTable) == Cardinality(Primaries(routingTable)) = 1

\* Selects a node that has a primary assigned in the routing table
ChoosePrimary(routingTable) == CHOOSE n \in DOMAIN routingTable : n \in Primaries(routingTable)

\* Selects a node that has a replica assigned in the routing table
ChooseReplica(routingTable) == CHOOSE n \in DOMAIN routingTable : n \in Replicas(routingTable)

\* Determines whether the shard on node n was promoted to primary when a cluster state update occurs
ShardWasPromotedToPrimary(n, incomingRoutingTable, localRoutingTable) ==
LET oldPrimaries == Primaries(localRoutingTable)
Expand All @@ -306,24 +290,28 @@ ShardWasPromotedToPrimary(n, incomingRoutingTable, localRoutingTable) ==
/\ n \in newPrimaries

\* Calculates new cluster state based on shard failure on node n
FailShard(n, clusterState) ==
LET rt == clusterState.routingTable
IN IF rt[n] = Unassigned THEN
clusterState
ELSE
LET
\* increase primary term on primary failure
newPt == IF rt[n] = Primary THEN
clusterState.primaryTerm + 1
ELSE
clusterState.primaryTerm
newRt == IF rt[n] = Primary /\ Cardinality(Replicas(rt)) > 0 THEN
\* promote replica to primary
[rt EXCEPT ![n] = Unassigned, ![ChooseReplica(rt)] = Primary]
FailShardOnMaster(n) ==
LET rt == clusterStateOnMaster.routingTable
IN
IF rt[n] = Unassigned THEN
UNCHANGED <<clusterStateOnMaster>>
ELSE
\* increase primary term on primary failure
LET newPt == IF rt[n] = Primary THEN
clusterStateOnMaster.primaryTerm + 1
ELSE
[rt EXCEPT ![n] = Unassigned]
IN [clusterState EXCEPT !.routingTable = newRt, !.primaryTerm = newPt]

clusterStateOnMaster.primaryTerm
IN
IF rt[n] = Primary /\ Cardinality(Replicas(rt)) > 0 THEN
\* promote replica to primary
\E r \in Replicas(rt):
clusterStateOnMaster' = [clusterStateOnMaster EXCEPT
!.routingTable = [rt EXCEPT ![n] = Unassigned, ![r] = Primary],
!.primaryTerm = newPt]
ELSE
clusterStateOnMaster' = [clusterStateOnMaster EXCEPT
!.routingTable[n] = Unassigned,
!.primaryTerm = newPt]

----

Expand Down Expand Up @@ -500,7 +488,6 @@ InitialClusterStates == { [routingTable |-> rt, primaryTerm |-> 1] : rt \in Init

Init == /\ clusterStateOnMaster \in InitialClusterStates
/\ messages = {}
/\ crashedNodes = {}
/\ nextClientValue = 1
/\ clientResponses = {}
/\ nextRequestId = 1
Expand All @@ -516,7 +503,6 @@ Init == /\ clusterStateOnMaster \in InitialClusterStates

\* Index request arrives on node n with document id docId
ClientRequest(n, docId) ==
/\ n \notin crashedNodes \* only non-crashed nodes can accept requests
/\ clusterStateOnNode[n].routingTable[n] = Primary \* node believes itself to be the primary
/\ LET
replicas == Replicas(clusterStateOnNode[n].routingTable)
Expand Down Expand Up @@ -561,8 +547,7 @@ ClientRequest(n, docId) ==
ELSE
\* replication requests sent out, wait for responses before acking to client
/\ UNCHANGED <<clientResponses>>
/\ UNCHANGED <<clusterStateOnMaster, clusterStateOnNode, crashedNodes,
globalCheckPoint, currentTerm>>
/\ UNCHANGED <<clusterStateOnMaster, clusterStateOnNode, globalCheckPoint, currentTerm>>

\* Helper function for marking translog entries as pending confirmation if incoming term is higher
\* than current term on node n.
Expand All @@ -577,15 +562,13 @@ MaybeMarkPC(incomingTerm, n) ==

\* Replication request arrives on node n with message m
HandleReplicationRequest(n, m) ==
/\ n \notin crashedNodes
/\ m.request = TRUE
/\ m.method = Replication
/\ IF m.rterm < currentTerm[n] THEN
\* don't accept replication requests with lower term than we have
\* lower term means that it's coming from a primary that has since been demoted
/\ Reply(FailedResponse(m), m)
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars,
crashedNodes, nodeVars>>
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars, nodeVars>>
ELSE
/\ LET
tlogEntry == [id |-> m.id,
Expand All @@ -602,11 +585,10 @@ HandleReplicationRequest(n, m) ==
/\ globalCheckPoint' = [globalCheckPoint EXCEPT ![n] = Max({@, m.globalCP})]
/\ Reply([DefaultResponse(m) EXCEPT !.localCP = localCP], m)
/\ UNCHANGED <<clusterStateOnMaster, clusterStateOnNode, nextClientValue, nextRequestId,
clientResponses, crashedNodes, localCheckPoint>>
clientResponses, localCheckPoint>>

\* Trim translog request arrives on node n with message m
HandleTrimTranslogRequest(n, m) ==
/\ n \notin crashedNodes
/\ m.request = TRUE
/\ m.method = TrimTranslog
/\ IF m.term < currentTerm[n] THEN
Expand All @@ -620,7 +602,7 @@ HandleTrimTranslogRequest(n, m) ==
/\ currentTerm' = [currentTerm EXCEPT ![n] = m.term]
/\ Reply(DefaultResponse(m), m)
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars, localCheckPoint,
clusterStateOnNode, crashedNodes, globalCheckPoint>>
clusterStateOnNode, globalCheckPoint>>

\* Helper function for handling replication responses
FinishIfNeeded(m) ==
Expand All @@ -644,7 +626,6 @@ FinishAsFailed(m) ==

\* Replication response arrives on node n from node rn with message m
HandleReplicationResponse(n, rn, m) ==
/\ n \notin crashedNodes
/\ m.request = FALSE
/\ m.method = Replication
\* are we still interested in the response or already marked the overall client request as failed?
Expand Down Expand Up @@ -673,30 +654,28 @@ HandleReplicationResponse(n, rn, m) ==
/\ UNCHANGED <<clusterStateOnMaster>>
ELSE
\* fail shard and respond to client
/\ clusterStateOnMaster' = FailShard(rn, clusterStateOnMaster)
/\ FailShardOnMaster(rn)
/\ FinishIfNeeded(m)
/\ UNCHANGED <<localCheckPoint, globalCheckPoint>>
/\ messages' = messages \ {m}
/\ UNCHANGED <<nextClientValue, nextRequestId, crashedNodes, tlog, clusterStateOnNode,
/\ UNCHANGED <<nextClientValue, nextRequestId, tlog, clusterStateOnNode,
currentTerm>>


\* Trim translog response arrives on node n from node rn with message m
HandleTrimTranslogResponse(n, rn, m) ==
/\ n \notin crashedNodes
/\ m.request = FALSE
/\ m.method = TrimTranslog
/\ messages' = messages \ {m}
/\ IF m.success = FALSE /\ m.term >= clusterStateOnMaster.primaryTerm THEN
\* fail shard
clusterStateOnMaster' = FailShard(rn, clusterStateOnMaster)
FailShardOnMaster(rn)
ELSE
UNCHANGED <<clusterStateOnMaster>>
/\ UNCHANGED <<nextClientValue, nextRequestId, crashedNodes, nodeVars, clientResponses>>
/\ UNCHANGED <<nextClientValue, nextRequestId, nodeVars, clientResponses>>

\* Cluster state propagated from master is applied to node n
ApplyClusterStateFromMaster(n) ==
/\ n \notin crashedNodes
/\ clusterStateOnNode[n] /= clusterStateOnMaster
/\ clusterStateOnNode' = [clusterStateOnNode EXCEPT ![n] = clusterStateOnMaster]
\* Java implementation should update currentTerm to Max({@, clusterStateOnMaster.primaryTerm})
Expand Down Expand Up @@ -751,50 +730,28 @@ ApplyClusterStateFromMaster(n) ==
ELSE
UNCHANGED <<tlog>>
/\ UNCHANGED <<messages, nextRequestId, localCheckPoint>>
/\ UNCHANGED <<crashedNodes, clusterStateOnMaster, clientVars,
/\ UNCHANGED <<clusterStateOnMaster, clientVars,
globalCheckPoint>>


\* Fail request message
FailRequestMessage(m) ==
/\ m.request = TRUE
/\ m.dest \notin crashedNodes \* pointless to fail request going to crashed node
/\ Reply(FailedResponse(m), m)
/\ UNCHANGED <<crashedNodes, clusterStateOnMaster, nextRequestId,
clientVars, nodeVars>>
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars, nodeVars>>

\* Fail response message
FailResponseMessage(m) ==
/\ m.request = FALSE
/\ m.success = TRUE
/\ m.dest \notin crashedNodes \* pointless to fail response going to crashed node
/\ Reply([m EXCEPT !.success = FALSE], m)
/\ UNCHANGED <<crashedNodes, clusterStateOnMaster, nextRequestId,
clientVars, nodeVars>>
/\ UNCHANGED <<clusterStateOnMaster, nextRequestId, clientVars, nodeVars>>

\* Node fault detection on master finds node n to be isolated from the cluster
\* Node fault detection on master finds node n to be isolated from the cluster or crashed
NodeFaultDetectionKicksNodeOut(n) ==
/\ n \notin crashedNodes
/\ clusterStateOnMaster.routingTable[n] /= Unassigned \* not already unassigned
/\ clusterStateOnMaster' = FailShard(n, clusterStateOnMaster)
/\ UNCHANGED <<crashedNodes, messages, nextRequestId, clientVars,
nodeVars>>

\* Node n crashes
CrashNode(n) ==
/\ n \notin crashedNodes
/\ crashedNodes' = crashedNodes \cup {n}
/\ UNCHANGED <<clusterStateOnMaster, messages, clientVars, nextRequestId,
nodeVars>>

\* Master removes crashed node n from its cluster state
\* (could be combined wih NodeFaultDetectionKicksNodeOut rule)
RemoveCrashedNodeFromClusterState(n) ==
/\ n \in crashedNodes
/\ clusterStateOnMaster.routingTable[n] /= Unassigned \* not already unassigned
/\ clusterStateOnMaster' = FailShard(n, clusterStateOnMaster)
/\ UNCHANGED <<crashedNodes, messages, nextRequestId, clientVars,
nodeVars>>
/\ FailShardOnMaster(n)
/\ UNCHANGED <<messages, nextRequestId, clientVars, nodeVars>>

\* Defines how the variables may transition.
Next == \/ \E n \in Nodes : \E docId \in DocumentIds : ClientRequest(n, docId)
Expand All @@ -805,34 +762,29 @@ Next == \/ \E n \in Nodes : \E docId \in DocumentIds : ClientRequest(n, docId)
\/ \E m \in messages : FailRequestMessage(m)
\/ \E m \in messages : FailResponseMessage(m)
\/ \E n \in Nodes : ApplyClusterStateFromMaster(n)
\/ \E n \in Nodes : CrashNode(n)
\/ \E n \in Nodes : RemoveCrashedNodeFromClusterState(n)
\/ \E n \in Nodes : NodeFaultDetectionKicksNodeOut(n)

----

\* `^\Large\bf Helper functions for making assertions ^'

\* no active messages
NoActiveMessages == { m \in messages : m.dest \notin crashedNodes } = {}

\* cluster state on master has been applied to non-crashed nodes
ClusterStateAppliedOnAllNodes ==
\A n \in Nodes : n \notin crashedNodes => clusterStateOnNode[n] = clusterStateOnMaster

\* crashed node has been handled by master
CrashHandled == \A n \in crashedNodes : clusterStateOnMaster.routingTable[n] = Unassigned
NoActiveMessages == messages = {}

\* shard that is considered active by the master
ActiveShard(n) == clusterStateOnMaster.routingTable[n] /= Unassigned

\* cluster state on master has been applied to all nodes that are still supposed to have an active shard
ClusterStateAppliedOnAllNodesWithActiveShards ==
\A n \in Nodes : ActiveShard(n) => clusterStateOnNode[n] = clusterStateOnMaster

\* everything in the translog up to and including slot i
UpToSlot(ntlog, i) == [j \in 1..i |-> ntlog[j]]

\* copy of translog, where we ignore the pending confirmation marker
ExceptPC(ntlog) == [j \in DOMAIN ntlog |-> [r \in DOMAIN ntlog[j] \ {"pc"} |-> ntlog[j][r] ]]

\* all shard copies of non-crashed nodes contain same data
\* all shard copies contain same data
AllCopiesSameContents ==
\A n1, n2 \in Nodes:
/\ n1 /= n2
Expand All @@ -858,8 +810,7 @@ SameTranslogUpToGlobalCheckPoint ==
\* checks if the translog for all nodes is eventually the same
AllCopiesSameContentsOnQuietDown ==
(/\ NoActiveMessages
/\ CrashHandled
/\ ClusterStateAppliedOnAllNodes)
/\ ClusterStateAppliedOnAllNodesWithActiveShards)
=> AllCopiesSameContents

\* checks if all (acked) responses to client are successfully and correctly stored
Expand All @@ -885,4 +836,6 @@ LocalCheckPointMatchesMaxConfirmedSeq ==
\* routing table is well-formed (has at most one primary)
WellFormedRoutingTable(routingTable) == Cardinality(Primaries(routingTable)) <= 1

StateConstraint == nextClientValue <= 3 /\ Cardinality(messages) <= 5

=============================================================================
5 changes: 3 additions & 2 deletions elasticsearch.toolbox/elasticsearch___model.launch
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<booleanAttribute key="dfidMode" value="false"/>
<intAttribute key="distributedFPSetCount" value="0"/>
<stringAttribute key="distributedNetworkInterface" value="192.168.178.34"/>
<intAttribute key="distributedNodesCount" value="1"/>
<stringAttribute key="distributedTLC" value="off"/>
<stringAttribute key="distributedTLCVMArgs" value=""/>
<intAttribute key="fpBits" value="0"/>
Expand All @@ -18,7 +19,7 @@
<stringAttribute key="modelBehaviorNext" value="Next"/>
<stringAttribute key="modelBehaviorSpec" value="Spec"/>
<intAttribute key="modelBehaviorSpecType" value="2"/>
<stringAttribute key="modelBehaviorVars" value="nextClientValue, clusterStateOnMaster, globalCheckPoint, tlog, messages, clientResponses, crashedNodes, currentTerm, clusterStateOnNode, nextRequestId, localCheckPoint"/>
<stringAttribute key="modelBehaviorVars" value="nextClientValue, clusterStateOnMaster, globalCheckPoint, tlog, messages, clientResponses, currentTerm, clusterStateOnNode, nextRequestId, localCheckPoint"/>
<stringAttribute key="modelComments" value=""/>
<booleanAttribute key="modelCorrectnessCheckDeadlock" value="false"/>
<listAttribute key="modelCorrectnessInvariants">
Expand Down Expand Up @@ -46,7 +47,7 @@
<listEntry value="Replication;;Replication;1;0"/>
<listEntry value="TrimTranslog;;TrimTranslog;1;0"/>
</listAttribute>
<stringAttribute key="modelParameterContraint" value="nextClientValue &lt;= 3"/>
<stringAttribute key="modelParameterContraint" value="StateConstraint"/>
<listAttribute key="modelParameterDefinitions"/>
<stringAttribute key="modelParameterModelValues" value="{}"/>
<stringAttribute key="modelParameterNewDefinitions" value=""/>
Expand Down