Skip to content

Commit

Permalink
[refactor] grpc_call function delegated to the caller
Browse files Browse the repository at this point in the history
Signed-off-by: dd di cesare <[email protected]>
  • Loading branch information
didierofrivia committed Sep 4, 2024
1 parent ac45141 commit e87c46d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
1 change: 0 additions & 1 deletion src/filter/root_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl RootContext for FilterRoot {
.or_insert(Rc::from(GrpcServiceHandler::new(
Rc::clone(service),
Rc::new(HeaderResolver::new()),
None,
)));
});
Some(Box::new(Filter {
Expand Down
28 changes: 22 additions & 6 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ use crate::envoy::RateLimitDescriptor;
use crate::policy::Policy;
use crate::service::{GrpcMessage, GrpcServiceHandler};
use protobuf::RepeatedField;
use proxy_wasm::hostcalls;
use proxy_wasm::types::Status;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::time::Duration;

#[allow(dead_code)]
#[derive(PartialEq, Debug, Clone)]
Expand All @@ -27,6 +29,24 @@ impl State {
}
}

fn grpc_call(
upstream_name: &str,
service_name: &str,
method_name: &str,
initial_metadata: Vec<(&str, &[u8])>,
message: Option<&[u8]>,
timeout: Duration,
) -> Result<u32, Status> {
hostcalls::dispatch_grpc_call(
upstream_name,
service_name,
method_name,
initial_metadata,
message,
timeout,
)
}

type Procedure = (Rc<GrpcServiceHandler>, GrpcMessage);

#[allow(dead_code)]
Expand Down Expand Up @@ -56,7 +76,7 @@ impl Operation {
pub fn trigger(&mut self) {
if let State::Done = self.state {
} else {
self.result = self.procedure.0.send(self.procedure.1.clone());
self.result = self.procedure.0.send(grpc_call, self.procedure.1.clone());
self.state.next();
}
}
Expand Down Expand Up @@ -170,11 +190,7 @@ mod tests {
}

fn build_grpc_service_handler() -> GrpcServiceHandler {
GrpcServiceHandler::new(
Rc::new(Default::default()),
Rc::new(Default::default()),
Some(grpc_call),
)
GrpcServiceHandler::new(Rc::new(Default::default()), Rc::new(Default::default()))
}

fn build_message() -> RateLimitRequest {
Expand Down
13 changes: 3 additions & 10 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use protobuf::{
Clear, CodedInputStream, CodedOutputStream, Message, ProtobufResult, UnknownFields,
};
use proxy_wasm::hostcalls;
use proxy_wasm::hostcalls::dispatch_grpc_call;
use proxy_wasm::types::{Bytes, MapType, Status};
use std::any::Any;
use std::cell::OnceCell;
Expand Down Expand Up @@ -195,23 +194,17 @@ type GrpcCall = fn(
pub struct GrpcServiceHandler {
service: Rc<GrpcService>,
header_resolver: Rc<HeaderResolver>,
grpc_call: GrpcCall,
}

impl GrpcServiceHandler {
pub fn new(
service: Rc<GrpcService>,
header_resolver: Rc<HeaderResolver>,
grpc_call: Option<GrpcCall>,
) -> Self {
pub fn new(service: Rc<GrpcService>, header_resolver: Rc<HeaderResolver>) -> Self {
Self {
service,
header_resolver,
grpc_call: grpc_call.unwrap_or(dispatch_grpc_call),
}
}

pub fn send(&self, message: GrpcMessage) -> Result<u32, Status> {
pub fn send(&self, grpc_call: GrpcCall, message: GrpcMessage) -> Result<u32, Status> {
let msg = Message::write_to_bytes(&message).unwrap();
let metadata = self
.header_resolver
Expand All @@ -220,7 +213,7 @@ impl GrpcServiceHandler {
.map(|(header, value)| (*header, value.as_slice()))
.collect();

(self.grpc_call)(
grpc_call(
self.service.endpoint(),
self.service.name(),
self.service.method(),
Expand Down

0 comments on commit e87c46d

Please sign in to comment.