Skip to content

Commit 6be4730

Browse files
authored
feat: make parameters of API plugin extendable (#369)
1 parent aa38400 commit 6be4730

File tree

17 files changed

+120
-102
lines changed

17 files changed

+120
-102
lines changed

.github/workflows/github-actions.yml

+14-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
# If you want to matrix build , you can append the following list.
1616
matrix:
1717
go_version:
18-
- 1.15
18+
- 1.14.13
1919
os:
2020
- ubuntu-latest
2121

@@ -69,6 +69,17 @@ jobs:
6969
- name: Post Coverage
7070
run: bash <(curl -s https://codecov.io/bash)
7171

72-
- name: integrate
73-
run: make wasm-integrate
72+
integrate:
73+
name: integrate
74+
runs-on: ubuntu-latest
75+
steps:
76+
- name: Set up Go
77+
uses: actions/setup-go@v2
78+
with:
79+
go-version: 1.14.13
7480

81+
- name: Check out code
82+
uses: actions/checkout@v2
83+
84+
- name: Run Integrate tests.
85+
run: make wasm-integrate

pkg/integrate/api/helloworld/grpc_api.go renamed to cmd/layotto_multiple_api/helloworld/grpc_api.go

+2-21
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,14 @@ package helloworld
1818

1919
import (
2020
"context"
21-
"github.com/dapr/components-contrib/bindings"
22-
"github.com/dapr/components-contrib/pubsub"
23-
"github.com/dapr/components-contrib/state"
2421
rawGRPC "google.golang.org/grpc"
2522
pb "google.golang.org/grpc/examples/helloworld/helloworld"
26-
"mosn.io/layotto/components/configstores"
27-
"mosn.io/layotto/components/file"
28-
"mosn.io/layotto/components/hello"
29-
"mosn.io/layotto/components/lock"
30-
"mosn.io/layotto/components/rpc"
31-
"mosn.io/layotto/components/sequencer"
3223
"mosn.io/layotto/pkg/grpc"
24+
grpc_api "mosn.io/layotto/pkg/grpc"
3325
mgrpc "mosn.io/mosn/pkg/filter/network/grpc"
3426
)
3527

36-
func NewHelloWorldAPI(
37-
appId string,
38-
hellos map[string]hello.HelloService,
39-
configStores map[string]configstores.Store,
40-
rpcs map[string]rpc.Invoker,
41-
pubSubs map[string]pubsub.PubSub,
42-
stateStores map[string]state.Store,
43-
files map[string]file.File,
44-
lockStores map[string]lock.LockStore,
45-
sequencers map[string]sequencer.Store,
46-
sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error),
47-
) grpc.GrpcAPI {
28+
func NewHelloWorldAPI(ac *grpc_api.ApplicationContext) grpc.GrpcAPI {
4829
return &server{}
4930
}
5031

cmd/layotto_multiple_api/main.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@ package main
1919
import (
2020
"encoding/json"
2121
"fmt"
22-
"os"
23-
"strconv"
24-
"time"
25-
2622
"mosn.io/api"
27-
"mosn.io/layotto/diagnostics"
23+
helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld"
2824
"mosn.io/layotto/pkg/grpc/dapr"
2925
"mosn.io/layotto/pkg/grpc/default_api"
30-
helloworld_api "mosn.io/layotto/pkg/integrate/api/helloworld"
26+
"os"
27+
"strconv"
28+
"time"
3129

3230
mock_state "mosn.io/layotto/pkg/mock/components/state"
3331
_ "mosn.io/layotto/pkg/wasm"
@@ -121,6 +119,7 @@ import (
121119

122120
"github.com/urfave/cli"
123121
"google.golang.org/grpc"
122+
"mosn.io/layotto/diagnostics"
124123
_ "mosn.io/layotto/pkg/filter/network/tcpcopy"
125124
"mosn.io/layotto/pkg/runtime"
126125
"mosn.io/mosn/pkg/featuregate"

codecov.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ coverage:
1212

1313
ignore:
1414
# Configure what to ignore.
15-
- "pkg/mock" # - Testing mock.
15+
- "pkg/mock/**/*" # - Testing mock.
1616
# Ignore non-deterministic methods and tests
1717
# codecov might report unexpected changes even if no code has been changed.
1818
# see https://community.codecov.com/t/reported-change-in-coverage-when-no-code-changes-made/1346
@@ -24,4 +24,5 @@ ignore:
2424
- "pkg/grpc/dapr/proto/runtime/v1"
2525
- "pkg/grpc/dapr/dapr_api_unimplement.go"
2626
- "pkg/wasm/watcher.go"
27+
- "cmd/**/*"
2728
- "components/lock/consul/consul_lock_task.go"

components/rpc/callback/callback.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ var (
5757
// to storage BeforeFactory
5858
beforeInvokeRegistry = map[string]BeforeFactory{}
5959
// to storage AfterFactory
60-
afterInvokeRegistry = map[string]AfterFactory{}
60+
afterInvokeRegistry = map[string]AfterFactory{}
6161
)
6262

6363
// NewCallback is created Callback
@@ -84,7 +84,7 @@ func (c *callback) AddBeforeInvoke(conf rpc.CallbackFunc) {
8484
c.beforeInvoke = append(c.beforeInvoke, f.Create())
8585
}
8686

87-
// AddAfterInvoke is add beforeInvoke into callback.afterInvoke
87+
// AddAfterInvoke is used to add beforeInvoke into callback.afterInvoke
8888
func (c *callback) AddAfterInvoke(conf rpc.CallbackFunc) {
8989
f, ok := afterInvokeRegistry[conf.Name]
9090
if !ok {
@@ -98,7 +98,7 @@ func (c *callback) AddAfterInvoke(conf rpc.CallbackFunc) {
9898
c.afterInvoke = append(c.afterInvoke, f.Create())
9999
}
100100

101-
// BeforeInvoke is get RPCRequest in callback.beforeInvoke
101+
// BeforeInvoke is used to invoke beforeInvoke callbacks
102102
func (c *callback) BeforeInvoke(request *rpc.RPCRequest) (*rpc.RPCRequest, error) {
103103
var err error
104104
for _, cb := range c.beforeInvoke {
@@ -109,7 +109,7 @@ func (c *callback) BeforeInvoke(request *rpc.RPCRequest) (*rpc.RPCRequest, error
109109
return request, err
110110
}
111111

112-
// AfterInvoke is get RPCResponse in callback.afterInvoke
112+
// AfterInvoke is used to invoke afterInvoke callbacks
113113
func (c *callback) AfterInvoke(response *rpc.RPCResponse) (*rpc.RPCResponse, error) {
114114
var err error
115115
for _, cb := range c.afterInvoke {

components/rpc/invoker/mosn/channel/connpool.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func newConnPool(
8888
return p
8989
}
9090

91-
9291
// connPool is connected pool
9392
type connPool struct {
9493
maxActive int
@@ -131,6 +130,7 @@ func (p *connPool) Get(ctx context.Context) (*wrapConn, error) {
131130
if p.stateFunc != nil {
132131
wc.state = p.stateFunc()
133132
}
133+
// start a readloop gorountine to read and handle data
134134
if p.onDataFunc != nil {
135135
utils.GoWithRecover(func() {
136136
p.readloop(wc)
@@ -171,6 +171,7 @@ func (p *connPool) readloop(c *wrapConn) {
171171

172172
c.buf = buffer.NewIoBuffer(defaultBufSize)
173173
for {
174+
// read data from connection
174175
n, readErr := c.buf.ReadOnce(c)
175176
if readErr != nil {
176177
err = readErr
@@ -182,6 +183,8 @@ func (p *connPool) readloop(c *wrapConn) {
182183
}
183184

184185
if n > 0 {
186+
// handle data.
187+
// it will delegate to hstate if it's constructed by httpchannel
185188
if onDataErr := p.onDataFunc(c); onDataErr != nil {
186189
err = onDataErr
187190
log.DefaultLogger.Errorf("[runtime][rpc]connpool onData err: %s", onDataErr.Error())

components/rpc/invoker/mosn/channel/httpchannel.go

+25-5
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@ func init() {
3636
RegistChannel("http", newHttpChannel)
3737
}
3838

39+
// hstate is a pipe for readloop goroutine to communicate with request goroutine
3940
type hstate struct {
41+
// request goroutine will read data from it
4042
reader net.Conn
43+
// readloop goroutine will write data to it
4144
writer net.Conn
4245
}
4346

@@ -60,20 +63,30 @@ type httpChannel struct {
6063
pool *connPool
6164
}
6265

63-
// newHttpChannel is create rpc.Channel by ChannelConfig
66+
// newHttpChannel is used to create rpc.Channel according to ChannelConfig
6467
func newHttpChannel(config ChannelConfig) (rpc.Channel, error) {
6568
hc := &httpChannel{}
6669
hc.pool = newConnPool(
6770
config.Size,
71+
// dialFunc
6872
func() (net.Conn, error) {
6973
local, remote := net.Pipe()
7074
localTcpConn := &fakeTcpConn{c: local}
7175
remoteTcpConn := &fakeTcpConn{c: remote}
7276
if err := acceptFunc(remoteTcpConn, config.Listener); err != nil {
7377
return nil, err
7478
}
79+
// the goroutine model is:
80+
// request goroutine ---> localTcpConn ---> mosn
81+
// ^ |
82+
// | |
83+
// | |
84+
// hstate <-- readloop goroutine <------
7585
return localTcpConn, nil
76-
}, func() interface{} {
86+
},
87+
// stateFunc
88+
func() interface{} {
89+
// hstate is a pipe for readloop goroutine to communicate with request goroutine
7790
s := &hstate{}
7891
s.reader, s.writer = net.Pipe()
7992
return s
@@ -84,25 +97,29 @@ func newHttpChannel(config ChannelConfig) (rpc.Channel, error) {
8497
return hc, nil
8598
}
8699

87-
// Do is handle RPCRequest to RPCResponse
100+
// Do is used to handle RPCRequest and return RPCResponse
88101
func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
102+
// 1. context.WithTimeout
89103
timeout := time.Duration(req.Timeout) * time.Millisecond
90104
ctx, cancel := context.WithTimeout(req.Ctx, timeout)
91105
defer cancel()
92106

107+
// 2. get a fake connection with mosn
108+
// The pool will start a readloop gorountine,
109+
// which aims to read data from mosn and then write data to the hstate.writer
93110
conn, err := h.pool.Get(ctx)
94111
if err != nil {
95112
return nil, err
96113
}
97-
114+
// 3. set deadline before write data to this connection
98115
hstate := conn.state.(*hstate)
99116
deadline, _ := ctx.Deadline()
100117
if err = conn.SetWriteDeadline(deadline); err != nil {
101118
hstate.close()
102119
h.pool.Put(conn, true)
103120
return nil, common.Error(common.UnavailebleCode, err.Error())
104121
}
105-
122+
// 4. write data to this fake connection
106123
httpReq := h.constructReq(req)
107124
defer fasthttp.ReleaseRequest(httpReq)
108125

@@ -112,6 +129,7 @@ func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
112129
return nil, common.Error(common.UnavailebleCode, err.Error())
113130
}
114131

132+
// 5. read response data and parse it into fasthttp.Response
115133
httpResp := &fasthttp.Response{}
116134
hstate.reader.SetReadDeadline(deadline)
117135

@@ -121,6 +139,8 @@ func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
121139
return nil, common.Error(common.UnavailebleCode, err.Error())
122140
}
123141
h.pool.Put(conn, false)
142+
143+
// 6. convert result to rpc.RPCResponse,which is the response of rpc invoker
124144
body := httpResp.Body()
125145
if httpResp.StatusCode() != http.StatusOK {
126146
return nil, common.Errorf(common.UnavailebleCode, "http response code %d, body: %s", httpResp.StatusCode(), string(body))

components/rpc/invoker/mosn/channel/xchannel.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) {
5151
m := &xChannel{proto: proto}
5252
m.pool = newConnPool(
5353
config.Size,
54+
// dialFunc
5455
func() (net.Conn, error) {
5556
local, remote := net.Pipe()
5657
localTcpConn := &fakeTcpConn{c: local}
@@ -60,6 +61,7 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) {
6061
}
6162
return localTcpConn, nil
6263
},
64+
// stateFunc
6365
func() interface{} {
6466
return &xstate{calls: map[uint32]chan call{}}
6567
},
@@ -90,18 +92,20 @@ type xChannel struct {
9092

9193
// Do is handle RPCRequest to RPCResponse
9294
func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
95+
// 1. context.WithTimeout
9396
timeout := time.Duration(req.Timeout) * time.Millisecond
9497
ctx, cancel := context.WithTimeout(req.Ctx, timeout)
9598
defer cancel()
9699

100+
// 2. get fake connection with mosn
97101
conn, err := m.pool.Get(ctx)
98102
if err != nil {
99103
return nil, err
100104
}
101105

102106
xstate := conn.state.(*xstate)
103107

104-
// encode request
108+
// 3. encode request
105109
frame := m.proto.ToFrame(req)
106110
id := atomic.AddUint32(&xstate.reqid, 1)
107111
frame.SetRequestId(uint64(id))
@@ -131,6 +135,7 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
131135
}
132136
m.pool.Put(conn, false)
133137

138+
// read response and decode it
134139
select {
135140
case res := <-callChan:
136141
if res.err != nil {

components/rpc/invoker/mosn/mosninvoker.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -89,24 +89,26 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp
8989
}
9090
}()
9191

92+
// 1. validate request
9293
if req.Timeout == 0 {
9394
req.Timeout = 3000
9495
}
9596
req.Ctx = ctx
9697
log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req)
98+
// 2. beforeInvoke callback
9799
req, err = m.cb.BeforeInvoke(req)
98100
if err != nil {
99101
log.DefaultLogger.Errorf("[runtime][rpc]before filter error %s", err.Error())
100102
return nil, err
101103
}
102-
104+
// 3. do invocation
103105
resp, err = m.channel.Do(req)
104106
if err != nil {
105107
log.DefaultLogger.Errorf("[runtime][rpc]error %s", err.Error())
106108
return nil, err
107109
}
108-
109110
resp.Ctx = req.Ctx
111+
// 4. afterInvoke callback
110112
resp, err = m.cb.AfterInvoke(resp)
111113
if err != nil {
112114
log.DefaultLogger.Errorf("[runtime][rpc]after filter error %s", err.Error())

components/rpc/types.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (r RPCHeader) Get(key string) string {
5353
// RPCRequest is request info
5454
type RPCRequest struct {
5555
// context
56-
Ctx context.Context
56+
Ctx context.Context
5757
// request id
5858
Id string
5959
Timeout int32
@@ -88,9 +88,9 @@ type Callback interface {
8888
// AddAfterInvoke is add AfterInvoke func
8989
AddAfterInvoke(CallbackFunc)
9090

91-
// BeforeInvoke is get BeforeInvoke by RPCRequest
91+
// BeforeInvoke is used to invoke beforeInvoke callbacks
9292
BeforeInvoke(*RPCRequest) (*RPCRequest, error)
93-
// AfterInvoke is get AfterInvoke by RPCRequest
93+
// AfterInvoke is used to invoke afterInvoke callbacks
9494
AfterInvoke(*RPCResponse) (*RPCResponse, error)
9595
}
9696

pkg/actuator/info/endpoint.go

+3
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,8 @@ func AddInfoContributor(name string, c Contributor) {
6161

6262
// AddInfoContributorFunc register info.Contributor.It's not concurrent-safe,so please invoke it ONLY in init method.
6363
func AddInfoContributorFunc(name string, f func() (interface{}, error)) {
64+
if f == nil {
65+
return
66+
}
6467
AddInfoContributor(name, ContributorAdapter(f))
6568
}

pkg/actuator/info/endpoint_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ func TestEndpoint_Handle(t *testing.T) {
3838
assert.True(t, err == nil)
3939
assert.True(t, len(handle) == 0)
4040

41+
AddInfoContributorFunc("test", nil)
42+
handle, err = ep.Handle(context.Background(), nil)
43+
assert.True(t, err == nil)
44+
assert.True(t, len(handle) == 0)
45+
4146
AddInfoContributor("test", nil)
4247
handle, err = ep.Handle(context.Background(), nil)
4348
assert.True(t, err == nil)

0 commit comments

Comments
 (0)