Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequencial issue index numbering with pessimistic locking mechanism #9931

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
333aec5
Passes PSQL, SQLite and MSSQL
guillep2k Jan 20, 2020
70a2a8d
Move to upsert strategy; all tests work
guillep2k Jan 21, 2020
677e49f
Merge branch 'master' into bylock-indexes
guillep2k Jan 21, 2020
d834fb1
Use a LockedResource to numerate issues and prs
guillep2k Jan 21, 2020
58ac901
Fix tests and reserved keyword
guillep2k Jan 21, 2020
aa3797c
Fix unit tests
guillep2k Jan 21, 2020
95db48a
Fix export comments
guillep2k Jan 22, 2020
d8ad174
A little refactoring and better function naming
guillep2k Jan 22, 2020
6bec3d5
Merge branch 'master' into bylock-indexes
guillep2k Jan 22, 2020
e747a2c
Support LockType == "" and LockKey == 0
guillep2k Jan 22, 2020
afeb6f0
Merge branch 'master' into bylock-indexes
guillep2k Jan 22, 2020
c503f0d
Prepare for merge
guillep2k Jan 28, 2020
9b7ec1d
Merge branch 'master' into bylock-indexes
guillep2k Jan 28, 2020
1792664
Go simple
guillep2k Jan 28, 2020
ce6c24f
Improve test legibility
guillep2k Jan 30, 2020
15ffbb4
Fix typo
guillep2k Jan 30, 2020
ea9c875
Remove dead code
guillep2k Jan 30, 2020
9cb79c9
Merge branch 'master' into bylock-indexes
guillep2k Jan 30, 2020
d185a4f
Prepare for merge
guillep2k Feb 1, 2020
f46eaf5
Merge branch 'master' into bylock-indexes
guillep2k Feb 1, 2020
621c9d6
Prepare to merge
guillep2k Feb 12, 2020
17fa5e1
Merge branch 'master' into bylock-indexes
guillep2k Feb 12, 2020
b30094b
Merge branch 'master' into bylock-indexes
guillep2k Feb 15, 2020
299d313
Merge branch 'master' into bylock-indexes
guillep2k Feb 16, 2020
cea7c4f
Merge branch 'master' into bylock-indexes
guillep2k Feb 20, 2020
2311de3
Merge branch 'master' into bylock-indexes
guillep2k Feb 29, 2020
7e280a4
Merge branch 'master' into bylock-indexes
guillep2k May 2, 2020
15e407b
Code review suggestions by @lunny
guillep2k May 2, 2020
dd85873
Ignore SQLite3 integration when _txlock=immediate
guillep2k May 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions integrations/locked_resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package integrations

import (
"fmt"
"testing"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"

"github.com/stretchr/testify/assert"
)

const (
// The tests will fail if the waiter function takes less than
// blockerDelay minus tolerance to complete.
// Note: these values might require tuning in order to avoid
// false negatives.
waiterDelay = 100 * time.Millisecond
blockerDelay = 200 * time.Millisecond
tolerance = 50 * time.Millisecond // Should be <= (blockerDelay-waiterDelay)/2
)

type waitResult struct {
Waited time.Duration
Err error
}

func TestLockedResource(t *testing.T) {
defer prepareTestEnv(t)()

// We need to check whether two goroutines block each other
// Sadly, there's no way to ensure the second goroutine is
// waiting other than using a time delay. The longer the delay,
// the more certain we are the second goroutine is waiting.

// This check **must** fail as we're not blocking anything
assert.Error(t, blockTest("no block", func(ctx models.DBContext) error {
return nil
}))

models.AssertNotExistsBean(t, &models.LockedResource{LockType: "test-1", LockKey: 1})

// Test with creation (i.e. new lock type)
assert.NoError(t, blockTest("block-new", func(ctx models.DBContext) error {
_, err := models.GetLockedResourceCtx(ctx, "block-test-1", 1)
return err
}))

// Test without creation (i.e. lock type already exists)
assert.NoError(t, blockTest("block-existing", func(ctx models.DBContext) error {
_, err := models.GetLockedResourceCtx(ctx, "block-test-1", 1)
return err
}))

// Test with temporary record
assert.NoError(t, blockTest("block-temp", func(ctx models.DBContext) error {
return models.TemporarilyLockResourceKeyCtx(ctx, "temp-1", 1)
}))
}

func blockTest(name string, f func(ctx models.DBContext) error) error {
cb := make(chan waitResult)
cw := make(chan waitResult)
ref := time.Now()

go func() {
cb <- blockTestFunc(name, true, ref, f)
}()
go func() {
cw <- blockTestFunc(name, false, ref, f)
}()

resb := <-cb
resw := <-cw
if resb.Err != nil {
return resb.Err
}
if resw.Err != nil {
return resw.Err
}

if resw.Waited < blockerDelay-tolerance {
return fmt.Errorf("Waiter not blocked on %s; wait: %d ms, expected > %d ms",
name, resw.Waited.Milliseconds(), (blockerDelay - tolerance).Milliseconds())
}

return nil
}

func blockTestFunc(name string, blocker bool, ref time.Time, f func(ctx models.DBContext) error) (wr waitResult) {
if blocker {
name = fmt.Sprintf("blocker [%s]", name)
} else {
name = fmt.Sprintf("waiter [%s]", name)
}
err := models.WithTx(func(ctx models.DBContext) error {
log.Trace("Entering %s @%d", name, time.Since(ref).Milliseconds())
if !blocker {
log.Trace("Waiting on %s @%d", name, time.Since(ref).Milliseconds())
time.Sleep(waiterDelay)
log.Trace("Wait finished on %s @%d", name, time.Since(ref).Milliseconds())
}
if err := f(ctx); err != nil {
return err
}
if blocker {
log.Trace("Waiting on %s @%d", name, time.Since(ref).Milliseconds())
time.Sleep(blockerDelay)
log.Trace("Wait finished on %s @%d", name, time.Since(ref).Milliseconds())
} else {
wr.Waited = time.Since(ref)
}
log.Trace("Finishing %s @%d", name, time.Since(ref).Milliseconds())
return nil
})
if err != nil {
wr.Err = fmt.Errorf("error in %s: %v", name, err)
}
return
}
45 changes: 17 additions & 28 deletions models/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ var (

const issueTasksRegexpStr = `(^\s*[-*]\s\[[\sx]\]\s.)|(\n\s*[-*]\s\[[\sx]\]\s.)`
const issueTasksDoneRegexpStr = `(^\s*[-*]\s\[[x]\]\s.)|(\n\s*[-*]\s\[[x]\]\s.)`
const issueMaxDupIndexAttempts = 3

// IssueLockedEnumerator is the name of the locked_resource used to
// numerate issues in a repository.
const IssueLockedEnumerator = "repository-index"

func init() {
issueTasksPat = regexp.MustCompile(issueTasksRegexpStr)
Expand Down Expand Up @@ -898,19 +901,23 @@ func newIssue(e *xorm.Session, doer *User, opts NewIssueOptions) (err error) {
}

// Milestone validation should happen before insert actual object.
if _, err := e.SetExpr("`index`", "coalesce(MAX(`index`),0)+1").
Where("repo_id=?", opts.Issue.RepoID).
Insert(opts.Issue); err != nil {
return ErrNewIssueInsert{err}
}

inserted, err := getIssueByID(e, opts.Issue.ID)
// Obtain the next issue number for this repository, which will be locked
// and reserved for the remaining of the transaction. Should the transaction
// be rolled back, the previous value will be restored.
idxresource, err := GetLockedResource(e, IssueLockedEnumerator, opts.Issue.RepoID)
if err != nil {
return err
return fmt.Errorf("GetLockedResource(%s)", IssueLockedEnumerator)
}
idxresource.Counter++
if err := idxresource.UpdateValue(); err != nil {
return fmt.Errorf("locked.UpdateValue(%s)", IssueLockedEnumerator)
}
opts.Issue.Index = idxresource.Counter

// Patch Index with the value calculated by the database
opts.Issue.Index = inserted.Index
if _, err = e.Insert(opts.Issue); err != nil {
return err
}

if opts.Issue.MilestoneID > 0 {
if _, err = e.Exec("UPDATE `milestone` SET num_issues=num_issues+1 WHERE id=?", opts.Issue.MilestoneID); err != nil {
Expand Down Expand Up @@ -988,24 +995,6 @@ func newIssue(e *xorm.Session, doer *User, opts NewIssueOptions) (err error) {

// NewIssue creates new issue with labels for repository.
func NewIssue(repo *Repository, issue *Issue, labelIDs []int64, uuids []string) (err error) {
// Retry several times in case INSERT fails due to duplicate key for (repo_id, index); see #7887
i := 0
for {
if err = newIssueAttempt(repo, issue, labelIDs, uuids); err == nil {
return nil
}
if !IsErrNewIssueInsert(err) {
return err
}
if i++; i == issueMaxDupIndexAttempts {
break
}
log.Error("NewIssue: error attempting to insert the new issue; will retry. Original error: %v", err)
}
return fmt.Errorf("NewIssue: too many errors attempting to insert the new issue. Last error was: %v", err)
}

func newIssueAttempt(repo *Repository, issue *Issue, labelIDs []int64, uuids []string) (err error) {
sess := x.NewSession()
defer sess.Close()
if err = sess.Begin(); err != nil {
Expand Down
11 changes: 6 additions & 5 deletions models/issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestIssue_SearchIssueIDsByKeyword(t *testing.T) {
assert.EqualValues(t, []int64{1}, ids)
}

func testInsertIssue(t *testing.T, title, content string) {
func testInsertIssue(t *testing.T, title, content string, idx int64) {
repo := AssertExistsAndLoadBean(t, &Repository{ID: 1}).(*Repository)
user := AssertExistsAndLoadBean(t, &User{ID: 2}).(*User)

Expand All @@ -311,8 +311,7 @@ func testInsertIssue(t *testing.T, title, content string) {
assert.True(t, has)
assert.EqualValues(t, issue.Title, newIssue.Title)
assert.EqualValues(t, issue.Content, newIssue.Content)
// there are 5 issues and max index is 5 on repository 1, so this one should 6
assert.EqualValues(t, 6, newIssue.Index)
assert.EqualValues(t, idx, newIssue.Index)

_, err = x.ID(issue.ID).Delete(new(Issue))
assert.NoError(t, err)
Expand All @@ -321,8 +320,10 @@ func testInsertIssue(t *testing.T, title, content string) {
func TestIssue_InsertIssue(t *testing.T) {
assert.NoError(t, PrepareTestDatabase())

testInsertIssue(t, "my issue1", "special issue's comments?")
testInsertIssue(t, `my issue2, this is my son's love \n \r \ `, "special issue's '' comments?")
// there are 5 issues and max index is 5 on repository 1, so this one should be 6
testInsertIssue(t, "my issue1", "special issue's comments?", 6)
// deleting an issue should not let a new issue reuse its index number; this one should be 7
testInsertIssue(t, `my issue2, this is my son's love \n \r \ `, "special issue's '' comments?", 7)
}

func TestIssue_ResolveMentions(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions models/issue_xref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,12 @@ func testCreateIssue(t *testing.T, repo, doer int64, title, content string, ispu
sess := x.NewSession()
defer sess.Close()
assert.NoError(t, sess.Begin())
_, err := sess.SetExpr("`index`", "coalesce(MAX(`index`),0)+1").Where("repo_id=?", repo).Insert(i)
idxresource, err := GetLockedResource(sess, IssueLockedEnumerator, repo)
assert.NoError(t, err)
i, err = getIssueByID(sess, i.ID)
idxresource.Counter++
assert.NoError(t, idxresource.UpdateValue())
i.Index = idxresource.Counter
_, err = sess.Insert(i)
assert.NoError(t, err)
assert.NoError(t, i.addCrossReferences(sess, d, false))
assert.NoError(t, sess.Commit())
Expand Down
121 changes: 121 additions & 0 deletions models/locked_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package models

import (
"fmt"

"code.gitea.io/gitea/modules/setting"
)

// LockedResource represents the locking key for a pessimistic
// lock that can hold a counter
type LockedResource struct {
LockType string `xorm:"pk VARCHAR(30)"`
LockKey int64 `xorm:"pk"`
Counter int64 `xorm:"NOT NULL DEFAULT 0"`

engine Engine `xorm:"-"`
guillep2k marked this conversation as resolved.
Show resolved Hide resolved
}

// GetLockedResource gets or creates a pessimistic lock on the given type and key
func GetLockedResource(e Engine, lockType string, lockKey int64) (*LockedResource, error) {
resource := &LockedResource{LockType: lockType, LockKey: lockKey}

if err := upsertLockedResource(e, resource); err != nil {
return nil, fmt.Errorf("upsertLockedResource: %v", err)
}

// Read back the record we've created or locked to get the current Counter value
if has, err := e.Table(resource).NoCache().NoAutoCondition().AllCols().
Where("lock_type = ? AND lock_key = ?", lockType, lockKey).Get(resource); err != nil {
return nil, fmt.Errorf("get locked resource %s:%d: %v", lockType, lockKey, err)
} else if !has {
return nil, fmt.Errorf("unexpected upsert fail %s:%d", lockType, lockKey)
}

// Once active, the locked resource is tied to a specific session
resource.engine = e

return resource, nil
}

// UpdateValue updates the value of the counter of a locked resource
func (r *LockedResource) UpdateValue() error {
guillep2k marked this conversation as resolved.
Show resolved Hide resolved
// Bypass ORM to support lock_type == "" and lock_key == 0
_, err := r.engine.Exec("UPDATE locked_resource SET counter = ? WHERE lock_type = ? AND lock_key = ?",
r.Counter, r.LockType, r.LockKey)
return err
}

// Delete deletes the locked resource from the database,
// but the key remains locked until the end of the transaction
func (r *LockedResource) Delete() error {
// Bypass ORM to support lock_type == "" and lock_key == 0
_, err := r.engine.Exec("DELETE FROM locked_resource WHERE lock_type = ? AND lock_key = ?", r.LockType, r.LockKey)
return err
}

// DeleteLockedResourceKey deletes a locked resource by key
func DeleteLockedResourceKey(e Engine, lockType string, lockKey int64) error {
// Bypass ORM to support lock_type == "" and lock_key == 0
_, err := e.Exec("DELETE FROM locked_resource WHERE lock_type = ? AND lock_key = ?", lockType, lockKey)
return err
}

// TemporarilyLockResourceKey locks the given key but does not leave a permanent record
func TemporarilyLockResourceKey(e Engine, lockType string, lockKey int64) error {
// Temporary locked resources should not exist in the table.
// This allows us to use a simple INSERT to lock the key.
_, err := e.Exec("INSERT INTO locked_resource (lock_type, lock_key) VALUES (?, ?)", lockType, lockKey)
if err == nil {
_, err = e.Exec("DELETE FROM locked_resource WHERE lock_type = ? AND lock_key = ?", lockType, lockKey)
}
return err
}

// GetLockedResourceCtx gets or creates a pessimistic lock on the given type and key
func GetLockedResourceCtx(ctx DBContext, lockType string, lockKey int64) (*LockedResource, error) {
return GetLockedResource(ctx.e, lockType, lockKey)
}

// DeleteLockedResourceKeyCtx deletes a locked resource by key
func DeleteLockedResourceKeyCtx(ctx DBContext, lockType string, lockKey int64) error {
return DeleteLockedResourceKey(ctx.e, lockType, lockKey)
}

// TemporarilyLockResourceKeyCtx locks the given key but does not leave a permanent record
func TemporarilyLockResourceKeyCtx(ctx DBContext, lockType string, lockKey int64) error {
return TemporarilyLockResourceKey(ctx.e, lockType, lockKey)
}

// upsertLockedResource will create or lock the given key in the database.
// the function will not return until it acquires the lock or receives an error.
func upsertLockedResource(e Engine, resource *LockedResource) (err error) {
// An atomic UPSERT operation (INSERT/UPDATE) is the only operation
// that ensures that the key is actually locked.
guillep2k marked this conversation as resolved.
Show resolved Hide resolved
switch {
case setting.Database.UseSQLite3 || setting.Database.UsePostgreSQL:
_, err = e.Exec("INSERT INTO locked_resource (lock_type, lock_key) "+
"VALUES (?,?) ON CONFLICT(lock_type, lock_key) DO UPDATE SET lock_key = ?",
resource.LockType, resource.LockKey, resource.LockKey)
case setting.Database.UseMySQL:
_, err = e.Exec("INSERT INTO locked_resource (lock_type, lock_key) "+
"VALUES (?,?) ON DUPLICATE KEY UPDATE lock_key = lock_key",
resource.LockType, resource.LockKey)
case setting.Database.UseMSSQL:
// https://weblogs.sqlteam.com/dang/2009/01/31/upsert-race-condition-with-merge/
_, err = e.Exec("MERGE locked_resource WITH (HOLDLOCK) as target "+
"USING (SELECT ? AS lock_type, ? AS lock_key) AS src "+
"ON src.lock_type = target.lock_type AND src.lock_key = target.lock_key "+
"WHEN MATCHED THEN UPDATE SET target.lock_key = target.lock_key "+
"WHEN NOT MATCHED THEN INSERT (lock_type, lock_key) "+
"VALUES (src.lock_type, src.lock_key);",
resource.LockType, resource.LockKey)
default:
return fmt.Errorf("database type not supported")
}
return
}
Loading