-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinterceptor.go
165 lines (149 loc) · 6.21 KB
/
interceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
//grpcConst is a package that allow you to communicate defaulting values of your protobuf messages
//and communicate this default and set it before your client side code interacts with the messages.
//example server-side:
// header, err := grpcConst.HeaderSetConstant(
// &proto.Feature{
// Name: "some constant name",
// Location: &proto.Point{Latitude: 10}
// })
// stream.SetHeader(header)
// ... your normal routine but you could
// ... fx send &proto.Feature{Location: &proto.Point{Longitude: 20}}
// ... this will yield - name: "some constant name", location: {10, 20}
// ... while sending less data in the message
//or:
// stream = grpcConst.ServerStreMWrapper(
// &proto.Feature{
// Name: "some constant name",
// Location: &proto.Point{Latitude: 10}
// })
// ... using stream.Send() now removes the default values from your objects; sending less data
//example client-side:
//initiate your client with a grpc.StreamClientInterceptor this way:
// conn, err := grpc.Dial(..., grpc.WithStreamInterceptor(grpcConst.StreamClientInterceptor()))
package grpcConst
import (
"context"
"encoding/base64"
"log"
"reflect"
"github.com/MikkelHJuul/grpcConst/merge"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/metadata"
)
//XgRPCConst is the HTTP header passed between server and client
const XgRPCConst = "x-grpc-const"
//HeaderSetConstant is a convenience method for the server side to add a metadata.MD with the correct content
// given your gRPC struct v, the user is returned the metadata to send.
//that the user can send using `grpc.ServerStream:SendHeader(metadata.MD) or :SetHeader(metadata.MD)`.
// v must be passed by reference.
func HeaderSetConstant(v interface{}) (metadata.MD, error) {
msg, err := marshal(v)
return metadata.Pairs(XgRPCConst, msg), err
}
//ServerStreamWrapper wraps your stream object and returns the decorated stream with a SendMsg method,
//that removes items that are equal a reference object.
//The stream remains untouched if the client did not send an XgRPCConst header
func ServerStreamWrapper(reference interface{}) (stream grpc.ServerStream, err error) {
if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md.Get(XgRPCConst)) > 0 {
md, err = HeaderSetConstant(reference)
if err != nil {
return
}
stream.SetHeader(md)
var reducer merge.Reducer
if _, ok := reference.(Reducer); ok {
reducer = MessageMergerReducer{ConstantMessage: reference}
} else {
reducer = merge.NewReducer(reference)
}
stream = &dataRemovingServerStream{stream, reducer}
}
return
}
//StreamClientInterceptor is an interceptor for the client side (for unidirectional server-side streaming rpc's)
//The client side Stream interceptor intercepts the stream when it is initiated.
//This method decorates the actual ClientStream adding data to each message where applicable
//this variadic function accepts none or one argument. defaulting the method for constructing
//the merge.Merger to use merge.NewMerger.
//for a more safe alternative
func StreamClientInterceptor(mergerCreator ...MergerCreator) grpc.StreamClientInterceptor {
mergeCreator := mergerCreatorDefaulting(mergerCreator...)
return func(
parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx := metadata.AppendToOutgoingContext(parentCtx, XgRPCConst, "")
var stream, err = streamer(ctx, desc, cc, method, opts...)
return &dataAddingClientStream{stream, nil, mergeCreator}, err
}
}
func mergerCreatorDefaulting(creator ...MergerCreator) MergerCreator {
if creator == nil || creator[0] == nil {
return merge.NewMerger
} else {
return creator[0]
}
}
//MergerCreator is a proxy for a function returning a merge.Merger from an interface{}
type MergerCreator func(interface{}) merge.Merger
//marshal implements the server side marshalling of a protobuf message into the specification header value
func marshal(v interface{}) (string, error) {
msg, err := encoding.GetCodec("proto").Marshal(v)
return base64.URLEncoding.EncodeToString(msg), err
}
//unmarshal implements the client side handling/unmarshalling of the specification header
func unmarshal(header string, receiver interface{}) error {
protoMsg, err := base64.URLEncoding.DecodeString(header)
if err != nil {
return err
}
return encoding.GetCodec("proto").Unmarshal(protoMsg, receiver)
}
//dataAddingClientStream is the decorated grpc.ClientStream
//that your code will use via the method RecvMsg
//the intermediary construct fieldToSet is used to remove to need to traverse the entire message
type dataAddingClientStream struct {
grpc.ClientStream
Merger merge.Merger
mergerCreator MergerCreator
}
type dataRemovingServerStream struct {
grpc.ServerStream
Reducer merge.Reducer
}
//RecvMsg is called via your grpc.ClientStream;
//the generated code's method Recv calls this method on it's internal grpc.ClientStream
//This method initiates on first call the fields that should be default to all the messages
//on all calls the underlying grpc.ClientStream:RecvMsg message has this data added
func (dc *dataAddingClientStream) RecvMsg(m interface{}) error {
if dc.Merger == nil {
donor := newEmpty(m)
header, _ := dc.ClientStream.Header()
if head, ok := header[XgRPCConst]; ok && len(head) > 0 {
if err := unmarshal(head[0], donor); err != nil {
log.Printf("ERROR: an %s-header could not be unmarshalled correctly: %v", XgRPCConst, head)
}
}
if _, ok := donor.(Merger); ok {
dc.Merger = MessageMergerReducer{ConstantMessage: donor}
} else {
dc.Merger = dc.mergerCreator(donor)
}
}
if err := dc.ClientStream.RecvMsg(m); err != nil {
return err
}
return dc.Merger.SetFields(m)
}
//newEmpty simply creates a new instance of an interface given an instance of that interface
func newEmpty(t interface{}) interface{} {
return reflect.New(reflect.TypeOf(t).Elem()).Interface()
}
//SendMsg reduces the message using the reference before sending it using the underlying ServerStream
func (ds *dataRemovingServerStream) SendMsg(m interface{}) error {
if err := ds.Reducer.RemoveFields(m); err != nil {
log.Printf("ERROR: could not remove fields from %v", m)
}
return ds.ServerStream.SendMsg(m)
}