Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/server/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/server/forge/setup"
"go.woodpecker-ci.org/woodpecker/v3/server/logging"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory"
"go.woodpecker-ci.org/woodpecker/v3/server/queue"
"go.woodpecker-ci.org/woodpecker/v3/server/services"
service_log "go.woodpecker-ci.org/woodpecker/v3/server/services/log"
Expand Down Expand Up @@ -159,7 +159,7 @@ func setupJWTSecret(_store store.Store) (string, error) {
func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) (err error) {
// services
server.Config.Services.Logs = logging.New()
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()
server.Config.Services.Membership = setupMembershipService(ctx, s)
server.Config.Services.Queue, err = setupQueue(ctx, s)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions server/api/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
forge_mocks "go.woodpecker-ci.org/woodpecker/v3/server/forge/mocks"
forge_types "go.woodpecker-ci.org/woodpecker/v3/server/forge/types"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory"
queue_mocks "go.woodpecker-ci.org/woodpecker/v3/server/queue/mocks"
config_service_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/config/mocks"
manager_mocks "go.woodpecker-ci.org/woodpecker/v3/server/services/mocks"
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestCancelPipeline(t *testing.T) {
mockManager := manager_mocks.NewMockManager(t)
mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil)
server.Config.Services.Manager = mockManager
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()

w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("EnvironmentService").Return(nil).Maybe()
server.Config.Services.Manager = mockManager

server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()
mockQueue := queue_mocks.NewMockQueue(t)
mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe()
mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe()
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("EnvironmentService").Return(nil).Maybe()
server.Config.Services.Manager = mockManager

server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()
mockQueue := queue_mocks.NewMockQueue(t)
mockQueue.On("Push", mock.Anything, mock.Anything).Return(nil).Maybe()
mockQueue.On("PushAtOnce", mock.Anything, mock.Anything).Return(nil).Maybe()
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestCreatePipeline(t *testing.T) {
mockManager.On("ForgeFromRepo", fakeRepo).Return(mockForge, nil)
mockManager.On("ConfigServiceFromRepo", fakeRepo).Return(mockConfigService)
server.Config.Services.Manager = mockManager
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Pubsub = memory.New()

// return nil config with error
mockConfigService.On("Fetch", mock.Anything, mockForge, fakeUser, fakeRepo, mock.Anything, mock.Anything, false).Return(nil, http.ErrHandlerTimeout)
Expand Down
22 changes: 9 additions & 13 deletions server/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ func EventStreamSSE(c *gin.Context) {
log.Debug().Msg("user feed: connection opened")

user := session.User(c)
repo := map[string]bool{}
subTopics := make(map[string]struct{})
// subscribe to all public state changes
subTopics[pubsub.PublicTopic] = struct{}{}
// subscribe to all private state changes or repos the user owns
if user != nil {
repos, _ := store.FromContext(c).RepoList(user, false, true, nil)
for _, r := range repos {
repo[r.FullName] = true
subTopics[pubsub.GetRepoTopic(r)] = struct{}{}
}
}

Expand All @@ -92,23 +95,16 @@ func EventStreamSSE(c *gin.Context) {
}()

go func() {
server.Config.Services.Pubsub.Subscribe(ctx, func(m pubsub.Message) {
defer func() {
obj := recover() // fix #2480 // TODO: check if it's still needed
log.Trace().Msgf("pubsub subscribe recover return: %v", obj)
}()
name := m.Labels["repo"]
priv := m.Labels["private"]
if repo[name] || priv == "false" {
err := server.Config.Services.Pubsub.Subscribe(ctx, subTopics,
func(m pubsub.Message) {
select {
case <-ctx.Done():
return
default:
eventChan <- m.Data
}
}
})
cancel(nil)
})
cancel(err)
}()

for {
Expand Down
2 changes: 1 addition & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

var Config = struct {
Services struct {
Pubsub *pubsub.Publisher
Pubsub pubsub.PubSub
Queue queue.Queue
Logs logging.Log
Membership cache.MembershipService
Expand Down
5 changes: 4 additions & 1 deletion server/pipeline/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo
if killedPipeline.Workflows, err = store.WorkflowGetTree(killedPipeline); err != nil {
return err
}
publishToTopic(killedPipeline, repo)

if err := publishToTopic(ctx, killedPipeline, repo); err != nil {
log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider")
}

return nil
}
Expand Down
4 changes: 3 additions & 1 deletion server/pipeline/decline.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, u

updatePipelineStatus(ctx, forge, pipeline, repo, user)

publishToTopic(pipeline, repo)
if err := publishToTopic(ctx, pipeline, repo); err != nil {
log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider")
}

return pipeline, nil
}
4 changes: 3 additions & 1 deletion server/pipeline/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func prepareStart(ctx context.Context, forge forge.Forge, store store.Store, act
}

func publishPipeline(ctx context.Context, forge forge.Forge, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User) {
publishToTopic(pipeline, repo)
if err := publishToTopic(ctx, pipeline, repo); err != nil {
log.Error().Err(err).Msg("could not push pipeline status change to pubsub provider")
}
updatePipelineStatus(ctx, forge, pipeline, repo, repoUser)
}
33 changes: 17 additions & 16 deletions server/pipeline/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,35 @@
package pipeline

import (
"context"
"encoding/json"
"strconv"
"fmt"

"github.com/rs/zerolog/log"
"github.com/oklog/ulid/v2"

"go.woodpecker-ci.org/woodpecker/v3/server"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
)

// publishToTopic publishes message to UI clients.
func publishToTopic(pipeline *model.Pipeline, repo *model.Repo) {
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsSCMPrivate),
},
}
pipelineCopy := *pipeline

var err error
func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Repo) (err error) {
message := pubsub.Message{ID: ulid.Make().String()}
message.Data, err = json.Marshal(model.Event{
Repo: *repo,
Pipeline: pipelineCopy,
Pipeline: *pipeline,
})
if err != nil {
log.Error().Err(err).Msg("can't marshal JSON")
return
return fmt.Errorf("can't marshal JSON: %w", err)
}
server.Config.Services.Pubsub.Publish(message)

subTopics := make(map[string]struct{})
// if repo is public, push to public topic
if !repo.IsSCMPrivate {
subTopics[pubsub.PublicTopic] = struct{}{}
}
// publish to repo specific topic
subTopics[pubsub.GetRepoTopic(repo)] = struct{}{}
Comment thread
6543 marked this conversation as resolved.

return server.Config.Services.Pubsub.Publish(c, subTopics, message)
}
29 changes: 0 additions & 29 deletions server/pubsub/LICENSE

This file was deleted.

89 changes: 89 additions & 0 deletions server/pubsub/memory/pub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"context"
"fmt"
"slices"
"sync"

"github.com/rs/zerolog/log"
Comment thread
6543 marked this conversation as resolved.

"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
)

type publisher struct {
sync.RWMutex

subs map[*pubsub.Receiver][]string
}

// New creates an in-memory publisher.
func New() pubsub.PubSub {
return &publisher{
subs: make(map[*pubsub.Receiver][]string),
}
}

func (p *publisher) Publish(_ context.Context, topics pubsub.Topics, message pubsub.Message) error {
if len(topics) == 0 {
return fmt.Errorf("%w: specify at least one", pubsub.ErrNoTopic)
}

p.RLock()
defer p.RUnlock()

for s, tl := range p.subs {
// callback is from outside so just make sure it still exists
if s == nil || *s == nil {
log.Error().Msg("found nil callback func in subscribers!")
continue
}

for t := range topics {
if slices.Contains(tl, t) {
go (*s)(message)
break
}
}
}

return nil
}

func (p *publisher) Subscribe(c context.Context, topics pubsub.Topics, receiver pubsub.Receiver) error {
if len(topics) == 0 {
return fmt.Errorf("%w: subscribe to at least one", pubsub.ErrNoTopic)
}

var tl []string
for k := range topics {
tl = append(tl, k)
}

defer func() {
p.Lock()
delete(p.subs, &receiver)
p.Unlock()
}()

p.Lock()
p.subs[&receiver] = tl
p.Unlock()

<-c.Done()
return nil
}
17 changes: 11 additions & 6 deletions server/pubsub/pub_test.go → server/pubsub/memory/pub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub
package memory

import (
"context"
Expand All @@ -21,34 +21,39 @@ import (
"time"

"github.com/stretchr/testify/assert"

"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
)

func TestPubsub(t *testing.T) {
var (
wg sync.WaitGroup

testMessage = Message{
testTopic = map[string]struct{}{"test": {}}

testMessage = pubsub.Message{
Data: []byte("test"),
}
)

ctx, cancel := context.WithCancelCause(
t.Context(),
)

broker := New()

assert.Error(t, broker.Subscribe(ctx, nil, func(pubsub.Message) {}))
go func() {
broker.Subscribe(ctx, func(message Message) { assert.Equal(t, testMessage, message); wg.Done() })
assert.NoError(t, broker.Subscribe(ctx, testTopic, func(message pubsub.Message) { assert.Equal(t, testMessage, message); wg.Done() }))
}()
go func() {
broker.Subscribe(ctx, func(_ Message) { wg.Done() })
assert.NoError(t, broker.Subscribe(ctx, testTopic, func(pubsub.Message) { wg.Done() }))
}()

<-time.After(500 * time.Millisecond)

wg.Add(2)
go func() {
broker.Publish(testMessage)
assert.NoError(t, broker.Publish(ctx, testTopic, testMessage))
}()

wg.Wait()
Expand Down
Loading