Skip to content

Commit

Permalink
Merge branch 'main' into update_config
Browse files Browse the repository at this point in the history
  • Loading branch information
seeflood authored Aug 23, 2022
2 parents 974a0bb + 318dd76 commit a8e4b14
Show file tree
Hide file tree
Showing 26 changed files with 959 additions and 61 deletions.
112 changes: 110 additions & 2 deletions components/rpc/invoker/mosn/channel/xchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"

"mosn.io/pkg/buffer"
"mosn.io/pkg/log"

"mosn.io/api"

common "mosn.io/layotto/components/pkg/common"
Expand Down Expand Up @@ -93,8 +97,104 @@ type xChannel struct {
pool *connPool
}

// Do is handle RPCRequest to RPCResponse
func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
// InvokeWithTargetAddress send request to specific provider address
func (m *xChannel) InvokeWithTargetAddress(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
// 1. context.WithTimeout
timeout := time.Duration(req.Timeout) * time.Millisecond
ctx, cancel := context.WithTimeout(req.Ctx, timeout)
defer cancel()

// 2. get connection with specific address
conn, err := net.Dial("tcp", req.Header[rpc.TargetAddress][0])
if err != nil {
return nil, err
}
wc := &wrapConn{Conn: conn}

// 3. encode request
frame := m.proto.ToFrame(req)
buf, encErr := m.proto.Encode(req.Ctx, frame)
if encErr != nil {
return nil, common.Error(common.InternalCode, encErr.Error())
}

callChan := make(chan call, 1)
// 4. set timeout
deadline, _ := ctx.Deadline()
if err := conn.SetWriteDeadline(deadline); err != nil {
return nil, common.Error(common.UnavailebleCode, err.Error())
}

// 5. read package
go func() {
var err error
defer func() {
if err != nil {
callChan <- call{err: err}
}
wc.Close()
}()

wc.buf = buffer.NewIoBuffer(defaultBufSize)
for {
// read data from connection
n, readErr := wc.buf.ReadOnce(conn)
if readErr != nil {
err = readErr
if readErr == io.EOF {
log.DefaultLogger.Debugf("[runtime][rpc]direct conn read-loop err: %s", readErr.Error())
} else {
log.DefaultLogger.Errorf("[runtime][rpc]direct conn read-loop err: %s", readErr.Error())
}
}

if n > 0 {
iframe, decodeErr := m.proto.Decode(context.TODO(), wc.buf)
if decodeErr != nil {
err = decodeErr
log.DefaultLogger.Errorf("[runtime][rpc]direct conn decode frame err: %s", err)
break
}
frame, ok := iframe.(api.XRespFrame)
if frame == nil {
continue
}
if !ok {
err = errors.New("[runtime][rpc]xchannel type not XRespFrame")
log.DefaultLogger.Errorf("[runtime][rpc]direct conn decode frame err: %s", err)
break
}
callChan <- call{resp: frame}
return

}
if err != nil {
break
}
if wc.buf != nil && wc.buf.Len() == 0 && wc.buf.Cap() > maxBufSize {
wc.buf.Free()
wc.buf.Alloc(defaultBufSize)
}
}
}()

// 6. write packet
if _, err := conn.Write(buf.Bytes()); err != nil {
return nil, common.Error(common.UnavailebleCode, err.Error())
}

select {
case res := <-callChan:
if res.err != nil {
return nil, common.Error(common.UnavailebleCode, res.err.Error())
}
return m.proto.FromFrame(res.resp)
case <-ctx.Done():
return nil, common.Error(common.TimeoutCode, ErrTimeout.Error())
}
}

func (m *xChannel) Invoke(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
// 1. context.WithTimeout
timeout := time.Duration(req.Timeout) * time.Millisecond
ctx, cancel := context.WithTimeout(req.Ctx, timeout)
Expand Down Expand Up @@ -151,6 +251,14 @@ func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
}
}

// Do is handle RPCRequest to RPCResponse
func (m *xChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
if _, ok := req.Header[rpc.TargetAddress]; ok && len(req.Header[rpc.TargetAddress]) > 0 {
return m.InvokeWithTargetAddress(req)
}
return m.Invoke(req)
}

// removeCall is delete xstate.calls by id
func (m *xChannel) removeCall(xstate *xstate, id uint32) {
xstate.mu.Lock()
Expand Down
9 changes: 8 additions & 1 deletion components/rpc/invoker/mosn/mosninvoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"

// bridge to mosn
_ "mosn.io/mosn/pkg/filter/network/proxy"
Expand Down Expand Up @@ -93,7 +94,13 @@ func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rp

// 1. validate request
if req.Timeout == 0 {
req.Timeout = 3000
req.Timeout = rpc.DefaultRequestTimeoutMs
if ts, ok := req.Header[rpc.RequestTimeoutMs]; ok && len(ts) > 0 {
t, err := strconv.Atoi(ts[0])
if err == nil && t != 0 {
req.Timeout = int32(t)
}
}
}
req.Ctx = ctx
log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req)
Expand Down
17 changes: 17 additions & 0 deletions components/rpc/invoker/mosn/mosninvoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,27 @@ func Test_mosnInvoker_Invoke(t *testing.T) {
Timeout: 100,
Method: "Hello",
Data: []byte("hello"),
Header: map[string][]string{},
}
rsp, err := invoker.Invoke(context.Background(), req)
assert.Nil(t, err)
assert.Equal(t, "hello world!", string(rsp.Data))

req.Header[rpc.RequestTimeoutMs] = []string{"0"}
req.Timeout = 0
rsp, err = invoker.Invoke(context.Background(), req)
assert.Nil(t, err)
assert.Equal(t, "hello world!", string(rsp.Data))

assert.Equal(t, int32(3000), req.Timeout)

req.Header[rpc.RequestTimeoutMs] = []string{"100000"}
req.Timeout = 0
rsp, err = invoker.Invoke(context.Background(), req)
assert.Nil(t, err)
assert.Equal(t, "hello world!", string(rsp.Data))

assert.Equal(t, int32(100000), req.Timeout)
})

t.Run("panic", func(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions components/rpc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ import (
"strings"
)

const (
TargetAddress = "rpc_target_address"
RequestTimeoutMs = "rpc_request_timeout"
)

const (
DefaultRequestTimeoutMs = 3000
)

// RPCHeader is storage header info
type RPCHeader map[string][]string

Expand Down
2 changes: 2 additions & 0 deletions docs/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
- Service Invocation
- [Hello World](en/start/rpc/helloworld.md)
- [Dubbo JSON RPC](en/start/rpc/dubbo_json_rpc.md)
- [Use OSS API](en/start/oss/start.md)
- [API plugin: register your own API](en/start/api_plugin/helloworld.md)
<!--quickstart_generator-->
- As the data plane of istio
- [Integrate with istio 1.10.6](en/start/istio/)
- [Integrate with istio 1.5.x](en/start/istio/start.md)
Expand Down
2 changes: 1 addition & 1 deletion docs/api/v1/s3.html
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ <h2 id="oss.proto">oss.proto</h2><a href="#title">Top</a>


<h3 id="spec.proto.extension.v1.s3.ObjectStorageService">[gRPC Service] ObjectStorageService</h3>
<p>ObjectStorageService</p>
<p>ObjectStorageService is an abstraction for blob storage or so called "object storage", such as alibaba cloud OSS, such as AWS S3.</p><p>You invoke ObjectStorageService API to do some CRUD operations on your binary file, e.g. query my file, delete my file, etc.</p>
<table class="enum-table">
<thead>
<tr><td>Method Name</td><td>Request Type</td><td>Response Type</td><td>Description</td></tr>
Expand Down
25 changes: 7 additions & 18 deletions docs/en/api_reference/how_to_generate_api_doc.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# How to generate `.pb.go` code and API reference
# How to generate `.pb.go` code and corresponding documentation
Note: the commands below should be executed under layotto directory

```shell
Expand All @@ -8,7 +8,10 @@ make proto
Then you get:
- `.pb.go` code
- API reference docs
- updated sidebar in the doc site
- updated API reference list
- quickstart document (both chinese and english)
- updated sidebar (The tool will add the generated quickstart doc into the sidebar of https://mosn.io/layotto )
- updated CI (The tool will add the generated quickstart doc into the CI script `etc/script/test-quickstart.sh`)

That's all :)

Expand Down Expand Up @@ -48,22 +51,8 @@ make proto.doc
This command uses docker to run protoc-gen-doc and generate docs.

### **Use docker to run protoc-gen-doc**
`make proto.doc` essentially run commands below:
`make proto.doc` invokes the script `etc/script/generate-doc.sh`, which uses docker to run protoc-gen-doc.

```
docker run --rm \
-v $(pwd)/docs/en/api_reference:/out \
-v $(pwd)/spec/proto/runtime/v1:/protos \
pseudomuto/protoc-gen-doc --doc_opt=/protos/template.tmpl,runtime_v1.md runtime.proto
```

and

```shell
docker run --rm \
-v $(pwd)/docs/en/api_reference:/out \
-v $(pwd)/spec/proto/runtime/v1:/protos \
pseudomuto/protoc-gen-doc --doc_opt=/protos/template.tmpl,appcallback_v1.md appcallback.proto
```
You can check `etc/script/generate-doc.sh` for more details.

<!-- tabs:end -->
2 changes: 1 addition & 1 deletion docs/en/building_blocks/rpc/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Layotto's RPC API is based on [Mosn](https://mosn.io/en/)'s grpc handler, which

The interface of the RPC API are consistent with [Dapr](https://docs.dapr.io/developing-applications/building-blocks/service-invocation/service-invocation-overview/), you could see its details in [invoke.go](https://github.com/mosn/layotto/blob/3802c4591181fdbcfb7dd07cbbdbadeaaada650a/sdk/go-sdk/client/invoke.go).

Using Layotto RPC invocation, your application can reliably and securely communicate with other applications using the standard HTTP or [X-Protocol](https://www.servicemesher.com/blog/x-protocol-common-address-solution/) protocols.
Using Layotto RPC invocation, your application can reliably and securely communicate with other applications using the standard HTTP or [X-Protocol](https://cloudnative.to/blog/x-protocol-common-address-solution/) protocols.

![sidecar](https://mosn.io/en/docs/concept/sidecar-pattern/sidecar-pattern.jpg)

Expand Down
71 changes: 71 additions & 0 deletions docs/en/design/notify/phone_call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# PhoneCall API design

## What would you like to be added
IVR API, or PhoneCall API

Developers can invoke this API to send voice messages to specific people.

## Why is this needed
In the monitoring scenarios, monitor systems need to send alarm messages to people on-call.
The messages might be in different forms, including IM,SMS, Email and phone calls, depending on the level of urgency.

## Product research
| IVR product |Docs|
|---|---|
|Aliyun VMS| https://www.aliyun.com/product/vms |
|AWS Pinpoint | https://aws.amazon.com/cn/pinpoint/ |


## Detailed Design

We need to consider the following factors:
- Portability
For example, a monitor system might be deployed on alibaba cloud(using [VMS](https://www.aliyun.com/product/vms) to send voice message) or AWS (using [AWS Pinpoint](https://aws.amazon.com/cn/pinpoint/) to send voice message). So portability is important here.

```proto
// PhoneCallService is one of Notify APIs. It's used to send voice messages
service PhoneCallService {
// Send voice using the specific template
rpc SendVoiceWithTemplate(SendVoiceWithTemplateRequest) returns (SendVoiceWithTemplateResponse) {}
}
// The request of SendVoiceWithTemplate method
message SendVoiceWithTemplateRequest{
// If your system uses multiple IVR services at the same time,
// you can specify which service to use with this field.
string service_name = 1;
// Required
VoiceTemplate template = 2;
// Required
repeated string to_mobile = 3;
// This field is required by some cloud providers.
string from_mobile = 4;
}
// VoiceTemplate
message VoiceTemplate{
// Required
string template_id = 1;
// Required
map<string, string> template_params = 2;
}
// The response of `SendVoiceWithTemplate` method
message SendVoiceWithTemplateResponse{
// Id of this request.
string request_id = 1;
}
```
Loading

0 comments on commit a8e4b14

Please sign in to comment.