-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathring_conn.go
181 lines (160 loc) · 3.62 KB
/
ring_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// +build linux
package iouring
import (
"context"
"net"
"runtime"
"sync"
"syscall"
"time"
"unsafe"
"github.com/pkg/errors"
)
// ringConn is a net.Conn that is backed by the Ring.
type ringConn struct {
fd int
laddr *addr
raddr *addr
r *Ring
offset *int64
stop chan struct{}
poll chan uint64
pollReady *int32
deadMu sync.RWMutex
deadline time.Time
readDeadline time.Time
writeDeadline time.Time
}
// getCqe is used for getting a CQE result.
func (c *ringConn) getCqe(ctx context.Context, reqID uint64) (int, error) {
// TODO: Where should this repoll go?
_, err := c.r.Enter(uint(1024), uint(1), EnterGetEvents, nil)
if err != nil {
return 0, err
}
c.stop <- struct{}{}
var cqe *CompletionEntry
for {
select {
case <-ctx.Done():
return 0, syscall.ETIMEDOUT
default:
}
cqe, err = c.r.cq.EntryBy(reqID)
if err != nil {
// TODO: How many tries should looking for the cqe be
// tried?
if err != ErrEntryNotFound {
continue
}
return 0, err
}
break
}
res := int(cqe.Res)
if res < 0 {
return 0, syscall.Errno(-res)
}
return res, nil
}
func (c *ringConn) rePoll() {
// Reenable the poll on the connection.
id := c.r.ID()
sqe, commit := c.r.SubmitEntry()
sqe.Opcode = PollAdd
sqe.Fd = int32(c.fd)
sqe.UFlags = int32(POLLIN)
sqe.UserData = id
commit()
c.r.Enter(uint(1024), uint(1), EnterGetEvents, nil)
}
func (c *ringConn) run() {
for {
select {
case <-c.stop:
id := c.r.ID()
sqe, commit := c.r.SubmitEntry()
sqe.Opcode = PollRemove
sqe.Fd = int32(c.fd)
sqe.UserData = id
commit()
c.getCqe(context.Background(), id)
return
}
}
}
// Read implements the net.Conn interface.
func (c *ringConn) Read(b []byte) (int, error) {
c.rePoll()
sqe, commit := c.r.SubmitEntry()
if sqe == nil {
return 0, errors.New("ring unavailable")
}
sqe.Opcode = ReadFixed
sqe.Fd = int32(c.fd)
sqe.Len = uint32(len(b))
sqe.Flags = 0
sqe.Addr = (uint64)(uintptr(unsafe.Pointer(&b[0])))
// Use reqId as user data so we can return the request from the
// completion queue.
reqID := c.r.ID()
sqe.UserData = reqID
commit()
ctx := context.Background()
n, err := c.getCqe(ctx, reqID)
runtime.KeepAlive(b)
return n, err
}
// Write implements the net.Conn interface.
func (c *ringConn) Write(b []byte) (n int, err error) {
sqe, commit := c.r.SubmitEntry()
if sqe == nil {
return 0, errors.New("ring unavailable")
}
sqe.Opcode = WriteFixed
sqe.Fd = int32(c.fd)
sqe.Len = uint32(len(b))
sqe.Addr = (uint64)(uintptr(unsafe.Pointer(&b[0])))
// Use reqId as user data so we can return the request from the
// completion queue.
reqID := c.r.ID()
sqe.UserData = reqID
commit()
n, err = c.getCqe(context.Background(), reqID)
runtime.KeepAlive(b)
return n, err
}
// Close implements the net.Conn interface.
func (c *ringConn) Close() error {
c.stop <- struct{}{}
return syscall.Close(c.fd)
}
// LocalAddr implements the net.Conn interface.
func (c *ringConn) LocalAddr() net.Addr {
return c.laddr
}
// RemoteAddr implements the net.Conn interface.
func (c *ringConn) RemoteAddr() net.Addr {
return c.raddr
}
// SetDeadline implements the net.Conn interface.
func (c *ringConn) SetDeadline(t time.Time) error {
c.deadMu.Lock()
c.deadline = t
c.deadMu.Unlock()
return nil
}
// SetReadDeadline implements the net.Conn interface.
func (c *ringConn) SetReadDeadline(t time.Time) error {
c.deadMu.Lock()
c.readDeadline = t
c.deadMu.Unlock()
return nil
}
// SetWriteDeadline the net.Conn interface.
func (c *ringConn) SetWriteDeadline(t time.Time) error {
c.deadMu.Lock()
c.writeDeadline = t
c.deadMu.Unlock()
return nil
}