Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ require (
github.com/muesli/termenv v0.16.0
github.com/neticdk/go-bitbucket v1.0.3
github.com/oklog/ulid/v2 v2.1.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.0
github.com/rs/zerolog v1.34.0
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -173,6 +172,7 @@ require (
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/perimeterx/marshmallow v1.1.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
Expand Down
13 changes: 0 additions & 13 deletions server/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ func NewMemoryQueue(ctx context.Context) Queue {
return q
}

// Push pushes a task to the tail of this queue.
func (q *fifo) Push(_ context.Context, task *model.Task) error {
q.Lock()
q.pending.PushBack(task)
q.Unlock()
return nil
}

// PushAtOnce pushes multiple tasks to the tail of this queue.
func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
q.Lock()
Expand Down Expand Up @@ -153,11 +145,6 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e
return nil
}

// Evict removes a pending task from the queue.
func (q *fifo) Evict(ctx context.Context, taskID string) error {
return q.EvictAtOnce(ctx, []string{taskID})
}

// EvictAtOnce removes multiple pending tasks from the queue.
func (q *fifo) EvictAtOnce(_ context.Context, taskIDs []string) error {
q.Lock()
Expand Down
36 changes: 6 additions & 30 deletions server/queue/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestFifo(t *testing.T) {
q := NewMemoryQueue(ctx)
dummyTask := genDummyTask()

assert.NoError(t, q.Push(ctx, dummyTask))
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask}))
waitForProcess()
info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue")
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestFifoExpire(t *testing.T) {
dummyTask := genDummyTask()

q.extension = 0
assert.NoError(t, q.Push(ctx, dummyTask))
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask}))
waitForProcess()
info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue")
Expand All @@ -100,7 +100,7 @@ func TestFifoWait(t *testing.T) {

dummyTask := genDummyTask()

assert.NoError(t, q.Push(ctx, dummyTask))
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask}))

waitForProcess()
got, err := q.Poll(ctx, 1, filterFnTrue)
Expand All @@ -119,30 +119,6 @@ func TestFifoWait(t *testing.T) {
wg.Wait()
}

func TestFifoEvict(t *testing.T) {
ctx, cancel := context.WithCancelCause(t.Context())
t.Cleanup(func() { cancel(nil) })

q := NewMemoryQueue(ctx)
dummyTask := genDummyTask()

assert.NoError(t, q.Push(ctx, dummyTask))

waitForProcess()
info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue")

err := q.Evict(ctx, dummyTask.ID)
assert.NoError(t, err)

waitForProcess()
info = q.Info(ctx)
assert.Len(t, info.Pending, 0)

err = q.Evict(ctx, dummyTask.ID)
assert.ErrorIs(t, err, ErrNotFound)
}

func TestFifoDependencies(t *testing.T) {
ctx, cancel := context.WithCancelCause(t.Context())
t.Cleanup(func() { cancel(nil) })
Expand Down Expand Up @@ -442,7 +418,7 @@ func TestFifoPause(t *testing.T) {

q.Pause()
t0 := time.Now()
assert.NoError(t, q.Push(ctx, dummyTask))
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask}))
waitForProcess()
q.Resume()

Expand All @@ -452,7 +428,7 @@ func TestFifoPause(t *testing.T) {
assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume")

q.Pause()
assert.NoError(t, q.Push(ctx, dummyTask))
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask}))
q.Resume()
_, _ = q.Poll(ctx, 1, filterFnTrue)
}
Expand All @@ -467,7 +443,7 @@ func TestFifoPauseResume(t *testing.T) {
dummyTask := genDummyTask()

q.Pause()
assert.NoError(t, q.Push(ctx, dummyTask))
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{dummyTask}))
q.Resume()

_, _ = q.Poll(ctx, 1, filterFnTrue)
Expand Down
24 changes: 0 additions & 24 deletions server/queue/persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package queue
import (
"context"

"github.com/pkg/errors"
"github.com/rs/zerolog/log"

"go.woodpecker-ci.org/woodpecker/v3/server/model"
Expand All @@ -40,20 +39,6 @@ type persistentQueue struct {
store store.Store
}

// Push pushes a task to the tail of this queue.
func (q *persistentQueue) Push(c context.Context, task *model.Task) error {
if err := q.store.TaskInsert(task); err != nil {
return err
}
err := q.Queue.Push(c, task)
if err != nil {
if err2 := q.store.TaskDelete(task.ID); err2 != nil {
err = errors.Wrapf(err, "delete task '%s' failed: %v", task.ID, err2)
}
}
return err
}

// PushAtOnce pushes multiple tasks to the tail of this queue.
func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*model.Task) error {
// TODO: invent store.NewSession who return context including a session and make TaskInsert & TaskDelete use it
Expand Down Expand Up @@ -87,15 +72,6 @@ func (q *persistentQueue) Poll(c context.Context, agentID int64, f FilterFn) (*m
return task, err
}

// Evict removes a pending task from the queue.
func (q *persistentQueue) Evict(c context.Context, id string) error {
err := q.Queue.Evict(c, id)
if err == nil {
return q.store.TaskDelete(id)
}
return err
}

// EvictAtOnce removes multiple pending tasks from the queue.
func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error {
if err := q.Queue.EvictAtOnce(c, ids); err != nil {
Expand Down
6 changes: 0 additions & 6 deletions server/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ type FilterFn func(*model.Task) (bool, int)
// Queue defines a task queue for scheduling tasks among
// a pool of workers.
type Queue interface {
// Push pushes a task to the tail of this queue.
Push(c context.Context, task *model.Task) error

// PushAtOnce pushes multiple tasks to the tail of this queue.
PushAtOnce(c context.Context, tasks []*model.Task) error

Expand All @@ -98,9 +95,6 @@ type Queue interface {
// ErrorAtOnce signals multiple done are complete with an error.
ErrorAtOnce(c context.Context, ids []string, err error) error

// Evict removes a pending task from the queue.
Evict(c context.Context, id string) error

// EvictAtOnce removes multiple pending tasks from the queue.
EvictAtOnce(c context.Context, ids []string) error

Expand Down