Skip to content

Commit

Permalink
Zookeeper lock (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZLBer authored Jul 13, 2021
1 parent 34e8ff0 commit d6289c0
Show file tree
Hide file tree
Showing 13 changed files with 750 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
"mosn.io/layotto/components/lock"
lock_etcd "mosn.io/layotto/components/lock/etcd"
lock_redis "mosn.io/layotto/components/lock/redis"
lock_zookeeper "mosn.io/layotto/components/lock/zookeeper"
runtime_lock "mosn.io/layotto/pkg/runtime/lock"

// Actuator
Expand Down Expand Up @@ -245,6 +246,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_lock.NewFactory("redis", func() lock.LockStore {
return lock_redis.NewStandaloneRedisLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("zookeeper", func() lock.LockStore {
return lock_zookeeper.NewZookeeperLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("etcd", func() lock.LockStore {
return lock_etcd.NewEtcdLock(log.DefaultLogger)
}),
Expand Down
2 changes: 2 additions & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
github.com/alicebob/miniredis/v2 v2.13.3
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/go-redis/redis/v8 v8.8.0
github.com/go-zookeeper/zk v1.0.2
github.com/golang/mock v1.4.4
github.com/google/uuid v1.1.2
github.com/stretchr/testify v1.7.0
github.com/valyala/fasthttp v1.26.0
Expand Down
2 changes: 2 additions & 0 deletions components/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GO
github.com/go-redis/redis/v8 v8.8.0 h1:fDZP58UN/1RD3DjtTXP/fFZ04TFohSYhjZDkcDe2dnw=
github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand Down
194 changes: 194 additions & 0 deletions components/lock/zookeeper/zookeeper_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package zookeeper

import (
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"mosn.io/layotto/components/lock"
"mosn.io/pkg/log"
"mosn.io/pkg/utils"
"strconv"
"strings"
"time"
)

const (
host = "zookeeperHosts"
password = "zookeeperPassword"
sessionTimeout = "sessionTimeout"
logInfo = "logInfo"
defaultSessionTimeout = 5 * time.Second
)

type ConnectionFactory interface {
NewConnection(expire time.Duration, meta metadata) (ZKConnection, error)
}

type ConnectionFactoryImpl struct {
}

func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) {
conn, _, err := zk.Connect(meta.hosts, expire, zk.WithLogInfo(meta.logInfo))
if err != nil {
return nil, err
}
return conn, nil
}

type ZKConnection interface {
Get(path string) ([]byte, *zk.Stat, error)
Delete(path string, version int32) error
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Close()
}

type ZookeeperLock struct {
//trylock reestablish connection every time
factory ConnectionFactory
//unlock reuse this conneciton
unlockConn ZKConnection
metadata metadata
logger log.ErrorLogger
}

func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock {
lock := &ZookeeperLock{
logger: logger,
}
return lock
}

func (p *ZookeeperLock) Init(metadata lock.Metadata) error {

m, err := parseZookeeperMetadata(metadata)
if err != nil {
return err
}

p.metadata = m
p.factory = &ConnectionFactoryImpl{}

//init unlock connection
zkConn, err := p.factory.NewConnection(p.metadata.sessionTimeout, p.metadata)
if err != nil {
return err
}
p.unlockConn = zkConn
return nil
}

func (p *ZookeeperLock) Features() []lock.Feature {
return nil
}
func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {

conn, err := p.factory.NewConnection(time.Duration(req.Expire)*time.Second, p.metadata)
if err != nil {
return &lock.TryLockResponse{}, err
}
//1.create zk ephemeral node
_, err = conn.Create("/"+req.ResourceId, []byte(req.LockOwner), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))

//2.1 create node fail ,indicates lock fail
if err != nil {
defer conn.Close()
//the node exists,lock fail
if err == zk.ErrNodeExists {
return &lock.TryLockResponse{
Success: false,
}, nil
}
//other err
return nil, err
}

//2.2 create node success, asyn to make sure zkclient alive for need time
utils.GoWithRecover(func() {
//can also
//time.Sleep(time.Second * time.Duration(req.Expire))
timeAfterTrigger := time.After(time.Second * time.Duration(req.Expire))
<-timeAfterTrigger
// make sure close connecion
conn.Close()
}, nil)

return &lock.TryLockResponse{
Success: true,
}, nil

}
func (p *ZookeeperLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {

conn := p.unlockConn

path := "/" + req.ResourceId
owner, state, err := conn.Get(path)

if err != nil {
//node does not exist, indicates this lock has expired
if err == zk.ErrNoNode {
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
}
//other err
return nil, err
}
//node exist ,but owner not this, indicates this lock has occupied or wrong unlock
if string(owner) != req.LockOwner {
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
}
err = conn.Delete(path, state.Version)
//owner is this, but delete fail
if err != nil {
// delete no node , indicates this lock has expired
if err == zk.ErrNoNode {
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
// delete version error , indicates this lock has occupied by others
} else if err == zk.ErrBadVersion {
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
//other error
} else {
return nil, err
}
}
//delete success, unlock success
return &lock.UnlockResponse{Status: lock.SUCCESS}, nil
}

type metadata struct {
hosts []string
password string
sessionTimeout time.Duration
logInfo bool
}

func parseZookeeperMetadata(meta lock.Metadata) (metadata, error) {
m := metadata{}
if val, ok := meta.Properties[host]; ok && val != "" {
split := strings.Split(val, ";")
m.hosts = append(m.hosts, split...)
} else {
return m, errors.New("zookeeper store error: missing host address")
}

if val, ok := meta.Properties[password]; ok && val != "" {
m.password = val
}

m.sessionTimeout = defaultSessionTimeout
if val, ok := meta.Properties[sessionTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("zookeeper store error: can't parse sessionTimeout field: %s", err)
}
m.sessionTimeout = time.Duration(parsedVal) * time.Second
}

if val, ok := meta.Properties[logInfo]; ok && val != "" {
b, err := strconv.ParseBool(val)
if err != nil {
return metadata{}, err
}
m.logInfo = b
}
return m, nil
}
131 changes: 131 additions & 0 deletions components/lock/zookeeper/zookeeper_lock_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d6289c0

Please sign in to comment.