Skip to content

Commit 5b19c13

Browse files
committed
add raft framework, implement trigger election
1 parent 250d9a5 commit 5b19c13

File tree

8 files changed

+260
-0
lines changed

8 files changed

+260
-0
lines changed

build.rs

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fn main() -> Result<(), Box<dyn std::error::Error>> {
2+
tonic_build::compile_protos("proto/raftrpc.proto")?;
3+
Ok(())
4+
}

proto/raftrpc.proto

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
syntax = "proto3";
2+
3+
package raftrpc;
4+
5+
service Raft {
6+
// Invoked by candidate gather votes.
7+
rpc RequestVote (RequestVoteArgs) returns (RequestVoteReply);
8+
9+
// Invoked by leader to replicate log entries, also used as heartbeat.
10+
// Do a total-order broadcast.
11+
rpc AppendEntries (AppendEntriesArgs) returns (AppendEntriesReply);
12+
}
13+
14+
message RequestVoteArgs {
15+
uint64 term = 1;
16+
uint64 candidate_id = 2;
17+
uint64 last_log_index = 3;
18+
uint64 last_log_term = 4;
19+
}
20+
21+
message RequestVoteReply {
22+
uint64 term = 1;
23+
bool vote_granted = 2;
24+
}
25+
26+
message AppendEntriesArgs {
27+
}
28+
29+
message AppendEntriesReply {
30+
}

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod cmd;
22
mod engine;
3+
mod raft;
34
mod resp;
45
mod server;
56
mod util;

src/raft/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#[derive(thiserror::Error, Debug)]
2+
pub enum Error {}

src/raft/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
mod error;
2+
mod node;
3+
mod rpc;
4+
mod state;

src/raft/node.rs

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
use super::rpc::*;
2+
use super::state::State;
3+
use std::sync::{Arc, Mutex};
4+
use tokio::{sync::mpsc::channel, time::Instant};
5+
use tonic::{Request, Response, Status};
6+
7+
pub struct RafterInner {
8+
// Manage all peers rpc client
9+
peers: Vec<RaftClient>,
10+
11+
// State of this node
12+
state: Arc<Mutex<State>>,
13+
14+
// next triggering election time
15+
election_instant: Instant,
16+
}
17+
18+
impl RafterInner {
19+
// Try to start a new election
20+
fn trigger_election(&mut self) {
21+
let mut state = self.state.lock().unwrap();
22+
// Leader always don't trigger election.
23+
if state.is_leader() {
24+
return;
25+
}
26+
27+
// Set as candidate and vote for myself
28+
state.set_as_candidate();
29+
let current_term = state.term();
30+
let me = state.me();
31+
32+
// Send request to all peers except this node
33+
let (tx, mut rx) = channel(self.peers.len() - 1);
34+
for (node_id, peer) in self.peers.iter().enumerate() {
35+
if node_id != me {
36+
let mut peer = peer.clone();
37+
let tx = tx.clone();
38+
39+
let args = RequestVoteArgs {
40+
term: current_term,
41+
candidate_id: me as u64,
42+
// TODO: check last log index
43+
last_log_index: 0,
44+
last_log_term: 0,
45+
};
46+
47+
tokio::spawn(async move {
48+
let vote_reply = peer.request_vote(args).await;
49+
tx.send(vote_reply).await.ok();
50+
});
51+
}
52+
}
53+
54+
// Async receiving the vote reply, and handle the election
55+
let voted_threshold = self.peers.len() / 2;
56+
let state = Arc::clone(&self.state);
57+
tokio::spawn(async move {
58+
// Candidates always voted for myself
59+
let mut voted = 1;
60+
61+
while let Some(Ok(vote_reply)) = rx.recv().await {
62+
let vote_reply = vote_reply.get_ref();
63+
if vote_reply.term == current_term && vote_reply.vote_granted {
64+
voted += 1;
65+
if voted > voted_threshold {
66+
// Quorum of peers voted for this node, break to set this node as leader
67+
break;
68+
}
69+
} else if current_term < vote_reply.term {
70+
// Find a new term
71+
// Set this node into a follower, give up election process in this term
72+
state
73+
.lock()
74+
.unwrap()
75+
.set_as_follower_in_new_term(vote_reply.term);
76+
return;
77+
}
78+
}
79+
80+
if voted > voted_threshold {
81+
// Election success, set this peer as leader if current term is not changed
82+
state
83+
.lock()
84+
.unwrap()
85+
.set_as_leader_in_this_term(current_term);
86+
}
87+
});
88+
}
89+
}
90+
91+
// Implementation for Raft algorithm
92+
pub struct Rafter {}
93+
94+
#[tonic::async_trait]
95+
impl Raft for Rafter {
96+
// Implement server side of request-vote rpc for raft
97+
async fn request_vote(
98+
&self,
99+
request: Request<RequestVoteArgs>,
100+
) -> Result<Response<RequestVoteReply>, Status> {
101+
todo!()
102+
}
103+
104+
// Implement server side of append-entries rpc for raft
105+
async fn append_entries(
106+
&self,
107+
request: Request<AppendEntriesArgs>,
108+
) -> Result<Response<AppendEntriesReply>, Status> {
109+
todo!()
110+
}
111+
}

src/raft/rpc.rs

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
pub use raftrpc::{
2+
raft_server::{Raft, RaftServer},
3+
AppendEntriesArgs, AppendEntriesReply, RequestVoteArgs, RequestVoteReply,
4+
};
5+
6+
pub type RaftClient = raftrpc::raft_client::RaftClient<tonic::transport::Channel>;
7+
8+
pub mod raftrpc {
9+
tonic::include_proto!("raftrpc");
10+
}

src/raft/state.rs

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#[derive(Default, Copy, Clone, Debug, PartialEq, Eq)]
2+
enum Role {
3+
#[default]
4+
Follower,
5+
Candidate,
6+
Leader,
7+
}
8+
9+
#[derive(Debug, Default)]
10+
struct Peer {
11+
// Index of the next log entry to send to that server
12+
// Initialized to leader last log index + 1
13+
next_index: u64,
14+
15+
// Index of highest log entry known to be replicated on server
16+
// Initialized to 0, increases monotonically
17+
match_index: u64,
18+
}
19+
20+
#[derive(Debug, Default)]
21+
pub struct State {
22+
// Role of this node
23+
role: Role,
24+
25+
// This node's index in peers
26+
me_index: usize,
27+
28+
// Latest term server has seen, increases monotonically
29+
current_term: u64,
30+
31+
// Candidate id (index of peers array) that received voted in current term
32+
// None if not voted.
33+
voted_for: Option<usize>,
34+
35+
// Log entries; each entry contains command for state machine,
36+
// and term when entry was received by leader
37+
log: Vec<u64>,
38+
39+
// Index of highest log entry known to be committed
40+
commit_index: u64,
41+
42+
// Index of highest log entry applied to state machine
43+
last_applied: u64,
44+
45+
// List of Peer meta data, reinitialized after election
46+
// Only available on leader node
47+
peers: Vec<Peer>,
48+
}
49+
50+
impl State {
51+
#[inline]
52+
pub fn is_leader(&self) -> bool {
53+
matches!(self.role, Role::Leader)
54+
}
55+
56+
#[inline]
57+
pub fn term(&self) -> u64 {
58+
self.current_term
59+
}
60+
61+
// Perform a CAS, set this node as leader, if term is matched.
62+
#[inline]
63+
pub fn set_as_leader_in_this_term(&mut self, term: u64) {
64+
if self.current_term == term {
65+
self.role = Role::Leader;
66+
}
67+
}
68+
69+
// Set this node as follower and update the term if term is gte than current term
70+
#[inline]
71+
pub fn set_as_follower_in_new_term(&mut self, term: u64) {
72+
if term >= self.current_term {
73+
self.current_term = term;
74+
self.role = Role::Follower;
75+
}
76+
}
77+
78+
// Set this node as candidate, increment current term
79+
// Always vote for myself
80+
#[inline]
81+
pub fn set_as_candidate(&mut self) {
82+
self.current_term += 1;
83+
self.role = Role::Candidate;
84+
self.voted_for = Some(self.me_index);
85+
}
86+
87+
// Vote for peer at peers[candidate_id]
88+
#[inline]
89+
pub fn vote_for(&mut self, candidate_id: u64) {
90+
self.voted_for = Some(candidate_id as usize);
91+
}
92+
93+
// Access me index
94+
#[inline]
95+
pub fn me(&self) -> usize {
96+
self.me_index
97+
}
98+
}

0 commit comments

Comments
 (0)