Skip to content

Commit c71aa62

Browse files
authored
example: interceptor (#2541)
fix #2483
1 parent 36f3126 commit c71aa62

File tree

3 files changed

+456
-0
lines changed

3 files changed

+456
-0
lines changed

Diff for: examples/features/interceptor/README.md

+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Interceptor
2+
3+
gRPC provides simple APIs to implement and install interceptors on a per
4+
ClientConn/Server basis. Interceptor intercepts the execution of each RPC call.
5+
Users can use interceptors to do logging, authentication/authorization, metrics
6+
collection, and many other functionality that can be shared across RPCs.
7+
8+
## Try it
9+
10+
```
11+
go run server/main.go
12+
```
13+
14+
```
15+
go run client/main.go
16+
```
17+
18+
## Explanation
19+
20+
In gRPC, interceptors can be categorized into two kinds in terms of the type of
21+
RPC calls they intercept. The first one is the **unary interceptor**, which
22+
intercepts unary RPC calls. And the other is the **stream interceptor** which
23+
deals with streaming RPC calls. See
24+
[here](https://grpc.io/docs/guides/concepts.html#rpc-life-cycle) for explanation
25+
about unary RPCs and streaming RPCs. Each of client and server has their own
26+
types of unary and stream interceptors. Thus, there are in total four different
27+
types of interceptors in gRPC.
28+
29+
### Client-side
30+
31+
#### Unary Interceptor
32+
33+
[`UnaryClientInterceptor`](https://godoc.org/google.golang.org/grpc#UnaryClientInterceptor)
34+
is the type for client-side unary interceptor. It is essentially a function type
35+
with signature: `func(ctx context.Context, method string, req, reply
36+
interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error`.
37+
An implementation of a unary interceptor can usually be divided into three
38+
parts: pre-processing, invoking RPC method, and post-processing.
39+
40+
For pre-processing, users can get info about the current RPC call by examining
41+
the args passed in, such as RPC context, method string, request to be sent, and
42+
CallOptions configured. With the info, users can even modify the RPC call. For
43+
instance, in the example, we examine the list of CallOptions and see if call
44+
credential has been configured. If not, configure it to use oauth2 with token
45+
"some-secrete-token" as fallback. In our example, we intentionally omit
46+
configuring the per RPC credential to resort to fallback.
47+
48+
After pre-processing is done, use can invoke the RPC call by calling the
49+
`invoker`.
50+
51+
Once the invoker returns the reply and error, user can do post-processing of the
52+
RPC call. Usually, it's about dealing with the returned reply and error. In the
53+
example, we log the RPC timing and error info.
54+
55+
To install a unary interceptor on a ClientConn, configure `Dial` with
56+
`DialOption`
57+
[`WithUnaryInterceptor`](https://godoc.org/google.golang.org/grpc#WithUnaryInterceptor).
58+
59+
#### Stream Interceptor
60+
61+
[`StreamClientInterceptor`](https://godoc.org/google.golang.org/grpc#StreamClientInterceptor)
62+
is the type for client-side stream interceptor. It is a function type with
63+
signature: `func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method
64+
string, streamer Streamer, opts ...CallOption) (ClientStream, error)`. An
65+
implementation of a stream interceptor usually include pre-processing, and
66+
stream operation interception.
67+
68+
For pre-processing, it's similar to unary interceptor.
69+
70+
However, rather than doing the RPC method invocation and post-processing
71+
afterwards, stream interceptor intercepts the users' operation on the stream.
72+
First, the interceptor calls the passed-in `streamer` to get a `ClientStream`,
73+
and then wraps around the `ClientStream` and overloading its methods with
74+
intercepting logic. Finally, interceptors returns the wrapped `ClientStream` to
75+
user to operate on.
76+
77+
In the example, we define a new struct `wrappedStream`, which is embedded with a
78+
`ClientStream`. Then, we implement (overload) the `SendMsg` and `RecvMsg`
79+
methods on `wrappedStream` to intercepts these two operations on the embedded
80+
`ClientStream`. In the example, we log the message type info and time info for
81+
interception purpose.
82+
83+
To install the stream interceptor for a ClientConn, configure `Dial` with
84+
`DialOption`
85+
[`WithStreamInterceptor`](https://godoc.org/google.golang.org/grpc#WithStreamInterceptor).
86+
87+
### Server-side
88+
89+
Server side interceptor is similar to client side, though with slightly
90+
different provided info.
91+
92+
#### Unary Interceptor
93+
94+
[`UnaryServerInterceptor`](https://godoc.org/google.golang.org/grpc#UnaryServerInterceptor)
95+
is the type for server-side unary interceptor. It is a function type with
96+
signature: `func(ctx context.Context, req interface{}, info *UnaryServerInfo,
97+
handler UnaryHandler) (resp interface{}, err error)`.
98+
99+
Refer to client-side unary interceptor section for detailed implementation
100+
explanation.
101+
102+
To install the unary interceptor for a Server, configure `NewServer` with
103+
`ServerOption`
104+
[`UnaryInterceptor`](https://godoc.org/google.golang.org/grpc#UnaryInterceptor).
105+
106+
#### Stream Interceptor
107+
108+
[`StreamServerInterceptor`](https://godoc.org/google.golang.org/grpc#StreamServerInterceptor)
109+
is the type for server-side stream interceptor. It is a function type with
110+
signature: `func(srv interface{}, ss ServerStream, info *StreamServerInfo,
111+
handler StreamHandler) error`.
112+
113+
Refer to client-side stream interceptor section for detailed implementation
114+
explanation.
115+
116+
To install the unary interceptor for a Server, configure `NewServer` with
117+
`ServerOption`
118+
[`StreamInterceptor`](https://godoc.org/google.golang.org/grpc#StreamInterceptor).

Diff for: examples/features/interceptor/client/main.go

+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
*
3+
* Copyright 2018 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
// Binary client is an example client.
20+
package main
21+
22+
import (
23+
"context"
24+
"flag"
25+
"fmt"
26+
"io"
27+
"log"
28+
"time"
29+
30+
"golang.org/x/oauth2"
31+
"google.golang.org/grpc"
32+
"google.golang.org/grpc/credentials"
33+
"google.golang.org/grpc/credentials/oauth"
34+
ecpb "google.golang.org/grpc/examples/features/proto/echo"
35+
"google.golang.org/grpc/testdata"
36+
)
37+
38+
var addr = flag.String("addr", "localhost:50051", "the address to connect to")
39+
40+
const fallbackToken = "some-secret-token"
41+
42+
// logger is to mock a sophisticated logging system. To simplify the example, we just print out the content.
43+
func logger(format string, a ...interface{}) {
44+
fmt.Printf("LOG:\t"+format+"\n", a...)
45+
}
46+
47+
// unaryInterceptor is an example unary interceptor.
48+
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
49+
var credsConfigured bool
50+
for _, o := range opts {
51+
_, ok := o.(grpc.PerRPCCredsCallOption)
52+
if ok {
53+
credsConfigured = true
54+
break
55+
}
56+
}
57+
if !credsConfigured {
58+
opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
59+
AccessToken: fallbackToken,
60+
})))
61+
}
62+
start := time.Now()
63+
err := invoker(ctx, method, req, reply, cc, opts...)
64+
end := time.Now()
65+
logger("RPC: %s, start time: %s, end time: %s, err: %v", method, start.Format("Basic"), end.Format(time.RFC3339), err)
66+
return err
67+
}
68+
69+
// wrappedStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
70+
// SendMsg method call.
71+
type wrappedStream struct {
72+
grpc.ClientStream
73+
}
74+
75+
func (w *wrappedStream) RecvMsg(m interface{}) error {
76+
logger("Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
77+
return w.ClientStream.RecvMsg(m)
78+
}
79+
80+
func (w *wrappedStream) SendMsg(m interface{}) error {
81+
logger("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
82+
return w.ClientStream.SendMsg(m)
83+
}
84+
85+
func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
86+
return &wrappedStream{s}
87+
}
88+
89+
// streamInterceptor is an example stream interceptor.
90+
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
91+
var credsConfigured bool
92+
for _, o := range opts {
93+
_, ok := o.(*grpc.PerRPCCredsCallOption)
94+
if ok {
95+
credsConfigured = true
96+
}
97+
}
98+
if !credsConfigured {
99+
opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
100+
AccessToken: fallbackToken,
101+
})))
102+
}
103+
s, err := streamer(ctx, desc, cc, method, opts...)
104+
if err != nil {
105+
return nil, err
106+
}
107+
return newWrappedStream(s), nil
108+
}
109+
110+
func callUnaryEcho(client ecpb.EchoClient, message string) {
111+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
112+
defer cancel()
113+
resp, err := client.UnaryEcho(ctx, &ecpb.EchoRequest{Message: message})
114+
if err != nil {
115+
log.Fatalf("client.UnaryEcho(_) = _, %v: ", err)
116+
}
117+
fmt.Println("UnaryEcho: ", resp.Message)
118+
}
119+
120+
func callBidiStreamingEcho(client ecpb.EchoClient) {
121+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
122+
defer cancel()
123+
c, err := client.BidirectionalStreamingEcho(ctx)
124+
if err != nil {
125+
return
126+
}
127+
for i := 0; i < 5; i++ {
128+
if err := c.Send(&ecpb.EchoRequest{Message: fmt.Sprintf("Request %d", i+1)}); err != nil {
129+
log.Fatalf("failed to send request due to error: %v", err)
130+
}
131+
}
132+
c.CloseSend()
133+
for {
134+
resp, err := c.Recv()
135+
if err == io.EOF {
136+
break
137+
}
138+
if err != nil {
139+
log.Fatalf("failed to receive response due to error: %v", err)
140+
}
141+
fmt.Println("BidiStreaming Echo: ", resp.Message)
142+
}
143+
}
144+
145+
func main() {
146+
flag.Parse()
147+
148+
// Create tls based credential.
149+
creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
150+
if err != nil {
151+
log.Fatalf("failed to load credentials: %v", err)
152+
}
153+
154+
// Set up a connection to the server.
155+
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(creds), grpc.WithUnaryInterceptor(unaryInterceptor), grpc.WithStreamInterceptor(streamInterceptor))
156+
if err != nil {
157+
log.Fatalf("did not connect: %v", err)
158+
}
159+
defer conn.Close()
160+
161+
// Make a echo client and send RPCs.
162+
rgc := ecpb.NewEchoClient(conn)
163+
callUnaryEcho(rgc, "hello world")
164+
callBidiStreamingEcho(rgc)
165+
}

0 commit comments

Comments
 (0)