diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index 9915c102f7..6a9d21dfb0 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -81,6 +81,7 @@ import ( "mosn.io/layotto/components/lock" lock_etcd "mosn.io/layotto/components/lock/etcd" lock_redis "mosn.io/layotto/components/lock/redis" + lock_zookeeper "mosn.io/layotto/components/lock/zookeeper" runtime_lock "mosn.io/layotto/pkg/runtime/lock" // Actuator @@ -245,6 +246,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp runtime_lock.NewFactory("redis", func() lock.LockStore { return lock_redis.NewStandaloneRedisLock(log.DefaultLogger) }), + runtime_lock.NewFactory("zookeeper", func() lock.LockStore { + return lock_zookeeper.NewZookeeperLock(log.DefaultLogger) + }), runtime_lock.NewFactory("etcd", func() lock.LockStore { return lock_etcd.NewEtcdLock(log.DefaultLogger) }), diff --git a/components/go.mod b/components/go.mod index 8dd9ea4944..8bcc9408cb 100644 --- a/components/go.mod +++ b/components/go.mod @@ -6,6 +6,8 @@ require ( github.com/alicebob/miniredis/v2 v2.13.3 github.com/apache/dubbo-go-hessian2 v1.7.0 github.com/go-redis/redis/v8 v8.8.0 + github.com/go-zookeeper/zk v1.0.2 + github.com/golang/mock v1.4.4 github.com/google/uuid v1.1.2 github.com/stretchr/testify v1.7.0 github.com/valyala/fasthttp v1.26.0 diff --git a/components/go.sum b/components/go.sum index e6176cdc39..1b88c21441 100644 --- a/components/go.sum +++ b/components/go.sum @@ -155,6 +155,8 @@ github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GO github.com/go-redis/redis/v8 v8.8.0 h1:fDZP58UN/1RD3DjtTXP/fFZ04TFohSYhjZDkcDe2dnw= github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM= +github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= diff --git a/components/lock/zookeeper/zookeeper_lock.go b/components/lock/zookeeper/zookeeper_lock.go new file mode 100644 index 0000000000..284f267380 --- /dev/null +++ b/components/lock/zookeeper/zookeeper_lock.go @@ -0,0 +1,194 @@ +package zookeeper + +import ( + "errors" + "fmt" + "github.com/go-zookeeper/zk" + "mosn.io/layotto/components/lock" + "mosn.io/pkg/log" + "mosn.io/pkg/utils" + "strconv" + "strings" + "time" +) + +const ( + host = "zookeeperHosts" + password = "zookeeperPassword" + sessionTimeout = "sessionTimeout" + logInfo = "logInfo" + defaultSessionTimeout = 5 * time.Second +) + +type ConnectionFactory interface { + NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) +} + +type ConnectionFactoryImpl struct { +} + +func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) { + conn, _, err := zk.Connect(meta.hosts, expire, zk.WithLogInfo(meta.logInfo)) + if err != nil { + return nil, err + } + return conn, nil +} + +type ZKConnection interface { + Get(path string) ([]byte, *zk.Stat, error) + Delete(path string, version int32) error + Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) + Close() +} + +type ZookeeperLock struct { + //trylock reestablish connection every time + factory ConnectionFactory + //unlock reuse this conneciton + unlockConn ZKConnection + metadata metadata + logger log.ErrorLogger +} + +func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock { + lock := &ZookeeperLock{ + logger: logger, + } + return lock +} + +func (p *ZookeeperLock) Init(metadata lock.Metadata) error { + + m, err := parseZookeeperMetadata(metadata) + if err != nil { + return err + } + + p.metadata = m + p.factory = &ConnectionFactoryImpl{} + + //init unlock connection + zkConn, err := p.factory.NewConnection(p.metadata.sessionTimeout, p.metadata) + if err != nil { + return err + } + p.unlockConn = zkConn + return nil +} + +func (p *ZookeeperLock) Features() []lock.Feature { + return nil +} +func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) { + + conn, err := p.factory.NewConnection(time.Duration(req.Expire)*time.Second, p.metadata) + if err != nil { + return &lock.TryLockResponse{}, err + } + //1.create zk ephemeral node + _, err = conn.Create("/"+req.ResourceId, []byte(req.LockOwner), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + + //2.1 create node fail ,indicates lock fail + if err != nil { + defer conn.Close() + //the node exists,lock fail + if err == zk.ErrNodeExists { + return &lock.TryLockResponse{ + Success: false, + }, nil + } + //other err + return nil, err + } + + //2.2 create node success, asyn to make sure zkclient alive for need time + utils.GoWithRecover(func() { + //can also + //time.Sleep(time.Second * time.Duration(req.Expire)) + timeAfterTrigger := time.After(time.Second * time.Duration(req.Expire)) + <-timeAfterTrigger + // make sure close connecion + conn.Close() + }, nil) + + return &lock.TryLockResponse{ + Success: true, + }, nil + +} +func (p *ZookeeperLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) { + + conn := p.unlockConn + + path := "/" + req.ResourceId + owner, state, err := conn.Get(path) + + if err != nil { + //node does not exist, indicates this lock has expired + if err == zk.ErrNoNode { + return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil + } + //other err + return nil, err + } + //node exist ,but owner not this, indicates this lock has occupied or wrong unlock + if string(owner) != req.LockOwner { + return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil + } + err = conn.Delete(path, state.Version) + //owner is this, but delete fail + if err != nil { + // delete no node , indicates this lock has expired + if err == zk.ErrNoNode { + return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil + // delete version error , indicates this lock has occupied by others + } else if err == zk.ErrBadVersion { + return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil + //other error + } else { + return nil, err + } + } + //delete success, unlock success + return &lock.UnlockResponse{Status: lock.SUCCESS}, nil +} + +type metadata struct { + hosts []string + password string + sessionTimeout time.Duration + logInfo bool +} + +func parseZookeeperMetadata(meta lock.Metadata) (metadata, error) { + m := metadata{} + if val, ok := meta.Properties[host]; ok && val != "" { + split := strings.Split(val, ";") + m.hosts = append(m.hosts, split...) + } else { + return m, errors.New("zookeeper store error: missing host address") + } + + if val, ok := meta.Properties[password]; ok && val != "" { + m.password = val + } + + m.sessionTimeout = defaultSessionTimeout + if val, ok := meta.Properties[sessionTimeout]; ok && val != "" { + parsedVal, err := strconv.Atoi(val) + if err != nil { + return m, fmt.Errorf("zookeeper store error: can't parse sessionTimeout field: %s", err) + } + m.sessionTimeout = time.Duration(parsedVal) * time.Second + } + + if val, ok := meta.Properties[logInfo]; ok && val != "" { + b, err := strconv.ParseBool(val) + if err != nil { + return metadata{}, err + } + m.logInfo = b + } + return m, nil +} diff --git a/components/lock/zookeeper/zookeeper_lock_mock.go b/components/lock/zookeeper/zookeeper_lock_mock.go new file mode 100644 index 0000000000..3e1f1b9c5b --- /dev/null +++ b/components/lock/zookeeper/zookeeper_lock_mock.go @@ -0,0 +1,131 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: zookeeper_lock.go + +// Package zookeeper is a generated GoMock package. +package zookeeper + +import ( + reflect "reflect" + time "time" + + zk "github.com/go-zookeeper/zk" + gomock "github.com/golang/mock/gomock" +) + +// MockConnectionFactory is a mock of ConnectionFactory interface. +type MockConnectionFactory struct { + ctrl *gomock.Controller + recorder *MockConnectionFactoryMockRecorder +} + +// MockConnectionFactoryMockRecorder is the mock recorder for MockConnectionFactory. +type MockConnectionFactoryMockRecorder struct { + mock *MockConnectionFactory +} + +// NewMockConnectionFactory creates a new mock instance. +func NewMockConnectionFactory(ctrl *gomock.Controller) *MockConnectionFactory { + mock := &MockConnectionFactory{ctrl: ctrl} + mock.recorder = &MockConnectionFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockConnectionFactory) EXPECT() *MockConnectionFactoryMockRecorder { + return m.recorder +} + +// NewConnection mocks base method. +func (m *MockConnectionFactory) NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewConnection", expire, meta) + ret0, _ := ret[0].(ZKConnection) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewConnection indicates an expected call of NewConnection. +func (mr *MockConnectionFactoryMockRecorder) NewConnection(expire, meta interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewConnection", reflect.TypeOf((*MockConnectionFactory)(nil).NewConnection), expire, meta) +} + +// MockZKConnection is a mock of ZKConnection interface. +type MockZKConnection struct { + ctrl *gomock.Controller + recorder *MockZKConnectionMockRecorder +} + +// MockZKConnectionMockRecorder is the mock recorder for MockZKConnection. +type MockZKConnectionMockRecorder struct { + mock *MockZKConnection +} + +// NewMockZKConnection creates a new mock instance. +func NewMockZKConnection(ctrl *gomock.Controller) *MockZKConnection { + mock := &MockZKConnection{ctrl: ctrl} + mock.recorder = &MockZKConnectionMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockZKConnection) EXPECT() *MockZKConnectionMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockZKConnection) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockZKConnectionMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockZKConnection)(nil).Close)) +} + +// Create mocks base method. +func (m *MockZKConnection) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", path, data, flags, acl) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Create indicates an expected call of Create. +func (mr *MockZKConnectionMockRecorder) Create(path, data, flags, acl interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockZKConnection)(nil).Create), path, data, flags, acl) +} + +// Delete mocks base method. +func (m *MockZKConnection) Delete(path string, version int32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", path, version) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockZKConnectionMockRecorder) Delete(path, version interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockZKConnection)(nil).Delete), path, version) +} + +// Get mocks base method. +func (m *MockZKConnection) Get(path string) ([]byte, *zk.Stat, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", path) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(*zk.Stat) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// Get indicates an expected call of Get. +func (mr *MockZKConnectionMockRecorder) Get(path interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockZKConnection)(nil).Get), path) +} diff --git a/components/lock/zookeeper/zookeeper_lock_test.go b/components/lock/zookeeper/zookeeper_lock_test.go new file mode 100644 index 0000000000..0fc31da119 --- /dev/null +++ b/components/lock/zookeeper/zookeeper_lock_test.go @@ -0,0 +1,176 @@ +package zookeeper + +import ( + "github.com/go-zookeeper/zk" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "mosn.io/layotto/components/lock" + "mosn.io/pkg/log" + "testing" + "time" +) + +const resouseId = "resoure_1" +const lockOwerA = "p1" +const lockOwerB = "p2" +const expireTime = 5 + +var cfg = lock.Metadata{ + Properties: make(map[string]string), +} + +func TestMain(m *testing.M) { + + cfg.Properties["zookeeperHosts"] = "127.0.0.1;127.0.0.1" + cfg.Properties["zookeeperPassword"] = "" + m.Run() + +} + +// A lock ,A unlock +func TestZookeeperLock_ALock_AUnlock(t *testing.T) { + + comp := NewZookeeperLock(log.DefaultLogger) + comp.Init(cfg) + + //mock + ctrl := gomock.NewController(t) + unlockConn := NewMockZKConnection(ctrl) + lockConn := NewMockZKConnection(ctrl) + factory := NewMockConnectionFactory(ctrl) + path := "/" + resouseId + factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(2) + + lockConn.EXPECT().Create(path, []byte(lockOwerA), int32(zk.FlagEphemeral), zk.WorldACL(zk.PermAll)).Return("", nil).Times(1) + lockConn.EXPECT().Close().Return().Times(1) + + unlockConn.EXPECT().Get(path).Return([]byte(lockOwerA), &zk.Stat{Version: 123}, nil).Times(1) + unlockConn.EXPECT().Delete(path, int32(123)).Return(nil).Times(1) + + comp.unlockConn = unlockConn + comp.factory = factory + + tryLock, err := comp.TryLock(&lock.TryLockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerA, + Expire: expireTime, + }) + assert.NoError(t, err) + assert.Equal(t, tryLock.Success, true) + unlock, _ := comp.Unlock(&lock.UnlockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerA, + }) + assert.NoError(t, err) + assert.Equal(t, unlock.Status, lock.SUCCESS) + +} + +// A lock ,B unlock +func TestZookeeperLock_ALock_BUnlock(t *testing.T) { + + comp := NewZookeeperLock(log.DefaultLogger) + comp.Init(cfg) + + //mock + ctrl := gomock.NewController(t) + unlockConn := NewMockZKConnection(ctrl) + lockConn := NewMockZKConnection(ctrl) + factory := NewMockConnectionFactory(ctrl) + path := "/" + resouseId + factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(2) + + lockConn.EXPECT().Create(path, []byte(lockOwerA), int32(zk.FlagEphemeral), zk.WorldACL(zk.PermAll)).Return("", nil).Times(1) + lockConn.EXPECT().Close().Return().Times(1) + + unlockConn.EXPECT().Get(path).Return([]byte(lockOwerA), &zk.Stat{Version: 123}, nil).Times(1) + unlockConn.EXPECT().Delete(path, int32(123)).Return(nil).Times(1) + + comp.unlockConn = unlockConn + comp.factory = factory + + tryLock, err := comp.TryLock(&lock.TryLockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerA, + Expire: expireTime, + }) + assert.NoError(t, err) + assert.Equal(t, tryLock.Success, true) + unlock, err := comp.Unlock(&lock.UnlockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerB, + }) + assert.NoError(t, err) + assert.Equal(t, unlock.Status, lock.LOCK_BELONG_TO_OTHERS) + +} + +// A lock , B lock ,A unlock ,B lock,B unlock +func TestZookeeperLock_ALock_BLock_AUnlock_BLock_BUnlock(t *testing.T) { + + comp := NewZookeeperLock(log.DefaultLogger) + comp.Init(cfg) + + //mock + ctrl := gomock.NewController(t) + unlockConn := NewMockZKConnection(ctrl) + lockConn := NewMockZKConnection(ctrl) + factory := NewMockConnectionFactory(ctrl) + path := "/" + resouseId + + factory.EXPECT().NewConnection(time.Duration(expireTime)*time.Second, comp.metadata).Return(lockConn, nil).Times(3) + + lockConn.EXPECT().Create(path, []byte(lockOwerA), int32(zk.FlagEphemeral), zk.WorldACL(zk.PermAll)).Return("", nil).Times(1) + lockConn.EXPECT().Create(path, []byte(lockOwerB), int32(zk.FlagEphemeral), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNodeExists).Times(1) + lockConn.EXPECT().Create(path, []byte(lockOwerB), int32(zk.FlagEphemeral), zk.WorldACL(zk.PermAll)).Return("", nil).Times(1) + lockConn.EXPECT().Close().Return().Times(5) + + unlockConn.EXPECT().Get(path).Return([]byte(lockOwerA), &zk.Stat{Version: 123}, nil).Times(1) + unlockConn.EXPECT().Get(path).Return([]byte(lockOwerB), &zk.Stat{Version: 124}, nil).Times(1) + unlockConn.EXPECT().Delete(path, int32(123)).Return(nil).Times(2) + unlockConn.EXPECT().Delete(path, int32(124)).Return(nil).Times(2) + + comp.unlockConn = unlockConn + comp.factory = factory + + //A lock + tryLock, err := comp.TryLock(&lock.TryLockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerA, + Expire: expireTime, + }) + assert.NoError(t, err) + assert.Equal(t, true, tryLock.Success) + //B lock + tryLock, err = comp.TryLock(&lock.TryLockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerB, + Expire: expireTime, + }) + assert.NoError(t, err) + assert.Equal(t, false, tryLock.Success) + //A unlock + unlock, _ := comp.Unlock(&lock.UnlockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerA, + }) + assert.NoError(t, err) + assert.Equal(t, lock.SUCCESS, unlock.Status) + + //B lock + tryLock, err = comp.TryLock(&lock.TryLockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerB, + Expire: expireTime, + }) + assert.NoError(t, err) + assert.Equal(t, true, tryLock.Success) + + //B unlock + unlock, _ = comp.Unlock(&lock.UnlockRequest{ + ResourceId: resouseId, + LockOwner: lockOwerB, + }) + assert.NoError(t, err) + assert.Equal(t, lock.SUCCESS, unlock.Status) +} diff --git a/configs/config_lock_zookeeper.json b/configs/config_lock_zookeeper.json new file mode 100644 index 0000000000..5f77776fbd --- /dev/null +++ b/configs/config_lock_zookeeper.json @@ -0,0 +1,77 @@ +{ + "servers": [ + { + "default_log_path": "stdout", + "default_log_level": "DEBUG", + "routers": [ + { + "router_config_name": "actuator_dont_need_router" + } + ], + "listeners": [ + { + "name": "grpc", + "address": "127.0.0.1:34904", + "bind_port": true, + "filter_chains": [ + { + "filters": [ + { + "type": "grpc", + "config": { + "server_name": "runtime", + "grpc_config": { + "hellos": { + "helloworld": { + "hello": "greeting" + } + }, + "lock": { + "zookeeper": { + "metadata": { + "zookeeperHosts": "127.0.0.1", + "zookeeperPassword": "", + "sessionTimeout": "3", + "logInfo":"false" + } + } + }, + "app": { + "app_id": "app1", + "grpc_callback_port": 9999 + } + } + } + } + ] + } + ] + }, + { + "name": "actuator", + "address": "127.0.0.1:34999", + "bind_port": true, + "filter_chains": [ + { + "filters": [ + { + "type": "proxy", + "config": { + "downstream_protocol": "Http1", + "upstream_protocol": "Http1", + "router_config_name": "actuator_dont_need_router" + } + } + ] + } + ], + "stream_filters": [ + { + "type": "actuator_filter" + } + ] + } + ] + } + ] +} diff --git a/demo/lock/zookeeper/client.go b/demo/lock/zookeeper/client.go new file mode 100644 index 0000000000..6c36944dbd --- /dev/null +++ b/demo/lock/zookeeper/client.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "fmt" + "github.com/google/uuid" + "mosn.io/layotto/sdk/go-sdk/client" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" + "sync" +) + +const ( + resourceId = "resource_a" + storeName = "zookeeper" +) + +func main() { + cli, err := client.NewClient() + if err != nil { + panic(err) + } + defer cli.Close() + ctx := context.Background() + // 1. Client trylock + owner1 := uuid.New().String() + fmt.Println("client1 prepare to tryLock...") + resp, err := cli.TryLock(ctx, &runtimev1pb.TryLockRequest{ + StoreName: storeName, + ResourceId: resourceId, + LockOwner: owner1, + Expire: 1000, + }) + if err != nil { + panic(err) + } + if !resp.Success { + panic("TryLock failed") + } + fmt.Printf("client1 got lock!ResourceId is %s\n", resourceId) + var wg sync.WaitGroup + wg.Add(1) + // 2. Client2 tryLock fail + go func() { + fmt.Println("client2 prepare to tryLock...") + owner2 := uuid.New().String() + resp, err := cli.TryLock(ctx, &runtimev1pb.TryLockRequest{ + StoreName: storeName, + ResourceId: resourceId, + LockOwner: owner2, + Expire: 10, + }) + if err != nil { + panic(err) + } + if resp.Success { + panic("client2 got lock?!") + } + fmt.Printf("client2 failed to get lock.ResourceId is %s\n", resourceId) + wg.Done() + }() + wg.Wait() + // 3. client 1 unlock + fmt.Println("client1 prepare to unlock...") + unlockResp, err := cli.Unlock(ctx, &runtimev1pb.UnlockRequest{ + StoreName: storeName, + ResourceId: resourceId, + LockOwner: owner1, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + }) + if err != nil { + panic(err) + } + if unlockResp.Status != 0 { + panic("client1 failed to unlock!") + } + fmt.Println("client1 succeeded in unlocking") + // 4. client 2 get lock + wg.Add(1) + go func() { + fmt.Println("client2 prepare to tryLock...") + owner2 := uuid.New().String() + resp, err := cli.TryLock(ctx, &runtimev1pb.TryLockRequest{ + StoreName: storeName, + ResourceId: resourceId, + LockOwner: owner2, + Expire: 10, + }) + if err != nil { + panic(err) + } + if !resp.Success { + panic("client2 failed to get lock?!") + } + fmt.Printf("client2 got lock.ResourceId is %s\n", resourceId) + // 5. client2 unlock + unlockResp, err := cli.Unlock(ctx, &runtimev1pb.UnlockRequest{ + StoreName: storeName, + ResourceId: resourceId, + LockOwner: owner2, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + }) + if err != nil { + panic(err) + } + if unlockResp.Status != 0 { + panic("client2 failed to unlock!") + } + fmt.Println("client2 succeeded in unlocking") + wg.Done() + }() + wg.Wait() + fmt.Println("Demo success!") +} diff --git a/docs/_sidebar.md b/docs/_sidebar.md index 9d281d7830..3b9085776f 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -27,6 +27,7 @@ - [Redis](en/component_specs/pubsub/redis.md) - Distributed Lock - [Redis](en/component_specs/lock/redis.md) + - [Zookeeper](zh/component_specs/lock/zookeeper.md) - Configuration - [Etcd](en/component_specs/configuration/etcd.md) - Contributing diff --git a/docs/en/component_specs/lock/zookeeper.md b/docs/en/component_specs/lock/zookeeper.md new file mode 100644 index 0000000000..760485afa1 --- /dev/null +++ b/docs/en/component_specs/lock/zookeeper.md @@ -0,0 +1,20 @@ +# Zookeeper + +## metadata fields +Example: configs/config_lock_zookeeper.json + +| Field | Required | Description | +| --- | --- | --- | +| zookeeperHosts | Y | zookeeper server address, such as localhost:6380 | +| zookeeperPassword | Y | zookeeper Password | +| sessionTimeout | N | Session timeout,Unit second, same as zookeeper's sessionTimeout| +|logInfo|N|true if zookeeper information messages are logged; false if only zookeeper errors are logged| + +## How to start Redis +If you want to run the zookeeper demo, you need to start a Zookeeper server with Docker first. + +command: +```shell +docker pull zookeeper +docker run --privileged=true -d --name zookeeper --publish 2181:2181 -d zookeeper:latest +``` diff --git a/docs/zh/_sidebar.md b/docs/zh/_sidebar.md index 90984f8440..901221c897 100644 --- a/docs/zh/_sidebar.md +++ b/docs/zh/_sidebar.md @@ -27,6 +27,7 @@ - [Redis](zh/component_specs/pubsub/redis.md) - Distributed Lock - [Redis](zh/component_specs/lock/redis.md) + - [Zookeeper](zh/component_specs/lock/zookeeper.md) - Configuration - [Etcd](zh/component_specs/configuration/etcd.md) - 贡献指南 diff --git a/docs/zh/component_specs/lock/zookeeper.md b/docs/zh/component_specs/lock/zookeeper.md new file mode 100644 index 0000000000..8654764a41 --- /dev/null +++ b/docs/zh/component_specs/lock/zookeeper.md @@ -0,0 +1,23 @@ +package lock + +# Zookeeper + +## 配置项说明 + +示例:configs/config_lock_zookeeper.json + +| 字段 | 必填 | 说明 | +| --- | --- | --- | +| zookeeperHosts | Y | zookeeper服务器地址,支持配置zk集群, 例如: 127.0.0.1:2181;127.0.0.2:2181 | +| zookeeperPassword | Y | zookeeper password| +| sessionTimeout | N | 会话的超时时间,单位秒,同zookeeper的sessionTimeout| +|logInfo|N|true会打印zookeeper操作的所有信息,false只会打印zookeeper的错误信息| + +## 怎么启动Zookeeper + +如果想启动zookeeper的demo,需要先用Docker启动一个Zookeeper 命令: + +```shell +docker pull zookeeper +docker run --privileged=true -d --name zookeeper --publish 2181:2181 -d zookeeper:latest +``` diff --git a/go.sum b/go.sum index 2eda6641da..0f3f856d5d 100644 --- a/go.sum +++ b/go.sum @@ -435,6 +435,8 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM= +github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=