Skip to content

Commit

Permalink
server: handle duplicate request IDs within a single batch (#77)
Browse files Browse the repository at this point in the history
Previously, if two or more calls arrived in a single batch with the same
request ID, the server would process the first one and report the rest as
having duplicate IDs.

This is subtly wrong, in that the client has no way of knowing which of the
duplicates should get the successful call, and which should get the errors.
Although a client sending such requests is already wrong, this makes the error
harder to debug.

Instead, check for duplicates within the batch during assignment, and fail all
the colliding requests.

* Add a regression test for duplicate requests in a single batch.
* Add a shared error base for NoSuchMethod.
* Add a shared common error for InvalidParams.
  • Loading branch information
creachadair authored Feb 4, 2022
1 parent 8b046b1 commit 1a458a9
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 20 deletions.
4 changes: 2 additions & 2 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ func (r *Request) UnmarshalParams(v interface{}) error {
dec := json.NewDecoder(bytes.NewReader(r.params))
dec.DisallowUnknownFields()
if err := dec.Decode(v); err != nil {
return Errorf(code.InvalidParams, "invalid parameters: %v", err.Error())
return errInvalidParams.WithData(err.Error())
}
return nil
}
if err := json.Unmarshal(r.params, v); err != nil {
return Errorf(code.InvalidParams, "invalid parameters: %v", err.Error())
return errInvalidParams.WithData(err.Error())
}
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,21 @@ var errClientStopped = errors.New("the client has been stopped")
// errEmptyMethod is the error reported for an empty request method name.
var errEmptyMethod = &Error{Code: code.InvalidRequest, Message: "empty method name"}

// errNoSuchMethod is the error reported for an unknown method name.
var errNoSuchMethod = &Error{Code: code.MethodNotFound, Message: "no such method"}

// errDuplicateID is the error reported for a duplicated request ID.
var errDuplicateID = &Error{Code: code.InvalidRequest, Message: "duplicate request ID"}

// errInvalidRequest is the error reported for an invalid request object or batch.
var errInvalidRequest = &Error{Code: code.ParseError, Message: "invalid request value"}

// errEmptyBatch is the error reported for an empty request batch.
var errEmptyBatch = &Error{Code: code.InvalidRequest, Message: "empty request batch"}

// errInvalidParams is the error reported for invalid request parameters.
var errInvalidParams = &Error{Code: code.InvalidParams, Message: "invalid parameters"}

// ErrConnClosed is returned by a server's push-to-client methods if they are
// called after the client connection is closed.
var ErrConnClosed = errors.New("client connection is closed")
Expand Down
4 changes: 2 additions & 2 deletions jrpc2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ func TestServer_nonLibraryClient(t *testing.T) {

// The method specified doesn't exist.
{`{"jsonrpc":"2.0", "id": 3, "method": "NoneSuch"}`,
`{"jsonrpc":"2.0","id":3,"error":{"code":-32601,"message":"no such method \"NoneSuch\""}}`},
`{"jsonrpc":"2.0","id":3,"error":{"code":-32601,"message":"no such method","data":"NoneSuch"}}`},

// The parameters are of the wrong form.
{`{"jsonrpc":"2.0", "id": 4, "method": "X", "params": "bogus"}`,
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestServer_nonLibraryClient(t *testing.T) {

// A batch of invalid requests returns a batch of errors.
{`[{"jsonrpc": "2.0", "id": 6, "method":"bogus"}]`,
`[{"jsonrpc":"2.0","id":6,"error":{"code":-32601,"message":"no such method \"bogus\""}}]`},
`[{"jsonrpc":"2.0","id":6,"error":{"code":-32601,"message":"no such method","data":"bogus"}}]`},

// Batch requests return batch responses, even for a singleton.
{`[{"jsonrpc": "2.0", "id": 7, "method": "X"}]`, `[{"jsonrpc":"2.0","id":7,"result":"OK"}]`},
Expand Down
43 changes: 42 additions & 1 deletion regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/creachadair/jrpc2/handler"
"github.com/creachadair/jrpc2/server"
"github.com/fortytw2/leaktest"
"github.com/google/go-cmp/cmp"
)

// Verify that a notification handler will not deadlock with the dispatcher on
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestDuplicateIDCancellation(t *testing.T) {

// Send the duplicate, which should report an error.
send(duplicateReq)
expect(`{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"duplicate request id \"1\""}}`)
expect(`{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"duplicate request ID","data":"1"}}`)

// Unblock the handler, which should now complete. If the duplicate request
// caused the handler to cancel, it will have logged an error to fail the test.
Expand All @@ -151,3 +152,43 @@ func TestDuplicateIDCancellation(t *testing.T) {
cch.Close()
srv.Wait()
}

func TestCheckBatchDuplicateID(t *testing.T) {
defer leaktest.Check(t)()

srv, cli := channel.Direct()
s := jrpc2.NewServer(handler.Map{
"Test": testOK,
}, nil).Start(srv)
defer func() {
cli.Close()
if err := s.Wait(); err != nil {
t.Errorf("Server wait: unexpected error: %v", err)
}
}()

// A batch of requests containing two calls with the same ID.
const input = `[
{"jsonrpc": "2.0", "id": 1, "method": "Test"},
{"jsonrpc": "2.0", "id": 1, "method": "Test"},
{"jsonrpc": "2.0", "id": 2, "method": "Test"}
]
`
const errorReply = `{` +
`"jsonrpc":"2.0",` +
`"id":1,` +
`"error":{"code":-32600,"message":"duplicate request ID","data":"1"}` +
`}`
const want = `[` + errorReply + `,` + errorReply + `,` + `{"jsonrpc":"2.0","id":2,"result":"OK"}]`

if err := cli.Send([]byte(input)); err != nil {
t.Fatalf("Send %d bytes failed: %v", len(input), err)
}
rsp, err := cli.Recv()
if err != nil {
t.Fatalf("Recv failed: %v", err)
}
if diff := cmp.Diff(want, string(rsp)); diff != "" {
t.Errorf("Server response: (-want, +got)\n%s", diff)
}
}
56 changes: 41 additions & 15 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,15 @@ func (s *Server) deliver(rsps jmessages, ch sender, elapsed time.Duration) error
// records errors for them as appropriate. The caller must hold s.mu.
func (s *Server) checkAndAssign(next jmessages) tasks {
var ts tasks
var ids []string
dup := make(map[string]*task) // :: id ⇒ first task in batch with id

// Phase 1: Filter out responses from push calls and check for duplicate
// request ID.s
for _, req := range next {
fid := fixID(req.ID)
t := &task{
hreq: &Request{id: fid, method: req.M, params: req.P},
batch: req.batch,
}
id := string(fid)
if req.err != nil {
t.err = req.err // deferred validation error
} else if !req.isRequestOrNotification() && s.call[id] != nil {
if !req.isRequestOrNotification() && s.call[id] != nil {
// This is a result or error for a pending push-call.
//
// N.B. It is important to check for this before checking for
Expand All @@ -296,24 +295,51 @@ func (s *Server) checkAndAssign(next jmessages) tasks {
delete(s.call, id)
rsp.ch <- req
continue // don't send a reply for this
} else if id != "" && s.used[id] != nil {
t.err = Errorf(code.InvalidRequest, "duplicate request id %q", id)
} else if req.err != nil {
// keep the existing error
} else if !s.versionOK(req.V) {
t.err = ErrInvalidVersion
} else if req.M == "" {
req.err = ErrInvalidVersion
}

t := &task{
hreq: &Request{id: fid, method: req.M, params: req.P},
batch: req.batch,
err: req.err,
}
if old := dup[id]; old != nil {
// A previous task already used this ID, fail both.
old.err = errDuplicateID.WithData(id)
t.err = old.err
} else if id != "" && s.used[id] != nil {
// A task from a previous batch already used this ID, fail this one.
t.err = errDuplicateID.WithData(id)
} else if id != "" {
// This is the first task with this ID in the batch.
dup[id] = t
}
ts = append(ts, t)
ids = append(ids, id)
}

// Phase 2: Assign method handlers and set up contexts.
for i, t := range ts {
id := ids[i]
if t.err != nil {
// deferred validation error
} else if t.hreq.method == "" {
t.err = errEmptyMethod
} else if s.setContext(t, id) {
t.m = s.assign(t.ctx, req.M)
t.m = s.assign(t.ctx, t.hreq.method)
if t.m == nil {
t.err = Errorf(code.MethodNotFound, "no such method %q", req.M)
t.err = errNoSuchMethod.WithData(t.hreq.method)
}
}

if t.err != nil {
s.log("Request check error for %q (params %q): %v", req.M, string(req.P), t.err)
s.log("Request check error for %q (params %q): %v",
t.hreq.method, string(t.hreq.params), t.err)
s.metrics.Count("rpc.errors", 1)
}
ts = append(ts, t)
}
return ts
}
Expand Down

0 comments on commit 1a458a9

Please sign in to comment.