diff --git a/server/api/stream.go b/server/api/stream.go index 3e740bb81ce..f61b35783d6 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -23,6 +23,7 @@ import ( "io" "net/http" "strconv" + "sync" "time" "github.com/gin-gonic/gin" @@ -40,6 +41,9 @@ const ( // How many batches of logs to keep for each client before starting to // drop them if the client is not consuming them faster than they arrive. maxQueuedBatchesPerClient int = 30 + + // Is the time till we send a ping to keep the connection alive. + idlePingTime = time.Second * 30 ) // EventStreamSSE @@ -90,7 +94,6 @@ func EventStreamSSE(c *gin.Context) { defer func() { cancel(nil) - close(eventChan) log.Debug().Msg("user feed: connection closed") }() @@ -99,9 +102,7 @@ func EventStreamSSE(c *gin.Context) { func(m pubsub.Message) { select { case <-ctx.Done(): - return - default: - eventChan <- m.Data + case eventChan <- m.Data: } }) cancel(err) @@ -113,7 +114,7 @@ func EventStreamSSE(c *gin.Context) { return case <-ctx.Done(): return - case <-time.After(time.Second * 30): + case <-time.After(idlePingTime): logWriteStringErr(io.WriteString(rw, ": ping\n\n")) flusher.Flush() case buf, ok := <-eventChan: @@ -207,7 +208,6 @@ func LogStreamSSE(c *gin.Context) { defer func() { cancel(nil) - close(logChan) log.Debug().Msg("log stream: connection closed") }() @@ -221,24 +221,20 @@ func LogStreamSSE(c *gin.Context) { go func() { batches := make(logging.LogChan, maxQueuedBatchesPerClient) + var innerDone sync.WaitGroup + innerDone.Add(1) go func() { - defer func() { - if r := recover(); r != nil { - log.Error().Msgf("error sending log message: %v", r) - } - }() - + defer innerDone.Done() for entries := range batches { for _, entry := range entries { - select { - case <-ctx.Done(): - return - default: - if ee, err := json.Marshal(entry); err == nil { - logChan <- ee - } else { - log.Error().Err(err).Msg("unable to serialize log entry") + if ee, err := json.Marshal(entry); err == nil { + select { + case <-ctx.Done(): + return + case logChan <- ee: } + } else { + log.Error().Err(err).Msg("unable to serialize log entry") } } } @@ -249,6 +245,8 @@ func LogStreamSSE(c *gin.Context) { log.Error().Err(err).Msg("tail of logs failed") } + close(batches) + innerDone.Wait() cancel(err) }() @@ -262,11 +260,6 @@ func LogStreamSSE(c *gin.Context) { for { select { - // after 1 hour of idle (no response) end the stream. - // this is more of a safety mechanism than anything, - // and can be removed once the code is more mature. - case <-time.After(time.Hour): - return case <-ctx.Done(): // Monitor if the "tail" context is canceled. if err := context.Cause(ctx); errors.Is(err, context.Canceled) { log.Debug().Msg("log stream: eof") @@ -277,7 +270,7 @@ func LogStreamSSE(c *gin.Context) { case <-requestCtx.Done(): // Monitor the request context for cancellation when the client has gone away. log.Debug().Msg("log stream: closed, client has gone away") return - case <-time.After(time.Second * 30): + case <-time.After(idlePingTime): logWriteStringErr(io.WriteString(rw, ": ping\n\n")) flusher.Flush() case buf, ok := <-logChan: diff --git a/server/api/stream_test.go b/server/api/stream_test.go new file mode 100644 index 00000000000..8b4ee8933a0 --- /dev/null +++ b/server/api/stream_test.go @@ -0,0 +1,160 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/mock" + + "go.woodpecker-ci.org/woodpecker/v3/server" + "go.woodpecker-ci.org/woodpecker/v3/server/logging" + "go.woodpecker-ci.org/woodpecker/v3/server/model" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub" + "go.woodpecker-ci.org/woodpecker/v3/server/pubsub/memory" + "go.woodpecker-ci.org/woodpecker/v3/server/scheduler" + store_mocks "go.woodpecker-ci.org/woodpecker/v3/server/store/mocks" +) + +func TestEventStreamSSEConcurrentDisconnect(t *testing.T) { + gin.SetMode(gin.TestMode) + broker := memory.New() + server.Config.Services.Scheduler = scheduler.NewScheduler(nil, broker) + t.Cleanup(func() { server.Config.Services.Scheduler = nil }) + + for i := range 50 { + t.Run(fmt.Sprint(i), func(t *testing.T) { + t.Parallel() + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + ctx, cancel := context.WithCancelCause(t.Context()) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "/stream/events", nil) + c.Request = req + + topic := map[string]struct{}{pubsub.PublicTopic: {}} + + done := make(chan struct{}) + go func() { + defer close(done) + EventStreamSSE(c) + }() + + // Let the event handler subscribe + time.Sleep(20 * time.Millisecond) + + // Fire concurrent publishes while canceling the request. + var wg sync.WaitGroup + for range 20 { + wg.Add(1) + go func() { + defer wg.Done() + _ = broker.Publish(ctx, topic, pubsub.Message{ + Data: []byte(`{"pipeline":1}`), + }) + }() + } + + // Simulate client disconnect mid-publish. + cancel(nil) + wg.Wait() + <-done + }) + } +} + +func setupLogStreamContext(t *testing.T) (*httptest.ResponseRecorder, *gin.Context, context.CancelCauseFunc) { + t.Helper() + + const stepID int64 = 42 + const pipelineID int64 = 10 + + mockStore := store_mocks.NewMockStore(t) + mockStore.On("GetPipelineNumber", mock.Anything, mock.Anything). + Return(&model.Pipeline{ID: pipelineID}, nil) + mockStore.On("StepLoad", mock.Anything). + Return(&model.Step{ + ID: stepID, + PipelineID: pipelineID, + State: model.StatusRunning, + }, nil) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + ctx, cancel := context.WithCancelCause(t.Context()) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "/stream/logs/1/1/42", nil) + c.Request = req + c.Params = gin.Params{ + {Key: "repo_id", Value: "1"}, + {Key: "pipeline", Value: "1"}, + {Key: "step_id", Value: "42"}, + } + c.Set("repo", &model.Repo{ID: 1, FullName: "owner/repo"}) + c.Set("store", mockStore) + + return w, c, cancel +} + +func TestLogStreamSSEConcurrentDisconnect(t *testing.T) { + gin.SetMode(gin.TestMode) + + logService := logging.New() + server.Config.Services.Logs = logService + t.Cleanup(func() { server.Config.Services.Logs = nil }) + + const stepID int64 = 42 + + for i := range 50 { + t.Run(fmt.Sprint(i), func(t *testing.T) { + t.Parallel() + done := make(chan struct{}) + + _, c, cancel := setupLogStreamContext(t) + + go func() { + defer close(done) + LogStreamSSE(c) + }() + + // Let LogStreamSSE open the stream and start tailing. + time.Sleep(20 * time.Millisecond) + + // Fire concurrent log writes while canceling the request. + var wg sync.WaitGroup + for i := range 20 { + wg.Add(1) + go func() { + defer wg.Done() + _ = logService.Write(t.Context(), stepID, []*model.LogEntry{ + {Line: i, Data: []byte("log line")}, + }) + }() + } + + // Simulate client disconnect mid-write. + cancel(nil) + wg.Wait() + <-done + }) + } +} diff --git a/server/pubsub/memory/pub_test.go b/server/pubsub/memory/pub_test.go index 8ab1993c35f..53d403c7fc0 100644 --- a/server/pubsub/memory/pub_test.go +++ b/server/pubsub/memory/pub_test.go @@ -59,3 +59,40 @@ func TestPubsub(t *testing.T) { wg.Wait() cancel(nil) } + +func TestPubsubConcurrentCancel(t *testing.T) { + testTopic := map[string]struct{}{"test": {}} + broker := New() + + for range 100 { + ctx, cancel := context.WithCancelCause(t.Context()) + ch := make(chan []byte) // Unbuffered to force blocking sends + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = broker.Subscribe(ctx, testTopic, func(m pubsub.Message) { + select { + case <-ctx.Done(): + case ch <- m.Data: + } + }) + }() + + // Start publishing many messages to increase chance of blocking send + var pubWg sync.WaitGroup + for range 100 { + pubWg.Add(1) + go func() { + defer pubWg.Done() + _ = broker.Publish(ctx, testTopic, pubsub.Message{Data: []byte("x")}) + }() + } + + // Cancel while publishes are in flight to race with pending sends + cancel(nil) + pubWg.Wait() + wg.Wait() + } +}