Skip to content

Commit dacfcdc

Browse files
committed
[refactor] Fix OperationDispatcher.next() behaviour
* Bonus: Addressed review regarding testing and Fn types Signed-off-by: dd di cesare <[email protected]>
1 parent 3b675f1 commit dacfcdc

File tree

2 files changed

+57
-36
lines changed

2 files changed

+57
-36
lines changed

src/operation_dispatcher.rs

+49-28
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::configuration::{Extension, ExtensionType, FailureMode};
22
use crate::envoy::RateLimitDescriptor;
33
use crate::policy::Policy;
4-
use crate::service::{GetMapValuesBytes, GrpcCall, GrpcMessage, GrpcServiceHandler};
4+
use crate::service::{GetMapValuesBytesFn, GrpcCallFn, GrpcMessage, GrpcServiceHandler};
55
use protobuf::RepeatedField;
66
use proxy_wasm::hostcalls;
77
use proxy_wasm::types::{Bytes, MapType, Status};
@@ -38,8 +38,8 @@ pub(crate) struct Operation {
3838
result: Result<u32, Status>,
3939
extension: Rc<Extension>,
4040
procedure: Procedure,
41-
grpc_call: GrpcCall,
42-
get_map_values_bytes: GetMapValuesBytes,
41+
grpc_call_fn: GrpcCallFn,
42+
get_map_values_bytes_fn: GetMapValuesBytesFn,
4343
}
4444

4545
#[allow(dead_code)]
@@ -50,17 +50,17 @@ impl Operation {
5050
result: Err(Status::Empty),
5151
extension,
5252
procedure,
53-
grpc_call,
54-
get_map_values_bytes,
53+
grpc_call_fn,
54+
get_map_values_bytes_fn,
5555
}
5656
}
5757

5858
fn trigger(&mut self) {
5959
if let State::Done = self.state {
6060
} else {
6161
self.result = self.procedure.0.send(
62-
self.get_map_values_bytes,
63-
self.grpc_call,
62+
self.get_map_values_bytes_fn,
63+
self.grpc_call_fn,
6464
self.procedure.1.clone(),
6565
);
6666
self.state.next();
@@ -147,7 +147,8 @@ impl OperationDispatcher {
147147
let mut operations = self.operations.borrow_mut();
148148
if let Some((i, operation)) = operations.iter_mut().enumerate().next() {
149149
if let State::Done = operation.get_state() {
150-
Some(operations.remove(i))
150+
operations.remove(i);
151+
operations.get(i).cloned() // The next op is now at `i`
151152
} else {
152153
operation.trigger();
153154
Some(operation.clone())
@@ -158,7 +159,7 @@ impl OperationDispatcher {
158159
}
159160
}
160161

161-
fn grpc_call(
162+
fn grpc_call_fn(
162163
upstream_name: &str,
163164
service_name: &str,
164165
method_name: &str,
@@ -176,7 +177,7 @@ fn grpc_call(
176177
)
177178
}
178179

179-
fn get_map_values_bytes(map_type: MapType, key: &str) -> Result<Option<Bytes>, Status> {
180+
fn get_map_values_bytes_fn(map_type: MapType, key: &str) -> Result<Option<Bytes>, Status> {
180181
hostcalls::get_map_value_bytes(map_type, key)
181182
}
182183

@@ -186,7 +187,7 @@ mod tests {
186187
use crate::envoy::RateLimitRequest;
187188
use std::time::Duration;
188189

189-
fn grpc_call(
190+
fn grpc_call_fn_stub(
190191
_upstream_name: &str,
191192
_service_name: &str,
192193
_method_name: &str,
@@ -197,7 +198,10 @@ mod tests {
197198
Ok(200)
198199
}
199200

200-
fn get_map_values_bytes(_map_type: MapType, _key: &str) -> Result<Option<Bytes>, Status> {
201+
fn get_map_values_bytes_fn_stub(
202+
_map_type: MapType,
203+
_key: &str,
204+
) -> Result<Option<Bytes>, Status> {
201205
Ok(Some(Vec::new()))
202206
}
203207

@@ -218,14 +222,14 @@ mod tests {
218222
fn build_operation() -> Operation {
219223
Operation {
220224
state: State::Pending,
221-
result: Ok(200),
225+
result: Ok(1),
222226
extension: Rc::new(Extension::default()),
223227
procedure: (
224228
Rc::new(build_grpc_service_handler()),
225229
GrpcMessage::RateLimit(build_message()),
226230
),
227-
grpc_call,
228-
get_map_values_bytes,
231+
grpc_call_fn: grpc_call_fn_stub,
232+
get_map_values_bytes_fn: get_map_values_bytes_fn_stub,
229233
}
230234
}
231235

@@ -236,7 +240,7 @@ mod tests {
236240
assert_eq!(operation.get_state(), State::Pending);
237241
assert_eq!(operation.get_extension_type(), ExtensionType::RateLimit);
238242
assert_eq!(operation.get_failure_mode(), FailureMode::Deny);
239-
assert_eq!(operation.get_result(), Ok(200));
243+
assert_eq!(operation.get_result(), Ok(1));
240244
}
241245

242246
#[test]
@@ -272,20 +276,37 @@ mod tests {
272276

273277
#[test]
274278
fn operation_dispatcher_next() {
275-
let operation = build_operation();
276279
let operation_dispatcher = OperationDispatcher::default();
277-
operation_dispatcher.push_operations(vec![operation]);
280+
operation_dispatcher.push_operations(vec![build_operation(), build_operation()]);
278281

279-
if let Some(operation) = operation_dispatcher.next() {
280-
assert_eq!(operation.get_result(), Ok(200));
281-
assert_eq!(operation.get_state(), State::Waiting);
282-
}
282+
assert_eq!(operation_dispatcher.get_current_operation_result(), Ok(1));
283+
assert_eq!(
284+
operation_dispatcher.get_current_operation_state(),
285+
Some(State::Pending)
286+
);
283287

284-
if let Some(operation) = operation_dispatcher.next() {
285-
assert_eq!(operation.get_result(), Ok(200));
286-
assert_eq!(operation.get_state(), State::Done);
287-
}
288-
operation_dispatcher.next();
289-
assert_eq!(operation_dispatcher.get_current_operation_state(), None);
288+
let mut op = operation_dispatcher.next();
289+
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
290+
assert_eq!(op.unwrap().get_state(), State::Waiting);
291+
292+
op = operation_dispatcher.next();
293+
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
294+
assert_eq!(op.unwrap().get_state(), State::Done);
295+
296+
op = operation_dispatcher.next();
297+
assert_eq!(op.clone().unwrap().get_result(), Ok(1));
298+
assert_eq!(op.unwrap().get_state(), State::Pending);
299+
300+
op = operation_dispatcher.next();
301+
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
302+
assert_eq!(op.unwrap().get_state(), State::Waiting);
303+
304+
op = operation_dispatcher.next();
305+
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
306+
assert_eq!(op.unwrap().get_state(), State::Done);
307+
308+
op = operation_dispatcher.next();
309+
assert!(op.is_none());
310+
assert!(operation_dispatcher.get_current_operation_state().is_none());
290311
}
291312
}

src/service.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ impl GrpcService {
181181
}
182182
}
183183

184-
pub type GrpcCall = fn(
184+
pub type GrpcCallFn = fn(
185185
upstream_name: &str,
186186
service_name: &str,
187187
method_name: &str,
@@ -190,7 +190,7 @@ pub type GrpcCall = fn(
190190
timeout: Duration,
191191
) -> Result<u32, Status>;
192192

193-
pub type GetMapValuesBytes = fn(map_type: MapType, key: &str) -> Result<Option<Bytes>, Status>;
193+
pub type GetMapValuesBytesFn = fn(map_type: MapType, key: &str) -> Result<Option<Bytes>, Status>;
194194

195195
pub struct GrpcServiceHandler {
196196
service: Rc<GrpcService>,
@@ -207,19 +207,19 @@ impl GrpcServiceHandler {
207207

208208
pub fn send(
209209
&self,
210-
get_map_values_bytes: GetMapValuesBytes,
211-
grpc_call: GrpcCall,
210+
get_map_values_bytes_fn: GetMapValuesBytesFn,
211+
grpc_call_fn: GrpcCallFn,
212212
message: GrpcMessage,
213213
) -> Result<u32, Status> {
214214
let msg = Message::write_to_bytes(&message).unwrap();
215215
let metadata = self
216216
.header_resolver
217-
.get(get_map_values_bytes)
217+
.get(get_map_values_bytes_fn)
218218
.iter()
219219
.map(|(header, value)| (*header, value.as_slice()))
220220
.collect();
221221

222-
grpc_call(
222+
grpc_call_fn(
223223
self.service.endpoint(),
224224
self.service.name(),
225225
self.service.method(),
@@ -255,12 +255,12 @@ impl HeaderResolver {
255255
}
256256
}
257257

258-
pub fn get(&self, get_map_values_bytes: GetMapValuesBytes) -> &Vec<(&'static str, Bytes)> {
258+
pub fn get(&self, get_map_values_bytes_fn: GetMapValuesBytesFn) -> &Vec<(&'static str, Bytes)> {
259259
self.headers.get_or_init(|| {
260260
let mut headers = Vec::new();
261261
for header in TracingHeader::all() {
262262
if let Ok(Some(value)) =
263-
get_map_values_bytes(MapType::HttpRequestHeaders, (*header).as_str())
263+
get_map_values_bytes_fn(MapType::HttpRequestHeaders, (*header).as_str())
264264
{
265265
headers.push(((*header).as_str(), value));
266266
}

0 commit comments

Comments
 (0)