-
Notifications
You must be signed in to change notification settings - Fork 6
/
client.go
120 lines (93 loc) · 2.23 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package http2
import (
"container/list"
"sync"
"time"
"github.com/valyala/fasthttp"
)
const DefaultPingInterval = time.Second * 3
// ClientOpts defines the client options for the HTTP/2 connection.
type ClientOpts struct {
// PingInterval defines the interval in which the client will ping the server.
//
// An interval of 0 will make the library to use DefaultPingInterval. Because ping intervals can't be disabled.
PingInterval time.Duration
// OnRTT is assigned to every client after creation, and the handler
// will be called after every RTT measurement (after receiving a PONG message).
OnRTT func(time.Duration)
}
// Ctx represents a context for a stream. Every stream is related to a context.
type Ctx struct {
Request *fasthttp.Request
Response *fasthttp.Response
Err chan error
}
type Client struct {
d *Dialer
// TODO: impl rtt
onRTT func(time.Duration)
lck sync.Mutex
conns list.List
}
func createClient(d *Dialer, opts ClientOpts) *Client {
cl := &Client{
d: d,
onRTT: opts.OnRTT,
}
return cl
}
func (cl *Client) onConnectionDropped(c *Conn) {
cl.lck.Lock()
defer cl.lck.Unlock()
for e := cl.conns.Front(); e != nil; e = e.Next() {
if e.Value.(*Conn) == c {
cl.conns.Remove(e)
_, _, _ = cl.createConn()
break
}
}
}
func (cl *Client) createConn() (*Conn, *list.Element, error) {
c, err := cl.d.Dial(ConnOpts{
PingInterval: cl.d.PingInterval,
OnDisconnect: cl.onConnectionDropped,
})
if err != nil {
return nil, nil, err
}
return c, cl.conns.PushFront(c), nil
}
func (cl *Client) Do(req *fasthttp.Request, res *fasthttp.Response) (err error) {
var c *Conn
cl.lck.Lock()
var next *list.Element
for e := cl.conns.Front(); c == nil; e = next {
if e != nil {
c = e.Value.(*Conn)
} else {
c, e, err = cl.createConn()
if err != nil {
return err
}
}
// if we can't open a stream, then move on to the next one.
if !c.CanOpenStream() {
c = nil
next = e.Next()
}
// if the connection has been closed, then just remove the connection.
if c != nil && c.Closed() {
next = e.Next()
cl.conns.Remove(e)
c = nil
}
}
cl.lck.Unlock()
ch := make(chan error, 1)
c.Write(&Ctx{
Request: req,
Response: res,
Err: ch,
})
return <-ch
}