Skip to content

Commit

Permalink
Deal better with network partitions in leaders (hypermodeinc#2749)
Browse files Browse the repository at this point in the history
Currently, if Zero leader goes in a network partition, the Alpha nodes would get stuck indefinitely waiting to hear updates from the leader, hence the zero leader becoming a single point of failure. After a network heal, it takes a while for the partitioned nodes to get better again.

This PR fixes these issues, by:
- Each Zero leader now sends membership updates every second.
- If an Alpha does not get a membership update for over 10s, it disconnects from the leader and tries to recreate the connection to another (any) Zero server. Thus, every alpha would correctly pick up the new membership state and hence, the new Zero leader.
- Oracle Delta Stream: If the Zero leader changes or Zero connection becomes unhealthy, Alpha leader would disconnect from the current leader, and try to recreate connection to the new one. Thus, it would continue to receive updates correctly.
- Connection Pool: It used to poll every 10s, with no timeout. Changed that to poll every 1s, with a timeout of 1s -- so we get to know about connection health issues quicker. This creates more network traffic (one Echo every 1s, N^2, where N = number of servers in the Dgraph cluster), but if and when that becomes a problem, we'll fix it.

Commits:
* Added some code to cancel recv from Zero if no update for x seconds. Need to work on ensuring that Zero is sending update every second or so.
* Alpha leader can reconnect to the new Zero leader after existing Zero leader is partitioned away from the cluster.
* Fixed various partition related issues. After partitioning off both Zero and alpha leaders, increments converge quickly to the new ones. And when partition heals, both of them heal quickly.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 7bcd888 commit e36cf7d
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 87 deletions.
9 changes: 7 additions & 2 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
ErrUnhealthyConnection = fmt.Errorf("Unhealthy connection")
errNoPeerPoolEntry = fmt.Errorf("no peerPool entry")
errNoPeerPool = fmt.Errorf("no peerPool pool, could not connect")
echoDuration = 10 * time.Second
echoDuration = time.Second
)

// "Pool" is used to manage the grpc client connection(s) for communicating with other
Expand Down Expand Up @@ -167,7 +167,12 @@ func (p *Pool) UpdateHealthStatus(printError bool) error {
x.Check2(rand.Read(query.Data))

c := pb.NewRaftClient(conn)
resp, err := c.Echo(context.Background(), query)
// Ensure that we have a timeout here, otherwise a network partition could
// end up causing this RPC to get stuck forever.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

resp, err := c.Echo(ctx, query)
if err == nil {
x.AssertTruef(bytes.Equal(resp.Data, query.Data),
"non-matching Echo response value from %v", p.Addr)
Expand Down
24 changes: 11 additions & 13 deletions contrib/blockade/blockade.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ containers:
- 5080
- 6080
command: /gobin/dgraph zero --my=zero1:5080 --replicas 3 --idx 1 --bindall --expose_trace --profile_mode block --block_rate 10 --logtostderr -v=2
volumes:
"/home/dmai/go/bin": "/gobin"
volumes: {"/home/mrjn/go/bin": "/gobin"}

zero2:
image: dgraph/dgraph:latest
Expand All @@ -32,8 +31,7 @@ containers:
- 5081
- 6081
command: /gobin/dgraph zero -o 1 --my=zero2:5081 --replicas 3 --peer=zero1:5080 --idx 2 --bindall --expose_trace --profile_mode block --block_rate 10 --logtostderr -v=2
volumes:
"/home/dmai/go/bin": "/gobin"
volumes: {"/home/mrjn/go/bin": "/gobin"}

zero3:
image: dgraph/dgraph:latest
Expand All @@ -47,7 +45,7 @@ containers:
- 6082
command: /gobin/dgraph zero -o 2 --my=zero3:5082 --replicas 3 --peer=zero1:5080 --idx 3 --bindall --expose_trace --profile_mode block --block_rate 10 --logtostderr -v=2
volumes:
"/home/dmai/go/bin": "/gobin"
"/home/mrjn/go/bin": "/gobin"

dg1:
image: dgraph/dgraph:latest
Expand All @@ -59,9 +57,9 @@ containers:
expose:
- 8180
- 9180
command: sh -xc '/gobin/increment -addr=dg1:9180 -num=10000 -wait=1s & /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2'
command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --logtostderr -v=2
volumes:
"/home/dmai/go/bin": "/gobin"
"/home/mrjn/go/bin": "/gobin"

dg2:
image: dgraph/dgraph:latest
Expand All @@ -73,10 +71,10 @@ containers:
expose:
- 8182
- 9182
start_delay: 10
command: sh -xc '/gobin/increment -addr=dg2:9182 -num=10000 -wait=1s & echo "Starting dg2"; /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2'
start_delay: 8
command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --logtostderr -v=2
volumes:
"/home/dmai/go/bin": "/gobin"
"/home/mrjn/go/bin": "/gobin"

dg3:
image: dgraph/dgraph:latest
Expand All @@ -88,10 +86,10 @@ containers:
expose:
- 8183
- 9183
start_deplay: 15
command: sh -xc '/gobin/increment -addr=dg3:9183 -num=10000 -wait=1s & echo "Starting dg3"; /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2'
start_delay: 16
command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --logtostderr -v=2
volumes:
"/home/dmai/go/bin": "/gobin"
"/home/mrjn/go/bin": "/gobin"

network:
driver: "udn"
3 changes: 2 additions & 1 deletion contrib/integration/increment/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func main() {
time.Sleep(time.Second)
continue
}
fmt.Printf("Counter VAL: %d [ Ts: %d ]\n", cnt.Val, cnt.startTs)
fmt.Printf("%-17s Counter VAL: %d [ Ts: %d ]\n",
time.Now().UTC().Format("0102 03:04:05.999"), cnt.Val, cnt.startTs)
*num -= 1
time.Sleep(waitDur)
}
Expand Down
40 changes: 0 additions & 40 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"log"
"math/rand"
"time"

"google.golang.org/grpc"
Expand Down Expand Up @@ -48,41 +47,6 @@ type node struct {

var errReadIndex = x.Errorf("cannot get linerized read (time expired or no configured leader)")

func (n *node) RegisterForUpdates(ch chan struct{}) uint32 {
n.Lock()
defer n.Unlock()
if n.subscribers == nil {
n.subscribers = make(map[uint32]chan struct{})
}
for {
id := rand.Uint32()
if _, has := n.subscribers[id]; has {
continue
}
n.subscribers[id] = ch
return id
}
}

func (n *node) Deregister(id uint32) {
n.Lock()
defer n.Unlock()
delete(n.subscribers, id)
}

func (n *node) triggerUpdates() {
n.Lock()
defer n.Unlock()
for _, ch := range n.subscribers {
select {
case ch <- struct{}{}:
// We can ignore it and don't send a notification, because they are going to
// read a state version after now since ch is already full.
default:
}
}
}

func (n *node) AmLeader() bool {
if n.Raft() == nil {
return false
Expand Down Expand Up @@ -573,10 +537,6 @@ func (n *node) Run() {
for _, msg := range rd.Messages {
n.Send(msg)
}
// Need to send membership state to dgraph nodes on leader change also.
if rd.SoftState != nil || len(rd.CommittedEntries) > 0 {
n.triggerUpdates()
}
n.Raft().Advance()
}
}
Expand Down
21 changes: 6 additions & 15 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,16 +560,8 @@ func (s *Server) Update(stream pb.Zero_UpdateServer) error {
che <- s.receiveUpdates(stream)
}()

// Check every minute that whether we caught upto read index or not.
ticker := time.NewTicker(time.Minute)
// Send MembershipState right away. So, the connection is correctly established.
ctx := stream.Context()
// node sends struct{} on this channel whenever membership state is updated
changeCh := make(chan struct{}, 1)

id := s.Node.RegisterForUpdates(changeCh)
defer s.Node.Deregister(id)
// Send MembershipState immediately after registering. (Or there could be race
// condition between registering and change in membership state).
ms, err := s.latestMembershipState(ctx)
if err != nil {
return err
Expand All @@ -581,9 +573,13 @@ func (s *Server) Update(stream pb.Zero_UpdateServer) error {
}
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-changeCh:
case <-ticker.C:
// Send an update every second.
ms, err := s.latestMembershipState(ctx)
if err != nil {
return err
Expand All @@ -595,11 +591,6 @@ func (s *Server) Update(stream pb.Zero_UpdateServer) error {
case err := <-che:
// Error while receiving updates.
return err
case <-ticker.C:
// Check Whether we caught upto read index or not.
if _, err := s.latestMembershipState(ctx); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-s.shutDownCh:
Expand Down
5 changes: 3 additions & 2 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
if txn == nil {
return
}
err := x.RetryUntilSuccess(Config.MaxRetries, 10 * time.Millisecond, func() error {
err := x.RetryUntilSuccess(Config.MaxRetries, 10*time.Millisecond, func() error {
return txn.CommitToDisk(writer, commit)
})

Expand Down Expand Up @@ -477,7 +477,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
if txn == nil {
return
}
err := x.RetryUntilSuccess(Config.MaxRetries, 10 * time.Millisecond, func() error {
err := x.RetryUntilSuccess(Config.MaxRetries, 10*time.Millisecond, func() error {
return txn.CommitToMemory(commit)
})
if err != nil {
Expand Down Expand Up @@ -689,6 +689,7 @@ func (n *node) Run() {
break
}
glog.Errorf("While retrieving snapshot, error: %v. Retrying...", err)
time.Sleep(100 * time.Millisecond) // Wait for a bit.
}
glog.Infof("---> SNAPSHOT: %+v. Group %d. DONE.\n", snap, n.gid)
} else {
Expand Down
55 changes: 45 additions & 10 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,21 @@ func (g *groupi) triggerMembershipSync() {
}
}

// TODO: This function needs to be refactored into smaller functions. It gets hard to reason about.
func (g *groupi) periodicMembershipUpdate() {
defer g.closer.Done() // CLOSER:1

// Calculating tablet sizes is expensive, hence we do it only every 5 mins.
ticker := time.NewTicker(time.Minute * 5)
// Node might not be the leader when we are calculating size.
// We need to send immediately on start so no leader check inside calculatesize.
tablets := g.calculateTabletSizes()

// Calculating tablet sizes is expensive, hence we do it only every 5 mins.
slowTicker := time.NewTicker(time.Minute * 5)
defer slowTicker.Stop()

fastTicker := time.NewTicker(10 * time.Second)
defer fastTicker.Stop()

START:
select {
case <-g.closer.HasBeenClosed():
Expand All @@ -511,7 +517,7 @@ START:
pl := g.AnyServer(0)
// We should always have some connection to dgraphzero.
if pl == nil {
glog.Warningln("WARNING: We don't have address of any Zero server.")
glog.Warningln("Membership update: No Zero server known.")
time.Sleep(time.Second)
goto START
}
Expand All @@ -526,8 +532,10 @@ START:
goto START
}

stateCh := make(chan *pb.MembershipState, 10)
go func() {
for {
glog.Infof("Starting a new membership stream receive from %s.", pl.Addr)
for i := 0; ; i++ {
// Blocking, should return if sending on stream fails(Need to verify).
state, err := stream.Recv()
if err != nil || state == nil {
Expand All @@ -542,10 +550,18 @@ START:
}
return
}
g.applyState(state)
if i == 0 {
glog.Infof("Received first state update from Zero: %+v", state)
}
select {
case stateCh <- state:
case <-ctx.Done():
return
}
}
}()

lastRecv := time.Now()
g.triggerMembershipSync() // Ticker doesn't start immediately
OUTER:
for {
Expand All @@ -556,7 +572,17 @@ OUTER:
case <-ctx.Done():
stream.CloseSend()
break OUTER

case state := <-stateCh:
lastRecv = time.Now()
g.applyState(state)
case <-fastTicker.C:
if time.Since(lastRecv) > 10*time.Second {
// Zero might have gone under partition. We should recreate our connection.
glog.Warningf("No membership update for 10s. Closing connection to Zero.")
stream.CloseSend()
cancel()
break OUTER
}
case <-g.triggerCh:
if !g.Node.AmLeader() {
tablets = nil
Expand All @@ -566,7 +592,7 @@ OUTER:
stream.CloseSend()
break OUTER
}
case <-ticker.C:
case <-slowTicker.C:
// dgraphzero just adds to the map so check that no data is present for the tablet
// before we remove it to avoid the race condition where a tablet is added recently
// and mutation has not been persisted to disk.
Expand Down Expand Up @@ -712,6 +738,9 @@ func (g *groupi) sendMembership(tablets map[string]*pb.Tablet,
func (g *groupi) processOracleDeltaStream() {
defer g.closer.Done() // CLOSER:1

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

blockingReceiveAndPropose := func() {
elog := trace.NewEventLog("Dgraph", "ProcessOracleStream")
defer elog.Finish()
Expand All @@ -720,7 +749,7 @@ func (g *groupi) processOracleDeltaStream() {

pl := g.Leader(0)
if pl == nil {
glog.Warningln("WARNING: We don't have address of any dgraphzero leader.")
glog.Warningln("Oracle delta stream: No Zero leader known.")
elog.Errorf("Dgraph zero leader address unknown")
time.Sleep(time.Second)
return
Expand Down Expand Up @@ -776,6 +805,14 @@ func (g *groupi) processOracleDeltaStream() {
return
}
batch++
case <-ticker.C:
newLead := g.Leader(0)
if newLead == nil || newLead.Addr != pl.Addr {
glog.Infof("Zero leadership changed. Renewing oracle delta stream.")
return
}
continue

case <-ctx.Done():
return
case <-g.closer.HasBeenClosed():
Expand Down Expand Up @@ -828,8 +865,6 @@ func (g *groupi) processOracleDeltaStream() {
}
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-g.closer.HasBeenClosed():
Expand Down
9 changes: 5 additions & 4 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,15 @@ func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error)
zc := pb.NewZeroClient(pl.Get())
tctx, err := zc.CommitOrAbort(ctx, tc)

if err != nil {
span.Annotatef(nil, "Error=%v", err)
return 0, err
}
var attributes []otrace.Attribute
attributes = append(attributes, otrace.Int64Attribute("commitTs", int64(tctx.CommitTs)))
attributes = append(attributes, otrace.BoolAttribute("committed", tctx.CommitTs > 0))
span.Annotatef(attributes, "Error=%v", err)
span.Annotate(attributes, "")

if err != nil {
return 0, err
}
if tctx.Aborted {
return 0, y.ErrAborted
}
Expand Down

0 comments on commit e36cf7d

Please sign in to comment.