Skip to content

Commit faedd27

Browse files
authored
consul lock (#140)
* consul lock * demo and docs
1 parent 5b5b8be commit faedd27

File tree

13 files changed

+792
-1
lines changed

13 files changed

+792
-1
lines changed

cmd/layotto/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ import (
9494

9595
// Lock
9696
"mosn.io/layotto/components/lock"
97+
lock_consul "mosn.io/layotto/components/lock/consul"
9798
lock_etcd "mosn.io/layotto/components/lock/etcd"
9899
lock_redis "mosn.io/layotto/components/lock/redis"
99100
lock_zookeeper "mosn.io/layotto/components/lock/zookeeper"
@@ -289,6 +290,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
289290
runtime_lock.NewFactory("etcd", func() lock.LockStore {
290291
return lock_etcd.NewEtcdLock(log.DefaultLogger)
291292
}),
293+
runtime_lock.NewFactory("consul", func() lock.LockStore {
294+
return lock_consul.NewConsulLock(log.DefaultLogger)
295+
}),
292296
),
293297

294298
// bindings

components/go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ require (
1313
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
1414
github.com/go-redis/redis/v8 v8.8.0
1515
github.com/go-zookeeper/zk v1.0.2
16-
github.com/golang/mock v1.4.4
16+
github.com/golang/mock v1.6.0
1717
github.com/google/uuid v1.2.0
18+
github.com/hashicorp/consul/api v1.3.0
1819
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
1920
github.com/minio/minio-go/v7 v7.0.15
2021
github.com/mitchellh/mapstructure v1.3.3 // indirect

components/go.sum

+3
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,8 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
379379
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
380380
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
381381
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
382+
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
383+
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
382384
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
383385
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
384386
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@@ -1310,6 +1312,7 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc
13101312
golang.org/x/tools v0.0.0-20201014170642-d1624618ad65/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
13111313
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
13121314
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
1315+
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
13131316
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
13141317
golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs=
13151318
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=

components/lock/consul/consul_lock.go

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
//
2+
// Copyright 2021 Layotto Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package consul
15+
16+
import (
17+
"github.com/hashicorp/consul/api"
18+
"mosn.io/layotto/components/lock"
19+
"mosn.io/layotto/components/pkg/utils"
20+
msync "mosn.io/mosn/pkg/sync"
21+
"mosn.io/pkg/log"
22+
"runtime"
23+
"strconv"
24+
"sync"
25+
"time"
26+
)
27+
28+
type ConsulLock struct {
29+
metadata utils.ConsulMetadata
30+
logger log.ErrorLogger
31+
client utils.ConsulClient
32+
sessionFactory utils.SessionFactory
33+
kv utils.ConsulKV
34+
sMap sync.Map
35+
workPool msync.WorkerPool
36+
}
37+
38+
func NewConsulLock(logger log.ErrorLogger) *ConsulLock {
39+
consulLock := &ConsulLock{logger: logger}
40+
return consulLock
41+
}
42+
43+
func (c *ConsulLock) Init(metadata lock.Metadata) error {
44+
consulMetadata, err := utils.ParseConsulMetadata(metadata)
45+
if err != nil {
46+
return err
47+
}
48+
c.metadata = consulMetadata
49+
client, err := api.NewClient(&api.Config{
50+
Address: consulMetadata.Address,
51+
Scheme: consulMetadata.Scheme,
52+
})
53+
c.client = client
54+
c.sessionFactory = client.Session()
55+
c.kv = client.KV()
56+
c.workPool = msync.NewWorkerPool(runtime.NumCPU())
57+
return nil
58+
}
59+
func (c *ConsulLock) Features() []lock.Feature {
60+
return nil
61+
}
62+
63+
func getTTL(expire int32) string {
64+
//session TTL must be between [10s=24h0m0s]
65+
if expire < 10 {
66+
expire = 10
67+
}
68+
return strconv.Itoa(int(expire)) + "s"
69+
}
70+
71+
func (c *ConsulLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
72+
73+
// create a session TTL
74+
session, _, err := c.sessionFactory.Create(&api.SessionEntry{
75+
TTL: getTTL(req.Expire),
76+
LockDelay: 0,
77+
Behavior: "delete", //Controls the behavior to delete when a session is invalidated.
78+
}, nil)
79+
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
// put a new KV pair with ttl session
85+
p := &api.KVPair{Key: req.ResourceId, Value: []byte(req.LockOwner), Session: session}
86+
//acquire lock
87+
acquire, _, err := c.kv.Acquire(p, nil)
88+
89+
if err != nil {
90+
return nil, err
91+
}
92+
93+
if acquire {
94+
//bind lockOwner+resourceId and session
95+
c.sMap.Store(req.LockOwner+"-"+req.ResourceId, session)
96+
c.workPool.Schedule(func() {
97+
time.Sleep(time.Second * time.Duration(req.Expire))
98+
//may delete the second lock,but not affect the result
99+
c.sMap.Delete(req.LockOwner + "-" + req.ResourceId)
100+
})
101+
return &lock.TryLockResponse{
102+
Success: true,
103+
}, nil
104+
} else {
105+
return &lock.TryLockResponse{
106+
Success: false,
107+
}, nil
108+
}
109+
110+
}
111+
func (c *ConsulLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
112+
113+
session, ok := c.sMap.Load(req.LockOwner + "-" + req.ResourceId)
114+
115+
if !ok {
116+
return &lock.UnlockResponse{Status: lock.LOCK_UNEXIST}, nil
117+
}
118+
// put a new KV pair with ttl session
119+
p := &api.KVPair{Key: req.ResourceId, Value: []byte(req.LockOwner), Session: session.(string)}
120+
//release lock
121+
release, _, err := c.kv.Release(p, nil)
122+
123+
if err != nil {
124+
return &lock.UnlockResponse{Status: lock.INTERNAL_ERROR}, nil
125+
}
126+
127+
if release {
128+
c.sMap.Delete(req.LockOwner + "-" + req.ResourceId)
129+
_, err = c.sessionFactory.Destroy(session.(string), nil)
130+
if err != nil {
131+
c.logger.Errorf("consul lock session destroy error: %v", err)
132+
}
133+
return &lock.UnlockResponse{Status: lock.
134+
SUCCESS}, nil
135+
} else {
136+
return &lock.UnlockResponse{Status: lock.LOCK_BELONG_TO_OTHERS}, nil
137+
}
138+
}
+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
//
2+
// Copyright 2021 Layotto Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package consul
15+
16+
import (
17+
"github.com/golang/mock/gomock"
18+
"github.com/hashicorp/consul/api"
19+
"github.com/stretchr/testify/assert"
20+
"mosn.io/layotto/components/lock"
21+
"mosn.io/layotto/components/pkg/utils"
22+
"mosn.io/pkg/log"
23+
"testing"
24+
)
25+
26+
const resouseId = "resoure_1"
27+
const lockOwerA = "p1"
28+
const lockOwerB = "p2"
29+
const expireTime = 5
30+
31+
//A lock A unlock
32+
func TestConsulLock_TryLock(t *testing.T) {
33+
//mock
34+
ctrl := gomock.NewController(t)
35+
client := utils.NewMockConsulClient(ctrl)
36+
factory := utils.NewMockSessionFactory(ctrl)
37+
kv := utils.NewMockConsulKV(ctrl)
38+
39+
comp := NewConsulLock(log.DefaultLogger)
40+
cfg := lock.Metadata{
41+
Properties: make(map[string]string),
42+
}
43+
cfg.Properties["address"] = "127.0.0.1:8500"
44+
err := comp.Init(cfg)
45+
comp.client = client
46+
comp.sessionFactory = factory
47+
comp.kv = kv
48+
factory.EXPECT().Create(&api.SessionEntry{TTL: getTTL(expireTime), LockDelay: 0, Behavior: "delete"}, nil).
49+
Return("session1", nil, nil).Times(1)
50+
factory.EXPECT().Destroy("session1", nil).Return(nil, nil).Times(1)
51+
kv.EXPECT().Acquire(&api.KVPair{Key: resouseId, Value: []byte(lockOwerA), Session: "session1"}, nil).
52+
Return(true, nil, nil).Times(1)
53+
kv.EXPECT().Release(&api.KVPair{Key: resouseId, Value: []byte(lockOwerA), Session: "session1"}, nil).
54+
Return(true, nil, nil).Times(1)
55+
56+
tryLock, err := comp.TryLock(&lock.TryLockRequest{
57+
ResourceId: resouseId,
58+
LockOwner: lockOwerA,
59+
Expire: expireTime,
60+
})
61+
62+
assert.NoError(t, err)
63+
assert.Equal(t, true, tryLock.Success)
64+
65+
unlock, err := comp.Unlock(&lock.UnlockRequest{
66+
ResourceId: resouseId,
67+
LockOwner: lockOwerA,
68+
})
69+
70+
assert.NoError(t, err)
71+
assert.Equal(t, lock.SUCCESS, unlock.Status)
72+
73+
}
74+
75+
//A lock B lock
76+
func TestConsulLock_ALock_BLock(t *testing.T) {
77+
78+
//mock
79+
ctrl := gomock.NewController(t)
80+
client := utils.NewMockConsulClient(ctrl)
81+
factory := utils.NewMockSessionFactory(ctrl)
82+
kv := utils.NewMockConsulKV(ctrl)
83+
84+
comp := NewConsulLock(log.DefaultLogger)
85+
cfg := lock.Metadata{
86+
Properties: make(map[string]string),
87+
}
88+
cfg.Properties["address"] = "127.0.0.1:8500"
89+
err := comp.Init(cfg)
90+
comp.client = client
91+
comp.sessionFactory = factory
92+
comp.kv = kv
93+
factory.EXPECT().Create(&api.SessionEntry{TTL: getTTL(expireTime), LockDelay: 0, Behavior: "delete"}, nil).
94+
Return("session1", nil, nil).Times(1)
95+
factory.EXPECT().Create(&api.SessionEntry{TTL: getTTL(expireTime), LockDelay: 0, Behavior: "delete"}, nil).
96+
Return("session2", nil, nil).Times(1)
97+
kv.EXPECT().Acquire(&api.KVPair{Key: resouseId, Value: []byte(lockOwerA), Session: "session1"}, nil).
98+
Return(true, nil, nil).Times(1)
99+
kv.EXPECT().Acquire(&api.KVPair{Key: resouseId, Value: []byte(lockOwerB), Session: "session2"}, nil).
100+
Return(false, nil, nil).Times(1)
101+
102+
tryLock, _ := comp.TryLock(&lock.TryLockRequest{
103+
ResourceId: resouseId,
104+
LockOwner: lockOwerA,
105+
Expire: expireTime,
106+
})
107+
108+
assert.NoError(t, err)
109+
assert.Equal(t, true, tryLock.Success)
110+
111+
bLock, _ := comp.TryLock(&lock.TryLockRequest{
112+
ResourceId: resouseId,
113+
LockOwner: lockOwerB,
114+
Expire: expireTime,
115+
})
116+
117+
assert.NoError(t, err)
118+
assert.Equal(t, false, bLock.Success)
119+
120+
}
121+
122+
// A lock B unlock A unlock
123+
func TestConsulLock_ALock_BUnlock(t *testing.T) {
124+
//mock
125+
ctrl := gomock.NewController(t)
126+
client := utils.NewMockConsulClient(ctrl)
127+
factory := utils.NewMockSessionFactory(ctrl)
128+
kv := utils.NewMockConsulKV(ctrl)
129+
130+
comp := NewConsulLock(log.DefaultLogger)
131+
cfg := lock.Metadata{
132+
Properties: make(map[string]string),
133+
}
134+
cfg.Properties["address"] = "127.0.0.1:8500"
135+
err := comp.Init(cfg)
136+
comp.client = client
137+
comp.sessionFactory = factory
138+
comp.kv = kv
139+
factory.EXPECT().Create(&api.SessionEntry{TTL: getTTL(expireTime), LockDelay: 0, Behavior: "delete"}, nil).
140+
Return("session1", nil, nil).Times(1)
141+
factory.EXPECT().Destroy("session1", nil).Return(nil, nil).Times(1)
142+
kv.EXPECT().Acquire(&api.KVPair{Key: resouseId, Value: []byte(lockOwerA), Session: "session1"}, nil).
143+
Return(true, nil, nil).Times(1)
144+
kv.EXPECT().Release(&api.KVPair{Key: resouseId, Value: []byte(lockOwerA), Session: "session1"}, nil).
145+
Return(true, nil, nil).Times(1)
146+
147+
tryLock, _ := comp.TryLock(&lock.TryLockRequest{
148+
ResourceId: resouseId,
149+
LockOwner: lockOwerA,
150+
Expire: expireTime,
151+
})
152+
153+
assert.NoError(t, err)
154+
assert.Equal(t, true, tryLock.Success)
155+
156+
unlock, _ := comp.Unlock(&lock.UnlockRequest{
157+
ResourceId: resouseId,
158+
LockOwner: lockOwerB,
159+
})
160+
161+
assert.NoError(t, err)
162+
assert.Equal(t, lock.LOCK_UNEXIST, unlock.Status)
163+
164+
unlock2, err := comp.Unlock(&lock.UnlockRequest{
165+
ResourceId: resouseId,
166+
LockOwner: lockOwerA,
167+
})
168+
169+
assert.NoError(t, err)
170+
assert.Equal(t, lock.SUCCESS, unlock2.Status)
171+
}

0 commit comments

Comments
 (0)