Skip to content

Commit

Permalink
bugfix: WaitForReceiver didn't kill proxies correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzhe1991 committed Jul 20, 2015
1 parent a54c6b0 commit dd0083b
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 22 deletions.
31 changes: 10 additions & 21 deletions pkg/models/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ import (
"strconv"
"time"

"github.com/ngaut/zkhelper"
"github.com/wandoulabs/codis/pkg/utils"

"github.com/juju/errors"
"github.com/ngaut/go-zookeeper/zk"
log "github.com/ngaut/logging"
"github.com/ngaut/zkhelper"
)

type ActionType string
Expand Down Expand Up @@ -87,41 +85,32 @@ func WaitForReceiver(zkConn zkhelper.Conn, productName string, actionZkPath stri
}

times := 0
var proxyIds []string
var offlineProxyIds []string
proxyIds := make(map[string]bool)
for _, p := range proxies {
proxyIds = append(proxyIds, p.Id)
proxyIds[p.Id] = true
}
sort.Strings(proxyIds)
// check every 500ms
for times < 60 {
for times < 20 {
if times >= 6 && (times*500)%1000 == 0 {
log.Warning("abnormal waiting time for receivers", actionZkPath)
}
nodes, _, err := zkConn.Children(actionZkPath)
if err != nil {
return errors.Trace(err)
}
var confirmIds []string
for _, node := range nodes {
id := path.Base(node)
confirmIds = append(confirmIds, id)
delete(proxyIds, id)
}
if len(confirmIds) != 0 {
sort.Strings(confirmIds)
if utils.Strings(proxyIds).Eq(confirmIds) {
return nil
}
offlineProxyIds = proxyIds[len(confirmIds)-1:]
if len(proxyIds) == 0 {
return nil
}
times += 1
times++
time.Sleep(500 * time.Millisecond)
}
if len(offlineProxyIds) > 0 {
log.Error("proxies didn't responed: ", offlineProxyIds)
}
log.Warning("proxies didn't responed: ", proxyIds)
// set offline proxies
for _, id := range offlineProxyIds {
for id, _ := range proxyIds {
log.Errorf("mark proxy %s to PROXY_STATE_MARK_OFFLINE", id)
if err := SetProxyStatus(zkConn, productName, id, PROXY_STATE_MARK_OFFLINE); err != nil {
return err
Expand Down
123 changes: 122 additions & 1 deletion pkg/models/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package models
import (
"encoding/json"
"fmt"
"path"
"testing"
"time"

"github.com/juju/errors"
"github.com/ngaut/zkhelper"

"github.com/wandoulabs/codis/pkg/utils"
)

Expand Down Expand Up @@ -48,6 +49,126 @@ func TestNewAction(t *testing.T) {
}
}

func TestWaitForReceiverTimeout(t *testing.T) {
fakeZkConn := zkhelper.NewConn()
proxies := []ProxyInfo{}
for i := 0; i < 5; i++ {
proxies = append(proxies, ProxyInfo{
Id: fmt.Sprintf("proxy_%d", i),
Addr: fmt.Sprintf("localhost:%d", i+1234),
State: PROXY_STATE_ONLINE,
})
CreateProxyInfo(fakeZkConn, productName, &proxies[i])
}
zkhelper.CreateRecursive(fakeZkConn, GetActionResponsePath(productName)+"/1", "", 0, zkhelper.DefaultDirACLs())
go func() {
time.Sleep(time.Second * 2)
doResponseForTest(fakeZkConn, "1", &proxies[0])
doResponseForTest(fakeZkConn, "1", &proxies[2])
doResponseForTest(fakeZkConn, "1", &proxies[4])
for {
for i := 0; i < 5; i++ {
pname := fmt.Sprintf("proxy_%d", i)
p, _ := GetProxyInfo(fakeZkConn, productName, pname)
if p != nil && p.State == PROXY_STATE_MARK_OFFLINE {
zkhelper.DeleteRecursive(fakeZkConn, path.Join(GetProxyPath(productName), pname), -1)
}
}
}
}()
err := WaitForReceiver(fakeZkConn, productName, GetActionResponsePath(productName)+"/1", proxies)
if err != ErrReceiverTimeout {
t.Error("there is no timeout as expected")
}
p, _ := GetProxyInfo(fakeZkConn, productName, "proxy_0")
if p == nil || p.State != PROXY_STATE_ONLINE {
t.Error("proxy_0 status is not as expected")
}
p, _ = GetProxyInfo(fakeZkConn, productName, "proxy_1")
if p != nil {
t.Error("proxy_1 status is not as expected")
}
p, _ = GetProxyInfo(fakeZkConn, productName, "proxy_2")
if p == nil || p.State != PROXY_STATE_ONLINE {
t.Error("proxy_2 status is not as expected")
}
p, _ = GetProxyInfo(fakeZkConn, productName, "proxy_3")
if p != nil {
t.Error("proxy_3 status is not as expected")
}
p, _ = GetProxyInfo(fakeZkConn, productName, "proxy_4")
if p == nil || p.State != PROXY_STATE_ONLINE {
t.Error("proxy_4 status is not as expected")
}
}

func TestWaitForReceiver(t *testing.T) {
fakeZkConn := zkhelper.NewConn()
proxies := []ProxyInfo{}
for i := 0; i < 5; i++ {
proxies = append(proxies, ProxyInfo{
Id: fmt.Sprintf("proxy_%d", i),
Addr: fmt.Sprintf("localhost:%d", i+1234),
State: PROXY_STATE_ONLINE,
})
CreateProxyInfo(fakeZkConn, productName, &proxies[i])
}
zkhelper.CreateRecursive(fakeZkConn, GetActionResponsePath(productName)+"/1", "", 0, zkhelper.DefaultDirACLs())
go func() {
time.Sleep(time.Second * 2)
doResponseForTest(fakeZkConn, "1", &proxies[0])
doResponseForTest(fakeZkConn, "1", &proxies[1])
doResponseForTest(fakeZkConn, "1", &proxies[2])
doResponseForTest(fakeZkConn, "1", &proxies[3])
doResponseForTest(fakeZkConn, "1", &proxies[4])
for {
for i := 0; i < 5; i++ {
pname := fmt.Sprintf("proxy_%d", i)
p, _ := GetProxyInfo(fakeZkConn, productName, pname)
if p != nil && p.State == PROXY_STATE_MARK_OFFLINE {
zkhelper.DeleteRecursive(fakeZkConn, path.Join(GetProxyPath(productName), pname), -1)
}
}
}
}()
err := WaitForReceiver(fakeZkConn, productName, GetActionResponsePath(productName)+"/1", proxies)
if err != nil {
t.Error("there is error not as expected")
}
p, _ := GetProxyInfo(fakeZkConn, productName, "proxy_0")
if p == nil || p.State != PROXY_STATE_ONLINE {
t.Error("proxy_0 status is not as expected")
}
p, _ = GetProxyInfo(fakeZkConn, productName, "proxy_1")
if p == nil || p.State != PROXY_STATE_ONLINE {
t.Error("proxy_1 status is not as expected")
}
p, _ = GetProxyInfo(fakeZkConn, productName, "proxy_2")
if p == nil || p.State != PROXY_STATE_ONLINE {
t.Error("proxy_2 status is not as expected")
}
p, _ = GetProxyInfo(fakeZkConn, productName, "proxy_3")
if p == nil || p.State != PROXY_STATE_ONLINE {
t.Error("proxy_3 status is not as expected")
}
p, _ = GetProxyInfo(fakeZkConn, productName, "proxy_4")
if p == nil || p.State != PROXY_STATE_ONLINE {
t.Error("proxy_4 status is not as expected")
}
}

func doResponseForTest(conn zkhelper.Conn, seq string, pi *ProxyInfo) error {
actionPath := GetActionResponsePath(productName) + "/" + seq
data, err := json.Marshal(pi)
if err != nil {
return errors.Trace(err)
}

_, err = conn.Create(path.Join(actionPath, pi.Id), data,
0, zkhelper.DefaultFileACLs())
return err
}

func TestForceRemoveLock(t *testing.T) {
fakeZkConn := zkhelper.NewConn()
zkLock := utils.GetZkLock(fakeZkConn, productName)
Expand Down

0 comments on commit dd0083b

Please sign in to comment.