Skip to content

Commit

Permalink
rename /doozer/* to new structure
Browse files Browse the repository at this point in the history
kr committed Mar 23, 2011
1 parent e0ad5a8 commit 1e46868
Showing 13 changed files with 82 additions and 82 deletions.
2 changes: 1 addition & 1 deletion bin/test-cluster
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ done
i=2
while test $i -le $c
do
true | doozer -a 127.0.0.1:8041 set /doozer/slot/$i 0
true | doozer -a 127.0.0.1:8041 set /ctl/cal/$i 0
i=$(expr $i + 1)
done

22 changes: 11 additions & 11 deletions src/pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -376,7 +376,7 @@ func (c *conn) monitorAddrs(cl *Client) {
}
}

addrGlob := pb.String("/doozer/info/*/public-addr")
addrGlob := pb.String("/ctl/node/*/public-addr")
watchAddr, err := c.events(&T{Verb: watch, Path: addrGlob})
if err != nil {
log.Println(err)
@@ -402,7 +402,7 @@ init:
}
}

glob := pb.String("/doozer/slot/*")
glob := pb.String("/ctl/cal/*")

watch, err := c.events(&T{Verb: watch, Path: glob})
if err != nil {
@@ -416,7 +416,7 @@ init:
return
}

slots := make(map[string]string)
cal := make(map[string]string)

for {
if watchAddr.C == nil && walk.C == nil && watch.C == nil {
@@ -438,13 +438,13 @@ init:

if len(ev.Body) > 0 {
sid := string(ev.Body)
path := "/doozer/info/" + sid + "/public-addr"
path := "/ctl/node/" + sid + "/public-addr"
addr := addrs[path]
slots[ev.Path] = addr
cal[ev.Path] = addr
cl.a <- addr
} else {
addr := slots[ev.Path]
slots[ev.Path] = "", false
addr := cal[ev.Path]
cal[ev.Path] = "", false
cl.r <- addr
}
case ev := <-watch.C:
@@ -455,13 +455,13 @@ init:

if len(ev.Body) > 0 {
sid := string(ev.Body)
path := "/doozer/info/" + sid + "/public-addr"
path := "/ctl/node/" + sid + "/public-addr"
addr := addrs[path]
slots[ev.Path] = addr
cal[ev.Path] = addr
cl.a <- addr
} else {
addr := slots[ev.Path]
slots[ev.Path] = "", false
addr := cal[ev.Path]
cal[ev.Path] = "", false
cl.r <- addr
}
}
16 changes: 8 additions & 8 deletions src/pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
@@ -14,8 +14,8 @@ func TestConsensusOne(t *testing.T) {
alpha := int64(1)
st := store.New()

st.Ops <- store.Op{1, store.MustEncodeSet("/doozer/info/"+self+"/addr", "x", 0)}
st.Ops <- store.Op{2, store.MustEncodeSet("/doozer/slot/1", self, 0)}
st.Ops <- store.Op{1, store.MustEncodeSet("/ctl/node/"+self+"/addr", "x", 0)}
st.Ops <- store.Op{2, store.MustEncodeSet("/ctl/cal/1", self, 0)}
<-st.Seqns

cmw := st.Watch(store.Any)
@@ -64,10 +64,10 @@ func TestConsensusTwo(t *testing.T) {
alpha := int64(1)
st := store.New()

st.Ops <- store.Op{1, store.MustEncodeSet("/doozer/info/"+a+"/addr", "x", 0)}
st.Ops <- store.Op{2, store.MustEncodeSet("/doozer/slot/1", a, 0)}
st.Ops <- store.Op{3, store.MustEncodeSet("/doozer/info/"+b+"/addr", "x", 0)}
st.Ops <- store.Op{4, store.MustEncodeSet("/doozer/slot/2", b, 0)}
st.Ops <- store.Op{1, store.MustEncodeSet("/ctl/node/"+a+"/addr", "x", 0)}
st.Ops <- store.Op{2, store.MustEncodeSet("/ctl/cal/1", a, 0)}
st.Ops <- store.Op{3, store.MustEncodeSet("/ctl/node/"+b+"/addr", "x", 0)}
st.Ops <- store.Op{4, store.MustEncodeSet("/ctl/cal/2", b, 0)}
snn := <-st.Seqns

acmw := st.Watch(store.Any)
@@ -131,8 +131,8 @@ func TestLearnedValueIsLearned(t *testing.T) {
alpha := int64(1)
st := store.New()

st.Ops <- store.Op{1, store.MustEncodeSet("/doozer/info/"+self+"/addr", "x", 0)}
st.Ops <- store.Op{2, store.MustEncodeSet("/doozer/slot/1", self, 0)}
st.Ops <- store.Op{1, store.MustEncodeSet("/ctl/node/"+self+"/addr", "x", 0)}
st.Ops <- store.Op{2, store.MustEncodeSet("/ctl/cal/1", self, 0)}
<-st.Seqns

cmw := st.Watch(store.Any)
12 changes: 6 additions & 6 deletions src/pkg/consensus/run.go
Original file line number Diff line number Diff line change
@@ -111,12 +111,12 @@ func generateRuns(alpha int64, w <-chan store.Event, runs chan<- *run, t run) {
}

func getCals(g store.Getter) []string {
slots := store.Getdir(g, "/doozer/slot")
cals := make([]string, len(slots))
ents := store.Getdir(g, "/ctl/cal")
cals := make([]string, len(ents))

i := 0
for _, slot := range slots {
id := store.GetString(g, "/doozer/slot/"+slot)
for _, cal := range ents {
id := store.GetString(g, "/ctl/cal/"+cal)
if id != "" {
cals[i] = id
i++
@@ -133,11 +133,11 @@ func getCals(g store.Getter) []string {
func getAddrs(g store.Getter) map[string]bool {
// TODO include only CALs, once followers use TCP for updates.

members := store.Getdir(g, "/doozer/info")
members := store.Getdir(g, "/ctl/node")
addrs := make(map[string]bool)

for _, member := range members {
addrs[store.GetString(g, "/doozer/info/"+member+"/addr")] = true
addrs[store.GetString(g, "/ctl/node/"+member+"/addr")] = true
}

return addrs
30 changes: 15 additions & 15 deletions src/pkg/consensus/run_test.go
Original file line number Diff line number Diff line change
@@ -10,8 +10,8 @@ import (


const (
info = "/doozer/info"
slot = "/doozer/slot"
node = "/ctl/node"
cal = "/ctl/cal"
)


@@ -29,9 +29,9 @@ func TestGetCalsFull(t *testing.T) {
st := store.New()
defer close(st.Ops)

st.Ops <- store.Op{Seqn: 1, Mut: store.MustEncodeSet(slot+"/1", "a", 0)}
st.Ops <- store.Op{Seqn: 2, Mut: store.MustEncodeSet(slot+"/2", "c", 0)}
st.Ops <- store.Op{Seqn: 3, Mut: store.MustEncodeSet(slot+"/3", "b", 0)}
st.Ops <- store.Op{Seqn: 1, Mut: store.MustEncodeSet(cal+"/1", "a", 0)}
st.Ops <- store.Op{Seqn: 2, Mut: store.MustEncodeSet(cal+"/2", "c", 0)}
st.Ops <- store.Op{Seqn: 3, Mut: store.MustEncodeSet(cal+"/3", "b", 0)}
<-st.Seqns

assert.Equal(t, []string{"a", "b", "c"}, getCals(st))
@@ -42,9 +42,9 @@ func TestGetCalsPartial(t *testing.T) {
st := store.New()
defer close(st.Ops)

st.Ops <- store.Op{Seqn: 1, Mut: store.MustEncodeSet(slot+"/1", "a", 0)}
st.Ops <- store.Op{Seqn: 2, Mut: store.MustEncodeSet(slot+"/2", "", 0)}
st.Ops <- store.Op{Seqn: 3, Mut: store.MustEncodeSet(slot+"/3", "", 0)}
st.Ops <- store.Op{Seqn: 1, Mut: store.MustEncodeSet(cal+"/1", "a", 0)}
st.Ops <- store.Op{Seqn: 2, Mut: store.MustEncodeSet(cal+"/2", "", 0)}
st.Ops <- store.Op{Seqn: 3, Mut: store.MustEncodeSet(cal+"/3", "", 0)}
<-st.Seqns

assert.Equal(t, []string{"a"}, getCals(st))
@@ -55,9 +55,9 @@ func TestGetAddrs(t *testing.T) {
st := store.New()
defer close(st.Ops)

st.Ops <- store.Op{1, store.MustEncodeSet(info+"/1/addr", "x", 0)}
st.Ops <- store.Op{2, store.MustEncodeSet(info+"/2/addr", "y", 0)}
st.Ops <- store.Op{3, store.MustEncodeSet(info+"/3/addr", "z", 0)}
st.Ops <- store.Op{1, store.MustEncodeSet(node+"/1/addr", "x", 0)}
st.Ops <- store.Op{2, store.MustEncodeSet(node+"/2/addr", "y", 0)}
st.Ops <- store.Op{3, store.MustEncodeSet(node+"/3/addr", "z", 0)}
<-st.Seqns

assert.Equal(t, map[string]bool{"x": true, "y": true, "z": true}, getAddrs(st))
@@ -83,12 +83,12 @@ func alphaTest(t *testing.T, alpha int64) {

st.Ops <- store.Op{
Seqn: 1,
Mut: store.MustEncodeSet(info+"/a/addr", "x", 0),
Mut: store.MustEncodeSet(node+"/a/addr", "x", 0),
}

st.Ops <- store.Op{
Seqn: 2,
Mut: store.MustEncodeSet(slot+"/1", "a", 0),
Mut: store.MustEncodeSet(cal+"/1", "a", 0),
}

for 2 != <-st.Seqns {
@@ -154,7 +154,7 @@ func TestRunAfterWatch(t *testing.T) {

st.Ops <- store.Op{
Seqn: 1,
Mut: store.MustEncodeSet(info+"/b/addr", "y", 0),
Mut: store.MustEncodeSet(node+"/b/addr", "y", 0),
}

for 1 != <-st.Seqns {
@@ -170,7 +170,7 @@ func TestRunAfterWatch(t *testing.T) {

st.Ops <- store.Op{
Seqn: 2,
Mut: store.MustEncodeSet(slot+"/1", "b", 0),
Mut: store.MustEncodeSet(cal+"/1", "b", 0),
}

exp := &run{
28 changes: 14 additions & 14 deletions src/pkg/doozer.go
Original file line number Diff line number Diff line change
@@ -23,9 +23,9 @@ const (
sessionPollInterval = 1e9 // ns == 1s
)

const slot = "/doozer/slot"
const calDir = "/ctl/cal"

var slots = store.MustCompileGlob("/doozer/slot/*")
var calGlob = store.MustCompileGlob(calDir + "/*")


type proposer struct {
@@ -59,12 +59,12 @@ func Main(clusterName, attachAddr string, udpConn net.PacketConn, listener, webL
self := util.RandId()
st := store.New()
if attachAddr == "" { // we are the only node in a new cluster
set(st, "/doozer/info/"+self+"/addr", listenAddr, store.Missing)
set(st, "/doozer/info/"+self+"/public-addr", listenAddr, store.Missing)
set(st, "/doozer/info/"+self+"/hostname", os.Getenv("HOSTNAME"), store.Missing)
set(st, "/doozer/info/"+self+"/version", Version, store.Missing)
set(st, "/ctl/node/"+self+"/addr", listenAddr, store.Missing)
set(st, "/ctl/node/"+self+"/public-addr", listenAddr, store.Missing)
set(st, "/ctl/node/"+self+"/hostname", os.Getenv("HOSTNAME"), store.Missing)
set(st, "/ctl/node/"+self+"/version", Version, store.Missing)
set(st, "/doozer/members/"+self, listenAddr, store.Missing)
set(st, "/doozer/slot/"+"1", self, store.Missing)
set(st, "/ctl/cal/"+"1", self, store.Missing)
set(st, "/ping", "pong", store.Missing)

cal <- true
@@ -73,10 +73,10 @@ func Main(clusterName, attachAddr string, udpConn net.PacketConn, listener, webL
var cl *client.Client
cl = client.New("local", attachAddr) // TODO use real cluster name

setC(cl, "/doozer/info/"+self+"/addr", listenAddr, store.Clobber)
setC(cl, "/doozer/info/"+self+"/public-addr", listenAddr, store.Clobber)
setC(cl, "/doozer/info/"+self+"/hostname", os.Getenv("HOSTNAME"), store.Clobber)
setC(cl, "/doozer/info/"+self+"/version", Version, store.Clobber)
setC(cl, "/ctl/node/"+self+"/addr", listenAddr, store.Clobber)
setC(cl, "/ctl/node/"+self+"/public-addr", listenAddr, store.Clobber)
setC(cl, "/ctl/node/"+self+"/hostname", os.Getenv("HOSTNAME"), store.Clobber)
setC(cl, "/ctl/node/"+self+"/version", Version, store.Clobber)

rev, err := cl.Rev()
if err != nil {
@@ -212,10 +212,10 @@ func Main(clusterName, attachAddr string, udpConn net.PacketConn, listener, webL
}

func activate(st *store.Store, self string, c *client.Client) int64 {
w := store.NewWatch(st, slots)
w := store.NewWatch(st, calGlob)

for _, base := range store.Getdir(st, slot) {
p := slot + "/" + base
for _, base := range store.Getdir(st, calDir) {
p := calDir + "/" + base
v, rev := st.Get(p)
if rev != store.Dir && v[0] == "" {
seqn, err := c.Set(p, rev, []byte(self))
4 changes: 2 additions & 2 deletions src/pkg/doozer_test.go
Original file line number Diff line number Diff line change
@@ -464,10 +464,10 @@ func TestDoozerReconnect(t *testing.T) {

c0 := client.New("foo", a)

_, err := c0.Set("/doozer/slot/2", 0, []byte{})
_, err := c0.Set("/ctl/cal/2", 0, []byte{})
assert.Equal(t, nil, err)

_, err = c0.Set("/doozer/slot/3", 0, []byte{})
_, err = c0.Set("/ctl/cal/3", 0, []byte{})
assert.Equal(t, nil, err)

// Wait for the other members to become CALs.
2 changes: 1 addition & 1 deletion src/pkg/gc/pulse.go
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import (
)

func Pulse(node string, seqns <-chan int64, p consensus.Proposer, sleep int64) {
path := "/doozer/info/" + node + "/applied"
path := "/ctl/node/" + node + "/applied"
for {
seqn := strconv.Itoa64(<-seqns)
if closed(seqns) {
4 changes: 2 additions & 2 deletions src/pkg/gc/pulse_test.go
Original file line number Diff line number Diff line change
@@ -24,8 +24,8 @@ func TestGcPulse(t *testing.T) {
go Pulse("test", seqns, fs, 1)

seqns <- 0
assert.Equal(t, "-1:/doozer/info/test/applied=0", <-fs)
assert.Equal(t, "-1:/ctl/node/test/applied=0", <-fs)

seqns <- 1
assert.Equal(t, "-1:/doozer/info/test/applied=1", <-fs)
assert.Equal(t, "-1:/ctl/node/test/applied=1", <-fs)
}
10 changes: 5 additions & 5 deletions src/pkg/member/member.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import (
)

var (
slots = store.MustCompileGlob("/doozer/slot/*")
calGlob = store.MustCompileGlob("/ctl/cal/*")
)

func Clean(c chan string, st *store.Store, p consensus.Proposer) {
@@ -27,8 +27,8 @@ func Clean(c chan string, st *store.Store, p consensus.Proposer) {


func getId(addr string, g store.Getter) string {
for _, slot := range store.Getdir(g, "/doozer/slot") {
id := store.GetString(g, "/doozer/slot/"+slot)
for _, cal := range store.Getdir(g, "/ctl/cal") {
id := store.GetString(g, "/ctl/cal/"+cal)
if store.GetString(g, "/doozer/members/"+id) == addr {
return id
}
@@ -38,7 +38,7 @@ func getId(addr string, g store.Getter) string {


func clearSlot(p consensus.Proposer, g store.Getter, name string) {
store.Walk(g, slots, func(path, body string, rev int64) bool {
store.Walk(g, calGlob, func(path, body string, rev int64) bool {
if body == name {
consensus.Set(p, path, nil, rev)
}
@@ -55,7 +55,7 @@ func removeMember(p consensus.Proposer, g store.Getter, name string) {
}

func removeInfo(p consensus.Proposer, g store.Getter, name string) {
glob, err := store.CompileGlob("/doozer/info/" + name + "/**")
glob, err := store.CompileGlob("/ctl/node/" + name + "/**")
if err != nil {
log.Println(err)
return
16 changes: 8 additions & 8 deletions src/pkg/member/member_test.go
Original file line number Diff line number Diff line change
@@ -18,19 +18,19 @@ func TestMemberSimple(t *testing.T) {
// start our session
fp.Propose([]byte(store.MustEncodeSet("/ctl/sess/a", "foo", store.Missing)))

fp.Propose([]byte(store.MustEncodeSet("/doozer/info/a/x", "a", store.Missing)))
fp.Propose([]byte(store.MustEncodeSet("/doozer/info/a/y", "b", store.Missing)))
fp.Propose([]byte(store.MustEncodeSet("/ctl/node/a/x", "a", store.Missing)))
fp.Propose([]byte(store.MustEncodeSet("/ctl/node/a/y", "b", store.Missing)))
fp.Propose([]byte(store.MustEncodeSet("/doozer/members/a", "addr", store.Missing)))
fp.Propose([]byte(store.MustEncodeSet("/doozer/slot/0", "a", store.Missing)))
fp.Propose([]byte(store.MustEncodeSet("/ctl/cal/0", "a", store.Missing)))

slotCh := fp.Watch(store.MustCompileGlob("/doozer/slot/0"))
calCh := fp.Watch(store.MustCompileGlob("/ctl/cal/0"))
membCh := fp.Watch(store.MustCompileGlob("/doozer/members/a"))
infoCh := fp.Watch(store.MustCompileGlob("/doozer/info/a/?"))
nodeCh := fp.Watch(store.MustCompileGlob("/ctl/node/a/?"))

// end the session
go func() { c <- "addr" }()

ev := <-slotCh
ev := <-calCh
assert.T(t, ev.IsSet())
assert.Equal(t, "", ev.Body)

@@ -39,11 +39,11 @@ func TestMemberSimple(t *testing.T) {

cs := []int{}

ev = <-infoCh
ev = <-nodeCh
assert.T(t, ev.IsDel())
cs = append(cs, int(ev.Path[len(ev.Path)-1]))

ev = <-infoCh
ev = <-nodeCh
assert.T(t, ev.IsDel())
cs = append(cs, int(ev.Path[len(ev.Path)-1]))

6 changes: 3 additions & 3 deletions src/pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ const (
)


var slots = store.MustCompileGlob("/doozer/slot/*")
var calGlob = store.MustCompileGlob("/ctl/cal/*")


type T proto.Request
@@ -147,7 +147,7 @@ func (s *Server) Serve(l net.Listener, cal chan bool) {
func (sv *Server) cals() []string {
cals := make([]string, 0)
_, g := sv.St.Snap()
store.Walk(g, slots, func(_, body string, _ int64) bool {
store.Walk(g, calGlob, func(_, body string, _ int64) bool {
if len(body) > 0 {
cals = append(cals, body)
}
@@ -359,7 +359,7 @@ func (c *conn) redirect(t *T) {
}

cal := cals[rand.Intn(len(cals))]
parts, rev := c.s.St.Get("/doozer/info/" + cal + "/public-addr")
parts, rev := c.s.St.Get("/ctl/node/" + cal + "/public-addr")
if rev == store.Dir && rev == store.Missing {
c.respond(t, Valid|Done, nil, readonly)
return
12 changes: 6 additions & 6 deletions test/fail_test.go
Original file line number Diff line number Diff line change
@@ -55,13 +55,13 @@ func TestDoozerNodeFailure(t *testing.T) {
cl, err := client.Dial("127.0.0.1:8046")
assert.Equal(t, nil, err)

ch, err := cl.Watch("/doozer/slot/*")
ch, err := cl.Watch("/ctl/cal/*")
assert.Equal(t, nil, err)

cl.Set("/doozer/slot/2", "", "")
cl.Set("/ctl/cal/2", "", "")
<-ch
<-ch
cl.Set("/doozer/slot/3", "", "")
cl.Set("/ctl/cal/3", "", "")
<-ch
<-ch

@@ -98,13 +98,13 @@ func TestDoozerFiveNodeFailure(t *testing.T) {
cl, err := client.Dial("127.0.0.1:8040")
assert.Equal(t, nil, err)

ch, err := cl.Watch("/doozer/slot/*")
ch, err := cl.Watch("/ctl/cal/*")
assert.Equal(t, nil, err)

cl.Set("/doozer/slot/2", "", "")
cl.Set("/ctl/cal/2", "", "")
<-ch
<-ch
cl.Set("/doozer/slot/3", "", "")
cl.Set("/ctl/cal/3", "", "")
<-ch
<-ch

0 comments on commit 1e46868

Please sign in to comment.