Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zookeeper lock #111

Merged
merged 15 commits into from
Jul 13, 2021
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
// Lock
"mosn.io/layotto/components/lock"
lock_redis "mosn.io/layotto/components/lock/redis"
lock_zookeeper "mosn.io/layotto/components/lock/zookeeper"
runtime_lock "mosn.io/layotto/pkg/runtime/lock"

// Actuator
Expand Down Expand Up @@ -241,6 +242,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_lock.NewFactory("redis", func() lock.LockStore {
return lock_redis.NewStandaloneRedisLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("zookeeper", func() lock.LockStore {
return lock_zookeeper.NewZookeeperLock(log.DefaultLogger)
}),
),
)
// 4. check if unhealthy
Expand Down
3 changes: 3 additions & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ require (
github.com/alicebob/miniredis/v2 v2.13.3
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/go-redis/redis/v8 v8.8.0
github.com/go-zookeeper/zk v1.0.2
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.2 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/uuid v1.1.1
github.com/stretchr/testify v1.7.0
github.com/valyala/fasthttp v1.26.0
Expand Down
2 changes: 2 additions & 0 deletions components/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GO
github.com/go-redis/redis/v8 v8.8.0 h1:fDZP58UN/1RD3DjtTXP/fFZ04TFohSYhjZDkcDe2dnw=
github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
Expand Down
194 changes: 194 additions & 0 deletions components/lock/zookeeper/zookeeper_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package zookeeper

import (
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"mosn.io/layotto/components/lock"
"mosn.io/pkg/log"
"strconv"
"strings"
"time"
)

const (
host = "zookeeperHosts"
password = "zookeeperPassword"
sessionTimeout = "sessionTimeout"
defaultSessionTimeout = 5
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
)

type ZKConnection interface {
Get(path string) ([]byte, *zk.Stat, error)
Delete(path string, version int32) error
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Close()
NewConnection(expire time.Duration, meta metadata) (ZKConnection, error)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}

type ZkConnectionImpl struct {
connection *zk.Conn
}

func (z *ZkConnectionImpl) Get(path string) ([]byte, *zk.Stat, error) {
return z.connection.Get(path)
}

func (z *ZkConnectionImpl) Delete(path string, version int32) error {
return z.connection.Delete(path, version)
}
func (z *ZkConnectionImpl) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) {
return z.connection.Create(path, data, flags, acl)

}
func (z *ZkConnectionImpl) Close() {
z.connection.Close()
}
func (z *ZkConnectionImpl) NewConnection(expire time.Duration, meta metadata) (ZKConnection, error) {

conn, _, err := zk.Connect(meta.hosts, expire*time.Second, zk.WithLogger(defaultLogger{}))
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
return &ZkConnectionImpl{connection: conn}, nil
}

type ZookeeperLock struct {
conn ZKConnection
metadata metadata
logger log.ErrorLogger
}

func NewZookeeperLock(logger log.ErrorLogger) *ZookeeperLock {
lock := &ZookeeperLock{
logger: logger,
}
return lock
}

type defaultLogger struct{}

//silent the default logger
func (defaultLogger) Printf(format string, a ...interface{}) {
ZLBer marked this conversation as resolved.
Show resolved Hide resolved

}

func (p *ZookeeperLock) newConnection(expire time.Duration) (ZKConnection, error) {
//make sure a lock and a connection
connection, err := p.conn.NewConnection(expire, p.metadata)

if err != nil {
return nil, err
}
return connection, nil
}

func (p *ZookeeperLock) Init(metadata lock.Metadata) error {

m, err := parseZookeeperMetadata(metadata)
if err != nil {
return err
}
//nil to this
p.conn = &ZkConnectionImpl{
connection: nil,
}
p.metadata = m

return nil
}

func (p *ZookeeperLock) Features() []lock.Feature {
return nil
}
func (p *ZookeeperLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {

conn, err := p.newConnection(time.Duration(req.Expire))
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return &lock.TryLockResponse{}, err
}
//1.create zk ephemeral node
_, err = conn.Create("/"+req.ResourceId, []byte(req.LockOwner), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))

//2.1 create node fail ,indicates lock fail
if err != nil {
//make sure close connetion in time
conn.Close()
return &lock.TryLockResponse{
Success: false,
}, nil
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}

//2.2 create node success, asyn to make sure zkclient alive for need time
go func() {
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
//can also
//time.Sleep(time.Second * time.Duration(req.Expire))
timeAfterTrigger := time.After(time.Second * time.Duration(req.Expire))
<-timeAfterTrigger
// make sure close connecion
conn.Close()
}()

return &lock.TryLockResponse{
Success: true,
}, nil

}
func (p *ZookeeperLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {

conn, err := p.newConnection(p.metadata.sessionTimeout)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
defer conn.Close()

if err != nil {
return &lock.UnlockResponse{Status: lock.INTERNAL_ERROR}, err
}

path := "/" + req.ResourceId
owner, state, err := conn.Get(path)
//1. node does not exist, indicates this lock has expired or wrong unlock
if err != nil {
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
}
//2. node exist ,but owner not this, indicates this lock has expired or wrong unlock
if string(owner) != req.LockOwner {
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
}
err = conn.Delete(path, state.Version)
//3.owner is this, but delete fail, indicates this lock has expired
//this step contains problem, the lock maybe also UNEXIST
if err != nil {
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
}
//4.delete success, unlock succes
return &lock.UnlockResponse{Status: lock.SUCCESS}, nil
}

type metadata struct {
hosts []string
password string
sessionTimeout time.Duration
}

func parseZookeeperMetadata(meta lock.Metadata) (metadata, error) {
m := metadata{}
if val, ok := meta.Properties[host]; ok && val != "" {
split := strings.Split(val, ";")
m.hosts = append(m.hosts, split...)
} else {
return m, errors.New("zookeeper store error: missing host address")
}

if val, ok := meta.Properties[password]; ok && val != "" {
m.password = val
}

m.sessionTimeout = defaultSessionTimeout
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
if val, ok := meta.Properties[sessionTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("zookeeper store error: can't parse sessionTimeout field: %s", err)
}
m.sessionTimeout = time.Duration(parsedVal)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}
return m, nil
}
108 changes: 108 additions & 0 deletions components/lock/zookeeper/zookeeper_lock_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading