From 45d4130355712e96b649d2b8ca3f2e47e688108c Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 10 Apr 2026 01:27:39 +0200 Subject: [PATCH] Init server/scheduler package and use it as proxy for queue&pubsub --- cmd/server/grpc_server.go | 3 +- cmd/server/metrics_server.go | 2 +- cmd/server/setup.go | 6 +- server/api/agent.go | 14 ++--- server/api/agent_test.go | 6 +- server/api/pipeline_test.go | 11 ++-- server/api/queue.go | 8 +-- server/api/stream.go | 2 +- server/config.go | 6 +- server/pipeline/cancel.go | 2 +- server/pipeline/queue.go | 2 +- server/pipeline/topic.go | 2 +- server/rpc/rpc.go | 20 +++---- server/rpc/rpc_integration_test.go | 77 +++++++++++++------------- server/rpc/server.go | 8 +-- server/scheduler/proxy.go | 88 ++++++++++++++++++++++++++++++ server/scheduler/scheduler.go | 33 +++++++++++ 17 files changed, 205 insertions(+), 85 deletions(-) create mode 100644 server/scheduler/proxy.go create mode 100644 server/scheduler/scheduler.go diff --git a/cmd/server/grpc_server.go b/cmd/server/grpc_server.go index ac32d5c13c0..a5feae8a0f0 100644 --- a/cmd/server/grpc_server.go +++ b/cmd/server/grpc_server.go @@ -49,9 +49,8 @@ func runGrpcServer(ctx context.Context, c *cli.Command, _store store.Store) erro ) woodpeckerServer := server_rpc.NewWoodpeckerServer( - server.Config.Services.Queue, + server.Config.Services.Scheduler, server.Config.Services.Logs, - server.Config.Services.Pubsub, _store, ) proto.RegisterWoodpeckerServer(grpcServer, woodpeckerServer) diff --git a/cmd/server/metrics_server.go b/cmd/server/metrics_server.go index a7034f5d2f9..c746f1372b1 100644 --- a/cmd/server/metrics_server.go +++ b/cmd/server/metrics_server.go @@ -68,7 +68,7 @@ func startMetricsCollector(ctx context.Context, _store store.Store) { log.Info().Msg("queue metric collector started") for { - stats := server.Config.Services.Queue.Info(ctx) + stats := server.Config.Services.Scheduler.Info(ctx) pendingSteps.Set(float64(stats.Stats.Pending)) waitingSteps.Set(float64(stats.Stats.WaitingOnDeps)) runningSteps.Set(float64(stats.Stats.Running)) diff --git a/cmd/server/setup.go b/cmd/server/setup.go index 9e157e12901..47019d52768 100644 --- a/cmd/server/setup.go +++ b/cmd/server/setup.go @@ -36,6 +36,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/model" "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" "go.woodpecker-ci.org/woodpecker/v3/server/queue" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" "go.woodpecker-ci.org/woodpecker/v3/server/services" service_log "go.woodpecker-ci.org/woodpecker/v3/server/services/log" "go.woodpecker-ci.org/woodpecker/v3/server/services/log/addon" @@ -159,12 +160,13 @@ func setupJWTSecret(_store store.Store) (string, error) { func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) (err error) { // services server.Config.Services.Logs = logging.New() - server.Config.Services.Pubsub = memory.New() server.Config.Services.Membership = setupMembershipService(ctx, s) - server.Config.Services.Queue, err = setupQueue(ctx, s) + pubsub := memory.New() + queue, err := setupQueue(ctx, s) if err != nil { return fmt.Errorf("could not setup queue: %w", err) } + server.Config.Services.Scheduler = scheduler.NewScheduler(queue, pubsub) server.Config.Services.Manager, err = services.NewManager(c, s, setup.Forge) if err != nil { return fmt.Errorf("could not setup service manager: %w", err) diff --git a/server/api/agent.go b/server/api/agent.go index 116fe5cac4f..22d3af0ddc2 100644 --- a/server/api/agent.go +++ b/server/api/agent.go @@ -96,7 +96,7 @@ func GetAgentTasks(c *gin.Context) { } var tasks []*model.Task - info := server.Config.Services.Queue.Info(c) + info := server.Config.Services.Scheduler.Info(c) for _, task := range info.Running { if task.AgentID == agent.ID { tasks = append(tasks, task) @@ -142,7 +142,7 @@ func PatchAgent(c *gin.Context) { agent.Name = in.Name agent.NoSchedule = in.NoSchedule if agent.NoSchedule { - server.Config.Services.Queue.KickAgentWorkers(agent.ID) + server.Config.Services.Scheduler.KickAgentWorkers(agent.ID) } err = _store.AgentUpdate(agent) @@ -213,7 +213,7 @@ func DeleteAgent(c *gin.Context) { } // prevent deletion of agents with running tasks - info := server.Config.Services.Queue.Info(c) + info := server.Config.Services.Scheduler.Info(c) for _, task := range info.Running { if task.AgentID == agent.ID { c.String(http.StatusConflict, "Agent has running tasks") @@ -222,7 +222,7 @@ func DeleteAgent(c *gin.Context) { } // kick workers to remove the agent from the queue - server.Config.Services.Queue.KickAgentWorkers(agent.ID) + server.Config.Services.Scheduler.KickAgentWorkers(agent.ID) if err = _store.AgentDelete(agent); err != nil { c.String(http.StatusInternalServerError, "Error deleting user. %s", err) @@ -345,7 +345,7 @@ func PatchOrgAgent(c *gin.Context) { agent.Name = in.Name agent.NoSchedule = in.NoSchedule if agent.NoSchedule { - server.Config.Services.Queue.KickAgentWorkers(agent.ID) + server.Config.Services.Scheduler.KickAgentWorkers(agent.ID) } if err := _store.AgentUpdate(agent); err != nil { @@ -388,7 +388,7 @@ func DeleteOrgAgent(c *gin.Context) { } // Check if the agent has any running tasks - info := server.Config.Services.Queue.Info(c) + info := server.Config.Services.Scheduler.Info(c) for _, task := range info.Running { if task.AgentID == agent.ID { c.String(http.StatusConflict, "Agent has running tasks") @@ -397,7 +397,7 @@ func DeleteOrgAgent(c *gin.Context) { } // Kick workers to remove the agent from the queue - server.Config.Services.Queue.KickAgentWorkers(agent.ID) + server.Config.Services.Scheduler.KickAgentWorkers(agent.ID) if err := _store.AgentDelete(agent); err != nil { c.String(http.StatusInternalServerError, "Error deleting agent. %s", err) diff --git a/server/api/agent_test.go b/server/api/agent_test.go index 4573c00483b..b01268bf1e0 100644 --- a/server/api/agent_test.go +++ b/server/api/agent_test.go @@ -27,8 +27,10 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server" "go.woodpecker-ci.org/woodpecker/v3/server/model" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" "go.woodpecker-ci.org/woodpecker/v3/server/queue" queue_mocks "go.woodpecker-ci.org/woodpecker/v3/server/queue/mocks" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" manager_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/mocks" store_mocks "go.woodpecker-ci.org/woodpecker/v3/server/store/mocks" "go.woodpecker-ci.org/woodpecker/v3/server/store/types" @@ -196,7 +198,7 @@ func TestDeleteAgent(t *testing.T) { mockQueue := queue_mocks.NewMockQueue(t) mockQueue.On("Info", mock.Anything).Return(queue.InfoT{}) mockQueue.On("KickAgentWorkers", int64(1)).Return() - server.Config.Services.Queue = mockQueue + server.Config.Services.Scheduler = scheduler.NewScheduler(mockQueue, memory.New()) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -223,7 +225,7 @@ func TestDeleteAgent(t *testing.T) { mockQueue.On("Info", mock.Anything).Return(queue.InfoT{ Running: []*model.Task{{AgentID: 1}}, }) - server.Config.Services.Queue = mockQueue + server.Config.Services.Scheduler = scheduler.NewScheduler(mockQueue, memory.New()) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) diff --git a/server/api/pipeline_test.go b/server/api/pipeline_test.go index 6f96cc96caa..807bf639181 100644 --- a/server/api/pipeline_test.go +++ b/server/api/pipeline_test.go @@ -33,6 +33,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/model" "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" queue_mocks "go.woodpecker-ci.org/woodpecker/v3/server/queue/mocks" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" config_service_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/config/mocks" manager_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/mocks" registry_service_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/registry/mocks" @@ -264,7 +265,7 @@ func TestCancelPipeline(t *testing.T) { mockManager := manager_mocks.NewMockManager(t) mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil) server.Config.Services.Manager = mockManager - server.Config.Services.Pubsub = memory.New() + server.Config.Services.Scheduler = scheduler.NewScheduler(nil, memory.New()) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -315,11 +316,10 @@ func TestCreatePipeline(t *testing.T) { mockManager.On("EnvironmentService").Return(nil).Maybe() server.Config.Services.Manager = mockManager - server.Config.Services.Pubsub = memory.New() mockQueue := queue_mocks.NewMockQueue(t) mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe() mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe() - server.Config.Services.Queue = mockQueue + server.Config.Services.Scheduler = scheduler.NewScheduler(mockQueue, memory.New()) // mimic the valid config data configData := []*forge_types.FileMeta{ @@ -388,11 +388,10 @@ func TestCreatePipeline(t *testing.T) { mockManager.On("EnvironmentService").Return(nil).Maybe() server.Config.Services.Manager = mockManager - server.Config.Services.Pubsub = memory.New() mockQueue := queue_mocks.NewMockQueue(t) mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe() mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe() - server.Config.Services.Queue = mockQueue + server.Config.Services.Scheduler = scheduler.NewScheduler(mockQueue, memory.New()) // mimic the old config data oldConfigData := []*forge_types.FileMeta{ @@ -444,7 +443,7 @@ func TestCreatePipeline(t *testing.T) { mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil) mockManager.On("ConfigServiceFromRepo", fakeRepo).Return(mockConfigService) server.Config.Services.Manager = mockManager - server.Config.Services.Pubsub = memory.New() + server.Config.Services.Scheduler = scheduler.NewScheduler(nil, memory.New()) // return nil config with error mockConfigService.On("Fetch", mock.Anything, mockForge, fakeUser, fakeRepo, mock.Anything, mock.Anything, false).Return(nil, http.ErrHandlerTimeout) diff --git a/server/api/queue.go b/server/api/queue.go index 76e451ae3ca..191ab738019 100644 --- a/server/api/queue.go +++ b/server/api/queue.go @@ -35,7 +35,7 @@ import ( // @Tags Pipeline queues // @Param Authorization header string true "Insert your personal access token" default(Bearer ) func GetQueueInfo(c *gin.Context) { - info := server.Config.Services.Queue.Info(c) + info := server.Config.Services.Scheduler.Info(c) _store := store.FromContext(c) // Create a map to store agent names by ID @@ -91,7 +91,7 @@ func GetQueueInfo(c *gin.Context) { // @Tags Pipeline queues // @Param Authorization header string true "Insert your personal access token" default(Bearer ) func PauseQueue(c *gin.Context) { - server.Config.Services.Queue.Pause() + server.Config.Services.Scheduler.Pause() c.Status(http.StatusNoContent) } @@ -104,7 +104,7 @@ func PauseQueue(c *gin.Context) { // @Tags Pipeline queues // @Param Authorization header string true "Insert your personal access token" default(Bearer ) func ResumeQueue(c *gin.Context) { - server.Config.Services.Queue.Resume() + server.Config.Services.Scheduler.Resume() c.Status(http.StatusNoContent) } @@ -118,7 +118,7 @@ func ResumeQueue(c *gin.Context) { // @Param Authorization header string true "Insert your personal access token" default(Bearer ) func BlockTilQueueHasRunningItem(c *gin.Context) { for { - info := server.Config.Services.Queue.Info(c) + info := server.Config.Services.Scheduler.Info(c) if info.Stats.Running == 0 { break } diff --git a/server/api/stream.go b/server/api/stream.go index 7d0a667c22b..3e740bb81ce 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -95,7 +95,7 @@ func EventStreamSSE(c *gin.Context) { }() go func() { - err := server.Config.Services.Pubsub.Subscribe(ctx, subTopics, + err := server.Config.Services.Scheduler.Subscribe(ctx, subTopics, func(m pubsub.Message) { select { case <-ctx.Done(): diff --git a/server/config.go b/server/config.go index b94b98e743e..4d3400d548f 100644 --- a/server/config.go +++ b/server/config.go @@ -21,8 +21,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/cache" "go.woodpecker-ci.org/woodpecker/v3/server/logging" "go.woodpecker-ci.org/woodpecker/v3/server/model" - "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" - "go.woodpecker-ci.org/woodpecker/v3/server/queue" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" "go.woodpecker-ci.org/woodpecker/v3/server/services" "go.woodpecker-ci.org/woodpecker/v3/server/services/log" "go.woodpecker-ci.org/woodpecker/v3/server/services/permissions" @@ -30,8 +29,7 @@ import ( var Config = struct { Services struct { - Pubsub pubsub.PubSub - Queue queue.Queue + Scheduler scheduler.Scheduler Logs logging.Log Membership cache.MembershipService Manager services.Manager diff --git a/server/pipeline/cancel.go b/server/pipeline/cancel.go index cff9243e1f8..7be328ef8ec 100644 --- a/server/pipeline/cancel.go +++ b/server/pipeline/cancel.go @@ -48,7 +48,7 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo } if len(workflowsToCancel) != 0 { - if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil { + if err := server.Config.Services.Scheduler.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil { log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToCancel) } } diff --git a/server/pipeline/queue.go b/server/pipeline/queue.go index 3cee7df3898..b8003e92396 100644 --- a/server/pipeline/queue.go +++ b/server/pipeline/queue.go @@ -60,7 +60,7 @@ func queuePipeline(ctx context.Context, repo *model.Repo, pipelineItems []*step_ tasks = append(tasks, task) } - return server.Config.Services.Queue.PushAtOnce(ctx, tasks) + return server.Config.Services.Scheduler.PushAtOnce(ctx, tasks) } func getTaskDependencies(dependsOn []string, items []*step_builder.Item) (taskIDs []string) { diff --git a/server/pipeline/topic.go b/server/pipeline/topic.go index 6477fdc704c..498b660bce6 100644 --- a/server/pipeline/topic.go +++ b/server/pipeline/topic.go @@ -45,5 +45,5 @@ func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Rep // publish to repo specific topic subTopics[pubsub.GetRepoTopic(repo)] = struct{}{} - return server.Config.Services.Pubsub.Publish(c, subTopics, message) + return server.Config.Services.Scheduler.Publish(c, subTopics, message) } diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index d3f09082cd1..8a48c5a5c3b 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -38,6 +38,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/pipeline" "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" "go.woodpecker-ci.org/woodpecker/v3/server/queue" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" "go.woodpecker-ci.org/woodpecker/v3/server/store" ) @@ -45,8 +46,7 @@ import ( const updateAgentLastWorkDelay = time.Minute type RPC struct { - queue queue.Queue - pubsub pubsub.PubSub + scheduler scheduler.Scheduler logger logging.Log store store.Store pipelineTime *prometheus.GaugeVec @@ -84,7 +84,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er for { // poll blocks until a task is available or the context is canceled / worker is kicked - task, err := s.queue.Poll(c, agent.ID, filterFn) + task, err := s.scheduler.Poll(c, agent.ID, filterFn) if err != nil || task == nil { return nil, err } @@ -114,7 +114,7 @@ func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err err return false, err } - if err := s.queue.Wait(c, workflowID); err != nil { + if err := s.scheduler.Wait(c, workflowID); err != nil { if errors.Is(err, queue.ErrCancel) { // we explicit send a cancel signal log.Debug().Str("workflowID", workflowID).Msg("while waiting the queue reported the workflow as canceled") @@ -146,7 +146,7 @@ func (s *RPC) Extend(c context.Context, workflowID string) error { return err } - return s.queue.Extend(c, agent.ID, workflowID) + return s.scheduler.Extend(c, agent.ID, workflowID) } // Update let agent updates the step state at the server. @@ -351,15 +351,15 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt var queueErr error if !state.Canceled { if workflow.Failing() { - queueErr = s.queue.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error)) + queueErr = s.scheduler.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error)) } else { - queueErr = s.queue.Done(c, strWorkflowID, workflow.State) + queueErr = s.scheduler.Done(c, strWorkflowID, workflow.State) } } else { if workflow.Started > 0 { - queueErr = s.queue.Done(c, strWorkflowID, model.StatusKilled) + queueErr = s.scheduler.Done(c, strWorkflowID, model.StatusKilled) } else { - queueErr = s.queue.Done(c, strWorkflowID, model.StatusCanceled) + queueErr = s.scheduler.Done(c, strWorkflowID, model.StatusCanceled) } } if queueErr != nil { @@ -585,7 +585,7 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeli // publish to repo specific topic subTopics[pubsub.GetRepoTopic(repo)] = struct{}{} - return s.pubsub.Publish(c, subTopics, message) + return s.scheduler.Publish(c, subTopics, message) } func (s *RPC) getAgentFromContext(ctx context.Context) (*model.Agent, error) { diff --git a/server/rpc/rpc_integration_test.go b/server/rpc/rpc_integration_test.go index 26ec74589f4..4a8d9d9a887 100644 --- a/server/rpc/rpc_integration_test.go +++ b/server/rpc/rpc_integration_test.go @@ -30,13 +30,15 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/logging" "go.woodpecker-ci.org/woodpecker/v3/server/model" "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" + "go.woodpecker-ci.org/woodpecker/v3/server/queue" queue_mocks "go.woodpecker-ci.org/woodpecker/v3/server/queue/mocks" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" log_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/log/mocks" store_mocks "go.woodpecker-ci.org/woodpecker/v3/server/store/mocks" ) // newTestRPC creates an RPC instance with common test infrastructure. -func newTestRPC(t *testing.T, mockStore *store_mocks.MockStore) RPC { +func newTestRPC(t *testing.T, mockStore *store_mocks.MockStore, q queue.Queue) RPC { t.Helper() pipelineTime := prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -50,7 +52,7 @@ func newTestRPC(t *testing.T, mockStore *store_mocks.MockStore) RPC { return RPC{ store: mockStore, - pubsub: memory.New(), + scheduler: scheduler.NewScheduler(q, memory.New()), logger: logging.New(), pipelineTime: pipelineTime, pipelineCount: pipelineCount, @@ -133,7 +135,7 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("StepUpdate", mock.Anything).Return(nil) mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "30", rpc.StepState{ @@ -167,7 +169,7 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) mockLogStore.On("StepFinished", mock.Anything).Return() - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) // Step reports exit → it will transition to success/failure (terminal) @@ -194,7 +196,7 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) // Step reports started but not exited → still running (non-terminal) @@ -215,7 +217,7 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) @@ -239,7 +241,7 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("AgentFind", int64(1)).Return(agent, nil) mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) @@ -261,7 +263,7 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) mockStore.On("GetRepo", int64(10)).Return(repo, nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "2")) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) @@ -271,7 +273,7 @@ func TestRPCUpdate(t *testing.T) { t.Run("reject invalid workflow ID", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "not-a-number", rpc.StepState{StepUUID: "step-uuid-123"}) @@ -282,7 +284,7 @@ func TestRPCUpdate(t *testing.T) { mockStore := store_mocks.NewMockStore(t) mockStore.On("WorkflowLoad", int64(999)).Return(nil, errors.New("not found")) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "999", rpc.StepState{StepUUID: "step-uuid-123"}) @@ -300,7 +302,7 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("AgentFind", int64(1)).Return(agent, nil) mockStore.On("StepByUUID", "nonexistent").Return(nil, errors.New("not found")) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "nonexistent"}) @@ -315,7 +317,7 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) // no agent_id in metadata ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs()) @@ -347,7 +349,7 @@ func TestRPCInit(t *testing.T) { // updateAgentLastWork -> AgentUpdate mockStore.On("AgentUpdate", mock.Anything).Return(nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) @@ -371,7 +373,7 @@ func TestRPCInit(t *testing.T) { mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) mockStore.On("AgentUpdate", mock.Anything).Return(nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) @@ -389,7 +391,7 @@ func TestRPCInit(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) @@ -407,7 +409,7 @@ func TestRPCInit(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) @@ -425,7 +427,7 @@ func TestRPCInit(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "2")) err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) @@ -435,7 +437,7 @@ func TestRPCInit(t *testing.T) { t.Run("reject invalid workflow ID", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Init(ctx, "not-a-number", rpc.WorkflowState{}) @@ -470,8 +472,7 @@ func TestRPCDone(t *testing.T) { mockStore.On("AgentUpdate", mock.Anything).Return(nil) mockQueue.On("Done", mock.Anything, mock.Anything, mock.Anything).Return(nil) - rpcInst := newTestRPC(t, mockStore) - rpcInst.queue = mockQueue + rpcInst := newTestRPC(t, mockStore, mockQueue) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Started: 100, Finished: 200}) @@ -490,7 +491,7 @@ func TestRPCDone(t *testing.T) { mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) mockStore.On("AgentFind", int64(1)).Return(agent, nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) @@ -509,7 +510,7 @@ func TestRPCDone(t *testing.T) { mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) mockStore.On("AgentFind", int64(1)).Return(agent, nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) @@ -528,7 +529,7 @@ func TestRPCDone(t *testing.T) { mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) mockStore.On("AgentFind", int64(2)).Return(agent, nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "2")) err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) @@ -538,7 +539,7 @@ func TestRPCDone(t *testing.T) { t.Run("reject invalid workflow ID", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Done(ctx, "invalid", rpc.WorkflowState{}) @@ -580,7 +581,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("AgentUpdate", mock.Anything).Return(nil) mockLogStore.On("LogAppend", mock.Anything, mock.Anything).Return(nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) entries := []*rpc.LogEntry{ @@ -609,7 +610,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("AgentUpdate", mock.Anything).Return(nil) mockLogStore.On("LogAppend", mock.Anything, mock.Anything).Return(nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -636,7 +637,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("AgentUpdate", mock.Anything).Return(nil) mockLogStore.On("LogAppend", mock.Anything, mock.Anything).Return(nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -663,7 +664,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("AgentUpdate", mock.Anything).Return(nil) mockLogStore.On("LogAppend", mock.Anything, mock.Anything).Return(nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -683,7 +684,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -705,7 +706,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -726,7 +727,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -748,7 +749,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -769,7 +770,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -796,7 +797,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) mockStore.On("AgentUpdate", mock.Anything).Return(nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) // Second entry has a rogue UUID — agent trying to inject into another step. @@ -820,7 +821,7 @@ func TestRPCLog(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "2")) err := rpcInst.Log(ctx, "step-uuid-123", []*rpc.LogEntry{ @@ -834,7 +835,7 @@ func TestRPCLog(t *testing.T) { mockStore := store_mocks.NewMockStore(t) mockStore.On("StepByUUID", "nonexistent").Return(nil, errors.New("not found")) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Log(ctx, "nonexistent", []*rpc.LogEntry{ @@ -859,7 +860,7 @@ func TestRPCExtend(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "2")) err := rpcInst.Extend(ctx, "30") @@ -881,7 +882,7 @@ func TestRPCWait(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - rpcInst := newTestRPC(t, mockStore) + rpcInst := newTestRPC(t, mockStore, nil) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "2")) _, err := rpcInst.Wait(ctx, "30") diff --git a/server/rpc/server.go b/server/rpc/server.go index 7f39e086cf3..8836a1421ff 100644 --- a/server/rpc/server.go +++ b/server/rpc/server.go @@ -25,8 +25,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/rpc" "go.woodpecker-ci.org/woodpecker/v3/rpc/proto" "go.woodpecker-ci.org/woodpecker/v3/server/logging" - "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" - "go.woodpecker-ci.org/woodpecker/v3/server/queue" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" "go.woodpecker-ci.org/woodpecker/v3/server/store" "go.woodpecker-ci.org/woodpecker/v3/version" ) @@ -37,7 +36,7 @@ type WoodpeckerServer struct { peer RPC } -func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub pubsub.PubSub, store store.Store) proto.WoodpeckerServer { +func NewWoodpeckerServer(scheduler scheduler.Scheduler, logger logging.Log, store store.Store) proto.WoodpeckerServer { pipelineTime := promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "woodpecker", Name: "pipeline_time", @@ -50,8 +49,7 @@ func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub pubsub.Pu }, []string{"repo", "branch", "status", "pipeline"}) peer := RPC{ store: store, - queue: queue, - pubsub: pubsub, + scheduler: scheduler, logger: logger, pipelineTime: pipelineTime, pipelineCount: pipelineCount, diff --git a/server/scheduler/proxy.go b/server/scheduler/proxy.go new file mode 100644 index 00000000000..4a40a105f82 --- /dev/null +++ b/server/scheduler/proxy.go @@ -0,0 +1,88 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + + "go.woodpecker-ci.org/woodpecker/v3/server/model" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" + "go.woodpecker-ci.org/woodpecker/v3/server/queue" +) + +type proxy struct { + q queue.Queue + ps pubsub.PubSub +} + +// +// Queue. +// + +func (p *proxy) Done(c context.Context, id string, exitStatus model.StatusValue) error { + return p.q.Done(c, id, exitStatus) +} + +func (p *proxy) Error(c context.Context, id string, err error) error { + return p.q.Error(c, id, err) +} + +func (p *proxy) ErrorAtOnce(c context.Context, ids []string, err error) error { + return p.q.ErrorAtOnce(c, ids, err) +} + +func (p *proxy) Extend(c context.Context, agentID int64, workflowID string) error { + return p.q.Extend(c, agentID, workflowID) +} + +func (p *proxy) Info(c context.Context) queue.InfoT { + return p.q.Info(c) +} + +func (p *proxy) KickAgentWorkers(agentID int64) { + p.q.KickAgentWorkers(agentID) +} + +func (p *proxy) Pause() { + p.q.Pause() +} + +func (p *proxy) Poll(c context.Context, agentID int64, f queue.FilterFn) (*model.Task, error) { + return p.q.Poll(c, agentID, f) +} + +func (p *proxy) PushAtOnce(c context.Context, tasks []*model.Task) error { + return p.q.PushAtOnce(c, tasks) +} + +func (p *proxy) Resume() { + p.q.Resume() +} + +func (p *proxy) Wait(c context.Context, id string) error { + return p.q.Wait(c, id) +} + +// +// PubSub. +// + +func (p *proxy) Subscribe(c context.Context, t pubsub.Topics, r pubsub.Receiver) error { + return p.ps.Subscribe(c, t, r) +} + +func (p *proxy) Publish(c context.Context, t pubsub.Topics, m pubsub.Message) error { + return p.ps.Publish(c, t, m) +} diff --git a/server/scheduler/scheduler.go b/server/scheduler/scheduler.go new file mode 100644 index 00000000000..e4f2bced2da --- /dev/null +++ b/server/scheduler/scheduler.go @@ -0,0 +1,33 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" + "go.woodpecker-ci.org/woodpecker/v3/server/queue" +) + +// Scheduler is currently just the combined interface of Queue & PubSub. +type Scheduler interface { + queue.Queue + pubsub.PubSub +} + +func NewScheduler(q queue.Queue, ps pubsub.PubSub) Scheduler { + return &proxy{ + q: q, + ps: ps, + } +}