Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make parameters of API plugin extendable #369

Merged
merged 12 commits into from
Dec 21, 2021
15 changes: 13 additions & 2 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ jobs:
- name: Post Coverage
run: bash <(curl -s https://codecov.io/bash)

- name: integrate
run: make wasm-integrate
integrate:
name: integrate
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15

- name: Check out code
uses: actions/checkout@v2

- name: Run Integrate tests.
run: make wasm-integrate
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,14 @@ package helloworld

import (
"context"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/components-contrib/state"
rawGRPC "google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"mosn.io/layotto/components/configstores"
"mosn.io/layotto/components/file"
"mosn.io/layotto/components/hello"
"mosn.io/layotto/components/lock"
"mosn.io/layotto/components/rpc"
"mosn.io/layotto/components/sequencer"
"mosn.io/layotto/pkg/grpc"
grpc_api "mosn.io/layotto/pkg/grpc"
mgrpc "mosn.io/mosn/pkg/filter/network/grpc"
)

func NewHelloWorldAPI(
appId string,
hellos map[string]hello.HelloService,
configStores map[string]configstores.Store,
rpcs map[string]rpc.Invoker,
pubSubs map[string]pubsub.PubSub,
stateStores map[string]state.Store,
files map[string]file.File,
lockStores map[string]lock.LockStore,
sequencers map[string]sequencer.Store,
sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error),
) grpc.GrpcAPI {
func NewHelloWorldAPI(ac *grpc_api.ApplicationContext) grpc.GrpcAPI {
return &server{}
}

Expand Down
11 changes: 5 additions & 6 deletions cmd/layotto_multiple_api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package main
import (
"encoding/json"
"fmt"
"os"
"strconv"
"time"

"mosn.io/api"
"mosn.io/layotto/diagnostics"
helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld"
"mosn.io/layotto/pkg/grpc/dapr"
"mosn.io/layotto/pkg/grpc/default_api"
helloworld_api "mosn.io/layotto/pkg/integrate/api/helloworld"
"os"
"strconv"
"time"

mock_state "mosn.io/layotto/pkg/mock/components/state"
_ "mosn.io/layotto/pkg/wasm"
Expand Down Expand Up @@ -121,6 +119,7 @@ import (

"github.com/urfave/cli"
"google.golang.org/grpc"
"mosn.io/layotto/diagnostics"
_ "mosn.io/layotto/pkg/filter/network/tcpcopy"
"mosn.io/layotto/pkg/runtime"
"mosn.io/mosn/pkg/featuregate"
Expand Down
3 changes: 2 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ coverage:

ignore:
# Configure what to ignore.
- "pkg/mock" # - Testing mock.
- "pkg/mock/**/*" # - Testing mock.
# Ignore non-deterministic methods and tests
# codecov might report unexpected changes even if no code has been changed.
# see https://community.codecov.com/t/reported-change-in-coverage-when-no-code-changes-made/1346
Expand All @@ -24,4 +24,5 @@ ignore:
- "pkg/grpc/dapr/proto/runtime/v1"
- "pkg/grpc/dapr/dapr_api_unimplement.go"
- "pkg/wasm/watcher.go"
- "cmd/**/*"
- "components/lock/consul/consul_lock_task.go"
3 changes: 3 additions & 0 deletions pkg/actuator/info/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,8 @@ func AddInfoContributor(name string, c Contributor) {

// AddInfoContributorFunc register info.Contributor.It's not concurrent-safe,so please invoke it ONLY in init method.
func AddInfoContributorFunc(name string, f func() (interface{}, error)) {
if f == nil {
return
}
AddInfoContributor(name, ContributorAdapter(f))
}
5 changes: 5 additions & 0 deletions pkg/actuator/info/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func TestEndpoint_Handle(t *testing.T) {
assert.True(t, err == nil)
assert.True(t, len(handle) == 0)

AddInfoContributorFunc("test", nil)
handle, err = ep.Handle(context.Background(), nil)
assert.True(t, err == nil)
assert.True(t, len(handle) == 0)

AddInfoContributor("test", nil)
handle, err = ep.Handle(context.Background(), nil)
assert.True(t, err == nil)
Expand Down
22 changes: 6 additions & 16 deletions pkg/grpc/dapr/dapr_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,28 +164,18 @@ func (d *daprGrpcAPI) InvokeBinding(ctx context.Context, in *runtime.InvokeBindi
// NewDaprAPI_Alpha construct a grpc_api.GrpcAPI which implements DaprServer.
// Currently it only support Dapr's InvokeService and InvokeBinding API.
// Note: this feature is still in Alpha state and we don't recommend that you use it in your production environment.
func NewDaprAPI_Alpha(
appId string,
hellos map[string]hello.HelloService,
configStores map[string]configstores.Store,
rpcs map[string]rpc.Invoker,
pubSubs map[string]pubsub.PubSub,
stateStores map[string]state.Store,
files map[string]file.File,
lockStores map[string]lock.LockStore,
sequencers map[string]sequencer.Store,
sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error),
) grpc_api.GrpcAPI {
func NewDaprAPI_Alpha(ac *grpc_api.ApplicationContext) grpc_api.GrpcAPI {
// filter out transactionalStateStores
transactionalStateStores := map[string]state.TransactionalStore{}
for key, store := range stateStores {
for key, store := range ac.StateStores {
if state.FeatureTransactional.IsPresent(store.Features()) {
transactionalStateStores[key] = store.(state.TransactionalStore)
}
}
return NewDaprServer(appId, hellos, configStores, rpcs, pubSubs,
stateStores, transactionalStateStores,
files, lockStores, sequencers, sendToOutputBindingFn)
return NewDaprServer(ac.AppId,
ac.Hellos, ac.ConfigStores, ac.Rpcs, ac.PubSubs, ac.StateStores, transactionalStateStores,
ac.Files, ac.LockStores, ac.Sequencers,
ac.SendToOutputBindingFn)
}

func NewDaprServer(
Expand Down
7 changes: 5 additions & 2 deletions pkg/grpc/dapr/dapr_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
grpc_api "mosn.io/layotto/pkg/grpc"
mock_state "mosn.io/layotto/pkg/mock/components/state"
"net"
"testing"
Expand Down Expand Up @@ -63,13 +64,15 @@ func TestNewDaprAPI_Alpha(t *testing.T) {
mockTxStore,
}
// construct API
grpcAPI := NewDaprAPI_Alpha("", nil, nil, nil, nil, map[string]state.Store{"mock": store}, nil, nil, nil,
grpcAPI := NewDaprAPI_Alpha(&grpc_api.ApplicationContext{
"", nil, nil, nil, nil,
map[string]state.Store{"mock": store}, nil, nil, nil,
func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
if name == "error-binding" {
return nil, errors.New("error when invoke binding")
}
return &bindings.InvokeResponse{Data: []byte("ok")}, nil
})
}})
err := grpcAPI.Init(nil)
if err != nil {
t.Errorf("grpcAPI.Init error")
Expand Down
17 changes: 4 additions & 13 deletions pkg/grpc/default_api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,10 @@ func (a *api) Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer)
return registeredServer, nil
}

func NewGrpcAPI(
appId string,
hellos map[string]hello.HelloService,
configStores map[string]configstores.Store,
rpcs map[string]rpc.Invoker,
pubSubs map[string]pubsub.PubSub,
stateStores map[string]state.Store,
files map[string]file.File,
lockStores map[string]lock.LockStore,
sequencers map[string]sequencer.Store,
sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error),
) grpc_api.GrpcAPI {
return NewAPI(appId, hellos, configStores, rpcs, pubSubs, stateStores, files, lockStores, sequencers, sendToOutputBindingFn)
func NewGrpcAPI(ac *grpc_api.ApplicationContext) grpc_api.GrpcAPI {
return NewAPI(ac.AppId,
ac.Hellos, ac.ConfigStores, ac.Rpcs, ac.PubSubs, ac.StateStores, ac.Files, ac.LockStores, ac.Sequencers,
ac.SendToOutputBindingFn)
}

func NewAPI(
Expand Down
26 changes: 14 additions & 12 deletions pkg/grpc/grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ type GrpcAPI interface {
}

// NewGrpcAPI is the constructor of GrpcAPI
type NewGrpcAPI func(
appId string,
hellos map[string]hello.HelloService,
configStores map[string]configstores.Store,
rpcs map[string]rpc.Invoker,
pubSubs map[string]pubsub.PubSub,
stateStores map[string]state.Store,
files map[string]file.File,
lockStores map[string]lock.LockStore,
sequencers map[string]sequencer.Store,
sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error),
) GrpcAPI
type NewGrpcAPI func(applicationContext *ApplicationContext) GrpcAPI

type ApplicationContext struct {
AppId string
Hellos map[string]hello.HelloService
ConfigStores map[string]configstores.Store
Rpcs map[string]rpc.Invoker
PubSubs map[string]pubsub.PubSub
StateStores map[string]state.Store
Files map[string]file.File
LockStores map[string]lock.LockStore
Sequencers map[string]sequencer.Store
SendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
}
26 changes: 14 additions & 12 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,21 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) {
}
// create GrpcAPIs
var apis []grpc.GrpcAPI
ac := &grpc.ApplicationContext{
m.runtimeConfig.AppManagement.AppId,
m.hellos,
m.configStores,
m.rpcs,
m.pubSubs,
m.states,
m.files,
m.locks,
m.sequencers,
m.sendToOutputBinding,
}

for _, apiFactory := range o.apiFactorys {
api := apiFactory(
m.runtimeConfig.AppManagement.AppId,
m.hellos,
m.configStores,
m.rpcs,
m.pubSubs,
m.states,
m.files,
m.locks,
m.sequencers,
m.sendToOutputBinding,
)
api := apiFactory(ac)
// init the GrpcAPI
if err := api.Init(m.AppCallbackConn); err != nil {
return nil, err
Expand Down