From caeb40a36b425e17e54c95128d42041d25d82a40 Mon Sep 17 00:00:00 2001 From: Harri Avellan Date: Thu, 16 Apr 2026 13:53:04 +0300 Subject: [PATCH 1/7] fix: fixes send on closed channel panic in SSE stream handlers Instead of explicitly closing channels which leads to concurrency issues with channels being used after close, allow channels to be GC'ed. Remove the recover block that was previously catching&masking the panics in the LogStreamSSE. EventStreamSSE was missing the same fixes: #6454 --- server/api/stream.go | 28 +++++---------- server/api/stream_test.go | 75 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 19 deletions(-) create mode 100644 server/api/stream_test.go diff --git a/server/api/stream.go b/server/api/stream.go index 3e740bb81ce..d09d03d66b2 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -90,7 +90,6 @@ func EventStreamSSE(c *gin.Context) { defer func() { cancel(nil) - close(eventChan) log.Debug().Msg("user feed: connection closed") }() @@ -99,9 +98,7 @@ func EventStreamSSE(c *gin.Context) { func(m pubsub.Message) { select { case <-ctx.Done(): - return - default: - eventChan <- m.Data + case eventChan <- m.Data: } }) cancel(err) @@ -207,7 +204,6 @@ func LogStreamSSE(c *gin.Context) { defer func() { cancel(nil) - close(logChan) log.Debug().Msg("log stream: connection closed") }() @@ -222,23 +218,16 @@ func LogStreamSSE(c *gin.Context) { batches := make(logging.LogChan, maxQueuedBatchesPerClient) go func() { - defer func() { - if r := recover(); r != nil { - log.Error().Msgf("error sending log message: %v", r) - } - }() - for entries := range batches { for _, entry := range entries { - select { - case <-ctx.Done(): - return - default: - if ee, err := json.Marshal(entry); err == nil { - logChan <- ee - } else { - log.Error().Err(err).Msg("unable to serialize log entry") + if ee, err := json.Marshal(entry); err == nil { + select { + case <-ctx.Done(): + return + case logChan <- ee: } + } else { + log.Error().Err(err).Msg("unable to serialize log entry") } } } @@ -249,6 +238,7 @@ func LogStreamSSE(c *gin.Context) { log.Error().Err(err).Msg("tail of logs failed") } + close(batches) cancel(err) }() diff --git a/server/api/stream_test.go b/server/api/stream_test.go new file mode 100644 index 00000000000..08bc558a9c4 --- /dev/null +++ b/server/api/stream_test.go @@ -0,0 +1,75 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/gin-gonic/gin" + + "go.woodpecker-ci.org/woodpecker/v3/server" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" +) + +func TestEventStreamSSE_ConcurrentDisconnect(t *testing.T) { + gin.SetMode(gin.TestMode) + + for range 50 { + broker := memory.New() + server.Config.Services.Scheduler = scheduler.NewScheduler(nil, broker) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + ctx, cancel := context.WithCancel(t.Context()) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "/stream/events", nil) + c.Request = req + + topic := map[string]struct{}{pubsub.PublicTopic: {}} + + done := make(chan struct{}) + go func() { + defer close(done) + EventStreamSSE(c) + }() + + // Let the event handler subscribe + time.Sleep(20 * time.Millisecond) + + // Fire concurrent publishes while cancelling the request. + var wg sync.WaitGroup + for range 20 { + wg.Add(1) + go func() { + defer wg.Done() + _ = broker.Publish(ctx, topic, pubsub.Message{ + Data: []byte(`{"pipeline":1}`), + }) + }() + } + + // Simulate client disconnect mid-publish. + cancel() + wg.Wait() + <-done + } +} \ No newline at end of file From d42658416ecb7a629c951a4a0677b8c9cf5be60a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 16 Apr 2026 11:08:41 +0000 Subject: [PATCH 2/7] [pre-commit.ci] auto fixes from pre-commit.com hooks [CI SKIP] for more information, see https://pre-commit.ci --- server/api/stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/api/stream_test.go b/server/api/stream_test.go index 08bc558a9c4..1d603196d35 100644 --- a/server/api/stream_test.go +++ b/server/api/stream_test.go @@ -72,4 +72,4 @@ func TestEventStreamSSE_ConcurrentDisconnect(t *testing.T) { wg.Wait() <-done } -} \ No newline at end of file +} From 690b99d71f60266365dd3216f279e97cff0661c7 Mon Sep 17 00:00:00 2001 From: Harri Avellan Date: Mon, 20 Apr 2026 11:52:58 +0300 Subject: [PATCH 3/7] fixes: log entries race condition in goroutine handling Co-Authored-By: utafrali --- server/api/stream.go | 5 ++ server/api/stream_test.go | 138 +++++++++++++++++++++++++++++-- server/pubsub/memory/pub_test.go | 37 +++++++++ 3 files changed, 175 insertions(+), 5 deletions(-) diff --git a/server/api/stream.go b/server/api/stream.go index d09d03d66b2..1493ffacffb 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -23,6 +23,7 @@ import ( "io" "net/http" "strconv" + "sync" "time" "github.com/gin-gonic/gin" @@ -217,7 +218,10 @@ func LogStreamSSE(c *gin.Context) { go func() { batches := make(logging.LogChan, maxQueuedBatchesPerClient) + var innerDone sync.WaitGroup + innerDone.Add(1) go func() { + defer innerDone.Done() for entries := range batches { for _, entry := range entries { if ee, err := json.Marshal(entry); err == nil { @@ -239,6 +243,7 @@ func LogStreamSSE(c *gin.Context) { } close(batches) + innerDone.Wait() cancel(err) }() diff --git a/server/api/stream_test.go b/server/api/stream_test.go index 08bc558a9c4..10cb2c2c188 100644 --- a/server/api/stream_test.go +++ b/server/api/stream_test.go @@ -16,21 +16,27 @@ package api import ( "context" + "fmt" "net/http" "net/http/httptest" + "strings" "sync" "testing" "time" "github.com/gin-gonic/gin" + "github.com/stretchr/testify/mock" "go.woodpecker-ci.org/woodpecker/v3/server" + "go.woodpecker-ci.org/woodpecker/v3/server/logging" + "go.woodpecker-ci.org/woodpecker/v3/server/model" "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" + store_mocks "go.woodpecker-ci.org/woodpecker/v3/server/store/mocks" ) -func TestEventStreamSSE_ConcurrentDisconnect(t *testing.T) { +func TestEventStreamSSEConcurrentDisconnect(t *testing.T) { gin.SetMode(gin.TestMode) for range 50 { @@ -40,7 +46,7 @@ func TestEventStreamSSE_ConcurrentDisconnect(t *testing.T) { w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) - ctx, cancel := context.WithCancel(t.Context()) + ctx, cancel := context.WithCancelCause(t.Context()) req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "/stream/events", nil) c.Request = req @@ -55,7 +61,7 @@ func TestEventStreamSSE_ConcurrentDisconnect(t *testing.T) { // Let the event handler subscribe time.Sleep(20 * time.Millisecond) - // Fire concurrent publishes while cancelling the request. + // Fire concurrent publishes while canceling the request. var wg sync.WaitGroup for range 20 { wg.Add(1) @@ -68,8 +74,130 @@ func TestEventStreamSSE_ConcurrentDisconnect(t *testing.T) { } // Simulate client disconnect mid-publish. - cancel() + cancel(nil) wg.Wait() <-done } -} \ No newline at end of file +} + +func setupLogStreamContext(t *testing.T, logService logging.Log) (*httptest.ResponseRecorder, *gin.Context, context.CancelCauseFunc) { + t.Helper() + + const stepID int64 = 42 + const pipelineID int64 = 10 + + mockStore := store_mocks.NewMockStore(t) + mockStore.On("GetPipelineNumber", mock.Anything, mock.Anything). + Return(&model.Pipeline{ID: pipelineID}, nil) + mockStore.On("StepLoad", mock.Anything). + Return(&model.Step{ + ID: stepID, + PipelineID: pipelineID, + State: model.StatusRunning, + }, nil) + + server.Config.Services.Logs = logService + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + ctx, cancel := context.WithCancelCause(t.Context()) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "/stream/logs/1/1/42", nil) + c.Request = req + c.Params = gin.Params{ + {Key: "repo_id", Value: "1"}, + {Key: "pipeline", Value: "1"}, + {Key: "step_id", Value: "42"}, + } + c.Set("repo", &model.Repo{ID: 1, FullName: "owner/repo"}) + c.Set("store", mockStore) + + return w, c, cancel +} + +func TestLogStreamSSEConcurrentDisconnect(t *testing.T) { + gin.SetMode(gin.TestMode) + + const stepID int64 = 42 + + for range 50 { + logService := logging.New() + _, c, cancel := setupLogStreamContext(t, logService) + + done := make(chan struct{}) + go func() { + defer close(done) + LogStreamSSE(c) + }() + + // Let LogStreamSSE open the stream and start tailing. + time.Sleep(20 * time.Millisecond) + + // Fire concurrent log writes while canceling the request. + var wg sync.WaitGroup + for i := range 20 { + wg.Add(1) + go func() { + defer wg.Done() + _ = logService.Write(t.Context(), stepID, []*model.LogEntry{ + {Line: i, Data: []byte("log line")}, + }) + }() + } + + // Simulate client disconnect mid-write. + cancel(nil) + wg.Wait() + <-done + } +} + +func TestLogStreamSSEDrainBeforeEOF(t *testing.T) { + gin.SetMode(gin.TestMode) + + const stepID int64 = 42 + + logService := logging.New() + w, c, cancel := setupLogStreamContext(t, logService) + defer cancel(nil) + + // Buffer log entries before starting LogStreamSSE. Write auto-opens the + // stream, and Tail replays s.list on subscribe, so no timing dependency. + for i := range 5 { + _ = logService.Write(t.Context(), stepID, []*model.LogEntry{ + {Line: i, Data: []byte("line")}, + }) + } + + done := make(chan struct{}) + go func() { + defer close(done) + LogStreamSSE(c) + }() + + // Let LogStreamSSE open the stream and start tailing (replays buffered entries). + time.Sleep(20 * time.Millisecond) + + // Close the stream to signal pipeline finished. + _ = logService.Close(t.Context(), stepID) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("LogStreamSSE did not exit after stream close") + } + + body := w.Body.String() + + // All 5 log entries should appear before the eof event. + for i := 1; i <= 5; i++ { + idLine := fmt.Sprintf("id: %d", i) + if !strings.Contains(body, idLine) { + t.Errorf("missing log entry with id %d in SSE output", i) + } + } + + if !strings.Contains(body, "event: eof") { + t.Error("missing eof event in SSE output") + } +} diff --git a/server/pubsub/memory/pub_test.go b/server/pubsub/memory/pub_test.go index 8ab1993c35f..53d403c7fc0 100644 --- a/server/pubsub/memory/pub_test.go +++ b/server/pubsub/memory/pub_test.go @@ -59,3 +59,40 @@ func TestPubsub(t *testing.T) { wg.Wait() cancel(nil) } + +func TestPubsubConcurrentCancel(t *testing.T) { + testTopic := map[string]struct{}{"test": {}} + broker := New() + + for range 100 { + ctx, cancel := context.WithCancelCause(t.Context()) + ch := make(chan []byte) // Unbuffered to force blocking sends + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = broker.Subscribe(ctx, testTopic, func(m pubsub.Message) { + select { + case <-ctx.Done(): + case ch <- m.Data: + } + }) + }() + + // Start publishing many messages to increase chance of blocking send + var pubWg sync.WaitGroup + for range 100 { + pubWg.Add(1) + go func() { + defer pubWg.Done() + _ = broker.Publish(ctx, testTopic, pubsub.Message{Data: []byte("x")}) + }() + } + + // Cancel while publishes are in flight to race with pending sends + cancel(nil) + pubWg.Wait() + wg.Wait() + } +} From 37d45d2a58e347a303662eadd0c501aa1e1c6860 Mon Sep 17 00:00:00 2001 From: Harri Avellan Date: Mon, 20 Apr 2026 22:46:44 +0300 Subject: [PATCH 4/7] fix: lint --- server/api/stream_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/api/stream_test.go b/server/api/stream_test.go index ebe66664cfe..ad01cf5740f 100644 --- a/server/api/stream_test.go +++ b/server/api/stream_test.go @@ -16,10 +16,8 @@ package api import ( "context" - "fmt" "net/http" "net/http/httptest" - "strings" "sync" "testing" "time" From daff33ee37b9e5f0e7b24657e3646c1875ca384d Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 23:46:34 +0200 Subject: [PATCH 5/7] more concurent and speedup test --- server/api/stream_test.go | 127 ++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 59 deletions(-) diff --git a/server/api/stream_test.go b/server/api/stream_test.go index ad01cf5740f..8b4ee8933a0 100644 --- a/server/api/stream_test.go +++ b/server/api/stream_test.go @@ -16,6 +16,7 @@ package api import ( "context" + "fmt" "net/http" "net/http/httptest" "sync" @@ -36,49 +37,52 @@ import ( func TestEventStreamSSEConcurrentDisconnect(t *testing.T) { gin.SetMode(gin.TestMode) + broker := memory.New() + server.Config.Services.Scheduler = scheduler.NewScheduler(nil, broker) + t.Cleanup(func() { server.Config.Services.Scheduler = nil }) - for range 50 { - broker := memory.New() - server.Config.Services.Scheduler = scheduler.NewScheduler(nil, broker) + for i := range 50 { + t.Run(fmt.Sprint(i), func(t *testing.T) { + t.Parallel() + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) + ctx, cancel := context.WithCancelCause(t.Context()) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "/stream/events", nil) + c.Request = req - ctx, cancel := context.WithCancelCause(t.Context()) - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "/stream/events", nil) - c.Request = req + topic := map[string]struct{}{pubsub.PublicTopic: {}} - topic := map[string]struct{}{pubsub.PublicTopic: {}} - - done := make(chan struct{}) - go func() { - defer close(done) - EventStreamSSE(c) - }() - - // Let the event handler subscribe - time.Sleep(20 * time.Millisecond) - - // Fire concurrent publishes while canceling the request. - var wg sync.WaitGroup - for range 20 { - wg.Add(1) + done := make(chan struct{}) go func() { - defer wg.Done() - _ = broker.Publish(ctx, topic, pubsub.Message{ - Data: []byte(`{"pipeline":1}`), - }) + defer close(done) + EventStreamSSE(c) }() - } - // Simulate client disconnect mid-publish. - cancel(nil) - wg.Wait() - <-done + // Let the event handler subscribe + time.Sleep(20 * time.Millisecond) + + // Fire concurrent publishes while canceling the request. + var wg sync.WaitGroup + for range 20 { + wg.Add(1) + go func() { + defer wg.Done() + _ = broker.Publish(ctx, topic, pubsub.Message{ + Data: []byte(`{"pipeline":1}`), + }) + }() + } + + // Simulate client disconnect mid-publish. + cancel(nil) + wg.Wait() + <-done + }) } } -func setupLogStreamContext(t *testing.T, logService logging.Log) (*httptest.ResponseRecorder, *gin.Context, context.CancelCauseFunc) { +func setupLogStreamContext(t *testing.T) (*httptest.ResponseRecorder, *gin.Context, context.CancelCauseFunc) { t.Helper() const stepID int64 = 42 @@ -94,8 +98,6 @@ func setupLogStreamContext(t *testing.T, logService logging.Log) (*httptest.Resp State: model.StatusRunning, }, nil) - server.Config.Services.Logs = logService - w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -116,36 +118,43 @@ func setupLogStreamContext(t *testing.T, logService logging.Log) (*httptest.Resp func TestLogStreamSSEConcurrentDisconnect(t *testing.T) { gin.SetMode(gin.TestMode) - const stepID int64 = 42 + logService := logging.New() + server.Config.Services.Logs = logService + t.Cleanup(func() { server.Config.Services.Logs = nil }) - for range 50 { - logService := logging.New() - _, c, cancel := setupLogStreamContext(t, logService) + const stepID int64 = 42 - done := make(chan struct{}) - go func() { - defer close(done) - LogStreamSSE(c) - }() + for i := range 50 { + t.Run(fmt.Sprint(i), func(t *testing.T) { + t.Parallel() + done := make(chan struct{}) - // Let LogStreamSSE open the stream and start tailing. - time.Sleep(20 * time.Millisecond) + _, c, cancel := setupLogStreamContext(t) - // Fire concurrent log writes while canceling the request. - var wg sync.WaitGroup - for i := range 20 { - wg.Add(1) go func() { - defer wg.Done() - _ = logService.Write(t.Context(), stepID, []*model.LogEntry{ - {Line: i, Data: []byte("log line")}, - }) + defer close(done) + LogStreamSSE(c) }() - } - // Simulate client disconnect mid-write. - cancel(nil) - wg.Wait() - <-done + // Let LogStreamSSE open the stream and start tailing. + time.Sleep(20 * time.Millisecond) + + // Fire concurrent log writes while canceling the request. + var wg sync.WaitGroup + for i := range 20 { + wg.Add(1) + go func() { + defer wg.Done() + _ = logService.Write(t.Context(), stepID, []*model.LogEntry{ + {Line: i, Data: []byte("log line")}, + }) + }() + } + + // Simulate client disconnect mid-write. + cancel(nil) + wg.Wait() + <-done + }) } } From ad992a6977b4dc1d534b6a15ec862de1a39a77c2 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 21 Apr 2026 00:13:27 +0200 Subject: [PATCH 6/7] refactor nit --- server/api/stream.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/api/stream.go b/server/api/stream.go index 1493ffacffb..e870bf2e615 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -41,6 +41,9 @@ const ( // How many batches of logs to keep for each client before starting to // drop them if the client is not consuming them faster than they arrive. maxQueuedBatchesPerClient int = 30 + + // idlePingTime is the time till we send a ping to keep the connection alive. + idlePingTime = time.Second * 30 ) // EventStreamSSE @@ -111,7 +114,7 @@ func EventStreamSSE(c *gin.Context) { return case <-ctx.Done(): return - case <-time.After(time.Second * 30): + case <-time.After(idlePingTime): logWriteStringErr(io.WriteString(rw, ": ping\n\n")) flusher.Flush() case buf, ok := <-eventChan: @@ -257,11 +260,6 @@ func LogStreamSSE(c *gin.Context) { for { select { - // after 1 hour of idle (no response) end the stream. - // this is more of a safety mechanism than anything, - // and can be removed once the code is more mature. - case <-time.After(time.Hour): - return case <-ctx.Done(): // Monitor if the "tail" context is canceled. if err := context.Cause(ctx); errors.Is(err, context.Canceled) { log.Debug().Msg("log stream: eof") @@ -272,7 +270,7 @@ func LogStreamSSE(c *gin.Context) { case <-requestCtx.Done(): // Monitor the request context for cancellation when the client has gone away. log.Debug().Msg("log stream: closed, client has gone away") return - case <-time.After(time.Second * 30): + case <-time.After(idlePingTime): logWriteStringErr(io.WriteString(rw, ": ping\n\n")) flusher.Flush() case buf, ok := <-logChan: From 0cd7b84d3dfecc01d0c41d97d2628b9b9ce1d978 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 21 Apr 2026 00:23:28 +0200 Subject: [PATCH 7/7] aww lint :/ --- server/api/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/api/stream.go b/server/api/stream.go index e870bf2e615..f61b35783d6 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -42,7 +42,7 @@ const ( // drop them if the client is not consuming them faster than they arrive. maxQueuedBatchesPerClient int = 30 - // idlePingTime is the time till we send a ping to keep the connection alive. + // Is the time till we send a ping to keep the connection alive. idlePingTime = time.Second * 30 )