Skip to content

Stream Raft Messages and Fix Check Quorum #3138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 20, 2019
154 changes: 109 additions & 45 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/y"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/golang/glog"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
otrace "go.opencensus.io/trace"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -143,7 +145,7 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
confChanges: make(map[uint64]chan error),
messages: make(chan sendmsg, 100),
peers: make(map[uint64]string),
requestCh: make(chan linReadReq),
requestCh: make(chan linReadReq, 100),
}
n.Applied.Init(nil)
// This should match up to the Applied index set above.
Expand Down Expand Up @@ -301,12 +303,18 @@ func (n *Node) PastLife() (uint64, bool, error) {
}

const (
messageBatchSoftLimit = 10000000
messageBatchSoftLimit = 10e6
)

type Stream struct {
msgCh chan []byte
alive int32
}

func (n *Node) BatchAndSendMessages() {
batches := make(map[uint64]*bytes.Buffer)
failedConn := make(map[uint64]bool)
streams := make(map[uint64]*Stream)

for {
totalSize := 0
sm := <-n.messages
Expand Down Expand Up @@ -342,59 +350,106 @@ func (n *Node) BatchAndSendMessages() {
if buf.Len() == 0 {
continue
}

addr, has := n.Peer(to)
pool, err := Get().Get(addr)
if !has || err != nil {
if exists := failedConn[to]; !exists {
// So that we print error only the first time we are not able to connect.
// Otherwise, the log is polluted with multiple errors.
glog.Warningf("No healthy connection to node Id: %#x addr: [%s], err: %v\n",
to, addr, err)
failedConn[to] = true
stream, ok := streams[to]
if !ok || atomic.LoadInt32(&stream.alive) <= 0 {
stream = &Stream{
msgCh: make(chan []byte, 100),
alive: 1,
}
continue
go n.streamMessages(to, stream)
streams[to] = stream
}

failedConn[to] = false
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
go n.doSendMessage(to, pool, data)
buf.Reset()

select {
case stream.msgCh <- data:
default:
}
}
}
}

func (n *Node) doSendMessage(to uint64, pool *Pool, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

client := pool.Get()

c := pb.NewRaftClient(client)
p := &api.Payload{Data: data}
batch := &pb.RaftBatch{
Context: n.RaftContext,
Payload: p,
func (n *Node) streamMessages(to uint64, stream *Stream) {
defer atomic.StoreInt32(&stream.alive, 0)

const dur = 10 * time.Second
deadline := time.Now().Add(dur)
var lastLog time.Time
// Exit after a thousand tries or at least 10s. Let BatchAndSendMessages create another
// goroutine, if needed.
for i := 0; ; i++ {
if err := n.doSendMessage(to, stream.msgCh); err != nil {
// Update lastLog so we print error only a few times if we are not able to connect.
// Otherwise, the log is polluted with repeated errors.
if time.Since(lastLog) > dur {
glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err)
}
lastLog = time.Now()
}
if i >= 1e3 {
if time.Now().After(deadline) {
return
}
i = 0
}
}
}

// We don't need to run this in a goroutine, because doSendMessage is
// already being run in one.
_, err := c.RaftMessage(ctx, batch)
func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
addr, has := n.Peer(to)
if !has {
return x.Errorf("Do not have address of peer %#x", to)
}
pool, err := Get().Get(addr)
if err != nil {
switch {
case strings.Contains(err.Error(), "TransientFailure"):
glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
n.Raft().ReportUnreachable(to)
pool.SetUnhealthy()
default:
glog.V(3).Infof("Error while sending Raft message to node with addr: %s, err: %v\n",
pool.Addr, err)
return err
}
c := pb.NewRaftClient(pool.Get())
mc, err := c.RaftMessage(context.Background())
if err != nil {
return err
}

slurp := func(batch *pb.RaftBatch) {
for {
if len(batch.Payload.Data) > messageBatchSoftLimit {
return
}
select {
case data := <-msgCh:
batch.Payload.Data = append(batch.Payload.Data, data...)
default:
return
}
}
}
ctx := mc.Context()
for {
select {
case data := <-msgCh:
batch := &pb.RaftBatch{
Context: n.RaftContext,
Payload: &api.Payload{Data: data},
}
slurp(batch) // Pick up more entries from msgCh, if present.
if err := mc.Send(batch); err != nil {
switch {
case strings.Contains(err.Error(), "TransientFailure"):
glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
n.Raft().ReportUnreachable(to)
pool.SetUnhealthy()
default:
}
// We don't need to do anything if we receive any error while sending message.
// RAFT would automatically retry.
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
// We don't need to do anything if we receive any error while sending message.
// RAFT would automatically retry.
return
}

// Connects the node and makes its peerPool refer to the constructed pool and address
Expand Down Expand Up @@ -508,21 +563,29 @@ type linReadReq struct {
var errReadIndex = x.Errorf("Cannot get linearized read (time expired or no configured leader)")

func (n *Node) WaitLinearizableRead(ctx context.Context) error {
indexCh := make(chan uint64, 1)
span := otrace.FromContext(ctx)
span.Annotate(nil, "WaitLinearizableRead")

indexCh := make(chan uint64, 1)
select {
case n.requestCh <- linReadReq{indexCh: indexCh}:
span.Annotate(nil, "Pushed to requestCh")
case <-ctx.Done():
span.Annotate(nil, "Context expired")
return ctx.Err()
}

select {
case index := <-indexCh:
span.Annotatef(nil, "Received index: %d", index)
if index == 0 {
return errReadIndex
}
return n.Applied.WaitForMark(ctx, index)
err := n.Applied.WaitForMark(ctx, index)
span.Annotatef(nil, "Error from Applied.WaitForMark: %v", err)
return err
case <-ctx.Done():
span.Annotate(nil, "Context expired")
return ctx.Err()
}
}
Expand All @@ -532,7 +595,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
readIndex := func() (uint64, error) {
// Read Request can get rejected then we would wait idefinitely on the channel
// so have a timeout.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var activeRctx [8]byte
Expand All @@ -548,6 +611,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
return 0, errors.New("Closer has been called")
case rs := <-readStateCh:
if !bytes.Equal(activeRctx[:], rs.RequestCtx) {
glog.V(1).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx[:])
goto again
}
return rs.Index, nil
Expand Down
76 changes: 43 additions & 33 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,48 +187,58 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
return &api.Payload{}, err
}

func (w *RaftServer) RaftMessage(ctx context.Context,
batch *pb.RaftBatch) (*api.Payload, error) {
func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
ctx := server.Context()
if ctx.Err() != nil {
return &api.Payload{}, ctx.Err()
return ctx.Err()
}

rc := batch.GetContext()
if rc != nil {
n := w.GetNode()
if n == nil || n.Raft() == nil {
return &api.Payload{}, ErrNoNode
}
n.Connect(rc.Id, rc.Addr)
n := w.GetNode()
if n == nil || n.Raft() == nil {
return ErrNoNode
}
if batch.GetPayload() == nil {
return &api.Payload{}, nil
}
data := batch.Payload.Data
raft := w.GetNode().Raft()

for idx := 0; idx < len(data); {
x.AssertTruef(len(data[idx:]) >= 4,
"Slice left of size: %v. Expected at least 4.", len(data[idx:]))

sz := int(binary.LittleEndian.Uint32(data[idx : idx+4]))
idx += 4
msg := raftpb.Message{}
if idx+sz > len(data) {
return &api.Payload{}, x.Errorf(
"Invalid query. Specified size %v overflows slice [%v,%v)\n",
sz, idx, len(data))
for loop := 1; ; loop++ {
batch, err := server.Recv()
if err != nil {
return err
}
if err := msg.Unmarshal(data[idx : idx+sz]); err != nil {
x.Check(err)
if loop%1e6 == 0 {
glog.V(2).Infof("%d messages received by %#x", loop, n.Id)
}
if loop == 1 {
rc := batch.GetContext()
if rc != nil {
n.Connect(rc.Id, rc.Addr)
}
}
// This should be done in order, and not via a goroutine.
if err := raft.Step(ctx, msg); err != nil {
return &api.Payload{}, err
if batch.Payload == nil {
continue
}
data := batch.Payload.Data

for idx := 0; idx < len(data); {
x.AssertTruef(len(data[idx:]) >= 4,
"Slice left of size: %v. Expected at least 4.", len(data[idx:]))

sz := int(binary.LittleEndian.Uint32(data[idx : idx+4]))
idx += 4
msg := raftpb.Message{}
if idx+sz > len(data) {
return x.Errorf(
"Invalid query. Specified size %v overflows slice [%v,%v)\n",
sz, idx, len(data))
}
if err := msg.Unmarshal(data[idx : idx+sz]); err != nil {
x.Check(err)
}
// This should be done in order, and not via a goroutine.
if err := raft.Step(ctx, msg); err != nil {
return err
}
idx += sz
}
idx += sz
}
return &api.Payload{}, nil
}

// Hello rpc call is used to check connection with other workers after worker
Expand Down
13 changes: 11 additions & 2 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,24 +523,33 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {
}
}

var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01))

func (n *node) checkQuorum(closer *y.Closer) {
defer closer.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

quorum := func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// Make this timeout 3x the timeout on RunReadIndexLoop.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

ctx, span := otrace.StartSpan(ctx, "Zero.checkQuorum", startOption)
defer span.End()
span.Annotatef(nil, "Node id: %d", n.Id)

if state, err := n.server.latestMembershipState(ctx); err == nil {
n.mu.Lock()
n.lastQuorum = time.Now()
n.mu.Unlock()
// Also do some connection cleanup.
conn.Get().RemoveInvalid(state)
span.Annotate(nil, "Updated lastQuorum")

} else if glog.V(1) {
glog.Warningf("Zero node: %#x unable to reach quorum.", n.Id)
span.Annotatef(nil, "Got error: %v", err)
glog.Warningf("Zero node: %#x unable to reach quorum. Error: %v", n.Id, err)
}
}

Expand Down
8 changes: 4 additions & 4 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ message RaftBatch {
}

service Raft {
rpc Heartbeat (api.Payload) returns (stream api.Payload) {}
rpc RaftMessage (RaftBatch) returns (api.Payload) {}
rpc JoinCluster (RaftContext) returns (api.Payload) {}
rpc IsPeer (RaftContext) returns (PeerResponse) {}
rpc Heartbeat (api.Payload) returns (stream api.Payload) {}
rpc RaftMessage (stream RaftBatch) returns (api.Payload) {}
rpc JoinCluster (RaftContext) returns (api.Payload) {}
rpc IsPeer (RaftContext) returns (PeerResponse) {}
}

service Zero {
Expand Down
Loading