diff --git a/assets/go-licenses.json b/assets/go-licenses.json index 61bf76702c3a6..af10f0d8e0470 100644 --- a/assets/go-licenses.json +++ b/assets/go-licenses.json @@ -389,6 +389,11 @@ "path": "github.com/cloudflare/circl/LICENSE", "licenseText": "Copyright (c) 2019 Cloudflare. All rights reserved.\n\nRedistribution and use in source and binary forms, with or without\nmodification, are permitted provided that the following conditions are\nmet:\n\n * Redistributions of source code must retain the above copyright\nnotice, this list of conditions and the following disclaimer.\n * Redistributions in binary form must reproduce the above\ncopyright notice, this list of conditions and the following disclaimer\nin the documentation and/or other materials provided with the\ndistribution.\n * Neither the name of Cloudflare nor the names of its\ncontributors may be used to endorse or promote products derived from\nthis software without specific prior written permission.\n\nTHIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\nLIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\nA PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\nOWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\nSPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\nLIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\nDATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\nTHEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\nOF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n\n========================================================================\n\nCopyright (c) 2009 The Go Authors. All rights reserved.\n\nRedistribution and use in source and binary forms, with or without\nmodification, are permitted provided that the following conditions are\nmet:\n\n * Redistributions of source code must retain the above copyright\nnotice, this list of conditions and the following disclaimer.\n * Redistributions in binary form must reproduce the above\ncopyright notice, this list of conditions and the following disclaimer\nin the documentation and/or other materials provided with the\ndistribution.\n * Neither the name of Google Inc. nor the names of its\ncontributors may be used to endorse or promote products derived from\nthis software without specific prior written permission.\n\nTHIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\nLIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\nA PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\nOWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\nSPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\nLIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\nDATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\nTHEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\nOF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n" }, + { + "name": "github.com/coder/websocket", + "path": "github.com/coder/websocket/LICENSE.txt", + "licenseText": "Copyright (c) 2025 Coder\n\nPermission to use, copy, modify, and distribute this software for any\npurpose with or without fee is hereby granted, provided that the above\ncopyright notice and this permission notice appear in all copies.\n\nTHE SOFTWARE IS PROVIDED \"AS IS\" AND THE AUTHOR DISCLAIMS ALL WARRANTIES\nWITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF\nMERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR\nANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES\nWHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN\nACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF\nOR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.\n" + }, { "name": "github.com/couchbase/go-couchbase", "path": "github.com/couchbase/go-couchbase/LICENSE", diff --git a/go.mod b/go.mod index 22de5ba1ba9d4..81979e95e56bb 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/caddyserver/certmagic v0.25.2 github.com/charmbracelet/git-lfs-transfer v0.1.1-0.20251013092601-6327009efd21 github.com/chi-middleware/proxy v1.1.1 + github.com/coder/websocket v1.8.14 github.com/dimiro1/reply v0.0.0-20200315094148-d0136a4c9e21 github.com/dlclark/regexp2 v1.11.5 github.com/dsnet/compress v0.0.2-0.20230904184137-39efe44ab707 diff --git a/go.sum b/go.sum index eb1c07d179465..139c1ad11c78d 100644 --- a/go.sum +++ b/go.sum @@ -211,6 +211,8 @@ github.com/clipperhouse/uax29/v2 v2.7.0 h1:+gs4oBZ2gPfVrKPthwbMzWZDaAFPGYK72F0NJ github.com/clipperhouse/uax29/v2 v2.7.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM= github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8= github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4= +github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= +github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/modules/eventsource/event.go b/modules/eventsource/event.go deleted file mode 100644 index ebcca5090344c..0000000000000 --- a/modules/eventsource/event.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import ( - "bytes" - "fmt" - "io" - "strings" - "time" - - "code.gitea.io/gitea/modules/json" -) - -func wrapNewlines(w io.Writer, prefix, value []byte) (sum int64, err error) { - if len(value) == 0 { - return 0, nil - } - var n int - last := 0 - for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') { - n, err = w.Write(prefix) - sum += int64(n) - if err != nil { - return sum, err - } - n, err = w.Write(value[last : last+j+1]) - sum += int64(n) - if err != nil { - return sum, err - } - last += j + 1 - } - n, err = w.Write(prefix) - sum += int64(n) - if err != nil { - return sum, err - } - n, err = w.Write(value[last:]) - sum += int64(n) - if err != nil { - return sum, err - } - n, err = w.Write([]byte("\n")) - sum += int64(n) - return sum, err -} - -// Event is an eventsource event, not all fields need to be set -type Event struct { - // Name represents the value of the event: tag in the stream - Name string - // Data is either JSONified []byte or any that can be JSONd - Data any - // ID represents the ID of an event - ID string - // Retry tells the receiver only to attempt to reconnect to the source after this time - Retry time.Duration -} - -// WriteTo writes data to w until there's no more data to write or when an error occurs. -// The return value n is the number of bytes written. Any error encountered during the write is also returned. -func (e *Event) WriteTo(w io.Writer) (int64, error) { - sum := int64(0) - var nint int - n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name)) - sum += n - if err != nil { - return sum, err - } - - if e.Data != nil { - var data []byte - switch v := e.Data.(type) { - case []byte: - data = v - case string: - data = []byte(v) - default: - var err error - data, err = json.Marshal(e.Data) - if err != nil { - return sum, err - } - } - n, err := wrapNewlines(w, []byte("data: "), data) - sum += n - if err != nil { - return sum, err - } - } - - n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID)) - sum += n - if err != nil { - return sum, err - } - - if e.Retry != 0 { - nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond)) - sum += int64(nint) - if err != nil { - return sum, err - } - } - - nint, err = w.Write([]byte("\n")) - sum += int64(nint) - - return sum, err -} - -func (e *Event) String() string { - buf := new(strings.Builder) - _, _ = e.WriteTo(buf) - return buf.String() -} diff --git a/modules/eventsource/event_test.go b/modules/eventsource/event_test.go deleted file mode 100644 index a1c3e5c7a8ae1..0000000000000 --- a/modules/eventsource/event_test.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import ( - "bytes" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_wrapNewlines(t *testing.T) { - tests := []struct { - name string - prefix string - value string - output string - }{ - { - "check no new lines", - "prefix: ", - "value", - "prefix: value\n", - }, - { - "check simple newline", - "prefix: ", - "value1\nvalue2", - "prefix: value1\nprefix: value2\n", - }, - { - "check pathological newlines", - "p: ", - "\n1\n\n2\n3\n", - "p: \np: 1\np: \np: 2\np: 3\np: \n", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - w := &bytes.Buffer{} - gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value)) - require.NoError(t, err) - - assert.EqualValues(t, len(tt.output), gotSum) - assert.Equal(t, tt.output, w.String()) - }) - } -} diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go deleted file mode 100644 index 7ed2a829038f3..0000000000000 --- a/modules/eventsource/manager.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import ( - "sync" -) - -// Manager manages the eventsource Messengers -type Manager struct { - mutex sync.Mutex - - messengers map[int64]*Messenger - connection chan struct{} -} - -var manager *Manager - -func init() { - manager = &Manager{ - messengers: make(map[int64]*Messenger), - connection: make(chan struct{}, 1), - } -} - -// GetManager returns a Manager and initializes one as singleton if there's none yet -func GetManager() *Manager { - return manager -} - -// Register message channel -func (m *Manager) Register(uid int64) <-chan *Event { - m.mutex.Lock() - messenger, ok := m.messengers[uid] - if !ok { - messenger = NewMessenger(uid) - m.messengers[uid] = messenger - } - select { - case m.connection <- struct{}{}: - default: - } - m.mutex.Unlock() - return messenger.Register() -} - -// Unregister message channel -func (m *Manager) Unregister(uid int64, channel <-chan *Event) { - m.mutex.Lock() - defer m.mutex.Unlock() - messenger, ok := m.messengers[uid] - if !ok { - return - } - if messenger.Unregister(channel) { - delete(m.messengers, uid) - } -} - -// UnregisterAll message channels -func (m *Manager) UnregisterAll() { - m.mutex.Lock() - defer m.mutex.Unlock() - for _, messenger := range m.messengers { - messenger.UnregisterAll() - } - m.messengers = map[int64]*Messenger{} -} - -// SendMessage sends a message to a particular user -func (m *Manager) SendMessage(uid int64, message *Event) { - m.mutex.Lock() - messenger, ok := m.messengers[uid] - m.mutex.Unlock() - if ok { - messenger.SendMessage(message) - } -} - -// SendMessageBlocking sends a message to a particular user -func (m *Manager) SendMessageBlocking(uid int64, message *Event) { - m.mutex.Lock() - messenger, ok := m.messengers[uid] - m.mutex.Unlock() - if ok { - messenger.SendMessageBlocking(message) - } -} diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go deleted file mode 100644 index 4a42224ddac8a..0000000000000 --- a/modules/eventsource/manager_run.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import ( - "context" - "time" - - activities_model "code.gitea.io/gitea/models/activities" - issues_model "code.gitea.io/gitea/models/issues" - user_model "code.gitea.io/gitea/models/user" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/json" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/process" - "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/timeutil" - "code.gitea.io/gitea/services/convert" -) - -// Init starts this eventsource -func (m *Manager) Init() { - if setting.UI.Notification.EventSourceUpdateTime <= 0 { - return - } - go graceful.GetManager().RunWithShutdownContext(m.Run) -} - -// Run runs the manager within a provided context -func (m *Manager) Run(ctx context.Context) { - ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true) - defer finished() - - then := timeutil.TimeStampNow().Add(-2) - timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime) -loop: - for { - select { - case <-ctx.Done(): - timer.Stop() - break loop - case <-timer.C: - m.mutex.Lock() - connectionCount := len(m.messengers) - if connectionCount == 0 { - log.Trace("Event source has no listeners") - // empty the connection channel - select { - case <-m.connection: - default: - } - } - m.mutex.Unlock() - if connectionCount == 0 { - // No listeners so the source can be paused - log.Trace("Pausing the eventsource") - select { - case <-ctx.Done(): - break loop - case <-m.connection: - log.Trace("Connection detected - restarting the eventsource") - // OK we're back so lets reset the timer and start again - // We won't change the "then" time because there could be concurrency issues - select { - case <-timer.C: - default: - } - continue - } - } - - now := timeutil.TimeStampNow().Add(-2) - - uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now) - if err != nil { - log.Error("Unable to get UIDcounts: %v", err) - } - for _, uidCount := range uidCounts { - m.SendMessage(uidCount.UserID, &Event{ - Name: "notification-count", - Data: uidCount, - }) - } - then = now - - if setting.Service.EnableTimetracking { - usersStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx) - if err != nil { - log.Error("Unable to get GetUIDsAndStopwatch: %v", err) - return - } - - for _, userStopwatches := range usersStopwatches { - u, err := user_model.GetUserByID(ctx, userStopwatches.UserID) - if err != nil { - log.Error("Unable to get user %d: %v", userStopwatches.UserID, err) - continue - } - - apiSWs, err := convert.ToStopWatches(ctx, u, userStopwatches.StopWatches) - if err != nil { - if !issues_model.IsErrIssueNotExist(err) { - log.Error("Unable to APIFormat stopwatches: %v", err) - } - continue - } - dataBs, err := json.Marshal(apiSWs) - if err != nil { - log.Error("Unable to marshal stopwatches: %v", err) - continue - } - m.SendMessage(userStopwatches.UserID, &Event{ - Name: "stopwatches", - Data: string(dataBs), - }) - } - } - } - } - m.UnregisterAll() -} diff --git a/modules/eventsource/messenger.go b/modules/eventsource/messenger.go deleted file mode 100644 index 6df26716be661..0000000000000 --- a/modules/eventsource/messenger.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import "sync" - -// Messenger is a per uid message store -type Messenger struct { - mutex sync.Mutex - uid int64 - channels []chan *Event -} - -// NewMessenger creates a messenger for a particular uid -func NewMessenger(uid int64) *Messenger { - return &Messenger{ - uid: uid, - channels: [](chan *Event){}, - } -} - -// Register returns a new chan []byte -func (m *Messenger) Register() <-chan *Event { - m.mutex.Lock() - // TODO: Limit the number of messengers per uid - channel := make(chan *Event, 1) - m.channels = append(m.channels, channel) - m.mutex.Unlock() - return channel -} - -// Unregister removes the provider chan []byte -func (m *Messenger) Unregister(channel <-chan *Event) bool { - m.mutex.Lock() - defer m.mutex.Unlock() - for i, toRemove := range m.channels { - if channel == toRemove { - m.channels = append(m.channels[:i], m.channels[i+1:]...) - close(toRemove) - break - } - } - return len(m.channels) == 0 -} - -// UnregisterAll removes all chan []byte -func (m *Messenger) UnregisterAll() { - m.mutex.Lock() - defer m.mutex.Unlock() - for _, channel := range m.channels { - close(channel) - } - m.channels = nil -} - -// SendMessage sends the message to all registered channels -func (m *Messenger) SendMessage(message *Event) { - m.mutex.Lock() - defer m.mutex.Unlock() - for i := range m.channels { - channel := m.channels[i] - select { - case channel <- message: - default: - } - } -} - -// SendMessageBlocking sends the message to all registered channels and ensures it gets sent -func (m *Messenger) SendMessageBlocking(message *Event) { - m.mutex.Lock() - defer m.mutex.Unlock() - for i := range m.channels { - m.channels[i] <- message - } -} diff --git a/routers/common/blockexpensive.go b/routers/common/blockexpensive.go index fec364351ca5e..18a56de72d2a8 100644 --- a/routers/common/blockexpensive.go +++ b/routers/common/blockexpensive.go @@ -72,7 +72,7 @@ func isRoutePathExpensive(routePattern string) bool { } func isRoutePathForLongPolling(routePattern string) bool { - return routePattern == "/user/events" + return routePattern == "/-/ws" } func determineRequestPriority(reqCtx reqctx.RequestContext) (ret struct { diff --git a/routers/common/blockexpensive_test.go b/routers/common/blockexpensive_test.go index db5c0db7ddaa6..eca7d881184b7 100644 --- a/routers/common/blockexpensive_test.go +++ b/routers/common/blockexpensive_test.go @@ -26,5 +26,5 @@ func TestBlockExpensive(t *testing.T) { assert.Equal(t, c.expensive, isRoutePathExpensive(c.routePath), "routePath: %s", c.routePath) } - assert.True(t, isRoutePathForLongPolling("/user/events")) + assert.True(t, isRoutePathForLongPolling("/-/ws")) } diff --git a/routers/common/qos.go b/routers/common/qos.go index 96f23b64fe691..8124999a16de7 100644 --- a/routers/common/qos.go +++ b/routers/common/qos.go @@ -79,9 +79,9 @@ func QoS() func(next http.Handler) http.Handler { return } - // Release long-polling immediately, so they don't always + // Release long-lived connections immediately, so they don't always // take up an in-flight request - if strings.Contains(req.URL.Path, "/user/events") { + if strings.Contains(req.URL.Path, "/-/ws") { c.Release() } else { defer c.Release() diff --git a/routers/init.go b/routers/init.go index 2ed7a57e5c8fb..3545471615fab 100644 --- a/routers/init.go +++ b/routers/init.go @@ -12,7 +12,6 @@ import ( "code.gitea.io/gitea/models" authmodel "code.gitea.io/gitea/models/auth" "code.gitea.io/gitea/modules/cache" - "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/git/gitcmd" "code.gitea.io/gitea/modules/log" @@ -54,6 +53,7 @@ import ( "code.gitea.io/gitea/services/task" "code.gitea.io/gitea/services/uinotification" "code.gitea.io/gitea/services/webhook" + websocket_service "code.gitea.io/gitea/services/websocket" ) func mustInit(fn func() error) { @@ -159,7 +159,7 @@ func InitWebInstalled(ctx context.Context) { mustInit(automerge.Init) mustInit(task.Init) mustInit(repo_migrations.Init) - eventsource.GetManager().Init() + mustInit(websocket_service.Init) mustInitCtx(ctx, mailer_incoming.Init) mustInitCtx(ctx, syncAppConfForGit) diff --git a/routers/web/auth/auth.go b/routers/web/auth/auth.go index e5e30667f07ef..6e38c816ad43b 100644 --- a/routers/web/auth/auth.go +++ b/routers/web/auth/auth.go @@ -16,7 +16,6 @@ import ( "code.gitea.io/gitea/models/db" user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/auth/password" - "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/modules/httplib" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/optional" @@ -34,6 +33,7 @@ import ( "code.gitea.io/gitea/services/forms" "code.gitea.io/gitea/services/mailer" user_service "code.gitea.io/gitea/services/user" + websocket_service "code.gitea.io/gitea/services/websocket" "github.com/markbates/goth" ) @@ -480,10 +480,7 @@ func HandleSignOut(ctx *context.Context) { // SignOut sign out from login status func SignOut(ctx *context.Context) { if ctx.Doer != nil { - eventsource.GetManager().SendMessageBlocking(ctx.Doer.ID, &eventsource.Event{ - Name: "logout", - Data: ctx.Session.ID(), - }) + websocket_service.PublishLogout(ctx.Doer.ID, ctx.Session.ID()) } // prepare the sign-out URL before destroying the session diff --git a/routers/web/events/events.go b/routers/web/events/events.go deleted file mode 100644 index 52f20e07dc3e2..0000000000000 --- a/routers/web/events/events.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package events - -import ( - "net/http" - "time" - - "code.gitea.io/gitea/modules/eventsource" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/routers/web/auth" - "code.gitea.io/gitea/services/context" -) - -// Events listens for events -func Events(ctx *context.Context) { - // FIXME: Need to check if resp is actually a http.Flusher! - how though? - - // Set the headers related to event streaming. - ctx.Resp.Header().Set("Content-Type", "text/event-stream") - ctx.Resp.Header().Set("Cache-Control", "no-cache") - ctx.Resp.Header().Set("Connection", "keep-alive") - ctx.Resp.Header().Set("X-Accel-Buffering", "no") - ctx.Resp.WriteHeader(http.StatusOK) - - if !ctx.IsSigned { - // Return unauthorized status event - event := &eventsource.Event{ - Name: "close", - Data: "unauthorized", - } - _, _ = event.WriteTo(ctx) - ctx.Resp.Flush() - return - } - - // Listen to connection close and un-register messageChan - notify := ctx.Done() - ctx.Resp.Flush() - - shutdownCtx := graceful.GetManager().ShutdownContext() - - uid := ctx.Doer.ID - - messageChan := eventsource.GetManager().Register(uid) - - unregister := func() { - eventsource.GetManager().Unregister(uid, messageChan) - // ensure the messageChan is closed - for { - _, ok := <-messageChan - if !ok { - break - } - } - } - - if _, err := ctx.Resp.Write([]byte("\n")); err != nil { - log.Error("Unable to write to EventStream: %v", err) - unregister() - return - } - - timer := time.NewTicker(30 * time.Second) - -loop: - for { - select { - case <-timer.C: - event := &eventsource.Event{ - Name: "ping", - } - _, err := event.WriteTo(ctx.Resp) - if err != nil { - log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err) - go unregister() - break loop - } - ctx.Resp.Flush() - case <-notify: - go unregister() - break loop - case <-shutdownCtx.Done(): - go unregister() - break loop - case event, ok := <-messageChan: - if !ok { - break loop - } - - // Handle logout - if event.Name == "logout" { - if ctx.Session.ID() == event.Data { - _, _ = (&eventsource.Event{ - Name: "logout", - Data: "here", - }).WriteTo(ctx.Resp) - ctx.Resp.Flush() - go unregister() - auth.HandleSignOut(ctx) - break loop - } - // Replace the event - we don't want to expose the session ID to the user - event = &eventsource.Event{ - Name: "logout", - Data: "elsewhere", - } - } - - _, err := event.WriteTo(ctx.Resp) - if err != nil { - log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err) - go unregister() - break loop - } - ctx.Resp.Flush() - } - } - timer.Stop() -} diff --git a/routers/web/repo/issue_stopwatch.go b/routers/web/repo/issue_stopwatch.go index 2de3a7cfecd6f..ac75982a14235 100644 --- a/routers/web/repo/issue_stopwatch.go +++ b/routers/web/repo/issue_stopwatch.go @@ -4,10 +4,9 @@ package repo import ( - "code.gitea.io/gitea/models/db" issues_model "code.gitea.io/gitea/models/issues" - "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/services/context" + websocket_service "code.gitea.io/gitea/services/websocket" ) // IssueStartStopwatch creates a stopwatch for the given issue. @@ -29,6 +28,7 @@ func IssueStartStopwatch(c *context.Context) { c.Flash.Warning(c.Tr("repo.issues.stopwatch_already_created")) } else { c.Flash.Success(c.Tr("repo.issues.tracker_auto_close")) + websocket_service.PublishStopwatchesForUser(c, c.Doer) } c.JSONRedirect("") } @@ -50,6 +50,8 @@ func IssueStopStopwatch(c *context.Context) { return } else if !ok { c.Flash.Warning(c.Tr("repo.issues.stopwatch_already_stopped")) + } else { + websocket_service.PublishStopwatchesForUser(c, c.Doer) } c.JSONRedirect("") } @@ -69,18 +71,6 @@ func CancelStopwatch(c *context.Context) { c.ServerError("CancelStopwatch", err) return } - - stopwatches, err := issues_model.GetUserStopwatches(c, c.Doer.ID, db.ListOptions{}) - if err != nil { - c.ServerError("GetUserStopwatches", err) - return - } - if len(stopwatches) == 0 { - eventsource.GetManager().SendMessage(c.Doer.ID, &eventsource.Event{ - Name: "stopwatches", - Data: "{}", - }) - } - + websocket_service.PublishStopwatchesForUser(c, c.Doer) c.JSONRedirect("") } diff --git a/routers/web/web.go b/routers/web/web.go index e3dcf27cc4afe..79cc88a6adef1 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -27,7 +27,6 @@ import ( "code.gitea.io/gitea/routers/web/admin" "code.gitea.io/gitea/routers/web/auth" "code.gitea.io/gitea/routers/web/devtest" - "code.gitea.io/gitea/routers/web/events" "code.gitea.io/gitea/routers/web/explore" "code.gitea.io/gitea/routers/web/feed" "code.gitea.io/gitea/routers/web/healthcheck" @@ -41,6 +40,7 @@ import ( "code.gitea.io/gitea/routers/web/user" user_setting "code.gitea.io/gitea/routers/web/user/setting" "code.gitea.io/gitea/routers/web/user/setting/security" + gitea_websocket "code.gitea.io/gitea/routers/web/websocket" auth_service "code.gitea.io/gitea/services/auth" "code.gitea.io/gitea/services/context" "code.gitea.io/gitea/services/forms" @@ -587,7 +587,7 @@ func registerWebRoutes(m *web.Router, webAuth *AuthMiddleware) { }) }, reqSignOut) - m.Any("/user/events", routing.MarkLongPolling, events.Events) + m.Get("/-/ws", gitea_websocket.Serve) m.Group("/login/oauth", func() { m.Group("", func() { diff --git a/routers/web/websocket/websocket.go b/routers/web/websocket/websocket.go new file mode 100644 index 0000000000000..aec8accc08835 --- /dev/null +++ b/routers/web/websocket/websocket.go @@ -0,0 +1,93 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package websocket + +import ( + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/web/routing" + "code.gitea.io/gitea/services/context" + "code.gitea.io/gitea/services/pubsub" + + gitea_ws "github.com/coder/websocket" +) + +// logoutBrokerMsg is the internal broker message published by PublishLogout. +type logoutBrokerMsg struct { + Type string `json:"type"` + SessionID string `json:"sessionID,omitempty"` +} + +// logoutClientMsg is sent to the WebSocket client so the browser can tell +// whether the logout originated from this tab ("here") or another ("elsewhere"). +type logoutClientMsg struct { + Type string `json:"type"` + Data string `json:"data"` +} + +// rewriteLogout intercepts a broker logout message and rewrites it to the +// client format using "here"/"elsewhere" instead of the raw session ID. +// If sessionID is empty the logout applies to all sessions ("here" for all). +func rewriteLogout(msg []byte, connSessionID string) []byte { + var lm logoutBrokerMsg + if err := json.Unmarshal(msg, &lm); err != nil || lm.Type != "logout" { + return msg + } + where := "elsewhere" + if lm.SessionID == "" || lm.SessionID == connSessionID { + where = "here" + } + out, err := json.Marshal(logoutClientMsg{Type: "logout", Data: where}) + if err != nil { + return msg + } + return out +} + +// Serve handles WebSocket upgrade and real-time event delivery. +// Anonymous connections are accepted and kept open; user-specific events +// (notification count, stopwatch, logout) are only delivered to signed-in +// users. This allows future public event types to reuse the same endpoint +// without requiring authentication. +func Serve(ctx *context.Context) { + routing.MarkLongPolling(ctx.Resp, ctx.Req) + + conn, err := gitea_ws.Accept(ctx.Resp, ctx.Req, &gitea_ws.AcceptOptions{ + InsecureSkipVerify: false, + }) + if err != nil { + log.Error("websocket: accept failed: %v", err) + return + } + defer conn.CloseNow() //nolint:errcheck // CloseNow is best-effort; error is intentionally ignored + + // Subscribe to user-specific events only for signed-in users. + // ch is nil for anonymous users: the receive case below never fires, + // keeping the connection open for future public event types. + var ch <-chan []byte + var sessionID string + if ctx.IsSigned { + sessionID = ctx.Session.ID() + var cancel func() + ch, cancel = pubsub.DefaultBroker.Subscribe(pubsub.UserTopic(ctx.Doer.ID)) + defer cancel() + } + + wsCtx := ctx.Req.Context() + for { + select { + case <-wsCtx.Done(): + return + case msg, ok := <-ch: + if !ok { + return + } + msg = rewriteLogout(msg, sessionID) + if err := conn.Write(wsCtx, gitea_ws.MessageText, msg); err != nil { + log.Trace("websocket: write failed: %v", err) + return + } + } + } +} diff --git a/services/notify/notifier.go b/services/notify/notifier.go index 875a70e5644a7..8eeee19c2a2cb 100644 --- a/services/notify/notifier.go +++ b/services/notify/notifier.go @@ -82,4 +82,10 @@ type Notifier interface { WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask) + + // NotificationCountChange is called when the unread notification count for a + // specific user may have changed (e.g. after a new issue notification is created). + // Implementations can use this to push an immediate update to connected clients + // instead of waiting for the next polling tick. + NotificationCountChange(ctx context.Context, userID int64) } diff --git a/services/notify/notify.go b/services/notify/notify.go index 2416cbd2e0830..4a8c3b936bdb1 100644 --- a/services/notify/notify.go +++ b/services/notify/notify.go @@ -410,3 +410,11 @@ func WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, s notifier.WorkflowJobStatusUpdate(ctx, repo, sender, job, task) } } + +// NotificationCountChange notifies that the unread notification count for a +// specific user may have changed. +func NotificationCountChange(ctx context.Context, userID int64) { + for _, notifier := range notifiers { + notifier.NotificationCountChange(ctx, userID) + } +} diff --git a/services/notify/null.go b/services/notify/null.go index c3085d7c9eb0a..69c798b1cc18c 100644 --- a/services/notify/null.go +++ b/services/notify/null.go @@ -219,3 +219,6 @@ func (*NullNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *repo_mod func (*NullNotifier) WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask) { } + +func (*NullNotifier) NotificationCountChange(_ context.Context, _ int64) { +} diff --git a/services/pubsub/broker.go b/services/pubsub/broker.go new file mode 100644 index 0000000000000..1a8bef5321b63 --- /dev/null +++ b/services/pubsub/broker.go @@ -0,0 +1,85 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package pubsub + +import ( + "fmt" + "sync" +) + +// Broker is a simple in-memory pub/sub broker. +// It supports fan-out: one Publish call delivers the message to all active subscribers. +type Broker struct { + mu sync.RWMutex + subs map[string][]chan []byte +} + +// DefaultBroker is the global singleton used by both routers and notifiers. +var DefaultBroker = NewBroker() + +// NewBroker creates a new in-memory Broker. +func NewBroker() *Broker { + return &Broker{ + subs: make(map[string][]chan []byte), + } +} + +// Subscribe returns a channel that receives messages published to topic. +// Call the returned cancel function to unsubscribe. +func (b *Broker) Subscribe(topic string) (<-chan []byte, func()) { + ch := make(chan []byte, 8) + + b.mu.Lock() + b.subs[topic] = append(b.subs[topic], ch) + b.mu.Unlock() + + cancel := func() { + b.mu.Lock() + defer b.mu.Unlock() + subs := b.subs[topic] + for i, sub := range subs { + if sub == ch { + b.subs[topic] = append(subs[:i], subs[i+1:]...) + break + } + } + close(ch) + } + return ch, cancel +} + +// UserTopic returns the pub/sub topic name for a given user ID. +// Centralised here so the notifier and the WebSocket handler always agree on the format. +func UserTopic(userID int64) string { + return fmt.Sprintf("user-%d", userID) +} + +// HasSubscribers reports whether the broker has at least one active subscriber across all topics. +func (b *Broker) HasSubscribers() bool { + b.mu.RLock() + defer b.mu.RUnlock() + for _, subs := range b.subs { + if len(subs) > 0 { + return true + } + } + return false +} + +// Publish sends msg to all subscribers of topic. +// Non-blocking: slow subscribers are skipped. +// The RLock is held for the entire fan-out to prevent a race where cancel() +// closes a channel between the slice read and the send. +func (b *Broker) Publish(topic string, msg []byte) { + b.mu.RLock() + defer b.mu.RUnlock() + + for _, ch := range b.subs[topic] { + select { + case ch <- msg: + default: + // subscriber too slow — skip + } + } +} diff --git a/services/uinotification/notify.go b/services/uinotification/notify.go index dd3f1557c641e..998b15540cc65 100644 --- a/services/uinotification/notify.go +++ b/services/uinotification/notify.go @@ -51,9 +51,17 @@ func NewNotifier() notify_service.Notifier { } func handler(items ...issueNotificationOpts) []issueNotificationOpts { + ctx := graceful.GetManager().ShutdownContext() for _, opts := range items { - if err := activities_model.CreateOrUpdateIssueNotifications(graceful.GetManager().ShutdownContext(), opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil { + if err := activities_model.CreateOrUpdateIssueNotifications(ctx, opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil { log.Error("Was unable to create issue notification: %v", err) + continue + } + // Push an immediate notification-count update to the affected user's + // WebSocket clients. Only possible when ReceiverID is known; the + // periodic poller handles the ReceiverID==0 (all-watchers) case. + if opts.ReceiverID != 0 { + notify_service.NotificationCountChange(ctx, opts.ReceiverID) } } return nil diff --git a/services/user/user.go b/services/user/user.go index 9b8bcf83c0bc7..b540bc5909908 100644 --- a/services/user/user.go +++ b/services/user/user.go @@ -16,7 +16,6 @@ import ( repo_model "code.gitea.io/gitea/models/repo" system_model "code.gitea.io/gitea/models/system" user_model "code.gitea.io/gitea/models/user" - "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" @@ -28,6 +27,7 @@ import ( "code.gitea.io/gitea/services/packages" container_service "code.gitea.io/gitea/services/packages/container" repo_service "code.gitea.io/gitea/services/repository" + websocket_service "code.gitea.io/gitea/services/websocket" ) // RenameUser renames a user @@ -147,9 +147,7 @@ func DeleteUser(ctx context.Context, u *user_model.User, purge bool) error { // Force any logged in sessions to log out // FIXME: We also need to tell the session manager to log them out too. - eventsource.GetManager().SendMessage(u.ID, &eventsource.Event{ - Name: "logout", - }) + websocket_service.PublishLogout(u.ID, "") // Delete all repos belonging to this user // Now this is not within a transaction because there are internal transactions within the DeleteRepository diff --git a/services/websocket/logout_publisher.go b/services/websocket/logout_publisher.go new file mode 100644 index 0000000000000..91492fcc66e9c --- /dev/null +++ b/services/websocket/logout_publisher.go @@ -0,0 +1,30 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package websocket + +import ( + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/services/pubsub" +) + +type logoutEvent struct { + Type string `json:"type"` + SessionID string `json:"sessionID,omitempty"` +} + +// PublishLogout publishes a logout event to all WebSocket clients connected as +// the given user. sessionID identifies which session is signing out so the +// client can distinguish "this tab" from "another tab". +func PublishLogout(userID int64, sessionID string) { + msg, err := json.Marshal(logoutEvent{ + Type: "logout", + SessionID: sessionID, + }) + if err != nil { + log.Error("websocket: marshal logout event: %v", err) + return + } + pubsub.DefaultBroker.Publish(pubsub.UserTopic(userID), msg) +} diff --git a/services/websocket/notification_notifier.go b/services/websocket/notification_notifier.go new file mode 100644 index 0000000000000..97eb7d96f2e6a --- /dev/null +++ b/services/websocket/notification_notifier.go @@ -0,0 +1,34 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package websocket + +import ( + "context" + + activities_model "code.gitea.io/gitea/models/activities" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" + notify_service "code.gitea.io/gitea/services/notify" +) + +type wsNotifier struct { + notify_service.NullNotifier +} + +var _ notify_service.Notifier = &wsNotifier{} + +// NotificationCountChange queries the current unread count for the user and +// pushes it immediately to all connected WebSocket clients, bypassing the +// periodic polling loop for this specific user. +func (n *wsNotifier) NotificationCountChange(ctx context.Context, userID int64) { + count, err := db.Count[activities_model.Notification](ctx, activities_model.FindNotificationOptions{ + UserID: userID, + Status: []activities_model.NotificationStatus{activities_model.NotificationStatusUnread}, + }) + if err != nil { + log.Error("websocket: NotificationCountChange count %d: %v", userID, err) + return + } + publishNotificationCount(userID, count) +} diff --git a/services/websocket/notifier.go b/services/websocket/notifier.go new file mode 100644 index 0000000000000..18cba22c3a0f4 --- /dev/null +++ b/services/websocket/notifier.go @@ -0,0 +1,93 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package websocket + +import ( + "context" + "time" + + activities_model "code.gitea.io/gitea/models/activities" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + notify_service "code.gitea.io/gitea/services/notify" + "code.gitea.io/gitea/services/pubsub" +) + +// nowTS returns the current time as a TimeStamp using the real wall clock, +// avoiding data races with timeutil.MockUnset during tests. +func nowTS() timeutil.TimeStamp { + return timeutil.TimeStamp(time.Now().Unix()) +} + +type notificationCountEvent struct { + Type string `json:"type"` + Count int64 `json:"count"` +} + +// Init starts the background goroutines that push real-time updates to +// connected WebSocket clients: notification counts and (when time-tracking +// is enabled) active stopwatches. It also registers the websocket notifier +// so that targeted pushes fire immediately when notification counts change. +func Init() error { + notify_service.RegisterNotifier(&wsNotifier{}) + go graceful.GetManager().RunWithShutdownContext(run) + if setting.Service.EnableTimetracking { + go graceful.GetManager().RunWithShutdownContext(runStopwatch) + } + return nil +} + +func publishNotificationCount(userID, count int64) { + msg, err := json.Marshal(notificationCountEvent{ + Type: "notification-count", + Count: count, + }) + if err != nil { + return + } + pubsub.DefaultBroker.Publish(pubsub.UserTopic(userID), msg) +} + +func run(ctx context.Context) { + ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: WebSocket", process.SystemProcessType, true) + defer finished() + + if setting.UI.Notification.EventSourceUpdateTime <= 0 { + return + } + + then := nowTS().Add(-2) + timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + if !pubsub.DefaultBroker.HasSubscribers() { + then = nowTS().Add(-2) + continue + } + + now := nowTS().Add(-2) + + uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now) + if err != nil { + log.Error("websocket: GetUIDsAndNotificationCounts: %v", err) + continue + } + + for _, uidCount := range uidCounts { + publishNotificationCount(uidCount.UserID, uidCount.Count) + } + + then = now + } + } +} diff --git a/services/websocket/stopwatch_notifier.go b/services/websocket/stopwatch_notifier.go new file mode 100644 index 0000000000000..c1ffff60dfe98 --- /dev/null +++ b/services/websocket/stopwatch_notifier.go @@ -0,0 +1,97 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package websocket + +import ( + "context" + "time" + + "code.gitea.io/gitea/models/db" + issues_model "code.gitea.io/gitea/models/issues" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/services/convert" + "code.gitea.io/gitea/services/pubsub" +) + +type stopwatchesEvent struct { + Type string `json:"type"` + Data any `json:"data"` +} + +func publishStopwatchesForUser(ctx context.Context, user *user_model.User, sws []*issues_model.Stopwatch) { + var data any + if len(sws) == 0 { + data = []any{} + } else { + apiSWs, err := convert.ToStopWatches(ctx, user, sws) + if err != nil { + if !issues_model.IsErrIssueNotExist(err) { + log.Error("websocket: ToStopWatches: %v", err) + } + return + } + data = apiSWs + } + + msg, err := json.Marshal(stopwatchesEvent{Type: "stopwatches", Data: data}) + if err != nil { + return + } + pubsub.DefaultBroker.Publish(pubsub.UserTopic(user.ID), msg) +} + +// PublishStopwatchesForUser fetches the user's current stopwatches and pushes +// them immediately to all connected WebSocket clients, bypassing the periodic +// polling loop. Call this after any stopwatch start, stop, or cancel so that +// all open tabs update without waiting for the next tick. +func PublishStopwatchesForUser(ctx context.Context, user *user_model.User) { + sws, err := issues_model.GetUserStopwatches(ctx, user.ID, db.ListOptions{}) + if err != nil { + log.Error("websocket: GetUserStopwatches %d: %v", user.ID, err) + return + } + publishStopwatchesForUser(ctx, user, sws) +} + +func runStopwatch(ctx context.Context) { + ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: WebSocket Stopwatch", process.SystemProcessType, true) + defer finished() + + if setting.UI.Notification.EventSourceUpdateTime <= 0 { + return + } + + timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + if !pubsub.DefaultBroker.HasSubscribers() { + continue + } + + userStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx) + if err != nil { + log.Error("websocket: GetUIDsAndStopwatch: %v", err) + continue + } + + for _, us := range userStopwatches { + u, err := user_model.GetUserByID(ctx, us.UserID) + if err != nil { + log.Error("websocket: GetUserByID %d: %v", us.UserID, err) + continue + } + publishStopwatchesForUser(ctx, u, us.StopWatches) + } + } + } +} diff --git a/templates/base/head_navbar.tmpl b/templates/base/head_navbar.tmpl index cc7e4e6775b70..a2fc21b2612ae 100644 --- a/templates/base/head_navbar.tmpl +++ b/templates/base/head_navbar.tmpl @@ -152,31 +152,29 @@ {{$activeStopwatch := and .PageGlobalData (call .PageGlobalData.GetActiveStopwatch)}} - {{if $activeStopwatch}} -