Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zookeeper lock #111

Merged
merged 15 commits into from
Jul 13, 2021
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
// Lock
"mosn.io/layotto/components/lock"
lock_redis "mosn.io/layotto/components/lock/redis"
lock_zookeeper "mosn.io/layotto/components/lock/zookeeper"
runtime_lock "mosn.io/layotto/pkg/runtime/lock"

// Actuator
Expand Down Expand Up @@ -241,6 +242,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_lock.NewFactory("redis", func() lock.LockStore {
return lock_redis.NewStandaloneRedisLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("zookeeper", func() lock.LockStore {
return lock_zookeeper.NewZookeeperLock(log.DefaultLogger)
}),
),
)
// 4. check if unhealthy
Expand Down
3 changes: 3 additions & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ require (
github.com/alicebob/miniredis/v2 v2.13.3
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/go-redis/redis/v8 v8.8.0
github.com/go-zookeeper/zk v1.0.2
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.2 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/uuid v1.1.1
github.com/stretchr/testify v1.7.0
github.com/valyala/fasthttp v1.26.0
Expand Down
2 changes: 2 additions & 0 deletions components/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GO
github.com/go-redis/redis/v8 v8.8.0 h1:fDZP58UN/1RD3DjtTXP/fFZ04TFohSYhjZDkcDe2dnw=
github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
Expand Down
191 changes: 191 additions & 0 deletions components/lock/zookeeper/zookeeper_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package zookeeper

import (
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"mosn.io/layotto/components/lock"
"mosn.io/pkg/log"
"mosn.io/pkg/utils"
"strconv"
"strings"
"time"
)

const (
host = "zookeeperHosts"
password = "zookeeperPassword"
sessionTimeout = "sessionTimeout"
defaultSessionTimeout = 5
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
)

type ConnectionFactory interface {
NewConnection(expire time.Duration, meta metadata) (ZKConnection, error)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}

type ConnectionFactoryImpl struct {
}

func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) {
conn, _, err := zk.Connect(meta.hosts, expire*time.Second, zk.WithLogger(defaultLogger{}))
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
return conn, nil
}

type ZKConnection interface {
Get(path string) ([]byte, *zk.Stat, error)
Delete(path string, version int32) error
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Close()
}

type ZookeeperLock struct {
//trylock reestablish connection every time
factory ConnectionFactory
//unlock reuse this conneciton
unlockConn ZKConnection
metadata metadata
logger log.ErrorLogger
}

func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock {
lock := &ZookeeperLock{
logger: logger,
}
return lock
}

type defaultLogger struct{}

//silent the default logger
func (defaultLogger) Printf(format string, a ...interface{}) {
ZLBer marked this conversation as resolved.
Show resolved Hide resolved

}

func (p *ZookeeperLock) Init(metadata lock.Metadata) error {

m, err := parseZookeeperMetadata(metadata)
if err != nil {
return err
}

p.metadata = m
p.factory = &ConnectionFactoryImpl{}

//init unlock connection
zkConn, err := p.factory.NewConnection(p.metadata.sessionTimeout*time.Second, p.metadata)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
p.unlockConn = zkConn
return nil
}

func (p *ZookeeperLock) Features() []lock.Feature {
return nil
}
func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {

conn, err := p.factory.NewConnection(time.Duration(req.Expire)*time.Second, p.metadata)
if err != nil {
return &lock.TryLockResponse{}, err
}
//1.create zk ephemeral node
_, err = conn.Create("/"+req.ResourceId, []byte(req.LockOwner), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))

//2.1 create node fail ,indicates lock fail
if err != nil {
defer conn.Close()
//the node exists,lock fail
if err == zk.ErrNodeExists {
return &lock.TryLockResponse{
Success: false,
}, nil
}
//other err
return &lock.TryLockResponse{}, err
}

//2.2 create node success, asyn to make sure zkclient alive for need time
utils.GoWithRecover(func() {
//can also
//time.Sleep(time.Second * time.Duration(req.Expire))
timeAfterTrigger := time.After(time.Second * time.Duration(req.Expire))
<-timeAfterTrigger
// make sure close connecion
conn.Close()
}, nil)

return &lock.TryLockResponse{
Success: true,
}, nil

}
func (p *ZookeeperLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {

conn := p.unlockConn

path := "/" + req.ResourceId
owner, state, err := conn.Get(path)

if err != nil {
//node does not exist, indicates this lock has expired
if err == zk.ErrNoNode {
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
}
//other err
return &lock.UnlockResponse{}, err
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}
//node exist ,but owner not this, indicates this lock has occupied or wrong unlock
if string(owner) != req.LockOwner {
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
}
err = conn.Delete(path, state.Version)
//owner is this, but delete fail
if err != nil {
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
// delete no node , indicates this lock has expired
if err == zk.ErrNoNode {
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
// delete version error , indicates this lock has occupied by others
} else if err == zk.ErrBadVersion {
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
//other error
} else {
return &lock.UnlockResponse{}, err
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}
}
//delete success, unlock success
return &lock.UnlockResponse{Status: lock.SUCCESS}, nil
}

type metadata struct {
hosts []string
password string
sessionTimeout time.Duration
}

func parseZookeeperMetadata(meta lock.Metadata) (metadata, error) {
m := metadata{}
if val, ok := meta.Properties[host]; ok && val != "" {
split := strings.Split(val, ";")
m.hosts = append(m.hosts, split...)
} else {
return m, errors.New("zookeeper store error: missing host address")
}

if val, ok := meta.Properties[password]; ok && val != "" {
m.password = val
}

m.sessionTimeout = defaultSessionTimeout
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
if val, ok := meta.Properties[sessionTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("zookeeper store error: can't parse sessionTimeout field: %s", err)
}
m.sessionTimeout = time.Duration(parsedVal)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}
return m, nil
}
131 changes: 131 additions & 0 deletions components/lock/zookeeper/zookeeper_lock_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading