Skip to content

Commit

Permalink
Remove the non-standard built-in cancellation protocol (#50)
Browse files Browse the repository at this point in the history
Since the addition of the OnCancel hook for the client, and the export of CancelRequest 
on the server, it is no longer necessary to have non-standard built-in logic for propagating
client-side cancellation to the server.

Remove the client-side support for sending rpc.cancel notifications, and also remove the
server-side magic for responding to it. Update tests to exercise calling back up to the server
from a cancellation hook.

This is a BREAKING CHANGE.

Fixes #49.
  • Loading branch information
creachadair authored Jul 29, 2021
1 parent c57ee99 commit b74fa7f
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 107 deletions.
10 changes: 1 addition & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type Client struct {
chook func(*Client, *Response)

allow1 bool // tolerate v1 replies with no version marker
allowC bool // send rpc.cancel when a request context ends

mu sync.Mutex // protects the fields below
ch channel.Channel // channel to the server
Expand All @@ -40,7 +39,6 @@ func NewClient(ch channel.Channel, opts *ClientOptions) *Client {
done: make(chan struct{}),
log: opts.logger(),
allow1: opts.allowV1(),
allowC: opts.allowCancel(),
enctx: opts.encodeContext(),
snote: opts.handleNotification(),
scall: opts.handleCallback(),
Expand Down Expand Up @@ -265,19 +263,13 @@ func (c *Client) waitComplete(pctx context.Context, id string, p *Response) {
E: jerr,
}

// Inform the server, best effort only. N.B. Use a background context here,
// as the original context has ended by the time we get here.
// If there is a cancellation hook, give it a chance to run.
if c.chook != nil {
cleanup = func() {
p.wait() // ensure the response has settled
c.log("Calling OnCancel for id %q", id)
c.chook(c, p)
}
} else if c.allowC {
cleanup = func() {
c.log("Sending rpc.cancel for id %q to the server", id)
c.Notify(context.Background(), rpcCancel, []json.RawMessage{json.RawMessage(id)})
}
}
}

Expand Down
35 changes: 9 additions & 26 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,6 @@ not want to do anything for a notification, it can query the request:
}
Cancellation
The *jrpc2.Client and *jrpc2.Server types support a non-standard cancellation
protocol, consisting of a notification method "rpc.cancel" taking an array of
request IDs to be cancelled. The server cancels the context of each method
handler whose ID is named.
When the context associated with a client request is cancelled, the client
sends an "rpc.cancel" notification to the server for that request's ID. The
"rpc.cancel" method is automatically handled (unless disabled) by the
*jrpc2.Server implementation from this package.
Services with Multiple Methods
The example above shows a server with one method using handler.New. To
Expand Down Expand Up @@ -200,21 +187,17 @@ calls that overlap: If the caller needs to ensure that call A completes before
call B starts, it must wait for A to return before invoking B.
Non-Standard Extension Methods
By default, a *jrpc2.Server exports the following built-in non-standard
extension methods:
The "rpc.serverInfo" method takes no parameters and returns a jrpc2.ServerInfo
value giving server metrics.
Built-in Methods
The "rpc.cancel" mmethod takes an array of request IDs, and instructs the
server to terminate the in-flight requests with those IDs. This method works
only as a notification, and will report an error if invoked as a call. Request
IDs not recognized by the server are silently ignored.
Per the JSON-RPC 2.0 spec, method names beginning with "rpc." are reserved by
the implementation. By default, a server does not dispatch these methods to its
assigner. In this configuration, the server exports a "rpc.serverInfo" method
taking no parameters and returning a jrpc2.ServerInfo value.
These extension methods are enabled by default, but may be disabled by setting
the DisableBuiltin server option to true when constructing the server.
Setting the DisableBuiltin option to true in the ServerOptions removes special
treatment of "rpc." method names, and disables the rpc.serverInfo handler.
When this option is true, method names beginning with "rpc." will be dispatched
to the assigner like any other method.
Server Push
Expand Down
23 changes: 8 additions & 15 deletions internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,17 @@ func (h hmap) Names() []string { return nil }
// will terminate and report failure.
func TestClientCancellation(t *testing.T) {
started := make(chan struct{})
stopped := make(chan bool, 1)
stopped := make(chan struct{})
cpipe, spipe := channel.Direct()
srv := NewServer(hmap{
"Hang": methodFunc(func(ctx context.Context, _ *Request) (interface{}, error) {
close(started) // signal that the method handler is running
defer close(stopped)

t.Log("Waiting for context completion...")
select {
case <-ctx.Done():
t.Logf("Server context cancelled: err=%v", ctx.Err())
stopped <- true
return true, ctx.Err()
case <-stopped:
t.Log("Server unblocked by client completing")
return true, nil
case <-time.After(10 * time.Second):
t.Error("Server timed out before completion")
return false, nil
}
}),
Expand Down Expand Up @@ -195,18 +192,14 @@ func TestClientCancellation(t *testing.T) {
// The call should fail client side, in the usual way for a cancellation.
rsp := rsps[0]
rsp.wait()
close(stopped)
if err := rsp.Error(); err != nil {
if err.Code != code.Cancelled {
t.Errorf("Response error for %q: got %v, want %v", rsp.ID(), err, code.Cancelled)
}
} else {
t.Errorf("Response for %q: unexpectedly succeeded", rsp.ID())
}

// The server handler should have reported a cancellation.
if ok := <-stopped; !ok {
t.Error("Server context was not cancelled")
}
}

func TestSpecialMethods(t *testing.T) {
Expand All @@ -219,7 +212,7 @@ func TestSpecialMethods(t *testing.T) {
}),
}, nil)
ctx := context.Background()
for _, name := range []string{rpcServerInfo, rpcCancel, "donkeybait"} {
for _, name := range []string{rpcServerInfo, "donkeybait"} {
if got := s.assign(ctx, name); got == nil {
t.Errorf("s.assign(%s): no method assigned", name)
}
Expand All @@ -240,7 +233,7 @@ func TestDisableBuiltin(t *testing.T) {
ctx := context.Background()

// With builtins disabled, the default rpc.* methods should not get assigned.
for _, name := range []string{rpcServerInfo, rpcCancel} {
for _, name := range []string{rpcServerInfo} {
if got := s.assign(ctx, name); got != nil {
t.Errorf("s.assign(%s): got %+v, wanted nil", name, got)
}
Expand Down
39 changes: 22 additions & 17 deletions jrpc2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,42 +790,46 @@ func TestDeadServerPush(t *testing.T) {

// Verify that an OnCancel hook is called when expected.
func TestOnCancel(t *testing.T) {
// Set up a plumbing context so the test can unblock the server.
sctx, cancelServer := context.WithCancel(context.Background())
defer cancelServer()
hooked := make(chan struct{}) // closed when hook notification is finished

loc := server.NewLocal(handler.Map{
// Block until explicitly cancelled, via sctx.
"Stall": handler.New(func(_ context.Context) error {
// Block until explicitly cancelled or a long timeout expires.
"Stall": handler.New(func(ctx context.Context) error {
select {
case <-sctx.Done():
t.Logf("Server unblocked; returning err=%v", sctx.Err())
return sctx.Err()
case <-ctx.Done():
t.Logf("Method unblocked; returning err=%v", ctx.Err())
return ctx.Err()
case <-time.After(10 * time.Second): // shouldn't happen
t.Error("Timeout waiting for server cancellation")
}
return nil
}),

// Verify that setting the cancellation hook prevents the client from
// sending the default rpc.cancel notification.
"rpc.cancel": handler.New(func(ctx context.Context, ids json.RawMessage) error {
t.Errorf("Server-side rpc.cancel unexpectedly called: %s", string(ids))
// Cancel the specified request (notification only).
"computerSaysNo": handler.New(func(ctx context.Context, ids []string) error {
defer close(hooked)
if req := jrpc2.InboundRequest(ctx); !req.IsNotification() {
return jrpc2.Errorf(code.MethodNotFound, "no such method %q", req.Method())
}
srv := jrpc2.ServerFromContext(ctx)
for _, id := range ids {
srv.CancelRequest(id)
t.Logf("In cancellation handler, cancelled request id=%v", id)
}
return nil
}),
}, &server.LocalOptions{
// Disable handling of built-in methods on the server.
Server: &jrpc2.ServerOptions{DisableBuiltin: true},
Client: &jrpc2.ClientOptions{
OnCancel: func(cli *jrpc2.Client, rsp *jrpc2.Response) {
t.Logf("OnCancel hook called with id=%q, err=%v", rsp.ID(), rsp.Error())
cancelServer()
cli.Notify(context.Background(), "computerSaysNo", []string{rsp.ID()})
},
},
})

// Call a method on the server that will stall until cancelServer is called.
// Call a method on the server that will stall until its context terminates.
// On the client side, set a deadline to expire the caller's context.
// The cancellation hook will unblock the server.
// The cancellation hook will notify the server to unblock the method.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

Expand All @@ -835,6 +839,7 @@ func TestOnCancel(t *testing.T) {
} else if err != context.DeadlineExceeded {
t.Errorf("Stall: got error %v, want %v", err, context.Canceled)
}
<-hooked
loc.Client.Close()
if err := loc.Server.Wait(); err != nil {
t.Errorf("Server exit status: %v", err)
Expand Down
8 changes: 1 addition & 7 deletions opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ type ClientOptions struct {
// required "jsonrpc" version marker.
AllowV1 bool

// Instructs the client not to send rpc.cancel notifications to the server
// when the context for an in-flight request terminates.
DisableCancel bool

// If set, this function is called with the context, method name, and
// encoded request parameters before the request is sent to the server.
// Its return value replaces the request parameters. This allows the client
Expand All @@ -165,7 +161,6 @@ type ClientOptions struct {
// The function receives the client and the response that was cancelled.
// The hook can obtain the ID and error value from rsp.
//
// Setting this option disables the default rpc.cancel handling (as DisableCancel).
// Note that the hook does not receive the client context, which has already
// ended by the time the hook is called.
OnCancel func(cli *Client, rsp *Response)
Expand All @@ -179,8 +174,7 @@ func (c *ClientOptions) logger() logger {
return func(msg string, args ...interface{}) { logger.Output(2, fmt.Sprintf(msg, args...)) }
}

func (c *ClientOptions) allowV1() bool { return c != nil && c.AllowV1 }
func (c *ClientOptions) allowCancel() bool { return c == nil || !c.DisableCancel }
func (c *ClientOptions) allowV1() bool { return c != nil && c.AllowV1 }

type encoder = func(context.Context, string, json.RawMessage) (json.RawMessage, error)

Expand Down
4 changes: 1 addition & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s *Server) setContext(t *task, id string) bool {
t.ctx = context.WithValue(base, inboundRequestKey{}, t.hreq)

// Store the cancellation for a request that needs a reply, so that we can
// respond to rpc.cancel requests.
// respond to cancellation requests.
if id != "" {
ctx, cancel := context.WithCancel(t.ctx)
s.used[id] = cancel
Expand Down Expand Up @@ -620,8 +620,6 @@ func (s *Server) assign(ctx context.Context, name string) Handler {
switch name {
case rpcServerInfo:
return methodFunc(s.handleRPCServerInfo)
case rpcCancel:
return methodFunc(s.handleRPCCancel)
default:
return nil // reserved
}
Expand Down
35 changes: 5 additions & 30 deletions special.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,20 @@ package jrpc2

import (
"context"
"encoding/json"

"github.com/creachadair/jrpc2/code"
)

const (
rpcServerInfo = "rpc.serverInfo"
rpcCancel = "rpc.cancel"
)

// Handle the special rpc.cancel notification, that requests cancellation of a
// set of pending methods. This only works if issued as a notification.
func (s *Server) handleRPCCancel(ctx context.Context, req *Request) (interface{}, error) {
if !InboundRequest(ctx).IsNotification() {
return nil, code.MethodNotFound.Err()
}
var ids []json.RawMessage
if err := req.UnmarshalParams(&ids); err != nil {
return nil, err
}
s.cancelRequests(ids)
return nil, nil
}

func (s *Server) cancelRequests(ids []json.RawMessage) {
s.mu.Lock()
defer s.mu.Unlock()
for _, raw := range ids {
id := string(raw)
if s.cancel(id) {
s.log("Cancelled request %s by client order", id)
}
}
}

// CancelRequest instructs s to cancel the pending or in-flight request with
// the specified ID. If no request exists with that ID, this is a no-op.
func (s *Server) CancelRequest(id string) {
s.cancelRequests([]json.RawMessage{json.RawMessage(id)})
s.mu.Lock()
defer s.mu.Unlock()
if s.cancel(id) {
s.log("Cancelled request %s by client order", id)
}
}

// methodFunc is a replication of handler.Func redeclared to avert a cycle.
Expand Down

0 comments on commit b74fa7f

Please sign in to comment.