Skip to content

Commit

Permalink
[refactor] Indexing waiting operations by token_id
Browse files Browse the repository at this point in the history
Signed-off-by: dd di cesare <[email protected]>
  • Loading branch information
didierofrivia committed Sep 13, 2024
1 parent c8d6c4d commit 0712164
Showing 1 changed file with 58 additions and 17 deletions.
75 changes: 58 additions & 17 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,30 @@ impl Operation {
pub fn new(extension: Rc<Extension>, procedure: Procedure) -> Self {
Self {
state: State::Pending,
result: Err(Status::Empty),
result: Ok(0), // Heuristics: zero represents that it's not been triggered, following `hostcalls` example
extension,
procedure,
grpc_call_fn,
get_map_values_bytes_fn,
}
}

fn trigger(&mut self) {
if let State::Done = self.state {
} else {
self.result = self.procedure.0.send(
self.get_map_values_bytes_fn,
self.grpc_call_fn,
self.procedure.1.clone(),
);
self.state.next();
fn trigger(&mut self) -> Result<u32, Status> {
match self.state {
State::Pending => {
self.result = self.procedure.0.send(
self.get_map_values_bytes_fn,
self.grpc_call_fn,
self.procedure.1.clone(),
);
self.state.next();
self.result
}
State::Waiting => {
self.state.next();
self.result
}
State::Done => self.result,
}
}

Expand All @@ -88,6 +95,7 @@ impl Operation {
#[allow(dead_code)]
pub struct OperationDispatcher {
operations: RefCell<Vec<Operation>>,
waiting_operations: RefCell<HashMap<u32, Operation>>, // TODO(didierofrivia): Maybe keep references or Rc
service_handlers: HashMap<String, Rc<GrpcServiceHandler>>,
}

Expand All @@ -96,13 +104,15 @@ impl OperationDispatcher {
pub fn default() -> Self {
OperationDispatcher {
operations: RefCell::new(vec![]),
waiting_operations: RefCell::new(HashMap::default()),
service_handlers: HashMap::default(),
}
}
pub fn new(service_handlers: HashMap<String, Rc<GrpcServiceHandler>>) -> Self {
Self {
service_handlers,
operations: RefCell::new(vec![]),
waiting_operations: RefCell::new(HashMap::new()),
}
}

Expand Down Expand Up @@ -148,10 +158,18 @@ impl OperationDispatcher {
let mut operations = self.operations.borrow_mut();
if let Some((i, operation)) = operations.iter_mut().enumerate().next() {
if let State::Done = operation.get_state() {
if let Ok(token_id) = operation.result {
self.waiting_operations.borrow_mut().remove(&token_id);
} // If result was Err, means the operation wasn't indexed

operations.remove(i);
operations.get(i).cloned() // The next op is now at `i`
} else {
operation.trigger();
if let Ok(token_id) = operation.trigger() {
self.waiting_operations
.borrow_mut()
.insert(token_id, operation.clone());
} // TODO(didierofrivia): Decide on indexing the failed operations.
Some(operation.clone())
}
} else {
Expand Down Expand Up @@ -223,7 +241,7 @@ mod tests {
fn build_operation() -> Operation {
Operation {
state: State::Pending,
result: Ok(1),
result: Ok(0),
extension: Rc::new(Extension::default()),
procedure: (
Rc::new(build_grpc_service_handler()),
Expand All @@ -241,16 +259,19 @@ mod tests {
assert_eq!(operation.get_state(), State::Pending);
assert_eq!(operation.get_extension_type(), ExtensionType::RateLimit);
assert_eq!(operation.get_failure_mode(), FailureMode::Deny);
assert_eq!(operation.get_result(), Ok(1));
assert_eq!(operation.get_result(), Ok(0));
}

#[test]
fn operation_transition() {
let mut operation = build_operation();
assert_eq!(operation.result, Ok(0));
assert_eq!(operation.get_state(), State::Pending);
operation.trigger();
let mut res = operation.trigger();
assert_eq!(res, Ok(200));
assert_eq!(operation.get_state(), State::Waiting);
operation.trigger();
res = operation.trigger();
assert_eq!(res, Ok(200));
assert_eq!(operation.result, Ok(200));
assert_eq!(operation.get_state(), State::Done);
}
Expand Down Expand Up @@ -280,27 +301,43 @@ mod tests {
let operation_dispatcher = OperationDispatcher::default();
operation_dispatcher.push_operations(vec![build_operation(), build_operation()]);

assert_eq!(operation_dispatcher.get_current_operation_result(), Ok(1));
assert_eq!(operation_dispatcher.get_current_operation_result(), Ok(0));
assert_eq!(
operation_dispatcher.get_current_operation_state(),
Some(State::Pending)
);
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
0
);

let mut op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
assert_eq!(op.unwrap().get_state(), State::Waiting);
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
1
);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
assert_eq!(op.unwrap().get_state(), State::Done);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(1));
assert_eq!(op.clone().unwrap().get_result(), Ok(0));
assert_eq!(op.unwrap().get_state(), State::Pending);
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
0
);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
assert_eq!(op.unwrap().get_state(), State::Waiting);
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
1
);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
Expand All @@ -309,5 +346,9 @@ mod tests {
op = operation_dispatcher.next();
assert!(op.is_none());
assert!(operation_dispatcher.get_current_operation_state().is_none());
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
0
);
}
}

0 comments on commit 0712164

Please sign in to comment.