Skip to content

Commit

Permalink
Add support for Last-Event-ID=-1
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed May 18, 2020
1 parent ab2447c commit 007309d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 9 deletions.
5 changes: 4 additions & 1 deletion hub/bolt_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,13 @@ func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) {
}

c := b.Cursor()
afterFromID := false
afterFromID := s.LastEventID == "-1"
previousID := "-1"
for k, v := c.First(); k != nil; k, v = c.Next() {
if !afterFromID {
if string(k[8:]) == s.LastEventID {
afterFromID = true
previousID = ""
}

continue
Expand All @@ -185,6 +187,7 @@ func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) {
log.Error(fmt.Errorf("bolt history: %w", err))
return err
}
update.PreviousID = previousID

if !s.Dispatch(update, true) || (toSeq > 0 && binary.BigEndian.Uint64(k[:8]) >= toSeq) {
return nil
Expand Down
33 changes: 33 additions & 0 deletions hub/bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,39 @@ func TestBoltTransportHistory(t *testing.T) {
}
}

func TestBoltTransportRetrieveAllHistory(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
defer transport.Close()
defer os.Remove("test.db")

topics := []string{"https://example.com/foo"}
for i := 1; i <= 10; i++ {
transport.Dispatch(&Update{
Event: Event{ID: strconv.Itoa(i)},
Topics: topics,
})
}

s := newSubscriber("-1", newTopicSelectorStore())
s.Topics = topics
go s.start()

err := transport.AddSubscriber(s)
assert.Nil(t, err)

var count int
for {
u := <-s.Receive()
// the reading loop must read all messages
count++
assert.Equal(t, strconv.Itoa(count), u.ID)
if count == 10 {
return
}
}
}

func TestBoltTransportHistoryAndLive(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
Expand Down
18 changes: 12 additions & 6 deletions hub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
}
timer.Reset(hearthbeatInterval)
case update := <-s.Receive():
if update.PreviousID != "" {
w.Header().Set("Last-Event-ID", update.PreviousID)
}

if !h.write(w, s, newSerializedUpdate(update).event) {
return
}
Expand Down Expand Up @@ -107,7 +111,7 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug b
log.WithFields(s.LogFields).Error(err)
return nil
}
sendHeaders(w)
sendHeaders(w, s.LastEventID == "")
log.WithFields(s.LogFields).Info("New subscriber")

h.metrics.NewSubscriber(s)
Expand All @@ -116,7 +120,7 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug b
}

// sendHeaders sends correct HTTP headers to create a keep-alive connection.
func sendHeaders(w http.ResponseWriter) {
func sendHeaders(w http.ResponseWriter, flush bool) {
// Keep alive, useful only for HTTP 1 clients https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive
w.Header().Set("Connection", "keep-alive")

Expand All @@ -131,10 +135,12 @@ func sendHeaders(w http.ResponseWriter) {
// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
w.Header().Set("X-Accel-Buffering", "no")

// Write a comment in the body
// Go currently doesn't provide a better way to flush the headers
fmt.Fprint(w, ":\n")
w.(http.Flusher).Flush()
if flush {
// Write a comment in the body
// Go currently doesn't provide a better way to flush the headers
fmt.Fprint(w, ":\n")
w.(http.Flusher).Flush()
}
}

// retrieveLastEventID extracts the Last-Event-ID from the corresponding HTTP header with a fallback on the query parameter.
Expand Down
4 changes: 2 additions & 2 deletions hub/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func TestSendMissedEvents(t *testing.T) {

w := &responseTester{
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
expectedBody: "id: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}
Expand All @@ -513,7 +513,7 @@ func TestSendMissedEvents(t *testing.T) {

w := &responseTester{
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
expectedBody: "id: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}
Expand Down
4 changes: 4 additions & 0 deletions hub/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ type Update struct {
// Private updates can only be dispatched to subscribers authorized to receive them.
Private bool

// PreviousID contains the ID of the previous update
// This value must be sent only if the request Last-Event-ID cannot be found, and only on the first update available
PreviousID string

// The Server-Sent Event to send.
Event
}
Expand Down

0 comments on commit 007309d

Please sign in to comment.