Skip to content

Commit fff4d99

Browse files
authored
sequencer zookeeper (#156)
1 parent c978116 commit fff4d99

File tree

13 files changed

+537
-95
lines changed

13 files changed

+537
-95
lines changed

cmd/layotto/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ import (
8989
// Sequencer
9090
sequencer_etcd "mosn.io/layotto/components/sequencer/etcd"
9191
sequencer_redis "mosn.io/layotto/components/sequencer/redis"
92+
sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper"
9293
// Actuator
9394
_ "mosn.io/layotto/pkg/actuator"
9495
"mosn.io/layotto/pkg/actuator/health"
@@ -266,6 +267,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
266267
runtime_sequencer.NewFactory("redis", func() sequencer.Store {
267268
return sequencer_redis.NewStandaloneRedisSequencer(log.DefaultLogger)
268269
}),
270+
runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store {
271+
return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger)
272+
}),
269273
))
270274
// 4. check if unhealthy
271275
if err != nil {
+9-81
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,20 @@
11
package zookeeper
22

33
import (
4-
"errors"
5-
"fmt"
64
"github.com/go-zookeeper/zk"
75
"mosn.io/layotto/components/lock"
6+
"mosn.io/layotto/components/pkg/utils"
87
"mosn.io/pkg/log"
9-
"mosn.io/pkg/utils"
10-
"strconv"
11-
"strings"
8+
util "mosn.io/pkg/utils"
129
"time"
1310
)
1411

15-
const (
16-
host = "zookeeperHosts"
17-
password = "zookeeperPassword"
18-
sessionTimeout = "sessionTimeout"
19-
logInfo = "logInfo"
20-
defaultSessionTimeout = 5 * time.Second
21-
)
22-
23-
type ConnectionFactory interface {
24-
NewConnection(expire time.Duration, meta metadata) (ZKConnection, error)
25-
}
26-
27-
type ConnectionFactoryImpl struct {
28-
}
29-
30-
func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) {
31-
conn, _, err := zk.Connect(meta.hosts, expire, zk.WithLogInfo(meta.logInfo))
32-
if err != nil {
33-
return nil, err
34-
}
35-
return conn, nil
36-
}
37-
38-
type ZKConnection interface {
39-
Get(path string) ([]byte, *zk.Stat, error)
40-
Delete(path string, version int32) error
41-
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
42-
Close()
43-
}
44-
4512
type ZookeeperLock struct {
4613
//trylock reestablish connection every time
47-
factory ConnectionFactory
14+
factory utils.ConnectionFactory
4815
//unlock reuse this conneciton
49-
unlockConn ZKConnection
50-
metadata metadata
16+
unlockConn utils.ZKConnection
17+
metadata utils.ZookeeperMetadata
5118
logger log.ErrorLogger
5219
}
5320

@@ -60,16 +27,16 @@ func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock {
6027

6128
func (p *ZookeeperLock) Init(metadata lock.Metadata) error {
6229

63-
m, err := parseZookeeperMetadata(metadata)
30+
m, err := utils.ParseZookeeperMetadata(metadata.Properties)
6431
if err != nil {
6532
return err
6633
}
6734

6835
p.metadata = m
69-
p.factory = &ConnectionFactoryImpl{}
36+
p.factory = &utils.ConnectionFactoryImpl{}
7037

7138
//init unlock connection
72-
zkConn, err := p.factory.NewConnection(p.metadata.sessionTimeout, p.metadata)
39+
zkConn, err := p.factory.NewConnection(p.metadata.SessionTimeout, p.metadata)
7340
if err != nil {
7441
return err
7542
}
@@ -103,7 +70,7 @@ func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse
10370
}
10471

10572
//2.2 create node success, asyn to make sure zkclient alive for need time
106-
utils.GoWithRecover(func() {
73+
util.GoWithRecover(func() {
10774
//can also
10875
//time.Sleep(time.Second * time.Duration(req.Expire))
10976
timeAfterTrigger := time.After(time.Second * time.Duration(req.Expire))
@@ -153,42 +120,3 @@ func (p *ZookeeperLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, e
153120
//delete success, unlock success
154121
return &lock.UnlockResponse{Status: lock.SUCCESS}, nil
155122
}
156-
157-
type metadata struct {
158-
hosts []string
159-
password string
160-
sessionTimeout time.Duration
161-
logInfo bool
162-
}
163-
164-
func parseZookeeperMetadata(meta lock.Metadata) (metadata, error) {
165-
m := metadata{}
166-
if val, ok := meta.Properties[host]; ok && val != "" {
167-
split := strings.Split(val, ";")
168-
m.hosts = append(m.hosts, split...)
169-
} else {
170-
return m, errors.New("zookeeper store error: missing host address")
171-
}
172-
173-
if val, ok := meta.Properties[password]; ok && val != "" {
174-
m.password = val
175-
}
176-
177-
m.sessionTimeout = defaultSessionTimeout
178-
if val, ok := meta.Properties[sessionTimeout]; ok && val != "" {
179-
parsedVal, err := strconv.Atoi(val)
180-
if err != nil {
181-
return m, fmt.Errorf("zookeeper store error: can't parse sessionTimeout field: %s", err)
182-
}
183-
m.sessionTimeout = time.Duration(parsedVal) * time.Second
184-
}
185-
186-
if val, ok := meta.Properties[logInfo]; ok && val != "" {
187-
b, err := strconv.ParseBool(val)
188-
if err != nil {
189-
return metadata{}, err
190-
}
191-
m.logInfo = b
192-
}
193-
return m, nil
194-
}

components/lock/zookeeper/zookeeper_lock_test.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/golang/mock/gomock"
66
"github.com/stretchr/testify/assert"
77
"mosn.io/layotto/components/lock"
8+
"mosn.io/layotto/components/pkg/utils"
89
"mosn.io/pkg/log"
910
"testing"
1011
"time"
@@ -35,9 +36,9 @@ func TestZookeeperLock_ALock_AUnlock(t *testing.T) {
3536

3637
//mock
3738
ctrl := gomock.NewController(t)
38-
unlockConn := NewMockZKConnection(ctrl)
39-
lockConn := NewMockZKConnection(ctrl)
40-
factory := NewMockConnectionFactory(ctrl)
39+
unlockConn := utils.NewMockZKConnection(ctrl)
40+
lockConn := utils.NewMockZKConnection(ctrl)
41+
factory := utils.NewMockConnectionFactory(ctrl)
4142
path := "/" + resouseId
4243
factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(2)
4344

@@ -74,9 +75,9 @@ func TestZookeeperLock_ALock_BUnlock(t *testing.T) {
7475

7576
//mock
7677
ctrl := gomock.NewController(t)
77-
unlockConn := NewMockZKConnection(ctrl)
78-
lockConn := NewMockZKConnection(ctrl)
79-
factory := NewMockConnectionFactory(ctrl)
78+
unlockConn := utils.NewMockZKConnection(ctrl)
79+
lockConn := utils.NewMockZKConnection(ctrl)
80+
factory := utils.NewMockConnectionFactory(ctrl)
8081
path := "/" + resouseId
8182
factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(2)
8283

@@ -113,9 +114,9 @@ func TestZookeeperLock_ALock_BLock_AUnlock_BLock_BUnlock(t *testing.T) {
113114

114115
//mock
115116
ctrl := gomock.NewController(t)
116-
unlockConn := NewMockZKConnection(ctrl)
117-
lockConn := NewMockZKConnection(ctrl)
118-
factory := NewMockConnectionFactory(ctrl)
117+
unlockConn := utils.NewMockZKConnection(ctrl)
118+
lockConn := utils.NewMockZKConnection(ctrl)
119+
factory := utils.NewMockConnectionFactory(ctrl)
119120
path := "/" + resouseId
120121

121122
factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(3)

components/pkg/utils/zookeeper.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package utils
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/go-zookeeper/zk"
7+
"strconv"
8+
"strings"
9+
"time"
10+
)
11+
12+
const (
13+
zkHost = "zookeeperHosts"
14+
zkPassword = "zookeeperPassword"
15+
sessionTimeout = "SessionTimeout"
16+
logInfo = "LogInfo"
17+
defaultSessionTimeout = 5 * time.Second
18+
)
19+
20+
type ConnectionFactory interface {
21+
NewConnection(expire time.Duration, meta ZookeeperMetadata) (ZKConnection, error)
22+
}
23+
24+
type ConnectionFactoryImpl struct {
25+
}
26+
27+
func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta ZookeeperMetadata) (ZKConnection, error) {
28+
29+
if expire == 0 {
30+
expire = meta.SessionTimeout
31+
}
32+
33+
conn, _, err := zk.Connect(meta.Hosts, expire, zk.WithLogInfo(meta.LogInfo))
34+
if err != nil {
35+
return nil, err
36+
}
37+
return conn, nil
38+
}
39+
40+
type ZKConnection interface {
41+
Get(path string) ([]byte, *zk.Stat, error)
42+
Set(path string, data []byte, version int32) (*zk.Stat, error)
43+
Delete(path string, version int32) error
44+
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
45+
Close()
46+
}
47+
48+
type ZookeeperMetadata struct {
49+
Hosts []string
50+
Password string
51+
SessionTimeout time.Duration
52+
LogInfo bool
53+
}
54+
55+
func ParseZookeeperMetadata(properties map[string]string) (ZookeeperMetadata, error) {
56+
m := ZookeeperMetadata{}
57+
if val, ok := properties[zkHost]; ok && val != "" {
58+
split := strings.Split(val, ";")
59+
m.Hosts = append(m.Hosts, split...)
60+
} else {
61+
return m, errors.New("zookeeper store error: missing zkHost address")
62+
}
63+
64+
if val, ok := properties[zkPassword]; ok && val != "" {
65+
m.Password = val
66+
}
67+
68+
m.SessionTimeout = defaultSessionTimeout
69+
if val, ok := properties[sessionTimeout]; ok && val != "" {
70+
parsedVal, err := strconv.Atoi(val)
71+
if err != nil {
72+
return m, fmt.Errorf("zookeeper store error: can't parse SessionTimeout field: %s", err)
73+
}
74+
m.SessionTimeout = time.Duration(parsedVal) * time.Second
75+
}
76+
77+
if val, ok := properties[logInfo]; ok && val != "" {
78+
b, err := strconv.ParseBool(val)
79+
if err != nil {
80+
return ZookeeperMetadata{}, err
81+
}
82+
m.LogInfo = b
83+
}
84+
return m, nil
85+
}

components/lock/zookeeper/zookeeper_lock_mock.go renamed to components/pkg/utils/zookeeper_mock.go

+19-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)