From 5058fca209664e9f2ccab680e6a920b1503f4dfa Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Fri, 17 Dec 2021 16:31:11 +0800 Subject: [PATCH 1/8] parallel tests --- .github/workflows/github-actions.yml | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 0c311740bf..352d6b29e1 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -69,6 +69,17 @@ jobs: - name: Post Coverage run: bash <(curl -s https://codecov.io/bash) - - name: integrate - run: make wasm-integrate + integrate: + name: integrate + runs-on: ubuntu-latest + steps: + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.15 + - name: Check out code + uses: actions/checkout@v2 + + - name: Run Integrate tests. + run: make wasm-integrate From c8c1c4554b1d2c68669ec51dfc9e1f0b9eb7bfdf Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Sat, 18 Dec 2021 00:02:49 +0800 Subject: [PATCH 2/8] modify parameter --- pkg/grpc/dapr/dapr_api.go | 22 ++++++-------------- pkg/grpc/dapr/dapr_api_test.go | 7 +++++-- pkg/grpc/default_api/api.go | 17 ++++------------ pkg/grpc/grpc_api.go | 24 +++++++++++++--------- pkg/integrate/api/helloworld/grpc_api.go | 23 ++------------------- pkg/runtime/runtime.go | 26 +++++++++++++----------- 6 files changed, 45 insertions(+), 74 deletions(-) diff --git a/pkg/grpc/dapr/dapr_api.go b/pkg/grpc/dapr/dapr_api.go index aba19121dc..f89cbb30a3 100644 --- a/pkg/grpc/dapr/dapr_api.go +++ b/pkg/grpc/dapr/dapr_api.go @@ -158,28 +158,18 @@ func (d *daprGrpcAPI) InvokeBinding(ctx context.Context, in *runtime.InvokeBindi // NewDaprAPI_Alpha construct a grpc_api.GrpcAPI which implements DaprServer. // Currently it only support Dapr's InvokeService and InvokeBinding API. // Note: this feature is still in Alpha state and we don't recommend that you use it in your production environment. -func NewDaprAPI_Alpha( - appId string, - hellos map[string]hello.HelloService, - configStores map[string]configstores.Store, - rpcs map[string]rpc.Invoker, - pubSubs map[string]pubsub.PubSub, - stateStores map[string]state.Store, - files map[string]file.File, - lockStores map[string]lock.LockStore, - sequencers map[string]sequencer.Store, - sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), -) grpc_api.GrpcAPI { +func NewDaprAPI_Alpha(ac *grpc_api.ApplicationContext) grpc_api.GrpcAPI { // filter out transactionalStateStores transactionalStateStores := map[string]state.TransactionalStore{} - for key, store := range stateStores { + for key, store := range ac.StateStores { if state.FeatureTransactional.IsPresent(store.Features()) { transactionalStateStores[key] = store.(state.TransactionalStore) } } - return NewDaprServer(appId, hellos, configStores, rpcs, pubSubs, - stateStores, transactionalStateStores, - files, lockStores, sequencers, sendToOutputBindingFn) + return NewDaprServer(ac.AppId, + ac.Hellos, ac.ConfigStores, ac.Rpcs, ac.PubSubs, ac.StateStores, transactionalStateStores, + ac.Files, ac.LockStores, ac.Sequencers, + ac.SendToOutputBindingFn) } func NewDaprServer( diff --git a/pkg/grpc/dapr/dapr_api_test.go b/pkg/grpc/dapr/dapr_api_test.go index e747eb4dfe..35524cf292 100644 --- a/pkg/grpc/dapr/dapr_api_test.go +++ b/pkg/grpc/dapr/dapr_api_test.go @@ -24,6 +24,7 @@ import ( "github.com/golang/mock/gomock" "github.com/phayes/freeport" "github.com/stretchr/testify/assert" + grpc_api "mosn.io/layotto/pkg/grpc" mock_state "mosn.io/layotto/pkg/mock/components/state" "net" "testing" @@ -63,13 +64,15 @@ func TestNewDaprAPI_Alpha(t *testing.T) { mockTxStore, } // construct API - grpcAPI := NewDaprAPI_Alpha("", nil, nil, nil, nil, map[string]state.Store{"mock": store}, nil, nil, nil, + grpcAPI := NewDaprAPI_Alpha(&grpc_api.ApplicationContext{ + "", nil, nil, nil, nil, + map[string]state.Store{"mock": store}, nil, nil, nil, func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { if name == "error-binding" { return nil, errors.New("error when invoke binding") } return &bindings.InvokeResponse{Data: []byte("ok")}, nil - }) + }}) err := grpcAPI.Init(nil) if err != nil { t.Errorf("grpcAPI.Init error") diff --git a/pkg/grpc/default_api/api.go b/pkg/grpc/default_api/api.go index 3eb132db99..88c1e7c450 100644 --- a/pkg/grpc/default_api/api.go +++ b/pkg/grpc/default_api/api.go @@ -159,19 +159,10 @@ func (a *api) Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) return registeredServer, nil } -func NewGrpcAPI( - appId string, - hellos map[string]hello.HelloService, - configStores map[string]configstores.Store, - rpcs map[string]rpc.Invoker, - pubSubs map[string]pubsub.PubSub, - stateStores map[string]state.Store, - files map[string]file.File, - lockStores map[string]lock.LockStore, - sequencers map[string]sequencer.Store, - sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), -) grpc_api.GrpcAPI { - return NewAPI(appId, hellos, configStores, rpcs, pubSubs, stateStores, files, lockStores, sequencers, sendToOutputBindingFn) +func NewGrpcAPI(ac *grpc_api.ApplicationContext) grpc_api.GrpcAPI { + return NewAPI(ac.AppId, + ac.Hellos, ac.ConfigStores, ac.Rpcs, ac.PubSubs, ac.StateStores, ac.Files, ac.LockStores, ac.Sequencers, + ac.SendToOutputBindingFn) } func NewAPI( diff --git a/pkg/grpc/grpc_api.go b/pkg/grpc/grpc_api.go index 4975a51d11..09f31596fd 100644 --- a/pkg/grpc/grpc_api.go +++ b/pkg/grpc/grpc_api.go @@ -40,14 +40,18 @@ type GrpcAPI interface { // NewGrpcAPI is the constructor of GrpcAPI type NewGrpcAPI func( - appId string, - hellos map[string]hello.HelloService, - configStores map[string]configstores.Store, - rpcs map[string]rpc.Invoker, - pubSubs map[string]pubsub.PubSub, - stateStores map[string]state.Store, - files map[string]file.File, - lockStores map[string]lock.LockStore, - sequencers map[string]sequencer.Store, - sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), + applicationContext *ApplicationContext, ) GrpcAPI + +type ApplicationContext struct { + AppId string + Hellos map[string]hello.HelloService + ConfigStores map[string]configstores.Store + Rpcs map[string]rpc.Invoker + PubSubs map[string]pubsub.PubSub + StateStores map[string]state.Store + Files map[string]file.File + LockStores map[string]lock.LockStore + Sequencers map[string]sequencer.Store + SendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) +} diff --git a/pkg/integrate/api/helloworld/grpc_api.go b/pkg/integrate/api/helloworld/grpc_api.go index 52e16f7285..f12bb1a65d 100644 --- a/pkg/integrate/api/helloworld/grpc_api.go +++ b/pkg/integrate/api/helloworld/grpc_api.go @@ -18,33 +18,14 @@ package helloworld import ( "context" - "github.com/dapr/components-contrib/bindings" - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/components-contrib/state" rawGRPC "google.golang.org/grpc" pb "google.golang.org/grpc/examples/helloworld/helloworld" - "mosn.io/layotto/components/configstores" - "mosn.io/layotto/components/file" - "mosn.io/layotto/components/hello" - "mosn.io/layotto/components/lock" - "mosn.io/layotto/components/rpc" - "mosn.io/layotto/components/sequencer" "mosn.io/layotto/pkg/grpc" + grpc_api "mosn.io/layotto/pkg/grpc" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" ) -func NewHelloWorldAPI( - appId string, - hellos map[string]hello.HelloService, - configStores map[string]configstores.Store, - rpcs map[string]rpc.Invoker, - pubSubs map[string]pubsub.PubSub, - stateStores map[string]state.Store, - files map[string]file.File, - lockStores map[string]lock.LockStore, - sequencers map[string]sequencer.Store, - sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), -) grpc.GrpcAPI { +func NewHelloWorldAPI(ac *grpc_api.ApplicationContext) grpc.GrpcAPI { return &server{} } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 9ecb44a31c..5226ea8067 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -156,19 +156,21 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { } // create GrpcAPIs var apis []grpc.GrpcAPI + ac := &grpc.ApplicationContext{ + m.runtimeConfig.AppManagement.AppId, + m.hellos, + m.configStores, + m.rpcs, + m.pubSubs, + m.states, + m.files, + m.locks, + m.sequencers, + m.sendToOutputBinding, + } + for _, apiFactory := range o.apiFactorys { - api := apiFactory( - m.runtimeConfig.AppManagement.AppId, - m.hellos, - m.configStores, - m.rpcs, - m.pubSubs, - m.states, - m.files, - m.locks, - m.sequencers, - m.sendToOutputBinding, - ) + api := apiFactory(ac) // init the GrpcAPI if err := api.Init(m.AppCallbackConn); err != nil { return nil, err From a9508314f4b155e08f648b096912a25a9bf6c902 Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Sat, 18 Dec 2021 13:35:36 +0800 Subject: [PATCH 3/8] move helloworld to a different package --- .../api => cmd/layotto_multiple_api}/helloworld/grpc_api.go | 0 cmd/layotto_multiple_api/main.go | 2 +- pkg/grpc/grpc_api.go | 4 +--- 3 files changed, 2 insertions(+), 4 deletions(-) rename {pkg/integrate/api => cmd/layotto_multiple_api}/helloworld/grpc_api.go (100%) diff --git a/pkg/integrate/api/helloworld/grpc_api.go b/cmd/layotto_multiple_api/helloworld/grpc_api.go similarity index 100% rename from pkg/integrate/api/helloworld/grpc_api.go rename to cmd/layotto_multiple_api/helloworld/grpc_api.go diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go index bdc07e9b3a..d6a31d28c9 100644 --- a/cmd/layotto_multiple_api/main.go +++ b/cmd/layotto_multiple_api/main.go @@ -19,9 +19,9 @@ package main import ( "encoding/json" "fmt" + helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld" "mosn.io/layotto/pkg/grpc/dapr" "mosn.io/layotto/pkg/grpc/default_api" - helloworld_api "mosn.io/layotto/pkg/integrate/api/helloworld" "os" "strconv" "time" diff --git a/pkg/grpc/grpc_api.go b/pkg/grpc/grpc_api.go index 09f31596fd..08aa1cb611 100644 --- a/pkg/grpc/grpc_api.go +++ b/pkg/grpc/grpc_api.go @@ -39,9 +39,7 @@ type GrpcAPI interface { } // NewGrpcAPI is the constructor of GrpcAPI -type NewGrpcAPI func( - applicationContext *ApplicationContext, -) GrpcAPI +type NewGrpcAPI func(applicationContext *ApplicationContext) GrpcAPI type ApplicationContext struct { AppId string From 002807d039190c350aa773db91cc459d32c72587 Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Sat, 18 Dec 2021 13:47:11 +0800 Subject: [PATCH 4/8] improve ut --- pkg/actuator/info/endpoint.go | 3 +++ pkg/actuator/info/endpoint_test.go | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/pkg/actuator/info/endpoint.go b/pkg/actuator/info/endpoint.go index 65d6f6424e..987c74524b 100644 --- a/pkg/actuator/info/endpoint.go +++ b/pkg/actuator/info/endpoint.go @@ -61,5 +61,8 @@ func AddInfoContributor(name string, c Contributor) { // AddInfoContributorFunc register info.Contributor.It's not concurrent-safe,so please invoke it ONLY in init method. func AddInfoContributorFunc(name string, f func() (interface{}, error)) { + if f == nil { + return + } AddInfoContributor(name, ContributorAdapter(f)) } diff --git a/pkg/actuator/info/endpoint_test.go b/pkg/actuator/info/endpoint_test.go index d04ed4ef78..c49dbd5861 100644 --- a/pkg/actuator/info/endpoint_test.go +++ b/pkg/actuator/info/endpoint_test.go @@ -38,6 +38,11 @@ func TestEndpoint_Handle(t *testing.T) { assert.True(t, err == nil) assert.True(t, len(handle) == 0) + AddInfoContributorFunc("test", nil) + handle, err = ep.Handle(context.Background(), nil) + assert.True(t, err == nil) + assert.True(t, len(handle) == 0) + AddInfoContributor("test", nil) handle, err = ep.Handle(context.Background(), nil) assert.True(t, err == nil) From 371ffa69a06376a45ef0a5e92ef0556b8de8a677 Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Sat, 18 Dec 2021 13:55:11 +0800 Subject: [PATCH 5/8] modify codecov.yml --- codecov.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/codecov.yml b/codecov.yml index 899a28af85..67592ca474 100644 --- a/codecov.yml +++ b/codecov.yml @@ -12,7 +12,7 @@ coverage: ignore: # Configure what to ignore. - - "pkg/mock" # - Testing mock. + - "pkg/mock/**/*" # - Testing mock. # Ignore non-deterministic methods and tests # codecov might report unexpected changes even if no code has been changed. # see https://community.codecov.com/t/reported-change-in-coverage-when-no-code-changes-made/1346 @@ -24,3 +24,4 @@ ignore: - "pkg/grpc/dapr/proto/runtime/v1" - "pkg/grpc/dapr/dapr_api_unimplement.go" - "pkg/wasm/watcher.go" + - "cmd/**/*" From fbfdb5c6156dab1c6cf60ed30b2c650778d7b6a5 Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Tue, 21 Dec 2021 14:20:00 +0800 Subject: [PATCH 6/8] fix imports --- cmd/layotto_multiple_api/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go index dffa9af00e..c857d9dc3e 100644 --- a/cmd/layotto_multiple_api/main.go +++ b/cmd/layotto_multiple_api/main.go @@ -19,6 +19,7 @@ package main import ( "encoding/json" "fmt" + "mosn.io/api" helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld" "mosn.io/layotto/pkg/grpc/dapr" "mosn.io/layotto/pkg/grpc/default_api" @@ -118,6 +119,7 @@ import ( "github.com/urfave/cli" "google.golang.org/grpc" + "mosn.io/layotto/diagnostics" _ "mosn.io/layotto/pkg/filter/network/tcpcopy" "mosn.io/layotto/pkg/runtime" "mosn.io/mosn/pkg/featuregate" From dcb2a229fecb3e62d1a33434fe5f044fba10d0c1 Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Tue, 21 Dec 2021 14:41:53 +0800 Subject: [PATCH 7/8] downgrade go version in workflow configuration --- .github/workflows/github-actions.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 352d6b29e1..06ac6a554c 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -15,7 +15,7 @@ jobs: # If you want to matrix build , you can append the following list. matrix: go_version: - - 1.15 + - 1.14.13 os: - ubuntu-latest @@ -76,7 +76,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.15 + go-version: 1.14.13 - name: Check out code uses: actions/checkout@v2 From 599230261588006acf3ed86e9ee9f2d605f2b559 Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Tue, 21 Dec 2021 18:10:12 +0800 Subject: [PATCH 8/8] add comments --- components/rpc/callback/callback.go | 8 ++--- .../rpc/invoker/mosn/channel/connpool.go | 5 +++- .../rpc/invoker/mosn/channel/httpchannel.go | 30 +++++++++++++++---- .../rpc/invoker/mosn/channel/xchannel.go | 7 ++++- components/rpc/invoker/mosn/mosninvoker.go | 6 ++-- components/rpc/types.go | 6 ++-- 6 files changed, 46 insertions(+), 16 deletions(-) diff --git a/components/rpc/callback/callback.go b/components/rpc/callback/callback.go index 4e97d85bf9..863eb52251 100644 --- a/components/rpc/callback/callback.go +++ b/components/rpc/callback/callback.go @@ -57,7 +57,7 @@ var ( // to storage BeforeFactory beforeInvokeRegistry = map[string]BeforeFactory{} // to storage AfterFactory - afterInvokeRegistry = map[string]AfterFactory{} + afterInvokeRegistry = map[string]AfterFactory{} ) // NewCallback is created Callback @@ -84,7 +84,7 @@ func (c *callback) AddBeforeInvoke(conf rpc.CallbackFunc) { c.beforeInvoke = append(c.beforeInvoke, f.Create()) } -// AddAfterInvoke is add beforeInvoke into callback.afterInvoke +// AddAfterInvoke is used to add beforeInvoke into callback.afterInvoke func (c *callback) AddAfterInvoke(conf rpc.CallbackFunc) { f, ok := afterInvokeRegistry[conf.Name] if !ok { @@ -98,7 +98,7 @@ func (c *callback) AddAfterInvoke(conf rpc.CallbackFunc) { c.afterInvoke = append(c.afterInvoke, f.Create()) } -// BeforeInvoke is get RPCRequest in callback.beforeInvoke +// BeforeInvoke is used to invoke beforeInvoke callbacks func (c *callback) BeforeInvoke(request *rpc.RPCRequest) (*rpc.RPCRequest, error) { var err error for _, cb := range c.beforeInvoke { @@ -109,7 +109,7 @@ func (c *callback) BeforeInvoke(request *rpc.RPCRequest) (*rpc.RPCRequest, error return request, err } -// AfterInvoke is get RPCResponse in callback.afterInvoke +// AfterInvoke is used to invoke afterInvoke callbacks func (c *callback) AfterInvoke(response *rpc.RPCResponse) (*rpc.RPCResponse, error) { var err error for _, cb := range c.afterInvoke { diff --git a/components/rpc/invoker/mosn/channel/connpool.go b/components/rpc/invoker/mosn/channel/connpool.go index dcf163ea0a..3d4de6bde4 100644 --- a/components/rpc/invoker/mosn/channel/connpool.go +++ b/components/rpc/invoker/mosn/channel/connpool.go @@ -88,7 +88,6 @@ func newConnPool( return p } - // connPool is connected pool type connPool struct { maxActive int @@ -131,6 +130,7 @@ func (p *connPool) Get(ctx context.Context) (*wrapConn, error) { if p.stateFunc != nil { wc.state = p.stateFunc() } + // start a readloop gorountine to read and handle data if p.onDataFunc != nil { utils.GoWithRecover(func() { p.readloop(wc) @@ -171,6 +171,7 @@ func (p *connPool) readloop(c *wrapConn) { c.buf = buffer.NewIoBuffer(defaultBufSize) for { + // read data from connection n, readErr := c.buf.ReadOnce(c) if readErr != nil { err = readErr @@ -182,6 +183,8 @@ func (p *connPool) readloop(c *wrapConn) { } if n > 0 { + // handle data. + // it will delegate to hstate if it's constructed by httpchannel if onDataErr := p.onDataFunc(c); onDataErr != nil { err = onDataErr log.DefaultLogger.Errorf("[runtime][rpc]connpool onData err: %s", onDataErr.Error()) diff --git a/components/rpc/invoker/mosn/channel/httpchannel.go b/components/rpc/invoker/mosn/channel/httpchannel.go index c3310688bf..c2745fe543 100644 --- a/components/rpc/invoker/mosn/channel/httpchannel.go +++ b/components/rpc/invoker/mosn/channel/httpchannel.go @@ -36,8 +36,11 @@ func init() { RegistChannel("http", newHttpChannel) } +// hstate is a pipe for readloop goroutine to communicate with request goroutine type hstate struct { + // request goroutine will read data from it reader net.Conn + // readloop goroutine will write data to it writer net.Conn } @@ -60,11 +63,12 @@ type httpChannel struct { pool *connPool } -// newHttpChannel is create rpc.Channel by ChannelConfig +// newHttpChannel is used to create rpc.Channel according to ChannelConfig func newHttpChannel(config ChannelConfig) (rpc.Channel, error) { hc := &httpChannel{} hc.pool = newConnPool( config.Size, + // dialFunc func() (net.Conn, error) { local, remote := net.Pipe() localTcpConn := &fakeTcpConn{c: local} @@ -72,8 +76,17 @@ func newHttpChannel(config ChannelConfig) (rpc.Channel, error) { if err := acceptFunc(remoteTcpConn, config.Listener); err != nil { return nil, err } + // the goroutine model is: + // request goroutine ---> localTcpConn ---> mosn + // ^ | + // | | + // | | + // hstate <-- readloop goroutine <------ return localTcpConn, nil - }, func() interface{} { + }, + // stateFunc + func() interface{} { + // hstate is a pipe for readloop goroutine to communicate with request goroutine s := &hstate{} s.reader, s.writer = net.Pipe() return s @@ -84,17 +97,21 @@ func newHttpChannel(config ChannelConfig) (rpc.Channel, error) { return hc, nil } -// Do is handle RPCRequest to RPCResponse +// Do is used to handle RPCRequest and return RPCResponse func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { + // 1. context.WithTimeout timeout := time.Duration(req.Timeout) * time.Millisecond ctx, cancel := context.WithTimeout(req.Ctx, timeout) defer cancel() + // 2. get a fake connection with mosn + // The pool will start a readloop gorountine, + // which aims to read data from mosn and then write data to the hstate.writer conn, err := h.pool.Get(ctx) if err != nil { return nil, err } - + // 3. set deadline before write data to this connection hstate := conn.state.(*hstate) deadline, _ := ctx.Deadline() if err = conn.SetWriteDeadline(deadline); err != nil { @@ -102,7 +119,7 @@ func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { h.pool.Put(conn, true) return nil, common.Error(common.UnavailebleCode, err.Error()) } - + // 4. write data to this fake connection httpReq := h.constructReq(req) defer fasthttp.ReleaseRequest(httpReq) @@ -112,6 +129,7 @@ func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { return nil, common.Error(common.UnavailebleCode, err.Error()) } + // 5. read response data and parse it into fasthttp.Response httpResp := &fasthttp.Response{} hstate.reader.SetReadDeadline(deadline) @@ -121,6 +139,8 @@ func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { return nil, common.Error(common.UnavailebleCode, err.Error()) } h.pool.Put(conn, false) + + // 6. convert result to rpc.RPCResponse,which is the response of rpc invoker body := httpResp.Body() if httpResp.StatusCode() != http.StatusOK { return nil, common.Errorf(common.UnavailebleCode, "http response code %d, body: %s", httpResp.StatusCode(), string(body)) diff --git a/components/rpc/invoker/mosn/channel/xchannel.go b/components/rpc/invoker/mosn/channel/xchannel.go index 36c17f9ba5..839628d51e 100644 --- a/components/rpc/invoker/mosn/channel/xchannel.go +++ b/components/rpc/invoker/mosn/channel/xchannel.go @@ -51,6 +51,7 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) { m := &xChannel{proto: proto} m.pool = newConnPool( config.Size, + // dialFunc func() (net.Conn, error) { local, remote := net.Pipe() localTcpConn := &fakeTcpConn{c: local} @@ -60,6 +61,7 @@ func newXChannel(config ChannelConfig) (rpc.Channel, error) { } return localTcpConn, nil }, + // stateFunc func() interface{} { return &xstate{calls: map[uint32]chan call{}} }, @@ -90,10 +92,12 @@ type xChannel struct { // Do is handle RPCRequest to RPCResponse func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { + // 1. context.WithTimeout timeout := time.Duration(req.Timeout) * time.Millisecond ctx, cancel := context.WithTimeout(req.Ctx, timeout) defer cancel() + // 2. get fake connection with mosn conn, err := m.pool.Get(ctx) if err != nil { return nil, err @@ -101,7 +105,7 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { xstate := conn.state.(*xstate) - // encode request + // 3. encode request frame := m.proto.ToFrame(req) id := atomic.AddUint32(&xstate.reqid, 1) frame.SetRequestId(uint64(id)) @@ -131,6 +135,7 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) { } m.pool.Put(conn, false) + // read response and decode it select { case res := <-callChan: if res.err != nil { diff --git a/components/rpc/invoker/mosn/mosninvoker.go b/components/rpc/invoker/mosn/mosninvoker.go index 18ac934459..6a1c3b1d23 100644 --- a/components/rpc/invoker/mosn/mosninvoker.go +++ b/components/rpc/invoker/mosn/mosninvoker.go @@ -89,24 +89,26 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp } }() + // 1. validate request if req.Timeout == 0 { req.Timeout = 3000 } req.Ctx = ctx log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req) + // 2. beforeInvoke callback req, err = m.cb.BeforeInvoke(req) if err != nil { log.DefaultLogger.Errorf("[runtime][rpc]before filter error %s", err.Error()) return nil, err } - + // 3. do invocation resp, err = m.channel.Do(req) if err != nil { log.DefaultLogger.Errorf("[runtime][rpc]error %s", err.Error()) return nil, err } - resp.Ctx = req.Ctx + // 4. afterInvoke callback resp, err = m.cb.AfterInvoke(resp) if err != nil { log.DefaultLogger.Errorf("[runtime][rpc]after filter error %s", err.Error()) diff --git a/components/rpc/types.go b/components/rpc/types.go index abb8644b75..150b28fb23 100644 --- a/components/rpc/types.go +++ b/components/rpc/types.go @@ -53,7 +53,7 @@ func (r RPCHeader) Get(key string) string { // RPCRequest is request info type RPCRequest struct { // context - Ctx context.Context + Ctx context.Context // request id Id string Timeout int32 @@ -88,9 +88,9 @@ type Callback interface { // AddAfterInvoke is add AfterInvoke func AddAfterInvoke(CallbackFunc) - // BeforeInvoke is get BeforeInvoke by RPCRequest + // BeforeInvoke is used to invoke beforeInvoke callbacks BeforeInvoke(*RPCRequest) (*RPCRequest, error) - // AfterInvoke is get AfterInvoke by RPCRequest + // AfterInvoke is used to invoke afterInvoke callbacks AfterInvoke(*RPCResponse) (*RPCResponse, error) }