Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ func runGrpcServer(ctx context.Context, c *cli.Command, _store store.Store) erro
)

woodpeckerServer := server_rpc.NewWoodpeckerServer(
server.Config.Services.Queue,
server.Config.Services.Scheduler,
server.Config.Services.Logs,
server.Config.Services.Pubsub,
_store,
)
proto.RegisterWoodpeckerServer(grpcServer, woodpeckerServer)
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func startMetricsCollector(ctx context.Context, _store store.Store) {
log.Info().Msg("queue metric collector started")

for {
stats := server.Config.Services.Queue.Info(ctx)
stats := server.Config.Services.Scheduler.Info(ctx)
pendingSteps.Set(float64(stats.Stats.Pending))
waitingSteps.Set(float64(stats.Stats.WaitingOnDeps))
runningSteps.Set(float64(stats.Stats.Running))
Expand Down
6 changes: 4 additions & 2 deletions cmd/server/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory"
"go.woodpecker-ci.org/woodpecker/v3/server/queue"
"go.woodpecker-ci.org/woodpecker/v3/server/scheduler"
"go.woodpecker-ci.org/woodpecker/v3/server/services"
service_log "go.woodpecker-ci.org/woodpecker/v3/server/services/log"
"go.woodpecker-ci.org/woodpecker/v3/server/services/log/addon"
Expand Down Expand Up @@ -159,12 +160,13 @@ func setupJWTSecret(_store store.Store) (string, error) {
func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) (err error) {
// services
server.Config.Services.Logs = logging.New()
server.Config.Services.Pubsub = memory.New()
server.Config.Services.Membership = setupMembershipService(ctx, s)
server.Config.Services.Queue, err = setupQueue(ctx, s)
pubsub := memory.New()
queue, err := setupQueue(ctx, s)
if err != nil {
return fmt.Errorf("could not setup queue: %w", err)
}
server.Config.Services.Scheduler = scheduler.NewScheduler(queue, pubsub)
server.Config.Services.Manager, err = services.NewManager(c, s, setup.Forge)
if err != nil {
return fmt.Errorf("could not setup service manager: %w", err)
Expand Down
14 changes: 7 additions & 7 deletions server/api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func GetAgentTasks(c *gin.Context) {
}

var tasks []*model.Task
info := server.Config.Services.Queue.Info(c)
info := server.Config.Services.Scheduler.Info(c)
for _, task := range info.Running {
if task.AgentID == agent.ID {
tasks = append(tasks, task)
Expand Down Expand Up @@ -142,7 +142,7 @@ func PatchAgent(c *gin.Context) {
agent.Name = in.Name
agent.NoSchedule = in.NoSchedule
if agent.NoSchedule {
server.Config.Services.Queue.KickAgentWorkers(agent.ID)
server.Config.Services.Scheduler.KickAgentWorkers(agent.ID)
}

err = _store.AgentUpdate(agent)
Expand Down Expand Up @@ -213,7 +213,7 @@ func DeleteAgent(c *gin.Context) {
}

// prevent deletion of agents with running tasks
info := server.Config.Services.Queue.Info(c)
info := server.Config.Services.Scheduler.Info(c)
for _, task := range info.Running {
if task.AgentID == agent.ID {
c.String(http.StatusConflict, "Agent has running tasks")
Expand All @@ -222,7 +222,7 @@ func DeleteAgent(c *gin.Context) {
}

// kick workers to remove the agent from the queue
server.Config.Services.Queue.KickAgentWorkers(agent.ID)
server.Config.Services.Scheduler.KickAgentWorkers(agent.ID)

if err = _store.AgentDelete(agent); err != nil {
c.String(http.StatusInternalServerError, "Error deleting user. %s", err)
Expand Down Expand Up @@ -345,7 +345,7 @@ func PatchOrgAgent(c *gin.Context) {
agent.Name = in.Name
agent.NoSchedule = in.NoSchedule
if agent.NoSchedule {
server.Config.Services.Queue.KickAgentWorkers(agent.ID)
server.Config.Services.Scheduler.KickAgentWorkers(agent.ID)
}

if err := _store.AgentUpdate(agent); err != nil {
Expand Down Expand Up @@ -388,7 +388,7 @@ func DeleteOrgAgent(c *gin.Context) {
}

// Check if the agent has any running tasks
info := server.Config.Services.Queue.Info(c)
info := server.Config.Services.Scheduler.Info(c)
for _, task := range info.Running {
if task.AgentID == agent.ID {
c.String(http.StatusConflict, "Agent has running tasks")
Expand All @@ -397,7 +397,7 @@ func DeleteOrgAgent(c *gin.Context) {
}

// Kick workers to remove the agent from the queue
server.Config.Services.Queue.KickAgentWorkers(agent.ID)
server.Config.Services.Scheduler.KickAgentWorkers(agent.ID)

if err := _store.AgentDelete(agent); err != nil {
c.String(http.StatusInternalServerError, "Error deleting agent. %s", err)
Expand Down
6 changes: 4 additions & 2 deletions server/api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (

"go.woodpecker-ci.org/woodpecker/v3/server"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory"
"go.woodpecker-ci.org/woodpecker/v3/server/queue"
queue_mocks "go.woodpecker-ci.org/woodpecker/v3/server/queue/mocks"
"go.woodpecker-ci.org/woodpecker/v3/server/scheduler"
manager_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/mocks"
store_mocks "go.woodpecker-ci.org/woodpecker/v3/server/store/mocks"
"go.woodpecker-ci.org/woodpecker/v3/server/store/types"
Expand Down Expand Up @@ -196,7 +198,7 @@ func TestDeleteAgent(t *testing.T) {
mockQueue := queue_mocks.NewMockQueue(t)
mockQueue.On("Info", mock.Anything).Return(queue.InfoT{})
mockQueue.On("KickAgentWorkers", int64(1)).Return()
server.Config.Services.Queue = mockQueue
server.Config.Services.Scheduler = scheduler.NewScheduler(mockQueue, memory.New())

w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
Expand All @@ -223,7 +225,7 @@ func TestDeleteAgent(t *testing.T) {
mockQueue.On("Info", mock.Anything).Return(queue.InfoT{
Running: []*model.Task{{AgentID: 1}},
})
server.Config.Services.Queue = mockQueue
server.Config.Services.Scheduler = scheduler.NewScheduler(mockQueue, memory.New())

w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
Expand Down
11 changes: 5 additions & 6 deletions server/api/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory"
queue_mocks "go.woodpecker-ci.org/woodpecker/v3/server/queue/mocks"
"go.woodpecker-ci.org/woodpecker/v3/server/scheduler"
config_service_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/config/mocks"
manager_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/mocks"
registry_service_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/registry/mocks"
Expand Down Expand Up @@ -264,7 +265,7 @@ func TestCancelPipeline(t *testing.T) {
mockManager := manager_mocks.NewMockManager(t)
mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil)
server.Config.Services.Manager = mockManager
server.Config.Services.Pubsub = memory.New()
server.Config.Services.Scheduler = scheduler.NewScheduler(nil, memory.New())

w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
Expand Down Expand Up @@ -315,11 +316,10 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("EnvironmentService").Return(nil).Maybe()
server.Config.Services.Manager = mockManager

server.Config.Services.Pubsub = memory.New()
mockQueue := queue_mocks.NewMockQueue(t)
mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe()
mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe()
server.Config.Services.Queue = mockQueue
server.Config.Services.Scheduler = scheduler.NewScheduler(mockQueue, memory.New())

// mimic the valid config data
configData := []*forge_types.FileMeta{
Expand Down Expand Up @@ -388,11 +388,10 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("EnvironmentService").Return(nil).Maybe()
server.Config.Services.Manager = mockManager

server.Config.Services.Pubsub = memory.New()
mockQueue := queue_mocks.NewMockQueue(t)
mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe()
mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe()
server.Config.Services.Queue = mockQueue
server.Config.Services.Scheduler = scheduler.NewScheduler(mockQueue, memory.New())

// mimic the old config data
oldConfigData := []*forge_types.FileMeta{
Expand Down Expand Up @@ -444,7 +443,7 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil)
mockManager.On("ConfigServiceFromRepo", fakeRepo).Return(mockConfigService)
server.Config.Services.Manager = mockManager
server.Config.Services.Pubsub = memory.New()
server.Config.Services.Scheduler = scheduler.NewScheduler(nil, memory.New())

// return nil config with error
mockConfigService.On("Fetch", mock.Anything, mockForge, fakeUser, fakeRepo, mock.Anything, mock.Anything, false).Return(nil, http.ErrHandlerTimeout)
Expand Down
8 changes: 4 additions & 4 deletions server/api/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// @Tags Pipeline queues
// @Param Authorization header string true "Insert your personal access token" default(Bearer <personal access token>)
func GetQueueInfo(c *gin.Context) {
info := server.Config.Services.Queue.Info(c)
info := server.Config.Services.Scheduler.Info(c)
_store := store.FromContext(c)

// Create a map to store agent names by ID
Expand Down Expand Up @@ -91,7 +91,7 @@ func GetQueueInfo(c *gin.Context) {
// @Tags Pipeline queues
// @Param Authorization header string true "Insert your personal access token" default(Bearer <personal access token>)
func PauseQueue(c *gin.Context) {
server.Config.Services.Queue.Pause()
server.Config.Services.Scheduler.Pause()
c.Status(http.StatusNoContent)
}

Expand All @@ -104,7 +104,7 @@ func PauseQueue(c *gin.Context) {
// @Tags Pipeline queues
// @Param Authorization header string true "Insert your personal access token" default(Bearer <personal access token>)
func ResumeQueue(c *gin.Context) {
server.Config.Services.Queue.Resume()
server.Config.Services.Scheduler.Resume()
c.Status(http.StatusNoContent)
}

Expand All @@ -118,7 +118,7 @@ func ResumeQueue(c *gin.Context) {
// @Param Authorization header string true "Insert your personal access token" default(Bearer <personal access token>)
func BlockTilQueueHasRunningItem(c *gin.Context) {
for {
info := server.Config.Services.Queue.Info(c)
info := server.Config.Services.Scheduler.Info(c)
if info.Stats.Running == 0 {
break
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func EventStreamSSE(c *gin.Context) {
}()

go func() {
err := server.Config.Services.Pubsub.Subscribe(ctx, subTopics,
err := server.Config.Services.Scheduler.Subscribe(ctx, subTopics,
func(m pubsub.Message) {
select {
case <-ctx.Done():
Expand Down
6 changes: 2 additions & 4 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/server/cache"
"go.woodpecker-ci.org/woodpecker/v3/server/logging"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v3/server/queue"
"go.woodpecker-ci.org/woodpecker/v3/server/scheduler"
"go.woodpecker-ci.org/woodpecker/v3/server/services"
"go.woodpecker-ci.org/woodpecker/v3/server/services/log"
"go.woodpecker-ci.org/woodpecker/v3/server/services/permissions"
)

var Config = struct {
Services struct {
Pubsub pubsub.PubSub
Queue queue.Queue
Scheduler scheduler.Scheduler
Logs logging.Log
Membership cache.MembershipService
Manager services.Manager
Expand Down
2 changes: 1 addition & 1 deletion server/pipeline/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo
}

if len(workflowsToCancel) != 0 {
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil {
if err := server.Config.Services.Scheduler.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToCancel)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/pipeline/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func queuePipeline(ctx context.Context, repo *model.Repo, pipelineItems []*step_

tasks = append(tasks, task)
}
return server.Config.Services.Queue.PushAtOnce(ctx, tasks)
return server.Config.Services.Scheduler.PushAtOnce(ctx, tasks)
}

func getTaskDependencies(dependsOn []string, items []*step_builder.Item) (taskIDs []string) {
Expand Down
2 changes: 1 addition & 1 deletion server/pipeline/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Rep
// publish to repo specific topic
subTopics[pubsub.GetRepoTopic(repo)] = struct{}{}

return server.Config.Services.Pubsub.Publish(c, subTopics, message)
return server.Config.Services.Scheduler.Publish(c, subTopics, message)
}
20 changes: 10 additions & 10 deletions server/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/server/pipeline"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v3/server/queue"
"go.woodpecker-ci.org/woodpecker/v3/server/scheduler"
"go.woodpecker-ci.org/woodpecker/v3/server/store"
)

// updateAgentLastWorkDelay the delay before the LastWork info should be updated.
const updateAgentLastWorkDelay = time.Minute

type RPC struct {
queue queue.Queue
pubsub pubsub.PubSub
scheduler scheduler.Scheduler
logger logging.Log
store store.Store
pipelineTime *prometheus.GaugeVec
Expand Down Expand Up @@ -84,7 +84,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er

for {
// poll blocks until a task is available or the context is canceled / worker is kicked
task, err := s.queue.Poll(c, agent.ID, filterFn)
task, err := s.scheduler.Poll(c, agent.ID, filterFn)
if err != nil || task == nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err err
return false, err
}

if err := s.queue.Wait(c, workflowID); err != nil {
if err := s.scheduler.Wait(c, workflowID); err != nil {
if errors.Is(err, queue.ErrCancel) {
// we explicit send a cancel signal
log.Debug().Str("workflowID", workflowID).Msg("while waiting the queue reported the workflow as canceled")
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *RPC) Extend(c context.Context, workflowID string) error {
return err
}

return s.queue.Extend(c, agent.ID, workflowID)
return s.scheduler.Extend(c, agent.ID, workflowID)
}

// Update let agent updates the step state at the server.
Expand Down Expand Up @@ -351,15 +351,15 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt
var queueErr error
if !state.Canceled {
if workflow.Failing() {
queueErr = s.queue.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error))
queueErr = s.scheduler.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error))
} else {
queueErr = s.queue.Done(c, strWorkflowID, workflow.State)
queueErr = s.scheduler.Done(c, strWorkflowID, workflow.State)
}
} else {
if workflow.Started > 0 {
queueErr = s.queue.Done(c, strWorkflowID, model.StatusKilled)
queueErr = s.scheduler.Done(c, strWorkflowID, model.StatusKilled)
} else {
queueErr = s.queue.Done(c, strWorkflowID, model.StatusCanceled)
queueErr = s.scheduler.Done(c, strWorkflowID, model.StatusCanceled)
}
}
if queueErr != nil {
Expand Down Expand Up @@ -585,7 +585,7 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeli
// publish to repo specific topic
subTopics[pubsub.GetRepoTopic(repo)] = struct{}{}

return s.pubsub.Publish(c, subTopics, message)
return s.scheduler.Publish(c, subTopics, message)
}

func (s *RPC) getAgentFromContext(ctx context.Context) (*model.Agent, error) {
Expand Down
Loading