Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add etcd lock #128

Merged
merged 12 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import (

// Lock
"mosn.io/layotto/components/lock"
lock_etcd "mosn.io/layotto/components/lock/etcd"
lock_redis "mosn.io/layotto/components/lock/redis"
runtime_lock "mosn.io/layotto/pkg/runtime/lock"

Expand Down Expand Up @@ -241,6 +242,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_lock.NewFactory("redis", func() lock.LockStore {
return lock_redis.NewStandaloneRedisLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("etcd", func() lock.LockStore {
return lock_etcd.NewEtcdLock(log.DefaultLogger)
}),
),
)
// 4. check if unhealthy
Expand Down
14 changes: 12 additions & 2 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ go 1.14
require (
github.com/alicebob/miniredis/v2 v2.13.3
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/go-redis/redis/v8 v8.8.0
github.com/golang/mock v1.4.4
github.com/google/uuid v1.1.1
github.com/golang/mock v1.4.4 // indirect
github.com/google/uuid v1.1.2
github.com/stretchr/testify v1.7.0
github.com/valyala/fasthttp v1.26.0
github.com/zouyx/agollo/v4 v4.0.7
go.etcd.io/etcd v3.3.25+incompatible // indirect
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
go.uber.org/atomic v1.8.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.18.1 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
google.golang.org/genproto v0.0.0-20210707164411-8c882eb9abba // indirect
mosn.io/api v0.0.0-20210414070543-8a0686b03540
mosn.io/mosn v0.22.1-0.20210425073346-b6880db4669c
mosn.io/pkg v0.0.0-20210401090620-f0e0d1a3efce
Expand Down
212 changes: 212 additions & 0 deletions components/go.sum

Large diffs are not rendered by default.

250 changes: 250 additions & 0 deletions components/lock/etcd/etcd_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package etcd

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"strconv"
"strings"
"time"

"go.etcd.io/etcd/client/v3"

"mosn.io/layotto/components/lock"
"mosn.io/pkg/log"
)

const (
defaultDialTimeout = 5
defaultKeyPrefix = "/layotto/"

prefixKey = "keyPrefix"
usernameKey = "username"
passwordKey = "password"
dialTimeoutKey = "dialTimeout"
endpointsKey = "endpoints"
tlsCertPathKey = "tlsCert"
tlsCertKeyPathKey = "tlsCertKey"
tlsCaPathKey = "tlsCa"
)

type EtcdLock struct {
client *clientv3.Client
metadata metadata

features []lock.Feature
logger log.ErrorLogger

ctx context.Context
cancel context.CancelFunc
}

// NewEtcdLock returns a new etcd lock
func NewEtcdLock(logger log.ErrorLogger) *EtcdLock {
s := &EtcdLock{
features: make([]lock.Feature, 0),
logger: logger,
}

return s
}

func (e *EtcdLock) Init(metadata lock.Metadata) error {
// 1. parse config
m, err := parseEtcdMetadata(metadata)
if err != nil {
return err
}
e.metadata = m
// 2. construct client
if e.client, err = e.newClient(m); err != nil {
return err
}

e.ctx, e.cancel = context.WithCancel(context.Background())

return err
}

func (e *EtcdLock) Features() []lock.Feature {
return e.features
}

func (e *EtcdLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
var leaseId clientv3.LeaseID
//1.Create new lease
lease := clientv3.NewLease(e.client)
if leaseGrantResp, err := lease.Grant(e.ctx, int64(req.Expire)); err != nil {
return &lock.TryLockResponse{}, fmt.Errorf("[etcdLock]: Create new lease returned error: %s.ResourceId: %s", err, req.ResourceId)
} else {
leaseId = leaseGrantResp.ID
}

key := e.getKey(req.ResourceId)

//2.Create new KV
kv := clientv3.NewKV(e.client)
//3.Create txn
txn := kv.Txn(e.ctx)
txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).Then(
clientv3.OpPut(key, req.LockOwner, clientv3.WithLease(leaseId))).Else(
clientv3.OpGet(key))
//4.Commit and try get lock
txnResponse, err := txn.Commit()
if err != nil {
return &lock.TryLockResponse{}, fmt.Errorf("[etcdLock]: Creat lock returned error: %s.ResourceId: %s", err, req.ResourceId)
}

return &lock.TryLockResponse{
Success: txnResponse.Succeeded,
}, nil
}

func (e *EtcdLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
key := e.getKey(req.ResourceId)

kv := clientv3.NewKV(e.client)
txn := kv.Txn(e.ctx)
txn.If(clientv3.Compare(clientv3.Value(key), "=", req.LockOwner)).Then(
clientv3.OpDelete(key)).Else(
clientv3.OpGet(key))
txnResponse, err := txn.Commit()
if err != nil {
return newInternalErrorUnlockResponse(), fmt.Errorf("[etcdLock]: Unlock returned error: %s.ResourceId: %s", err, req.ResourceId)
}

if txnResponse.Succeeded {
return &lock.UnlockResponse{Status: lock.SUCCESS}, nil
} else {
resp := txnResponse.Responses[0].GetResponseRange()
if len(resp.Kvs) == 0 {
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
}

return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
}
}

func (e *EtcdLock) Close() error {
e.cancel()

return e.client.Close()
}

func (e *EtcdLock) newClient(meta metadata) (*clientv3.Client, error) {

config := clientv3.Config{
Endpoints: meta.endpoints,
DialTimeout: time.Second * time.Duration(meta.dialTimeout),
Username: meta.username,
Password: meta.password,
}

if meta.tlsCa != "" || meta.tlsCert != "" || meta.tlsCertKey != "" {
//enable tls
cert, err := tls.LoadX509KeyPair(meta.tlsCert, meta.tlsCertKey)
if err != nil {
return nil, fmt.Errorf("error reading tls certificate, cert: %s, certKey: %s, err: %s", meta.tlsCert, meta.tlsCertKey, err)
}

caData, err := ioutil.ReadFile(meta.tlsCa)
if err != nil {
return nil, fmt.Errorf("error reading tls ca %s, err: %s", meta.tlsCa, err)
}

pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caData)

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
}
config.TLS = tlsConfig
}

if client, err := clientv3.New(config); err != nil {
return nil, err
} else {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(meta.dialTimeout))
defer cancel()
//ping
_, err = client.Get(ctx, "ping")
if err != nil {
return nil, fmt.Errorf("etcd lock error: connect to etcd timeoout %s", meta.endpoints)
}

return client, nil
}
}

func (e *EtcdLock) getKey(resourceId string) string {
return fmt.Sprintf("%s%s", e.metadata.keyPrefix, resourceId)
}

func newInternalErrorUnlockResponse() *lock.UnlockResponse {
return &lock.UnlockResponse{
Status: lock.INTERNAL_ERROR,
}
}

func parseEtcdMetadata(meta lock.Metadata) (metadata, error) {
m := metadata{}
var err error

if val, ok := meta.Properties[endpointsKey]; ok && val != "" {
m.endpoints = strings.Split(val, ";")
} else {
return m, errors.New("etcd lock error: missing endpoints address")
}

if val, ok := meta.Properties[dialTimeoutKey]; ok && val != "" {
if m.dialTimeout, err = strconv.Atoi(val); err != nil {
return m, fmt.Errorf("etcd lock error: ncorrect dialTimeout value %s", val)
}
} else {
m.dialTimeout = defaultDialTimeout
}

if val, ok := meta.Properties[prefixKey]; ok && val != "" {
m.keyPrefix = val
} else {
m.keyPrefix = defaultKeyPrefix
}

if val, ok := meta.Properties[usernameKey]; ok && val != "" {
m.username = val
}

if val, ok := meta.Properties[passwordKey]; ok && val != "" {
m.password = val
}

if val, ok := meta.Properties[tlsCaPathKey]; ok && val != "" {
m.tlsCa = val
}

if val, ok := meta.Properties[tlsCertPathKey]; ok && val != "" {
m.tlsCert = val
}

if val, ok := meta.Properties[tlsCertKeyPathKey]; ok && val != "" {
m.tlsCertKey = val
}

return m, nil
}

type metadata struct {
keyPrefix string
dialTimeout int
endpoints []string
username string
password string

tlsCa string
tlsCert string
tlsCertKey string
}
Loading