-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathclient.go
140 lines (122 loc) · 3.86 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package grpc
import (
"context"
"fmt"
"reflect"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/go-kit/kit/endpoint"
)
// Client wraps a gRPC connection and provides a method that implements
// endpoint.Endpoint.
type Client struct {
client *grpc.ClientConn
serviceName string
method string
enc EncodeRequestFunc
dec DecodeResponseFunc
grpcReply reflect.Type
before []ClientRequestFunc
after []ClientResponseFunc
finalizer []ClientFinalizerFunc
}
// NewClient constructs a usable Client for a single remote endpoint.
// Pass an zero-value protobuf message of the RPC response type as
// the grpcReply argument.
func NewClient(
cc *grpc.ClientConn,
serviceName string,
method string,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
grpcReply interface{},
options ...ClientOption,
) *Client {
c := &Client{
client: cc,
method: fmt.Sprintf("/%s/%s", serviceName, method),
enc: enc,
dec: dec,
// We are using reflect.Indirect here to allow both reply structs and
// pointers to these reply structs. New consumers of the client should
// use structs directly, while existing consumers will not break if they
// remain to use pointers to structs.
grpcReply: reflect.TypeOf(
reflect.Indirect(
reflect.ValueOf(grpcReply),
).Interface(),
),
before: []ClientRequestFunc{},
after: []ClientResponseFunc{},
}
for _, option := range options {
option(c)
}
return c
}
// ClientOption sets an optional parameter for clients.
type ClientOption func(*Client)
// ClientBefore sets the RequestFuncs that are applied to the outgoing gRPC
// request before it's invoked.
func ClientBefore(before ...ClientRequestFunc) ClientOption {
return func(c *Client) { c.before = append(c.before, before...) }
}
// ClientAfter sets the ClientResponseFuncs that are applied to the incoming
// gRPC response prior to it being decoded. This is useful for obtaining
// response metadata and adding onto the context prior to decoding.
func ClientAfter(after ...ClientResponseFunc) ClientOption {
return func(c *Client) { c.after = append(c.after, after...) }
}
// ClientFinalizer is executed at the end of every gRPC request.
// By default, no finalizer is registered.
func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {
return func(s *Client) { s.finalizer = append(s.finalizer, f...) }
}
// Endpoint returns a usable endpoint that will invoke the gRPC specified by the
// client.
func (c Client) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if c.finalizer != nil {
defer func() {
for _, f := range c.finalizer {
f(ctx, err)
}
}()
}
ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)
req, err := c.enc(ctx, request)
if err != nil {
return nil, err
}
md := &metadata.MD{}
for _, f := range c.before {
ctx = f(ctx, md)
}
ctx = metadata.NewOutgoingContext(ctx, *md)
var header, trailer metadata.MD
grpcReply := reflect.New(c.grpcReply).Interface()
if err = c.client.Invoke(
ctx, c.method, req, grpcReply, grpc.Header(&header),
grpc.Trailer(&trailer),
); err != nil {
return nil, err
}
for _, f := range c.after {
ctx = f(ctx, header, trailer)
}
response, err = c.dec(ctx, grpcReply)
if err != nil {
return nil, err
}
return response, nil
}
}
// ClientFinalizerFunc can be used to perform work at the end of a client gRPC
// request, after the response is returned. The principal
// intended use is for error logging. Additional response parameters are
// provided in the context under keys with the ContextKeyResponse prefix.
// Note: err may be nil. There maybe also no additional response parameters depending on
// when an error occurs.
type ClientFinalizerFunc func(ctx context.Context, err error)