From 2ad50317f7ed6954cc7b9da92293028914612b89 Mon Sep 17 00:00:00 2001 From: Amin Jamali Date: Tue, 27 Jul 2021 12:56:54 +0000 Subject: [PATCH] Notify server when client connection are gone After a connection is [hijacked](https://pkg.go.dev/net/http#Hijacker.Hijack) it becomes server's responsibility to close the server and client connections. --- handlers/events_handlers.go | 8 ++++++-- handlers/events_handlers_test.go | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/handlers/events_handlers.go b/handlers/events_handlers.go index fdc31cde..455d1482 100644 --- a/handlers/events_handlers.go +++ b/handlers/events_handlers.go @@ -78,7 +78,11 @@ func streamEventsToResponse(logger lager.Logger, w http.ResponseWriter, eventCha var event models.Event eventID := 0 - closeNotifier := w.(http.CloseNotifier).CloseNotify() + done := make(chan bool, 1) + go func() { + rw.ReadFrom(conn) + done <- true + }() for { select { @@ -86,7 +90,7 @@ func streamEventsToResponse(logger lager.Logger, w http.ResponseWriter, eventCha case err := <-errorChan: logger.Error("failed-to-get-next-event", err) return - case <-closeNotifier: + case <-done: logger.Debug("received-close-notify") return } diff --git a/handlers/events_handlers_test.go b/handlers/events_handlers_test.go index d0428e07..566878ab 100644 --- a/handlers/events_handlers_test.go +++ b/handlers/events_handlers_test.go @@ -18,6 +18,7 @@ import ( "code.cloudfoundry.org/lager/lagertest" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gbytes" "github.com/vito/go-sse/sse" ) @@ -133,6 +134,23 @@ var _ = Describe("Event Handlers", func() { Expect(response.Header.Get("Cache-Control")).To(Equal("no-cache, no-store, must-revalidate")) }) + It("detects closed clients on the server", func() { + reader := sse.NewReadCloser(response.Body) + + hub.Emit(&eventfakes.FakeEvent{Token: "A"}) + encodedPayload := base64.StdEncoding.EncodeToString([]byte("A")) + + Expect(reader.Next()).To(Equal(sse.Event{ + ID: "0", + Name: "fake", + Data: []byte(encodedPayload), + })) + + reader.Close() + + Eventually(logger).Should(gbytes.Say("received-close-notify")) + }) + Context("when the source provides an unmarshalable event", func() { It("closes the event stream to the client", func() { hub.Emit(eventfakes.UnmarshalableEvent{Fn: func() {}})