forked from creachadair/jrpc2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
regression_test.go
243 lines (211 loc) · 6.84 KB
/
regression_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// Copyright (C) 2017 Michael J. Fromberger. All Rights Reserved.
package jrpc2_test
import (
"context"
"strings"
"sync"
"testing"
"time"
"github.com/creachadair/jrpc2"
"github.com/creachadair/jrpc2/channel"
"github.com/creachadair/jrpc2/handler"
"github.com/creachadair/jrpc2/server"
"github.com/fortytw2/leaktest"
"github.com/google/go-cmp/cmp"
)
// Verify that a notification handler will not deadlock with the dispatcher on
// holding the server lock. See: https://github.com/creachadair/jrpc2/issues/27
func TestLockRaceRegression(t *testing.T) {
defer leaktest.Check(t)()
hdone := make(chan struct{})
local := server.NewLocal(handler.Map{
// Do some busy-work and then try to get the server lock, in this case
// via the CancelRequest helper.
"Kill": handler.New(func(ctx context.Context, req *jrpc2.Request) error {
defer close(hdone) // signal we passed the deadlock point
var id string
if err := req.UnmarshalParams(&handler.Args{&id}); err != nil {
return err
}
jrpc2.ServerFromContext(ctx).CancelRequest(id)
return nil
}),
// Block indefinitely, just to give the dispatcher something to do.
"Stall": handler.New(func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}),
}, nil)
defer local.Close()
ctx := context.Background()
local.Client.Notify(ctx, "Kill", handler.Args{"1"})
go local.Client.Call(ctx, "Stall", nil)
select {
case <-time.After(10 * time.Second):
t.Fatal("Notification handler is probably deadlocked")
case <-hdone:
t.Log("Notification handler completed successfully")
}
}
// Verify that if a callback handler panics, the client will report an error
// back to the server. See https://github.com/creachadair/jrpc2/issues/41.
func TestOnCallbackPanicRegression(t *testing.T) {
defer leaktest.Check(t)()
const panicString = "the devil you say"
loc := server.NewLocal(handler.Map{
"Test": handler.New(func(ctx context.Context) error {
rsp, err := jrpc2.ServerFromContext(ctx).Callback(ctx, "Poke", nil)
if err == nil {
t.Errorf("Callback unexpectedly succeeded: %#q", rsp.ResultString())
} else if !strings.HasSuffix(err.Error(), panicString) {
t.Errorf("Callback reported unexpected error: %v", err)
} else {
t.Logf("Callback reported expected error: %v", err)
}
return nil
}),
}, &server.LocalOptions{
Server: &jrpc2.ServerOptions{
AllowPush: true,
},
Client: &jrpc2.ClientOptions{
OnCallback: func(ctx context.Context, req *jrpc2.Request) (interface{}, error) {
t.Log("Entering callback handler; about to panic")
panic(panicString)
},
},
})
defer loc.Close()
if _, err := loc.Client.Call(context.Background(), "Test", nil); err != nil {
t.Errorf("Call unexpectedly failed: %v", err)
}
}
// Verify that a duplicate request ID that arrives while a task is in flight
// does not cause the existing task to be cancelled.
func TestDuplicateIDCancellation(t *testing.T) {
defer leaktest.Check(t)()
tctx, cancel := context.WithCancel(context.Background())
defer cancel()
// This channel is closed when the test method is running.
ready := make(chan struct{})
cch, sch := channel.Direct()
srv := jrpc2.NewServer(handler.Map{
"Test": handler.New(func(ctx context.Context, req *jrpc2.Request) error {
t.Logf("Test method running, request ID %q", req.ID())
close(ready)
select {
case <-tctx.Done():
t.Log("Request ending normally (test signal)")
case <-ctx.Done():
t.Error("Request was unexpected cancelled by the server")
}
return nil
}),
}, nil).Start(sch)
send := func(s string) {
if err := cch.Send([]byte(s)); err != nil {
t.Errorf("Send %#q failed: %v", s, err)
}
}
expect := func(s string) {
bits, err := cch.Recv()
if err != nil {
t.Errorf("Recv failed: %v", err)
} else if got := string(bits); got != s {
t.Errorf("Recv: got %#q, want %#q", got, s)
}
}
const duplicateReq = `{"jsonrpc":"2.0", "id":1, "method":"Test"}`
// Send the first request and wait for the handler to start.
send(duplicateReq)
<-ready
// Send the duplicate, which should report an error.
send(duplicateReq)
expect(`{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"duplicate request ID","data":"1"}}`)
// Unblock the handler, which should now complete. If the duplicate request
// caused the handler to cancel, it will have logged an error to fail the test.
cancel()
expect(`{"jsonrpc":"2.0","id":1,"result":null}`)
cch.Close()
srv.Wait()
}
func TestCheckBatchDuplicateID(t *testing.T) {
defer leaktest.Check(t)()
srv, cli := channel.Direct()
s := jrpc2.NewServer(handler.Map{
"Test": testOK,
}, nil).Start(srv)
defer func() {
cli.Close()
if err := s.Wait(); err != nil {
t.Errorf("Server wait: unexpected error: %v", err)
}
}()
// A batch of requests containing two calls with the same ID.
const input = `[
{"jsonrpc": "2.0", "id": 1, "method": "Test"},
{"jsonrpc": "2.0", "id": 1, "method": "Test"},
{"jsonrpc": "2.0", "id": 2, "method": "Test"}
]
`
const errorReply = `{` +
`"jsonrpc":"2.0",` +
`"id":1,` +
`"error":{"code":-32600,"message":"duplicate request ID","data":"1"}` +
`}`
const want = `[` + errorReply + `,` + errorReply + `,` + `{"jsonrpc":"2.0","id":2,"result":"OK"}]`
if err := cli.Send([]byte(input)); err != nil {
t.Fatalf("Send %d bytes failed: %v", len(input), err)
}
rsp, err := cli.Recv()
if err != nil {
t.Fatalf("Recv failed: %v", err)
}
if diff := cmp.Diff(want, string(rsp)); diff != "" {
t.Errorf("Server response: (-want, +got)\n%s", diff)
}
}
// Verify that callbacks from notification handlers cannot deadlock on delivery
// of their own replies. Reported in #78, test case courtesy of @radeksimko.
func TestServer_NotificationCallbackDeadlock(t *testing.T) {
defer leaktest.Check(t)()
var wg sync.WaitGroup
loc := server.NewLocal(handler.Map{
"NotifyMe": handler.New(func(ctx context.Context) error {
defer wg.Done()
if _, err := jrpc2.ServerFromContext(ctx).Callback(ctx, "succeed", nil); err != nil {
t.Errorf("Callback failed: %v", err)
}
return nil
}),
}, &server.LocalOptions{
Server: &jrpc2.ServerOptions{AllowPush: true},
Client: &jrpc2.ClientOptions{
OnCallback: func(ctx context.Context, req *jrpc2.Request) (interface{}, error) {
switch req.Method() {
case "succeed":
return true, nil
}
panic("broken test: you should not see this")
},
},
})
defer loc.Close()
ctx := context.Background()
// Call the notification method that posts a callback.
wg.Add(2)
if err := loc.Client.Notify(ctx, "NotifyMe", nil); err != nil {
t.Fatalf("Notify: unexpected error: %v", err)
}
if err := loc.Client.Notify(ctx, "NotifyMe", nil); err != nil {
t.Fatalf("Notify: unexpected error: %v", err)
}
done := make(chan struct{})
go func() { wg.Wait(); close(done) }()
select {
case <-done:
// all is well
case <-time.After(5 * time.Second):
t.Fatal("Timed out waiting for callbacks to return")
}
}