Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mongo distributed lock #348

Merged
merged 21 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ import (
"mosn.io/layotto/components/lock"
lock_consul "mosn.io/layotto/components/lock/consul"
lock_etcd "mosn.io/layotto/components/lock/etcd"
lock_mongo "mosn.io/layotto/components/lock/mongo"
lock_redis "mosn.io/layotto/components/lock/redis"
lock_zookeeper "mosn.io/layotto/components/lock/zookeeper"
runtime_lock "mosn.io/layotto/pkg/runtime/lock"
Expand Down Expand Up @@ -299,6 +300,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_lock.NewFactory("consul", func() lock.LockStore {
return lock_consul.NewConsulLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("mongo", func() lock.LockStore {
return lock_mongo.NewMongoLock(log.DefaultLogger)
}),
),

// bindings
Expand Down
1 change: 1 addition & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
go.mongodb.org/mongo-driver v1.8.0
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 // indirect
google.golang.org/grpc v1.38.0
Expand Down
242 changes: 16 additions & 226 deletions components/go.sum

Large diffs are not rendered by default.

256 changes: 256 additions & 0 deletions components/lock/mongo/mongo_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mongo

import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/bsonx"
"mosn.io/layotto/components/lock"
"mosn.io/layotto/components/pkg/utils"
"mosn.io/pkg/log"
"time"
)

const (
TRY_LOCK_SUCCESS = 1
TRY_LOCK_FAIL = 2
UNLOCK_SUCCESS = 3
UNLOCK_UNEXIST = 4
UNLOCK_BELONG_TO_OTHERS = 5
UNLOCK_FAIL = 6
)

// mongo lock store
type MongoLock struct {
factory utils.MongoFactory

client utils.MongoClient
session utils.MongoSession
collection utils.MongoCollection
metadata utils.MongoMetadata

features []lock.Feature
logger log.ErrorLogger

ctx context.Context
cancel context.CancelFunc
}

// NewMongoLock returns a new mongo lock
func NewMongoLock(logger log.ErrorLogger) *MongoLock {
s := &MongoLock{
features: make([]lock.Feature, 0),
logger: logger,
}
return s
}

func (e *MongoLock) Init(metadata lock.Metadata) error {
var client utils.MongoClient
// 1.parse config
m, err := utils.ParseMongoMetadata(metadata.Properties)
if err != nil {
return err
}
e.metadata = m

e.factory = &utils.MongoFactoryImpl{}

// 2. construct client
if client, err = e.factory.NewMongoClient(m); err != nil {
return err
}

e.ctx, e.cancel = context.WithCancel(context.Background())

if err := client.Ping(e.ctx, nil); err != nil {
return err
}

wc, err := utils.GetWriteConcernObject(e.metadata.WriteConcern)
if err != nil {
return err
}

rc, err := utils.GetReadConcrenObject(e.metadata.ReadConcern)
if err != nil {
return err
}

// set mongo options of collection
opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc)

// create database
database := client.Database(e.metadata.DatabaseName)

// create collection
e.collection = e.factory.NewMongoCollection(database, e.metadata.CollectionName, opts)

// create exprie time index
indexModel := mongo.IndexModel{
Keys: bsonx.Doc{{"Expire", bsonx.Int64(1)}},
Options: options.Index().SetExpireAfterSeconds(0),
}
e.collection.Indexes().CreateOne(e.ctx, indexModel)

e.client = client

return err
}

// Features is to get MongoLock's features
func (e *MongoLock) Features() []lock.Feature {
return e.features
}

func (e *MongoLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
var err error
// create mongo session
e.session, err = e.client.StartSession()
LXPWing marked this conversation as resolved.
Show resolved Hide resolved
txnOpts := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))

// check session
if err != nil {
return &lock.TryLockResponse{
Success: false,
}, fmt.Errorf("[mongoLock]: Create session return error: %s ResourceId: %s", err, req.ResourceId)
}

// close mongo session
defer e.session.EndSession(e.ctx)

// start transaction
status, err := e.session.WithTransaction(e.ctx, func(sessionContext mongo.SessionContext) (interface{}, error) {
var err error
var insertOneResult *mongo.InsertOneResult

// set exprie date
expireTime := time.Now().Add(time.Duration(req.Expire) * time.Second)

// insert mongo lock
insertOneResult, err = e.collection.InsertOne(e.ctx, bson.M{"_id": req.ResourceId, "LockOwner": req.LockOwner, "Expire": expireTime})

if err != nil {
_ = sessionContext.AbortTransaction(sessionContext)
return TRY_LOCK_FAIL, err
}

// commit and set status
if insertOneResult != nil && insertOneResult.InsertedID == req.ResourceId {
if err = sessionContext.CommitTransaction(sessionContext); err == nil {
return TRY_LOCK_SUCCESS, err
}
}

return TRY_LOCK_FAIL, err
}, txnOpts)

// check lock
if err != nil {
return &lock.TryLockResponse{}, fmt.Errorf("[mongoLock]: Create new lock return error: %s ResourceId: %s", err, req.ResourceId)
}

if status == TRY_LOCK_SUCCESS {
return &lock.TryLockResponse{
Success: true,
}, nil
} else {
return &lock.TryLockResponse{
Success: false,
}, nil
}
}

func (e *MongoLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
var err error
// create mongo session
e.session, err = e.client.StartSession()
LXPWing marked this conversation as resolved.
Show resolved Hide resolved
txnOpts := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))

// check session
if err != nil {
return newInternalErrorUnlockResponse(), fmt.Errorf("[mongoLock]: Create Session return error: %s ResourceId: %s", err, req.ResourceId)
ZLBer marked this conversation as resolved.
Show resolved Hide resolved
}

// close mongo session
defer e.session.EndSession(e.ctx)

// start transaction
status, err := e.session.WithTransaction(e.ctx, func(sessionContext mongo.SessionContext) (interface{}, error) {
var status int

// delete lock
result, err := e.collection.DeleteOne(e.ctx, bson.M{"_id": req.ResourceId, "LockOwner": req.LockOwner})

// check delete result
if result.DeletedCount == 1 && err == nil {
status = UNLOCK_SUCCESS
} else if result.DeletedCount == 0 && err == nil {
if cursor, err := e.collection.Find(e.ctx, bson.M{"_id": req.ResourceId}); cursor != nil && cursor.RemainingBatchLength() != 0 && err == nil {
status = UNLOCK_BELONG_TO_OTHERS
} else if cursor != nil && cursor.RemainingBatchLength() == 0 && err == nil {
status = UNLOCK_UNEXIST
}
}

if err != nil {
_ = sessionContext.AbortTransaction(sessionContext)
return UNLOCK_FAIL, err
}

// commit and set status
if err = sessionContext.CommitTransaction(sessionContext); err == nil {
return status, err
}

return UNLOCK_FAIL, err
}, txnOpts)

resp := lock.INTERNAL_ERROR

if err != nil {
return newInternalErrorUnlockResponse(), fmt.Errorf("[mongoLock]: Unlock returned error: %s ResourceId: %s", err, req.ResourceId)
}

if status == UNLOCK_SUCCESS {
resp = lock.SUCCESS
} else if status == UNLOCK_UNEXIST {
resp = lock.LOCK_UNEXIST
} else if status == UNLOCK_BELONG_TO_OTHERS {
resp = lock.LOCK_BELONG_TO_OTHERS
}
return &lock.UnlockResponse{
Status: resp,
}, nil
}

func newInternalErrorUnlockResponse() *lock.UnlockResponse {
return &lock.UnlockResponse{
Status: lock.INTERNAL_ERROR,
}
}

func (e *MongoLock) Close() error {
e.cancel()

return e.client.Disconnect(e.ctx)
}
Loading