Skip to content
Merged
115 changes: 110 additions & 5 deletions cmd/server/openapi/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,7 @@ const docTemplate = `{
},
"/queue/info": {
"get": {
"description": "TODO: link the InfoT response object - this is blocked, until the ` + "`" + `swaggo/swag` + "`" + ` tool dependency is v1.18.12 or newer",
"description": "Returns pipeline queue information with agent details",
"produces": [
"application/json"
],
Expand All @@ -1799,10 +1799,7 @@ const docTemplate = `{
"200": {
"description": "OK",
"schema": {
"type": "object",
"additionalProperties": {
"type": "string"
}
"$ref": "#/definitions/QueueInfo"
}
}
}
Expand Down Expand Up @@ -5019,6 +5016,49 @@ const docTemplate = `{
}
}
},
"QueueInfo": {
"type": "object",
"properties": {
"paused": {
"type": "boolean"
},
"pending": {
"type": "array",
"items": {
"$ref": "#/definitions/model.QueueTask"
}
},
"running": {
"type": "array",
"items": {
"$ref": "#/definitions/model.QueueTask"
}
},
"stats": {
"type": "object",
"properties": {
"pending_count": {
"type": "integer"
},
"running_count": {
"type": "integer"
},
"waiting_on_deps_count": {
"type": "integer"
},
"worker_count": {
"type": "integer"
}
}
},
"waiting_on_deps": {
"type": "array",
"items": {
"$ref": "#/definitions/model.QueueTask"
}
}
}
},
"Registry": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -5453,6 +5493,18 @@ const docTemplate = `{
"type": "string"
}
},
"name": {
"type": "string"
},
"pid": {
"type": "integer"
},
"pipeline_id": {
"type": "integer"
},
"repo_id": {
"type": "integer"
},
"run_on": {
"type": "array",
"items": {
Expand Down Expand Up @@ -5803,6 +5855,59 @@ const docTemplate = `{
"ForgeTypeAddon"
]
},
"model.QueueTask": {
"type": "object",
"properties": {
"agent_id": {
"type": "integer"
},
"agent_name": {
"type": "string"
},
"dep_status": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/StatusValue"
}
},
"dependencies": {
"type": "array",
"items": {
"type": "string"
}
},
"id": {
"type": "string"
},
"labels": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"name": {
"type": "string"
},
"pid": {
"type": "integer"
},
"pipeline_id": {
"type": "integer"
},
"pipeline_number": {
"type": "integer"
},
"repo_id": {
"type": "integer"
},
"run_on": {
"type": "array",
"items": {
"type": "string"
}
}
}
},
"model.TrustedConfiguration": {
"type": "object",
"properties": {
Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
FROM docker.io/golang:1.25-alpine AS golang_image
FROM docker.io/node:23-alpine

RUN apk add --no-cache --update make gcc binutils-gold musl-dev protoc && \
RUN apk add --no-cache --update make gcc binutils-gold musl-dev && \
apk add --no-cache --repository=http://dl-cdn.alpinelinux.org/alpine/edge/main protoc && \
corepack enable

# Build packages.
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rpc/proto/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ package proto

// Version is the version of the woodpecker.proto file,
// IMPORTANT: increased by 1 each time it get changed.
const Version int32 = 13
const Version int32 = 14
2 changes: 1 addition & 1 deletion pipeline/rpc/proto/woodpecker.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

109 changes: 104 additions & 5 deletions server/api/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,115 @@ import (
// GetQueueInfo
//
// @Summary Get pipeline queue information
// @Description TODO: link the InfoT response object - this is blocked, until the `swaggo/swag` tool dependency is v1.18.12 or newer
// @Description Returns pipeline queue information with agent details
// @Router /queue/info [get]
// @Produce json
// @Success 200 {object} map[string]string
// @Success 200 {object} QueueInfo
// @Tags Pipeline queues
// @Param Authorization header string true "Insert your personal access token" default(Bearer <personal access token>)
func GetQueueInfo(c *gin.Context) {
c.IndentedJSON(http.StatusOK,
server.Config.Services.Queue.Info(c),
)
info := server.Config.Services.Queue.Info(c)
_store := store.FromContext(c)

// Create a map to store agent names by ID
agentNameMap := make(map[int64]string)

// Process tasks and add agent names
pendingWithAgents, err := processQueueTasks(_store, info.Pending, agentNameMap)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

waitingWithAgents, err := processQueueTasks(_store, info.WaitingOnDeps, agentNameMap)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

runningWithAgents, err := processQueueTasks(_store, info.Running, agentNameMap)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

// Create response with agent-enhanced tasks
response := model.QueueInfo{
Pending: pendingWithAgents,
WaitingOnDeps: waitingWithAgents,
Running: runningWithAgents,
Stats: struct {
WorkerCount int `json:"worker_count"`
PendingCount int `json:"pending_count"`
WaitingOnDepsCount int `json:"waiting_on_deps_count"`
RunningCount int `json:"running_count"`
}{
WorkerCount: info.Stats.Workers,
PendingCount: info.Stats.Pending,
WaitingOnDepsCount: info.Stats.WaitingOnDeps,
RunningCount: info.Stats.Running,
},
Paused: info.Paused,
}

c.IndentedJSON(http.StatusOK, response)
}

// getAgentName finds an agent's name, utilizing a map as a cache.
func getAgentName(store store.Store, agentNameMap map[int64]string, agentID int64) (string, bool) {
// 1. Check the cache first.
name, exists := agentNameMap[agentID]
if exists {
return name, true
}

// 2. If not in cache, query the store.
agent, err := store.AgentFind(agentID)
if err != nil || agent == nil {
// Agent not found or an error occurred.
return "", false
Comment thread
xoxys marked this conversation as resolved.
}

// 3. Found the agent, update the cache and return the name.
if agent.Name != "" {
agentNameMap[agentID] = agent.Name
return agent.Name, true
}

return "", false
}

// processQueueTasks converts tasks to QueueTask structs and adds agent names.
func processQueueTasks(store store.Store, tasks []*model.Task, agentNameMap map[int64]string) ([]model.QueueTask, error) {
result := make([]model.QueueTask, 0, len(tasks))

for _, task := range tasks {
taskResponse := model.QueueTask{
Task: *task,
}

if task.AgentID == 0 {
result = append(result, taskResponse)
continue
}

name, ok := getAgentName(store, agentNameMap, task.AgentID)
if !ok {
return nil, fmt.Errorf("agent not found for task %s", task.ID)
}

taskResponse.AgentName = name

p, err := store.GetPipeline(task.PipelineID)
if err != nil {
return nil, err
}

taskResponse.PipelineNumber = p.Number

result = append(result, taskResponse)
}
return result, nil
}

// PauseQueue
Expand Down
10 changes: 7 additions & 3 deletions server/grpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"maps"
"strings"

pipelineConsts "go.woodpecker-ci.org/woodpecker/v3/pipeline"
Expand All @@ -25,15 +26,18 @@ import (

func createFilterFunc(agentFilter rpc.Filter) queue.FilterFn {
return func(task *model.Task) (bool, int) {
// Create a copy of the labels for filtering to avoid modifying the original task
labels := maps.Clone(task.Labels)

// ignore internal labels for filtering
for k := range task.Labels {
for k := range labels {
if strings.HasPrefix(k, pipelineConsts.InternalLabelPrefix) {
delete(task.Labels, k)
delete(labels, k)
}
}

score := 0
for taskLabel, taskLabelValue := range task.Labels {
for taskLabel, taskLabelValue := range labels {
// if a task label is empty it will be ignored
if taskLabelValue == "" {
continue
Expand Down
22 changes: 22 additions & 0 deletions server/model/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package model

// QueueTask represents a task in the queue with additional API-specific fields.
type QueueTask struct {
Task
PipelineNumber int64 `json:"pipeline_number"`
AgentName string `json:"agent_name"`
}

// QueueInfo represents the response structure for queue information API.
type QueueInfo struct {
Pending []QueueTask `json:"pending"`
WaitingOnDeps []QueueTask `json:"waiting_on_deps"`
Running []QueueTask `json:"running"`
Stats struct {
WorkerCount int `json:"worker_count"`
PendingCount int `json:"pending_count"`
WaitingOnDepsCount int `json:"waiting_on_deps_count"`
RunningCount int `json:"running_count"`
} `json:"stats"`
Paused bool `json:"paused"`
} // @name QueueInfo
4 changes: 4 additions & 0 deletions server/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ import (
// Task defines scheduled pipeline Task.
type Task struct {
ID string `json:"id" xorm:"PK UNIQUE 'id'"`
PID int `json:"pid" xorm:"'pid'"`
Name string `json:"name" xorm:"'name'"`
Data []byte `json:"-" xorm:"LONGBLOB 'data'"`
Labels map[string]string `json:"labels" xorm:"json 'labels'"`
Dependencies []string `json:"dependencies" xorm:"json 'dependencies'"`
RunOn []string `json:"run_on" xorm:"json 'run_on'"`
DepStatus map[string]StatusValue `json:"dep_status" xorm:"json 'dependencies_status'"`
AgentID int64 `json:"agent_id" xorm:"'agent_id'"`
PipelineID int64 `json:"pipeline_id" xorm:"'pipeline_id'"`
RepoID int64 `json:"repo_id" xorm:"'repo_id'"`
} // @name Task

// TableName return database table name for xorm.
Expand Down
8 changes: 6 additions & 2 deletions server/pipeline/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ func queuePipeline(ctx context.Context, repo *model.Repo, pipelineItems []*stepb
continue
}
task := &model.Task{
ID: fmt.Sprint(item.Workflow.ID),
Labels: make(map[string]string),
ID: fmt.Sprint(item.Workflow.ID),
PID: item.Workflow.PID,
Name: item.Workflow.Name,
Labels: make(map[string]string),
PipelineID: item.Workflow.PipelineID,
RepoID: repo.ID,
}
maps.Copy(task.Labels, item.Labels)
err := task.ApplyLabelsFromRepo(repo)
Expand Down
Loading