Skip to content

Commit

Permalink
Remove the custom queue implementation. (#93)
Browse files Browse the repository at this point in the history
Use mlink.Queue instead. It allocates a tiny bit more (because no freelist) but
not enough to worry about.
  • Loading branch information
creachadair authored Mar 14, 2023
1 parent 15caece commit 03c871e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 81 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
golang.org/x/sync v0.1.0
)

require github.com/creachadair/mds v0.0.0-20230313153906-9788b6f60568

go 1.19

// A bug in handler.New could panic a wrapped handler on pointer arguments.
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/creachadair/mds v0.0.0-20230313153906-9788b6f60568 h1:LzMDPh2FLXEHnQNTjGJT+vInPSDVOUTuzqE4GskHTNg=
github.com/creachadair/mds v0.0.0-20230313153906-9788b6f60568/go.mod h1:caBACU+n1Q/rZ252FTzfnG0/H+ZUi+UnIQtEOraMv/g=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
Expand Down
64 changes: 0 additions & 64 deletions queue.go

This file was deleted.

36 changes: 19 additions & 17 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/creachadair/jrpc2/channel"
"github.com/creachadair/jrpc2/code"
"github.com/creachadair/mds/mlink"
"golang.org/x/sync/semaphore"
)

Expand Down Expand Up @@ -67,11 +68,11 @@ type Server struct {

mu *sync.Mutex // protects the fields below

nbar sync.WaitGroup // notification barrier (see the dispatch method)
err error // error from a previous operation
work chan struct{} // for signaling message availability
inq *queue // inbound requests awaiting processing
ch channel.Channel // the channel to the client
nbar sync.WaitGroup // notification barrier (see the dispatch method)
err error // error from a previous operation
work chan struct{} // for signaling message availability
inq *mlink.Queue[jmessages] // inbound requests awaiting processing
ch channel.Channel // the channel to the client

// For each request ID currently in-flight, this map carries a cancel
// function attached to the context that was sent to the handler.
Expand Down Expand Up @@ -104,7 +105,7 @@ func NewServer(mux Assigner, opts *ServerOptions) *Server {
mu: new(sync.Mutex),
start: opts.startTime(),
builtin: opts.allowBuiltin(),
inq: newQueue(),
inq: mlink.NewQueue[jmessages](),
used: make(map[string]context.CancelFunc),
call: make(map[string]*Response),
callID: 1,
Expand Down Expand Up @@ -196,18 +197,18 @@ func (s *Server) signal() {
func (s *Server) nextRequest() (func() error, error) {
s.mu.Lock()
defer s.mu.Unlock()
for s.ch != nil && s.inq.isEmpty() {
for s.ch != nil && s.inq.IsEmpty() {
s.mu.Unlock()
<-s.work
s.mu.Lock()
}
if s.ch == nil && s.inq.isEmpty() {
if s.ch == nil && s.inq.IsEmpty() {
return nil, s.err
}
ch := s.ch // capture

next := s.inq.pop()
s.log("Dequeued request batch of length %d (qlen=%d)", len(next), s.inq.size())
next, _ := s.inq.Pop()
s.log("Dequeued request batch of length %d (qlen=%d)", len(next), s.inq.Len())

// Construct a dispatcher to run the handlers outside the lock.
return s.dispatch(next, ch), nil
Expand Down Expand Up @@ -553,7 +554,7 @@ func (s ServerStatus) Success() bool { return s.Err == nil }
func (s *Server) WaitStatus() ServerStatus {
s.wg.Wait()
// Postcondition check.
if !s.inq.isEmpty() {
if !s.inq.IsEmpty() {
panic("s.inq is not empty at shutdown")
}
stat := ServerStatus{Err: s.err}
Expand Down Expand Up @@ -586,7 +587,7 @@ func (s *Server) stop(err error) {
//
// TODO(@creachadair): We need better tests for this behaviour.
var keep jmessages
s.inq.each(func(cur jmessages) {
s.inq.Each(func(cur jmessages) bool {
for _, req := range cur {
if req.isNotification() {
keep = append(keep, req)
Expand All @@ -595,10 +596,11 @@ func (s *Server) stop(err error) {
s.cancel(string(req.ID))
}
}
return true
})
s.inq.reset()
s.inq.Clear()
for _, elt := range keep {
s.inq.push(jmessages{elt})
s.inq.Add(jmessages{elt})
}
close(s.work)

Expand Down Expand Up @@ -653,9 +655,9 @@ func (s *Server) read(ch receiver) {
// was responses, so re-check the length after doing this.
keep := s.filterBatch(in)
if len(keep) != 0 {
s.log("Received request batch of size %d (qlen=%d)", len(keep), s.inq.size())
s.inq.push(keep)
if s.inq.size() == 1 { // the queue was empty
s.log("Received request batch of size %d (qlen=%d)", len(keep), s.inq.Len())
s.inq.Add(keep)
if s.inq.Len() == 1 { // the queue was empty
s.signal()
}
}
Expand Down

0 comments on commit 03c871e

Please sign in to comment.