Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the custom queue implementation. #93

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
golang.org/x/sync v0.1.0
)

require github.com/creachadair/mds v0.0.0-20230313153906-9788b6f60568

go 1.19

// A bug in handler.New could panic a wrapped handler on pointer arguments.
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/creachadair/mds v0.0.0-20230313153906-9788b6f60568 h1:LzMDPh2FLXEHnQNTjGJT+vInPSDVOUTuzqE4GskHTNg=
github.com/creachadair/mds v0.0.0-20230313153906-9788b6f60568/go.mod h1:caBACU+n1Q/rZ252FTzfnG0/H+ZUi+UnIQtEOraMv/g=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
Expand Down
64 changes: 0 additions & 64 deletions queue.go

This file was deleted.

36 changes: 19 additions & 17 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/creachadair/jrpc2/channel"
"github.com/creachadair/jrpc2/code"
"github.com/creachadair/mds/mlink"
"golang.org/x/sync/semaphore"
)

Expand Down Expand Up @@ -67,11 +68,11 @@ type Server struct {

mu *sync.Mutex // protects the fields below

nbar sync.WaitGroup // notification barrier (see the dispatch method)
err error // error from a previous operation
work chan struct{} // for signaling message availability
inq *queue // inbound requests awaiting processing
ch channel.Channel // the channel to the client
nbar sync.WaitGroup // notification barrier (see the dispatch method)
err error // error from a previous operation
work chan struct{} // for signaling message availability
inq *mlink.Queue[jmessages] // inbound requests awaiting processing
ch channel.Channel // the channel to the client

// For each request ID currently in-flight, this map carries a cancel
// function attached to the context that was sent to the handler.
Expand Down Expand Up @@ -104,7 +105,7 @@ func NewServer(mux Assigner, opts *ServerOptions) *Server {
mu: new(sync.Mutex),
start: opts.startTime(),
builtin: opts.allowBuiltin(),
inq: newQueue(),
inq: mlink.NewQueue[jmessages](),
used: make(map[string]context.CancelFunc),
call: make(map[string]*Response),
callID: 1,
Expand Down Expand Up @@ -196,18 +197,18 @@ func (s *Server) signal() {
func (s *Server) nextRequest() (func() error, error) {
s.mu.Lock()
defer s.mu.Unlock()
for s.ch != nil && s.inq.isEmpty() {
for s.ch != nil && s.inq.IsEmpty() {
s.mu.Unlock()
<-s.work
s.mu.Lock()
}
if s.ch == nil && s.inq.isEmpty() {
if s.ch == nil && s.inq.IsEmpty() {
return nil, s.err
}
ch := s.ch // capture

next := s.inq.pop()
s.log("Dequeued request batch of length %d (qlen=%d)", len(next), s.inq.size())
next, _ := s.inq.Pop()
s.log("Dequeued request batch of length %d (qlen=%d)", len(next), s.inq.Len())

// Construct a dispatcher to run the handlers outside the lock.
return s.dispatch(next, ch), nil
Expand Down Expand Up @@ -553,7 +554,7 @@ func (s ServerStatus) Success() bool { return s.Err == nil }
func (s *Server) WaitStatus() ServerStatus {
s.wg.Wait()
// Postcondition check.
if !s.inq.isEmpty() {
if !s.inq.IsEmpty() {
panic("s.inq is not empty at shutdown")
}
stat := ServerStatus{Err: s.err}
Expand Down Expand Up @@ -586,7 +587,7 @@ func (s *Server) stop(err error) {
//
// TODO(@creachadair): We need better tests for this behaviour.
var keep jmessages
s.inq.each(func(cur jmessages) {
s.inq.Each(func(cur jmessages) bool {
for _, req := range cur {
if req.isNotification() {
keep = append(keep, req)
Expand All @@ -595,10 +596,11 @@ func (s *Server) stop(err error) {
s.cancel(string(req.ID))
}
}
return true
})
s.inq.reset()
s.inq.Clear()
for _, elt := range keep {
s.inq.push(jmessages{elt})
s.inq.Add(jmessages{elt})
}
close(s.work)

Expand Down Expand Up @@ -653,9 +655,9 @@ func (s *Server) read(ch receiver) {
// was responses, so re-check the length after doing this.
keep := s.filterBatch(in)
if len(keep) != 0 {
s.log("Received request batch of size %d (qlen=%d)", len(keep), s.inq.size())
s.inq.push(keep)
if s.inq.size() == 1 { // the queue was empty
s.log("Received request batch of size %d (qlen=%d)", len(keep), s.inq.Len())
s.inq.Add(keep)
if s.inq.Len() == 1 { // the queue was empty
s.signal()
}
}
Expand Down