Skip to content

Commit

Permalink
add: client
Browse files Browse the repository at this point in the history
  • Loading branch information
summer-boythink committed Mar 17, 2024
1 parent fe70bca commit f2a2cda
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 49 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
build
build
.vscode
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
all:build

build:fmt
go build -o ./build/server ./example/server/ &&
go build -o ./build/server ./example/server/
go build -o ./build/client ./example/client/

run-server:build
./build/server
./build/server --local http://127.0.0.1:8080 --peer http://127.0.0.1:8081 --peer http://127.0.0.1:8082

fmt:
go mod tidy && gofmt -w .
28 changes: 14 additions & 14 deletions args.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package raftgo

type AppendEntriesArgs struct {
Term int
LeaderID int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
Term int `json:"term"`
LeaderID int `json:"leader_id"`
PrevLogIndex int `json:"prev_log_index"`
PrevLogTerm int `json:"prev_log_term"`
Entries []LogEntry `json:"entries"`
LeaderCommit int `json:"leader_commit"`
}

type AppendEntriesReply struct {
Term int
Success bool
Term int `json:"term"`
Success bool `json:"success"`
}

type RequestVoteArgs struct {
Term int
CandidateID int
LastLogIndex int
LastLogTerm int
Term int `json:"term"`
CandidateID int `json:"candidate_id"`
LastLogIndex int `json:"last_log_index"`
LastLogTerm int `json:"last_log_term"`
}

type RequestVoteReply struct {
Term int
VoteGranted bool
Term int `json:"term"`
VoteGranted bool `json:"vote_granted"`
}
101 changes: 101 additions & 0 deletions example/client/main.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,103 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"

"github.com/spf13/cobra"

"net/http"
"time"
)

type Option struct {
Addr string
}

type Command struct {
Type string `json:"type"`
Key string `json:"key"`
Value string `json:"value,omitempty"`
}

type Response struct {
Success bool `json:"success"`
Value string `json:"value,omitempty"`
}

var rootCmd = &cobra.Command{
Use: "client",
Short: "A client for raft",
}

var setCmd = &cobra.Command{
Use: "set [key] [value]",
Short: "Set the value of a string key to a string",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
addr, _ := cmd.Flags().GetString("addr")
key := args[0]
value := args[1]
set(Option{Addr: addr}, key, value)
},
}

var getCmd = &cobra.Command{
Use: "get [key]",
Short: "Get the string value of a given string key",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
addr, _ := cmd.Flags().GetString("addr")
key := args[0]
get(Option{Addr: addr}, key)
},
}

var rmCmd = &cobra.Command{
Use: "rm [key]",
Short: "Remove a given key",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
addr, _ := cmd.Flags().GetString("addr")
key := args[0]
rm(Option{Addr: addr}, key)
},
}

func main() {
rootCmd.PersistentFlags().StringP("addr", "a", "", "server addr.")
rootCmd.AddCommand(setCmd, getCmd, rmCmd)
cobra.CheckErr(rootCmd.Execute())
}

func set(op Option, key string, value string) {
command := Command{Type: "set", Key: key, Value: value}
send("POST", op.Addr, "/append", command)
}

func get(op Option, key string) {
send("GET", op.Addr, "/get?key="+key, nil)
}

func rm(op Option, key string) {
command := Command{Type: "rm", Key: key}
send("POST", op.Addr, "/append", command)
}

func send(method string, addr string, path string, body interface{}) {
start := time.Now()
jsonBody, _ := json.Marshal(body)
resp, err := http.Post(addr+path, "application/json", bytes.NewBuffer(jsonBody))
if err != nil {
fmt.Println("Error:", err)
return
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
var result Response
json.Unmarshal(data, &result)
fmt.Println("Response:", result)
fmt.Println("Latency:", time.Since(start))
}
45 changes: 29 additions & 16 deletions example/server/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import (
// "encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"sort"
Expand Down Expand Up @@ -70,30 +72,27 @@ var rootCmd = &cobra.Command{

func httpServe(raft *raftgo.Raft, stateMachine *raftgo.MemStateMachine, port int) {
r := gin.Default()
r.Use(func(ctx *gin.Context) {
if raft.LeaderID == nil {
ctx.JSON(500, "no leader")
return
}

if !raft.IsLeader() {
ctx.JSON(400, "it now is no leader")
return
}
})
r.POST("/append_entries", func(c *gin.Context) {
var body any
var body raftgo.AppendEntriesArgs
c.BindJSON(&body)
c.JSON(200, raft.HandleAppendEntries(body.(raftgo.AppendEntriesArgs)))

c.JSON(200, raft.HandleAppendEntries(body))
})

r.POST("/request_vote", func(c *gin.Context) {
var body any
c.BindJSON(&body)
c.JSON(200, raft.HandleRequestVote(body.(raftgo.RequestVoteArgs)))
var body raftgo.RequestVoteArgs

if err := c.BindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.JSON(200, raft.HandleRequestVote(body))
})

r.POST("/append", func(c *gin.Context) {
checkLeader(c, raft)

var body any
c.BindJSON(&body)
commandBase64 := body.(CommandBody).command
Expand All @@ -106,14 +105,28 @@ func httpServe(raft *raftgo.Raft, stateMachine *raftgo.MemStateMachine, port int
}
})

r.GET("/get", func(c *gin.Context) {
r.POST("/get", func(c *gin.Context) {
checkLeader(c, raft)

key := c.Query("key")
c.JSON(200, map[string]interface{}{"value": stateMachine.Get(key)})
})

r.Run(fmt.Sprintf(":%d", port))
}

func checkLeader(c *gin.Context, raft *raftgo.Raft) {
if raft.LeaderID == nil {
c.JSON(500, "no leader")
return
}

if !raft.IsLeader() {
c.JSON(400, "it now is no leader")
return
}
}

func main() {
rootCmd.Flags().StringVarP(&local, "local", "l", "", "the raft server local url")
rootCmd.MarkFlagRequired("local")
Expand Down
35 changes: 21 additions & 14 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func NewRaft(id int, logs *Logs, peers map[int]Peer, config Config) *Raft {
return r
}

func (r *Raft) SetCurrentTerm(term int) {
r.VotedFor = nil
r.CurrentTerm = term
}

func (r *Raft) HandleAppendEntries(aea AppendEntriesArgs) AppendEntriesReply {
if aea.Term < r.CurrentTerm {
return AppendEntriesReply{Term: r.CurrentTerm, Success: false}
Expand Down Expand Up @@ -135,10 +140,10 @@ func (r *Raft) run() {
func (r *Raft) runFollower() {
log.Printf("entering follower state. id: %d term: %d\n", r.ID, r.CurrentTerm)
r.heartbeatTimeout.Reset()
for r.Status == Leader {
for r.Status == Follower {
select {
case <-r.heartChannel:
r.Status = Follower
r.Status = Candidate
return
case <-r.shutdownChannel:
return
Expand Down Expand Up @@ -275,22 +280,24 @@ func (r *Raft) leaderSendHeartbeat(nextIndex map[int]int) []Reply {
return replies
}

func (r *Raft) electSelf() chan []RequestVoteReply {
func (r *Raft) electSelf() <-chan []RequestVoteReply {
r.CurrentTerm++
r.VotedFor = &r.ID
last := r.Logs.Last()
var votes []RequestVoteReply
var res chan []RequestVoteReply
for _, peer := range r.Peers {
vote, _ := peer.RequestVote(RequestVoteArgs{
Term: r.CurrentTerm,
CandidateID: r.ID,
LastLogIndex: last.LogIndex,
LastLogTerm: last.LogTerm,
}, time.Duration(r.Config.RPCTimeout)*time.Millisecond)
votes = append(votes, vote)
}
res <- votes
var res = make(chan []RequestVoteReply)
go func() {
for _, peer := range r.Peers {
vote, _ := peer.RequestVote(RequestVoteArgs{
Term: r.CurrentTerm,
CandidateID: r.ID,
LastLogIndex: last.LogIndex,
LastLogTerm: last.LogTerm,
}, time.Duration(r.Config.RPCTimeout)*time.Millisecond)
votes = append(votes, vote)
}
res <- votes
}()
return res
}

Expand Down
19 changes: 17 additions & 2 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"log"
"net/http"
"time"
)
Expand Down Expand Up @@ -34,7 +35,21 @@ func (p *HttpPeer) RequestVote(rv RequestVoteArgs, timeout time.Duration) (Reque
if err != nil {
return RequestVoteReply{}, err
}
return res.(RequestVoteReply), nil
ress := MapToJsonStruct1(res)
return ress.(RequestVoteReply), nil
}

func MapToJsonStruct1(res interface{}) interface{} {
jsonBytes, err := json.Marshal(res)
if err != nil {
log.Fatal(err)
}
var reply RequestVoteReply
err = json.Unmarshal(jsonBytes, &reply)
if err != nil {
log.Fatal(err)
}
return reply
}

func (p *HttpPeer) post(method string, data interface{}, timeout time.Duration) (interface{}, error) {
Expand All @@ -60,10 +75,10 @@ func (p *HttpPeer) post(method string, data interface{}, timeout time.Duration)
defer resp.Body.Close()

var result interface{}

err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return nil, err
}

return result, nil
}

0 comments on commit f2a2cda

Please sign in to comment.