1
1
use crate :: configuration:: { Extension , ExtensionType , FailureMode } ;
2
2
use crate :: envoy:: RateLimitDescriptor ;
3
3
use crate :: policy:: Policy ;
4
- use crate :: service:: { GrpcMessage , GrpcServiceHandler } ;
4
+ use crate :: service:: { GetMapValuesBytes , GrpcCall , GrpcMessage , GrpcServiceHandler } ;
5
5
use protobuf:: RepeatedField ;
6
6
use proxy_wasm:: hostcalls;
7
- use proxy_wasm:: types:: Status ;
7
+ use proxy_wasm:: types:: { Bytes , MapType , Status } ;
8
8
use std:: cell:: RefCell ;
9
9
use std:: collections:: HashMap ;
10
10
use std:: rc:: Rc ;
@@ -29,24 +29,6 @@ impl State {
29
29
}
30
30
}
31
31
32
- fn grpc_call (
33
- upstream_name : & str ,
34
- service_name : & str ,
35
- method_name : & str ,
36
- initial_metadata : Vec < ( & str , & [ u8 ] ) > ,
37
- message : Option < & [ u8 ] > ,
38
- timeout : Duration ,
39
- ) -> Result < u32 , Status > {
40
- hostcalls:: dispatch_grpc_call (
41
- upstream_name,
42
- service_name,
43
- method_name,
44
- initial_metadata,
45
- message,
46
- timeout,
47
- )
48
- }
49
-
50
32
type Procedure = ( Rc < GrpcServiceHandler > , GrpcMessage ) ;
51
33
52
34
#[ allow( dead_code) ]
@@ -56,6 +38,8 @@ pub(crate) struct Operation {
56
38
result : Result < u32 , Status > ,
57
39
extension : Rc < Extension > ,
58
40
procedure : Procedure ,
41
+ grpc_call : GrpcCall ,
42
+ get_map_values_bytes : GetMapValuesBytes ,
59
43
}
60
44
61
45
#[ allow( dead_code) ]
@@ -66,17 +50,19 @@ impl Operation {
66
50
result : Err ( Status :: Empty ) ,
67
51
extension,
68
52
procedure,
53
+ grpc_call,
54
+ get_map_values_bytes,
69
55
}
70
56
}
71
57
72
- pub fn set_action ( & mut self , procedure : Procedure ) {
73
- self . procedure = procedure;
74
- }
75
-
76
- pub fn trigger ( & mut self ) {
58
+ fn trigger ( & mut self ) {
77
59
if let State :: Done = self . state {
78
60
} else {
79
- self . result = self . procedure . 0 . send ( grpc_call, self . procedure . 1 . clone ( ) ) ;
61
+ self . result = self . procedure . 0 . send (
62
+ self . get_map_values_bytes ,
63
+ self . grpc_call ,
64
+ self . procedure . 1 . clone ( ) ,
65
+ ) ;
80
66
self . state . next ( ) ;
81
67
}
82
68
}
@@ -172,6 +158,28 @@ impl OperationDispatcher {
172
158
}
173
159
}
174
160
161
+ fn grpc_call (
162
+ upstream_name : & str ,
163
+ service_name : & str ,
164
+ method_name : & str ,
165
+ initial_metadata : Vec < ( & str , & [ u8 ] ) > ,
166
+ message : Option < & [ u8 ] > ,
167
+ timeout : Duration ,
168
+ ) -> Result < u32 , Status > {
169
+ hostcalls:: dispatch_grpc_call (
170
+ upstream_name,
171
+ service_name,
172
+ method_name,
173
+ initial_metadata,
174
+ message,
175
+ timeout,
176
+ )
177
+ }
178
+
179
+ fn get_map_values_bytes ( map_type : MapType , key : & str ) -> Result < Option < Bytes > , Status > {
180
+ hostcalls:: get_map_value_bytes ( map_type, key)
181
+ }
182
+
175
183
#[ cfg( test) ]
176
184
mod tests {
177
185
use super :: * ;
@@ -189,6 +197,10 @@ mod tests {
189
197
Ok ( 200 )
190
198
}
191
199
200
+ fn get_map_values_bytes ( _map_type : MapType , _key : & str ) -> Result < Option < Bytes > , Status > {
201
+ Ok ( Some ( Vec :: new ( ) ) )
202
+ }
203
+
192
204
fn build_grpc_service_handler ( ) -> GrpcServiceHandler {
193
205
GrpcServiceHandler :: new ( Rc :: new ( Default :: default ( ) ) , Rc :: new ( Default :: default ( ) ) )
194
206
}
@@ -203,33 +215,33 @@ mod tests {
203
215
}
204
216
}
205
217
206
- # [ test ]
207
- fn operation_getters ( ) {
208
- let extension = Rc :: new ( Extension :: default ( ) ) ;
209
- let operation = Operation :: new (
210
- extension,
211
- (
218
+ fn build_operation ( ) -> Operation {
219
+ Operation {
220
+ state : State :: Pending ,
221
+ result : Ok ( 200 ) ,
222
+ extension : Rc :: new ( Extension :: default ( ) ) ,
223
+ procedure : (
212
224
Rc :: new ( build_grpc_service_handler ( ) ) ,
213
225
GrpcMessage :: RateLimit ( build_message ( ) ) ,
214
226
) ,
215
- ) ;
227
+ grpc_call,
228
+ get_map_values_bytes,
229
+ }
230
+ }
231
+
232
+ #[ test]
233
+ fn operation_getters ( ) {
234
+ let operation = build_operation ( ) ;
216
235
217
236
assert_eq ! ( operation. get_state( ) , State :: Pending ) ;
218
237
assert_eq ! ( operation. get_extension_type( ) , ExtensionType :: RateLimit ) ;
219
238
assert_eq ! ( operation. get_failure_mode( ) , FailureMode :: Deny ) ;
220
- assert_eq ! ( operation. get_result( ) , Result :: Ok ( 1 ) ) ;
239
+ assert_eq ! ( operation. get_result( ) , Ok ( 200 ) ) ;
221
240
}
222
241
223
242
#[ test]
224
243
fn operation_transition ( ) {
225
- let extension = Rc :: new ( Extension :: default ( ) ) ;
226
- let mut operation = Operation :: new (
227
- extension,
228
- (
229
- Rc :: new ( build_grpc_service_handler ( ) ) ,
230
- GrpcMessage :: RateLimit ( build_message ( ) ) ,
231
- ) ,
232
- ) ;
244
+ let mut operation = build_operation ( ) ;
233
245
assert_eq ! ( operation. get_state( ) , State :: Pending ) ;
234
246
operation. trigger ( ) ;
235
247
assert_eq ! ( operation. get_state( ) , State :: Waiting ) ;
@@ -242,23 +254,16 @@ mod tests {
242
254
fn operation_dispatcher_push_actions ( ) {
243
255
let operation_dispatcher = OperationDispatcher :: default ( ) ;
244
256
245
- assert_eq ! ( operation_dispatcher. operations. borrow( ) . len( ) , 1 ) ;
246
- let extension = Rc :: new ( Extension :: default ( ) ) ;
247
- operation_dispatcher. push_operations ( vec ! [ Operation :: new(
248
- extension,
249
- (
250
- Rc :: new( build_grpc_service_handler( ) ) ,
251
- GrpcMessage :: RateLimit ( build_message( ) ) ,
252
- ) ,
253
- ) ] ) ;
257
+ assert_eq ! ( operation_dispatcher. operations. borrow( ) . len( ) , 0 ) ;
258
+ operation_dispatcher. push_operations ( vec ! [ build_operation( ) ] ) ;
254
259
255
- assert_eq ! ( operation_dispatcher. operations. borrow( ) . len( ) , 2 ) ;
260
+ assert_eq ! ( operation_dispatcher. operations. borrow( ) . len( ) , 1 ) ;
256
261
}
257
262
258
263
#[ test]
259
264
fn operation_dispatcher_get_current_action_state ( ) {
260
265
let operation_dispatcher = OperationDispatcher :: default ( ) ;
261
-
266
+ operation_dispatcher . push_operations ( vec ! [ build_operation ( ) ] ) ;
262
267
assert_eq ! (
263
268
operation_dispatcher. get_current_operation_state( ) ,
264
269
Some ( State :: Pending )
@@ -267,14 +272,7 @@ mod tests {
267
272
268
273
#[ test]
269
274
fn operation_dispatcher_next ( ) {
270
- let extension = Rc :: new ( Extension :: default ( ) ) ;
271
- let operation = Operation :: new (
272
- extension,
273
- (
274
- Rc :: new ( build_grpc_service_handler ( ) ) ,
275
- GrpcMessage :: RateLimit ( build_message ( ) ) ,
276
- ) ,
277
- ) ;
275
+ let operation = build_operation ( ) ;
278
276
let operation_dispatcher = OperationDispatcher :: default ( ) ;
279
277
operation_dispatcher. push_operations ( vec ! [ operation] ) ;
280
278
0 commit comments