Skip to content

Commit aa9bc97

Browse files
authored
Add etcd lock (#128)
1 parent 571dad3 commit aa9bc97

File tree

13 files changed

+1127
-8
lines changed

13 files changed

+1127
-8
lines changed

cmd/layotto/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ import (
7777

7878
// Lock
7979
"mosn.io/layotto/components/lock"
80+
lock_etcd "mosn.io/layotto/components/lock/etcd"
8081
lock_redis "mosn.io/layotto/components/lock/redis"
8182
runtime_lock "mosn.io/layotto/pkg/runtime/lock"
8283

@@ -241,6 +242,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
241242
runtime_lock.NewFactory("redis", func() lock.LockStore {
242243
return lock_redis.NewStandaloneRedisLock(log.DefaultLogger)
243244
}),
245+
runtime_lock.NewFactory("etcd", func() lock.LockStore {
246+
return lock_etcd.NewEtcdLock(log.DefaultLogger)
247+
}),
244248
),
245249
)
246250
// 4. check if unhealthy

components/go.mod

+12-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,22 @@ go 1.14
55
require (
66
github.com/alicebob/miniredis/v2 v2.13.3
77
github.com/apache/dubbo-go-hessian2 v1.7.0
8+
github.com/coreos/etcd v3.3.25+incompatible // indirect
89
github.com/go-redis/redis/v8 v8.8.0
9-
github.com/golang/mock v1.4.4
10-
github.com/google/uuid v1.1.1
10+
github.com/golang/mock v1.4.4 // indirect
11+
github.com/google/uuid v1.1.2
1112
github.com/stretchr/testify v1.7.0
1213
github.com/valyala/fasthttp v1.26.0
1314
github.com/zouyx/agollo/v4 v4.0.7
15+
go.etcd.io/etcd v3.3.25+incompatible // indirect
16+
go.etcd.io/etcd/client/v3 v3.5.0
17+
go.etcd.io/etcd/server/v3 v3.5.0
18+
go.uber.org/atomic v1.8.0 // indirect
19+
go.uber.org/multierr v1.7.0 // indirect
20+
go.uber.org/zap v1.18.1 // indirect
21+
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
22+
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
23+
google.golang.org/genproto v0.0.0-20210707164411-8c882eb9abba // indirect
1424
mosn.io/api v0.0.0-20210414070543-8a0686b03540
1525
mosn.io/mosn v0.22.1-0.20210425073346-b6880db4669c
1626
mosn.io/pkg v0.0.0-20210401090620-f0e0d1a3efce

components/go.sum

+212
Large diffs are not rendered by default.

components/lock/etcd/etcd_lock.go

+250
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package etcd
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"errors"
8+
"fmt"
9+
"io/ioutil"
10+
"strconv"
11+
"strings"
12+
"time"
13+
14+
"go.etcd.io/etcd/client/v3"
15+
16+
"mosn.io/layotto/components/lock"
17+
"mosn.io/pkg/log"
18+
)
19+
20+
const (
21+
defaultDialTimeout = 5
22+
defaultKeyPrefix = "/layotto/"
23+
24+
prefixKey = "keyPrefix"
25+
usernameKey = "username"
26+
passwordKey = "password"
27+
dialTimeoutKey = "dialTimeout"
28+
endpointsKey = "endpoints"
29+
tlsCertPathKey = "tlsCert"
30+
tlsCertKeyPathKey = "tlsCertKey"
31+
tlsCaPathKey = "tlsCa"
32+
)
33+
34+
type EtcdLock struct {
35+
client *clientv3.Client
36+
metadata metadata
37+
38+
features []lock.Feature
39+
logger log.ErrorLogger
40+
41+
ctx context.Context
42+
cancel context.CancelFunc
43+
}
44+
45+
// NewEtcdLock returns a new etcd lock
46+
func NewEtcdLock(logger log.ErrorLogger) *EtcdLock {
47+
s := &EtcdLock{
48+
features: make([]lock.Feature, 0),
49+
logger: logger,
50+
}
51+
52+
return s
53+
}
54+
55+
func (e *EtcdLock) Init(metadata lock.Metadata) error {
56+
// 1. parse config
57+
m, err := parseEtcdMetadata(metadata)
58+
if err != nil {
59+
return err
60+
}
61+
e.metadata = m
62+
// 2. construct client
63+
if e.client, err = e.newClient(m); err != nil {
64+
return err
65+
}
66+
67+
e.ctx, e.cancel = context.WithCancel(context.Background())
68+
69+
return err
70+
}
71+
72+
func (e *EtcdLock) Features() []lock.Feature {
73+
return e.features
74+
}
75+
76+
func (e *EtcdLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
77+
var leaseId clientv3.LeaseID
78+
//1.Create new lease
79+
lease := clientv3.NewLease(e.client)
80+
if leaseGrantResp, err := lease.Grant(e.ctx, int64(req.Expire)); err != nil {
81+
return &lock.TryLockResponse{}, fmt.Errorf("[etcdLock]: Create new lease returned error: %s.ResourceId: %s", err, req.ResourceId)
82+
} else {
83+
leaseId = leaseGrantResp.ID
84+
}
85+
86+
key := e.getKey(req.ResourceId)
87+
88+
//2.Create new KV
89+
kv := clientv3.NewKV(e.client)
90+
//3.Create txn
91+
txn := kv.Txn(e.ctx)
92+
txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).Then(
93+
clientv3.OpPut(key, req.LockOwner, clientv3.WithLease(leaseId))).Else(
94+
clientv3.OpGet(key))
95+
//4.Commit and try get lock
96+
txnResponse, err := txn.Commit()
97+
if err != nil {
98+
return &lock.TryLockResponse{}, fmt.Errorf("[etcdLock]: Creat lock returned error: %s.ResourceId: %s", err, req.ResourceId)
99+
}
100+
101+
return &lock.TryLockResponse{
102+
Success: txnResponse.Succeeded,
103+
}, nil
104+
}
105+
106+
func (e *EtcdLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
107+
key := e.getKey(req.ResourceId)
108+
109+
kv := clientv3.NewKV(e.client)
110+
txn := kv.Txn(e.ctx)
111+
txn.If(clientv3.Compare(clientv3.Value(key), "=", req.LockOwner)).Then(
112+
clientv3.OpDelete(key)).Else(
113+
clientv3.OpGet(key))
114+
txnResponse, err := txn.Commit()
115+
if err != nil {
116+
return newInternalErrorUnlockResponse(), fmt.Errorf("[etcdLock]: Unlock returned error: %s.ResourceId: %s", err, req.ResourceId)
117+
}
118+
119+
if txnResponse.Succeeded {
120+
return &lock.UnlockResponse{Status: lock.SUCCESS}, nil
121+
} else {
122+
resp := txnResponse.Responses[0].GetResponseRange()
123+
if len(resp.Kvs) == 0 {
124+
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
125+
}
126+
127+
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
128+
}
129+
}
130+
131+
func (e *EtcdLock) Close() error {
132+
e.cancel()
133+
134+
return e.client.Close()
135+
}
136+
137+
func (e *EtcdLock) newClient(meta metadata) (*clientv3.Client, error) {
138+
139+
config := clientv3.Config{
140+
Endpoints: meta.endpoints,
141+
DialTimeout: time.Second * time.Duration(meta.dialTimeout),
142+
Username: meta.username,
143+
Password: meta.password,
144+
}
145+
146+
if meta.tlsCa != "" || meta.tlsCert != "" || meta.tlsCertKey != "" {
147+
//enable tls
148+
cert, err := tls.LoadX509KeyPair(meta.tlsCert, meta.tlsCertKey)
149+
if err != nil {
150+
return nil, fmt.Errorf("error reading tls certificate, cert: %s, certKey: %s, err: %s", meta.tlsCert, meta.tlsCertKey, err)
151+
}
152+
153+
caData, err := ioutil.ReadFile(meta.tlsCa)
154+
if err != nil {
155+
return nil, fmt.Errorf("error reading tls ca %s, err: %s", meta.tlsCa, err)
156+
}
157+
158+
pool := x509.NewCertPool()
159+
pool.AppendCertsFromPEM(caData)
160+
161+
tlsConfig := &tls.Config{
162+
Certificates: []tls.Certificate{cert},
163+
RootCAs: pool,
164+
}
165+
config.TLS = tlsConfig
166+
}
167+
168+
if client, err := clientv3.New(config); err != nil {
169+
return nil, err
170+
} else {
171+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(meta.dialTimeout))
172+
defer cancel()
173+
//ping
174+
_, err = client.Get(ctx, "ping")
175+
if err != nil {
176+
return nil, fmt.Errorf("etcd lock error: connect to etcd timeoout %s", meta.endpoints)
177+
}
178+
179+
return client, nil
180+
}
181+
}
182+
183+
func (e *EtcdLock) getKey(resourceId string) string {
184+
return fmt.Sprintf("%s%s", e.metadata.keyPrefix, resourceId)
185+
}
186+
187+
func newInternalErrorUnlockResponse() *lock.UnlockResponse {
188+
return &lock.UnlockResponse{
189+
Status: lock.INTERNAL_ERROR,
190+
}
191+
}
192+
193+
func parseEtcdMetadata(meta lock.Metadata) (metadata, error) {
194+
m := metadata{}
195+
var err error
196+
197+
if val, ok := meta.Properties[endpointsKey]; ok && val != "" {
198+
m.endpoints = strings.Split(val, ";")
199+
} else {
200+
return m, errors.New("etcd lock error: missing endpoints address")
201+
}
202+
203+
if val, ok := meta.Properties[dialTimeoutKey]; ok && val != "" {
204+
if m.dialTimeout, err = strconv.Atoi(val); err != nil {
205+
return m, fmt.Errorf("etcd lock error: ncorrect dialTimeout value %s", val)
206+
}
207+
} else {
208+
m.dialTimeout = defaultDialTimeout
209+
}
210+
211+
if val, ok := meta.Properties[prefixKey]; ok && val != "" {
212+
m.keyPrefix = val
213+
} else {
214+
m.keyPrefix = defaultKeyPrefix
215+
}
216+
217+
if val, ok := meta.Properties[usernameKey]; ok && val != "" {
218+
m.username = val
219+
}
220+
221+
if val, ok := meta.Properties[passwordKey]; ok && val != "" {
222+
m.password = val
223+
}
224+
225+
if val, ok := meta.Properties[tlsCaPathKey]; ok && val != "" {
226+
m.tlsCa = val
227+
}
228+
229+
if val, ok := meta.Properties[tlsCertPathKey]; ok && val != "" {
230+
m.tlsCert = val
231+
}
232+
233+
if val, ok := meta.Properties[tlsCertKeyPathKey]; ok && val != "" {
234+
m.tlsCertKey = val
235+
}
236+
237+
return m, nil
238+
}
239+
240+
type metadata struct {
241+
keyPrefix string
242+
dialTimeout int
243+
endpoints []string
244+
username string
245+
password string
246+
247+
tlsCa string
248+
tlsCert string
249+
tlsCertKey string
250+
}

0 commit comments

Comments
 (0)