Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b06b088
Ensure that conflict status is reset if there are no conflicts
zeripath Feb 11, 2022
db1931c
Add some more trace and debug logging to process and testpatch
zeripath Feb 11, 2022
bd066ca
Add doctor commands for looking at the level DB
zeripath Feb 11, 2022
2bce719
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Feb 13, 2022
1475c62
As per review
zeripath Feb 13, 2022
93d8275
refix process manager trace ended
zeripath Feb 13, 2022
53232c4
No revive - we want this to stutter
zeripath Feb 13, 2022
1500a0d
Update modules/doctor/queue.go
zeripath Mar 27, 2022
a9ae5c3
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Mar 27, 2022
4e02994
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Jun 4, 2022
a639e44
remove trace logging from process manager
zeripath Jun 4, 2022
2821ddd
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Jul 12, 2022
bd5ca04
use constants for queues
zeripath Jul 13, 2022
bbe2915
Merge branch 'main' into reset-conflict-status
6543 Jul 13, 2022
bdfa176
Merge branch 'main' into reset-conflict-status
zeripath Jul 16, 2022
4680195
placate linter
zeripath Jul 17, 2022
760c257
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Jul 17, 2022
fc7301d
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Jan 31, 2023
2e9cdfb
placate the bloody linter
zeripath Jan 31, 2023
ff707de
replacate lint.
zeripath Feb 1, 2023
9e8f61b
Merge remote-tracking branch 'origin/main' into reset-conflict-status
zeripath Feb 26, 2023
900bb10
remove mismerged files
zeripath Feb 26, 2023
99c2b4b
add actions job queue name
zeripath Feb 26, 2023
71e16d6
restore licenses deleted by mistake
zeripath Feb 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions modules/doctor/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package doctor

import (
"context"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/nosql"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"gitea.com/lunny/levelqueue"
)

var uniqueQueueNames = []string{
"code_indexer",
"repo_stats_update",
"mirror",
"pr_patch_checker",
"repo-archive",
}

var queueNames = []string{
"issue_indexer",
"notification-service",
"mail",
"push_update",
"task",
}

var levelqueueTypes = []string{
string(queue.PersistableChannelQueueType),
string(queue.PersistableChannelUniqueQueueType),
string(queue.LevelQueueType),
string(queue.LevelUniqueQueueType),
}

func checkUniqueQueues(ctx context.Context, logger log.Logger, autofix bool) error {
for _, name := range uniqueQueueNames {
q := setting.GetQueueSettings(name)
if q.Type == "" {
q.Type = string(queue.PersistableChannelQueueType)
}
found := false
for _, typ := range levelqueueTypes {
if typ == q.Type {
found = true
break
}
}
if !found {
logger.Info("Queue: %s\nType: %s\nNo LevelDB", q.Name, q.Type)
continue
}

connection := q.ConnectionString
if connection == "" {
connection = q.DataDir
}

db, err := nosql.GetManager().GetLevelDB(connection)
if err != nil {
logger.Error("Queue: %s\nUnable to open DB connection %s: %v", q.Name, connection, err)
return err
}
defer db.Close()

prefix := q.Name

iQueue, err := levelqueue.NewQueue(db, []byte(prefix), false)
if err != nil {
logger.Error("Queue: %s\nUnable to open Queue component: %v", q.Name, err)
return err
}

iSet, err := levelqueue.NewSet(db, []byte(prefix+"-unique"), false)
if err != nil {
logger.Error("Queue: %s\nUnable to open Set component: %v", q.Name, err)
return err
}

qLen := iQueue.Len()
sMembers, err := iSet.Members()
if err != nil {
logger.Error("Queue: %s\nUnable to get members of Set component: %v", q.Name, err)
return err
}
sLen := len(sMembers)

if int(qLen) == sLen {
if qLen == 0 {
logger.Info("Queue: %s\nType: %s\nLevelDB: %s", q.Name, q.Type, "empty")
} else {
logger.Info("Queue: %s\nType: %s\nLevelDB contains: %d entries", q.Name, q.Type, qLen)
}
continue
}
logger.Warn("Queue: %s\nType: %s\nContains different numbers of elements in Queue component %d to Set component %d", q.Name, q.Type, qLen, sLen)
if !autofix {
continue
}

// Empty out the old set members
for _, member := range sMembers {
_, err := iSet.Remove(member)
if err != nil {
logger.Error("Queue: %s\nUnable to remove Set member %s: %v", q.Name, string(member), err)
return err
}
}

// Now iterate across the queue
for i := int64(0); i < qLen; i++ {
// Pop from the left
qData, err := iQueue.LPop()
if err != nil {
logger.Error("Queue: %s\nUnable to LPop out: %v", q.Name, err)
return err
}
// And add to the right
err = iQueue.RPush(qData)
if err != nil {
logger.Error("Queue: %s\nUnable to RPush back: %v", q.Name, err)
return err
}
// And add back to the set
_, err = iSet.Add(qData)
if err != nil {
logger.Error("Queue: %s\nUnable to add back in to Set: %v", q.Name, err)
return err
}
}
}
return nil
}

func queueListDB(ctx context.Context, logger log.Logger, autofix bool) error {
connections := []string{}

for _, name := range append(uniqueQueueNames, queueNames...) {
q := setting.GetQueueSettings(name)
if q.Type == "" {
q.Type = string(queue.PersistableChannelQueueType)
}
found := false
for _, typ := range levelqueueTypes {
if typ == q.Type {
found = true
break
}
}
if !found {
continue
}
if q.ConnectionString != "" {
found := false
for _, connection := range connections {
if connection == q.ConnectionString {
found = true
}
}
if !found {
connections = append(connections, q.ConnectionString)
}
continue
}
found = false
for _, connection := range connections {
if connection == q.DataDir {
found = true
}
}
if !found {
connections = append(connections, q.DataDir)
}
}

for _, connection := range connections {
logger.Info("LevelDB: %s", connection)
db, err := nosql.GetManager().GetLevelDB(connection)
if err != nil {
logger.Error("Connection: %s Unable to open DB: %v", connection, err)
return err
}
defer db.Close()
iter := db.NewIterator(nil, nil)
for iter.Next() {
logger.Info("%s\n%s", log.NewColoredIDValue(string(iter.Key())), string(iter.Value()))
}
iter.Release()
}
return nil
}

func init() {
Register(&Check{
Title: "Check if there are corrupt level uniquequeues",
Name: "uniquequeues-corrupt",
IsDefault: false,
Run: checkUniqueQueues,
AbortIfFailed: false,
SkipDatabaseInitialization: false,
Priority: 1,
})
Register(&Check{
Title: "List all entries in leveldb",
Name: "queues-listdb",
IsDefault: false,
Run: queueListDB,
AbortIfFailed: false,
SkipDatabaseInitialization: false,
Priority: 1,
})
}
4 changes: 4 additions & 0 deletions modules/process/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"strconv"
"sync"
"time"

"code.gitea.io/gitea/modules/log"
)

// TODO: This packages still uses a singleton for the Manager.
Expand Down Expand Up @@ -103,6 +105,7 @@ func (pm *Manager) AddContextTimeout(parent context.Context, timeout time.Durati
func (pm *Manager) Add(parentPID IDType, description string, cancel context.CancelFunc) (IDType, FinishedFunc) {
pm.mutex.Lock()
start, pid := pm.nextPID()
log.Trace("Adding Process[%s:%s] %s", parentPID, pid, description)

parent := pm.processes[parentPID]
if parent == nil {
Expand All @@ -120,6 +123,7 @@ func (pm *Manager) Add(parentPID IDType, description string, cancel context.Canc
finished := func() {
cancel()
pm.remove(process)
log.Trace("Finished Process[%d:%d] %s", parentPID, pid, description)
}

if parent != nil {
Expand Down
10 changes: 5 additions & 5 deletions modules/queue/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
case <-paused:
log.Trace("Worker for Queue %d Pausing", p.qid)
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data)
if unhandled := p.handle(data...); unhandled != nil {
log.Error("Unhandled Data in queue %d", p.qid)
}
Expand All @@ -507,7 +507,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
// go back around
case <-ctx.Done():
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data)
if unhandled := p.handle(data...); unhandled != nil {
log.Error("Unhandled Data in queue %d", p.qid)
}
Expand All @@ -519,7 +519,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
if !ok {
// the dataChan has been closed - we should finish up:
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data)
if unhandled := p.handle(data...); unhandled != nil {
log.Error("Unhandled Data in queue %d", p.qid)
}
Expand All @@ -532,7 +532,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
util.StopTimer(timer)

if len(data) >= p.batchLength {
log.Trace("Handling: %d data, %v", len(data), data)
log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data)
if unhandled := p.handle(data...); unhandled != nil {
log.Error("Unhandled Data in queue %d", p.qid)
}
Expand All @@ -544,7 +544,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
case <-timer.C:
delay = time.Millisecond * 100
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
log.Trace("Queue[%d] Handling: %d data, %v", p.qid, len(data), data)
if unhandled := p.handle(data...); unhandled != nil {
log.Error("Unhandled Data in queue %d", p.qid)
}
Expand Down
8 changes: 8 additions & 0 deletions services/pull/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ func checkAndUpdateStatus(pr *models.PullRequest) {
}

if !has {
log.Trace("Updating PR[%d] in %d: Status:%d Conflicts:%s Protected:%s", pr.ID, pr.BaseRepoID, pr.Status, pr.ConflictedFiles, pr.ChangedProtectedFiles)
if err := pr.UpdateColsIfNotMerged("merge_base", "status", "conflicted_files", "changed_protected_files"); err != nil {
log.Error("Update[%d]: %v", pr.ID, err)
}
} else {
log.Trace("Not updating PR[%d] in %d as still in the queue", pr.ID, pr.BaseRepoID)
}
}

Expand Down Expand Up @@ -234,12 +237,15 @@ func testPR(id int64) {
log.Error("GetPullRequestByID[%d]: %v", id, err)
return
}
log.Trace("Testing PR[%d] in %d", pr.ID, pr.BaseRepoID)

if pr.HasMerged {
log.Trace("PR[%d] in %d: already merged", pr.ID, pr.BaseRepoID)
return
}

if manuallyMerged(ctx, pr) {
log.Trace("PR[%d] in %d: manually merged", pr.ID, pr.BaseRepoID)
return
}

Expand All @@ -251,6 +257,8 @@ func testPR(id int64) {
}
return
}
log.Trace("PR[%d] in %d: patch tested new Status:%d ConflictedFiles:%s ChangedProtectedFiles:%s", pr.ID, pr.BaseRepoID, pr.Status, pr.ConflictedFiles, pr.ChangedProtectedFiles)

checkAndUpdateStatus(pr)
}

Expand Down
6 changes: 5 additions & 1 deletion services/pull/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,21 @@ func checkConflicts(ctx context.Context, pr *models.PullRequest, gitRepo *git.Re
if !conflict {
treeHash, err := git.NewCommand(ctx, "write-tree").RunInDir(tmpBasePath)
if err != nil {
log.Debug("Unable to write unconflicted tree for PR[%d] %s/%s#%d. Error: %v", pr.ID, pr.BaseRepo.OwnerName, pr.BaseRepo.Name, pr.Index, err)
return false, err
}
treeHash = strings.TrimSpace(treeHash)
baseTree, err := gitRepo.GetTree("base")
if err != nil {
log.Debug("Unable to get base tree for PR[%d] %s/%s#%d. Error: %v", pr.ID, pr.BaseRepo.OwnerName, pr.BaseRepo.Name, pr.Index, err)
return false, err
}
pr.Status = models.PullRequestStatusMergeable
pr.ConflictedFiles = []string{}

if treeHash == baseTree.ID.String() {
log.Debug("PullRequest[%d]: Patch is empty - ignoring", pr.ID)
pr.Status = models.PullRequestStatusEmpty
pr.ConflictedFiles = []string{}
pr.ChangedProtectedFiles = []string{}
}

Expand Down