Skip to content

Commit 5992302

Browse files
committed
add comments
1 parent dcb2a22 commit 5992302

File tree

6 files changed

+46
-16
lines changed

6 files changed

+46
-16
lines changed

components/rpc/callback/callback.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ var (
5757
// to storage BeforeFactory
5858
beforeInvokeRegistry = map[string]BeforeFactory{}
5959
// to storage AfterFactory
60-
afterInvokeRegistry = map[string]AfterFactory{}
60+
afterInvokeRegistry = map[string]AfterFactory{}
6161
)
6262

6363
// NewCallback is created Callback
@@ -84,7 +84,7 @@ func (c *callback) AddBeforeInvoke(conf rpc.CallbackFunc) {
8484
c.beforeInvoke = append(c.beforeInvoke, f.Create())
8585
}
8686

87-
// AddAfterInvoke is add beforeInvoke into callback.afterInvoke
87+
// AddAfterInvoke is used to add beforeInvoke into callback.afterInvoke
8888
func (c *callback) AddAfterInvoke(conf rpc.CallbackFunc) {
8989
f, ok := afterInvokeRegistry[conf.Name]
9090
if !ok {
@@ -98,7 +98,7 @@ func (c *callback) AddAfterInvoke(conf rpc.CallbackFunc) {
9898
c.afterInvoke = append(c.afterInvoke, f.Create())
9999
}
100100

101-
// BeforeInvoke is get RPCRequest in callback.beforeInvoke
101+
// BeforeInvoke is used to invoke beforeInvoke callbacks
102102
func (c *callback) BeforeInvoke(request *rpc.RPCRequest) (*rpc.RPCRequest, error) {
103103
var err error
104104
for _, cb := range c.beforeInvoke {
@@ -109,7 +109,7 @@ func (c *callback) BeforeInvoke(request *rpc.RPCRequest) (*rpc.RPCRequest, error
109109
return request, err
110110
}
111111

112-
// AfterInvoke is get RPCResponse in callback.afterInvoke
112+
// AfterInvoke is used to invoke afterInvoke callbacks
113113
func (c *callback) AfterInvoke(response *rpc.RPCResponse) (*rpc.RPCResponse, error) {
114114
var err error
115115
for _, cb := range c.afterInvoke {

components/rpc/invoker/mosn/channel/connpool.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func newConnPool(
8888
return p
8989
}
9090

91-
9291
// connPool is connected pool
9392
type connPool struct {
9493
maxActive int
@@ -131,6 +130,7 @@ func (p *connPool) Get(ctx context.Context) (*wrapConn, error) {
131130
if p.stateFunc != nil {
132131
wc.state = p.stateFunc()
133132
}
133+
// start a readloop gorountine to read and handle data
134134
if p.onDataFunc != nil {
135135
utils.GoWithRecover(func() {
136136
p.readloop(wc)
@@ -171,6 +171,7 @@ func (p *connPool) readloop(c *wrapConn) {
171171

172172
c.buf = buffer.NewIoBuffer(defaultBufSize)
173173
for {
174+
// read data from connection
174175
n, readErr := c.buf.ReadOnce(c)
175176
if readErr != nil {
176177
err = readErr
@@ -182,6 +183,8 @@ func (p *connPool) readloop(c *wrapConn) {
182183
}
183184

184185
if n > 0 {
186+
// handle data.
187+
// it will delegate to hstate if it's constructed by httpchannel
185188
if onDataErr := p.onDataFunc(c); onDataErr != nil {
186189
err = onDataErr
187190
log.DefaultLogger.Errorf("[runtime][rpc]connpool onData err: %s", onDataErr.Error())

components/rpc/invoker/mosn/channel/httpchannel.go

+25-5
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@ func init() {
3636
RegistChannel("http", newHttpChannel)
3737
}
3838

39+
// hstate is a pipe for readloop goroutine to communicate with request goroutine
3940
type hstate struct {
41+
// request goroutine will read data from it
4042
reader net.Conn
43+
// readloop goroutine will write data to it
4144
writer net.Conn
4245
}
4346

@@ -60,20 +63,30 @@ type httpChannel struct {
6063
pool *connPool
6164
}
6265

63-
// newHttpChannel is create rpc.Channel by ChannelConfig
66+
// newHttpChannel is used to create rpc.Channel according to ChannelConfig
6467
func newHttpChannel(config ChannelConfig) (rpc.Channel, error) {
6568
hc := &httpChannel{}
6669
hc.pool = newConnPool(
6770
config.Size,
71+
// dialFunc
6872
func() (net.Conn, error) {
6973
local, remote := net.Pipe()
7074
localTcpConn := &fakeTcpConn{c: local}
7175
remoteTcpConn := &fakeTcpConn{c: remote}
7276
if err := acceptFunc(remoteTcpConn, config.Listener); err != nil {
7377
return nil, err
7478
}
79+
// the goroutine model is:
80+
// request goroutine ---> localTcpConn ---> mosn
81+
// ^ |
82+
// | |
83+
// | |
84+
// hstate <-- readloop goroutine <------
7585
return localTcpConn, nil
76-
}, func() interface{} {
86+
},
87+
// stateFunc
88+
func() interface{} {
89+
// hstate is a pipe for readloop goroutine to communicate with request goroutine
7790
s := &hstate{}
7891
s.reader, s.writer = net.Pipe()
7992
return s
@@ -84,25 +97,29 @@ func newHttpChannel(config ChannelConfig) (rpc.Channel, error) {
8497
return hc, nil
8598
}
8699

87-
// Do is handle RPCRequest to RPCResponse
100+
// Do is used to handle RPCRequest and return RPCResponse
88101
func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
102+
// 1. context.WithTimeout
89103
timeout := time.Duration(req.Timeout) * time.Millisecond
90104
ctx, cancel := context.WithTimeout(req.Ctx, timeout)
91105
defer cancel()
92106

107+
// 2. get a fake connection with mosn
108+
// The pool will start a readloop gorountine,
109+
// which aims to read data from mosn and then write data to the hstate.writer
93110
conn, err := h.pool.Get(ctx)
94111
if err != nil {
95112
return nil, err
96113
}
97-
114+
// 3. set deadline before write data to this connection
98115
hstate := conn.state.(*hstate)
99116
deadline, _ := ctx.Deadline()
100117
if err = conn.SetWriteDeadline(deadline); err != nil {
101118
hstate.close()
102119
h.pool.Put(conn, true)
103120
return nil, common.Error(common.UnavailebleCode, err.Error())
104121
}
105-
122+
// 4. write data to this fake connection
106123
httpReq := h.constructReq(req)
107124
defer fasthttp.ReleaseRequest(httpReq)
108125

@@ -112,6 +129,7 @@ func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
112129
return nil, common.Error(common.UnavailebleCode, err.Error())
113130
}
114131

132+
// 5. read response data and parse it into fasthttp.Response
115133
httpResp := &fasthttp.Response{}
116134
hstate.reader.SetReadDeadline(deadline)
117135

@@ -121,6 +139,8 @@ func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
121139
return nil, common.Error(common.UnavailebleCode, err.Error())
122140
}
123141
h.pool.Put(conn, false)
142+
143+
// 6. convert result to rpc.RPCResponse,which is the response of rpc invoker
124144
body := httpResp.Body()
125145
if httpResp.StatusCode() != http.StatusOK {
126146
return nil, common.Errorf(common.UnavailebleCode, "http response code %d, body: %s", httpResp.StatusCode(), string(body))

components/rpc/invoker/mosn/channel/xchannel.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) {
5151
m := &xChannel{proto: proto}
5252
m.pool = newConnPool(
5353
config.Size,
54+
// dialFunc
5455
func() (net.Conn, error) {
5556
local, remote := net.Pipe()
5657
localTcpConn := &fakeTcpConn{c: local}
@@ -60,6 +61,7 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) {
6061
}
6162
return localTcpConn, nil
6263
},
64+
// stateFunc
6365
func() interface{} {
6466
return &xstate{calls: map[uint32]chan call{}}
6567
},
@@ -90,18 +92,20 @@ type xChannel struct {
9092

9193
// Do is handle RPCRequest to RPCResponse
9294
func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
95+
// 1. context.WithTimeout
9396
timeout := time.Duration(req.Timeout) * time.Millisecond
9497
ctx, cancel := context.WithTimeout(req.Ctx, timeout)
9598
defer cancel()
9699

100+
// 2. get fake connection with mosn
97101
conn, err := m.pool.Get(ctx)
98102
if err != nil {
99103
return nil, err
100104
}
101105

102106
xstate := conn.state.(*xstate)
103107

104-
// encode request
108+
// 3. encode request
105109
frame := m.proto.ToFrame(req)
106110
id := atomic.AddUint32(&xstate.reqid, 1)
107111
frame.SetRequestId(uint64(id))
@@ -131,6 +135,7 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
131135
}
132136
m.pool.Put(conn, false)
133137

138+
// read response and decode it
134139
select {
135140
case res := <-callChan:
136141
if res.err != nil {

components/rpc/invoker/mosn/mosninvoker.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -89,24 +89,26 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp
8989
}
9090
}()
9191

92+
// 1. validate request
9293
if req.Timeout == 0 {
9394
req.Timeout = 3000
9495
}
9596
req.Ctx = ctx
9697
log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req)
98+
// 2. beforeInvoke callback
9799
req, err = m.cb.BeforeInvoke(req)
98100
if err != nil {
99101
log.DefaultLogger.Errorf("[runtime][rpc]before filter error %s", err.Error())
100102
return nil, err
101103
}
102-
104+
// 3. do invocation
103105
resp, err = m.channel.Do(req)
104106
if err != nil {
105107
log.DefaultLogger.Errorf("[runtime][rpc]error %s", err.Error())
106108
return nil, err
107109
}
108-
109110
resp.Ctx = req.Ctx
111+
// 4. afterInvoke callback
110112
resp, err = m.cb.AfterInvoke(resp)
111113
if err != nil {
112114
log.DefaultLogger.Errorf("[runtime][rpc]after filter error %s", err.Error())

components/rpc/types.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (r RPCHeader) Get(key string) string {
5353
// RPCRequest is request info
5454
type RPCRequest struct {
5555
// context
56-
Ctx context.Context
56+
Ctx context.Context
5757
// request id
5858
Id string
5959
Timeout int32
@@ -88,9 +88,9 @@ type Callback interface {
8888
// AddAfterInvoke is add AfterInvoke func
8989
AddAfterInvoke(CallbackFunc)
9090

91-
// BeforeInvoke is get BeforeInvoke by RPCRequest
91+
// BeforeInvoke is used to invoke beforeInvoke callbacks
9292
BeforeInvoke(*RPCRequest) (*RPCRequest, error)
93-
// AfterInvoke is get AfterInvoke by RPCRequest
93+
// AfterInvoke is used to invoke afterInvoke callbacks
9494
AfterInvoke(*RPCResponse) (*RPCResponse, error)
9595
}
9696

0 commit comments

Comments
 (0)