diff --git a/src/operation_dispatcher.rs b/src/operation_dispatcher.rs index fa0b5a86..c6560586 100644 --- a/src/operation_dispatcher.rs +++ b/src/operation_dispatcher.rs @@ -48,7 +48,7 @@ impl Operation { pub fn new(extension: Rc, procedure: Procedure) -> Self { Self { state: State::Pending, - result: Err(Status::Empty), + result: Ok(0), // Heuristics: zero represents that it's not been triggered, following `hostcalls` example extension, procedure, grpc_call_fn, @@ -56,15 +56,22 @@ impl Operation { } } - fn trigger(&mut self) { - if let State::Done = self.state { - } else { - self.result = self.procedure.0.send( - self.get_map_values_bytes_fn, - self.grpc_call_fn, - self.procedure.1.clone(), - ); - self.state.next(); + fn trigger(&mut self) -> Result { + match self.state { + State::Pending => { + self.result = self.procedure.0.send( + self.get_map_values_bytes_fn, + self.grpc_call_fn, + self.procedure.1.clone(), + ); + self.state.next(); + self.result + } + State::Waiting => { + self.state.next(); + self.result + } + State::Done => self.result, } } @@ -88,6 +95,7 @@ impl Operation { #[allow(dead_code)] pub struct OperationDispatcher { operations: RefCell>, + waiting_operations: RefCell>, // TODO(didierofrivia): Maybe keep references or Rc service_handlers: HashMap>, } @@ -96,6 +104,7 @@ impl OperationDispatcher { pub fn default() -> Self { OperationDispatcher { operations: RefCell::new(vec![]), + waiting_operations: RefCell::new(HashMap::default()), service_handlers: HashMap::default(), } } @@ -103,6 +112,7 @@ impl OperationDispatcher { Self { service_handlers, operations: RefCell::new(vec![]), + waiting_operations: RefCell::new(HashMap::new()), } } @@ -148,10 +158,18 @@ impl OperationDispatcher { let mut operations = self.operations.borrow_mut(); if let Some((i, operation)) = operations.iter_mut().enumerate().next() { if let State::Done = operation.get_state() { + if let Ok(token_id) = operation.result { + self.waiting_operations.borrow_mut().remove(&token_id); + } // If result was Err, means the operation wasn't indexed + operations.remove(i); operations.get(i).cloned() // The next op is now at `i` } else { - operation.trigger(); + if let Ok(token_id) = operation.trigger() { + self.waiting_operations + .borrow_mut() + .insert(token_id, operation.clone()); + } // TODO(didierofrivia): Decide on indexing the failed operations. Some(operation.clone()) } } else { @@ -223,7 +241,7 @@ mod tests { fn build_operation() -> Operation { Operation { state: State::Pending, - result: Ok(1), + result: Ok(0), extension: Rc::new(Extension::default()), procedure: ( Rc::new(build_grpc_service_handler()), @@ -241,16 +259,19 @@ mod tests { assert_eq!(operation.get_state(), State::Pending); assert_eq!(operation.get_extension_type(), ExtensionType::RateLimit); assert_eq!(operation.get_failure_mode(), FailureMode::Deny); - assert_eq!(operation.get_result(), Ok(1)); + assert_eq!(operation.get_result(), Ok(0)); } #[test] fn operation_transition() { let mut operation = build_operation(); + assert_eq!(operation.result, Ok(0)); assert_eq!(operation.get_state(), State::Pending); - operation.trigger(); + let mut res = operation.trigger(); + assert_eq!(res, Ok(200)); assert_eq!(operation.get_state(), State::Waiting); - operation.trigger(); + res = operation.trigger(); + assert_eq!(res, Ok(200)); assert_eq!(operation.result, Ok(200)); assert_eq!(operation.get_state(), State::Done); } @@ -280,27 +301,43 @@ mod tests { let operation_dispatcher = OperationDispatcher::default(); operation_dispatcher.push_operations(vec![build_operation(), build_operation()]); - assert_eq!(operation_dispatcher.get_current_operation_result(), Ok(1)); + assert_eq!(operation_dispatcher.get_current_operation_result(), Ok(0)); assert_eq!( operation_dispatcher.get_current_operation_state(), Some(State::Pending) ); + assert_eq!( + operation_dispatcher.waiting_operations.borrow_mut().len(), + 0 + ); let mut op = operation_dispatcher.next(); assert_eq!(op.clone().unwrap().get_result(), Ok(200)); assert_eq!(op.unwrap().get_state(), State::Waiting); + assert_eq!( + operation_dispatcher.waiting_operations.borrow_mut().len(), + 1 + ); op = operation_dispatcher.next(); assert_eq!(op.clone().unwrap().get_result(), Ok(200)); assert_eq!(op.unwrap().get_state(), State::Done); op = operation_dispatcher.next(); - assert_eq!(op.clone().unwrap().get_result(), Ok(1)); + assert_eq!(op.clone().unwrap().get_result(), Ok(0)); assert_eq!(op.unwrap().get_state(), State::Pending); + assert_eq!( + operation_dispatcher.waiting_operations.borrow_mut().len(), + 0 + ); op = operation_dispatcher.next(); assert_eq!(op.clone().unwrap().get_result(), Ok(200)); assert_eq!(op.unwrap().get_state(), State::Waiting); + assert_eq!( + operation_dispatcher.waiting_operations.borrow_mut().len(), + 1 + ); op = operation_dispatcher.next(); assert_eq!(op.clone().unwrap().get_result(), Ok(200)); @@ -309,5 +346,9 @@ mod tests { op = operation_dispatcher.next(); assert!(op.is_none()); assert!(operation_dispatcher.get_current_operation_state().is_none()); + assert_eq!( + operation_dispatcher.waiting_operations.borrow_mut().len(), + 0 + ); } }