From 87ddc450c81c77b60638b2dd2979b895ba99a8cb Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Mon, 12 Aug 2024 13:36:29 +0200 Subject: [PATCH 1/2] [refactor] Extracting logic to Service trait and RateLimitService Signed-off-by: dd di cesare --- src/filter/http_context.rs | 28 +++------- src/lib.rs | 1 + src/service.rs | 8 +++ src/service/rate_limit.rs | 105 +++++++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 21 deletions(-) create mode 100644 src/service.rs create mode 100644 src/service/rate_limit.rs diff --git a/src/filter/http_context.rs b/src/filter/http_context.rs index 717563ab..aa02bdc4 100644 --- a/src/filter/http_context.rs +++ b/src/filter/http_context.rs @@ -1,16 +1,14 @@ use crate::configuration::{FailureMode, FilterConfig}; -use crate::envoy::{RateLimitRequest, RateLimitResponse, RateLimitResponse_Code}; +use crate::envoy::{RateLimitResponse, RateLimitResponse_Code}; use crate::filter::http_context::TracingHeader::{Baggage, Traceparent, Tracestate}; use crate::policy::Policy; +use crate::service::rate_limit::RateLimitService; +use crate::service::Service; use log::{debug, warn}; use protobuf::Message; use proxy_wasm::traits::{Context, HttpContext}; use proxy_wasm::types::{Action, Bytes}; use std::rc::Rc; -use std::time::Duration; - -const RATELIMIT_SERVICE_NAME: &str = "envoy.service.ratelimit.v3.RateLimitService"; -const RATELIMIT_METHOD_NAME: &str = "ShouldRateLimit"; // tracing headers pub enum TracingHeader { @@ -63,28 +61,16 @@ impl Filter { ); return Action::Continue; } - - let mut rl_req = RateLimitRequest::new(); - rl_req.set_domain(rlp.domain.clone()); - rl_req.set_hits_addend(1); - rl_req.set_descriptors(descriptors); - - let rl_req_serialized = Message::write_to_bytes(&rl_req).unwrap(); // TODO(rahulanand16nov): Error Handling - let rl_tracing_headers = self .tracing_headers .iter() .map(|(header, value)| (header.as_str(), value.as_slice())) .collect(); - match self.dispatch_grpc_call( - rlp.service.as_str(), - RATELIMIT_SERVICE_NAME, - RATELIMIT_METHOD_NAME, - rl_tracing_headers, - Some(&rl_req_serialized), - Duration::from_secs(5), - ) { + let rls = RateLimitService::new(rlp.service.as_str(), rl_tracing_headers); + let message = RateLimitService::message(rlp.domain.clone(), descriptors); + + match rls.send(message) { Ok(call_id) => { debug!( "#{} initiated gRPC call (id# {}) to Limitador", diff --git a/src/lib.rs b/src/lib.rs index 8ee6c311..fb1c60aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ mod filter; mod glob; mod policy; mod policy_index; +mod service; #[cfg(test)] mod tests { diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 00000000..b63bb827 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,8 @@ +pub(crate) mod rate_limit; + +use protobuf::Message; +use proxy_wasm::types::Status; + +pub trait Service { + fn send(&self, message: M) -> Result; +} diff --git a/src/service/rate_limit.rs b/src/service/rate_limit.rs new file mode 100644 index 00000000..170d1bce --- /dev/null +++ b/src/service/rate_limit.rs @@ -0,0 +1,105 @@ +use crate::envoy::{RateLimitDescriptor, RateLimitRequest}; +use crate::service::Service; +use protobuf::{Message, RepeatedField}; +use proxy_wasm::hostcalls::dispatch_grpc_call; +use proxy_wasm::types::Status; +use std::time::Duration; + +const RATELIMIT_SERVICE_NAME: &str = "envoy.service.ratelimit.v3.RateLimitService"; +const RATELIMIT_METHOD_NAME: &str = "ShouldRateLimit"; +pub struct RateLimitService<'a> { + endpoint: String, + metadata: Vec<(&'a str, &'a [u8])>, +} + +impl<'a> RateLimitService<'a> { + pub fn new(endpoint: &str, metadata: Vec<(&'a str, &'a [u8])>) -> RateLimitService<'a> { + Self { + endpoint: String::from(endpoint), + metadata, + } + } + pub fn message( + domain: String, + descriptors: RepeatedField, + ) -> RateLimitRequest { + RateLimitRequest { + domain, + descriptors, + hits_addend: 1, + unknown_fields: Default::default(), + cached_size: Default::default(), + } + } +} + +fn grpc_call( + upstream_name: &str, + initial_metadata: Vec<(&str, &[u8])>, + message: RateLimitRequest, +) -> Result { + let msg = Message::write_to_bytes(&message).unwrap(); // TODO(didierofrivia): Error Handling + dispatch_grpc_call( + upstream_name, + RATELIMIT_SERVICE_NAME, + RATELIMIT_METHOD_NAME, + initial_metadata, + Some(&msg), + Duration::from_secs(5), + ) +} + +impl Service for RateLimitService<'_> { + fn send(&self, message: RateLimitRequest) -> Result { + grpc_call(self.endpoint.as_str(), self.metadata.clone(), message) + } +} + +#[cfg(test)] +mod tests { + use crate::envoy::{RateLimitDescriptor, RateLimitDescriptor_Entry, RateLimitRequest}; + use crate::service::rate_limit::RateLimitService; + //use crate::service::Service; + use protobuf::{CachedSize, RepeatedField, UnknownFields}; + //use proxy_wasm::types::Status; + //use crate::filter::http_context::{Filter}; + + fn build_message() -> RateLimitRequest { + let domain = "rlp1"; + let mut field = RateLimitDescriptor::new(); + let mut entry = RateLimitDescriptor_Entry::new(); + entry.set_key("key1".to_string()); + entry.set_value("value1".to_string()); + field.set_entries(RepeatedField::from_vec(vec![entry])); + let descriptors = RepeatedField::from_vec(vec![field]); + + RateLimitService::message(domain.to_string(), descriptors.clone()) + } + #[test] + fn builds_correct_message() { + let msg = build_message(); + + assert_eq!(msg.hits_addend, 1); + assert_eq!(msg.domain, "rlp1".to_string()); + assert_eq!(msg.descriptors.first().unwrap().entries[0].key, "key1"); + assert_eq!(msg.descriptors.first().unwrap().entries[0].value, "value1"); + assert_eq!(msg.unknown_fields, UnknownFields::default()); + assert_eq!(msg.cached_size, CachedSize::default()); + } + /*#[test] + fn sends_message() { + let msg = build_message(); + let metadata = vec![("header-1", "value-1".as_bytes())]; + let rls = RateLimitService::new("limitador-cluster", metadata); + + // TODO(didierofrivia): When we have a grpc response type, assert the async response + } + + fn grpc_call( + _upstream_name: &str, + _initial_metadata: Vec<(&str, &[u8])>, + _message: RateLimitRequest, + ) -> Result { + Ok(1) + } */ +} From 301188137197379b594c44ff3c0f3b6b3ac02594 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Tue, 20 Aug 2024 10:48:21 +0200 Subject: [PATCH 2/2] [refactor] Passing `tracing_headers` to `RateLimitService` Signed-off-by: dd di cesare --- src/filter/http_context.rs | 10 +++------- src/service/rate_limit.rs | 24 ++++++++++++++++-------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/filter/http_context.rs b/src/filter/http_context.rs index aa02bdc4..281eab93 100644 --- a/src/filter/http_context.rs +++ b/src/filter/http_context.rs @@ -11,6 +11,7 @@ use proxy_wasm::types::{Action, Bytes}; use std::rc::Rc; // tracing headers +#[derive(Clone)] pub enum TracingHeader { Traceparent, Tracestate, @@ -22,7 +23,7 @@ impl TracingHeader { [Traceparent, Tracestate, Baggage] } - fn as_str(&self) -> &'static str { + pub fn as_str(&self) -> &'static str { match self { Traceparent => "traceparent", Tracestate => "tracestate", @@ -61,13 +62,8 @@ impl Filter { ); return Action::Continue; } - let rl_tracing_headers = self - .tracing_headers - .iter() - .map(|(header, value)| (header.as_str(), value.as_slice())) - .collect(); - let rls = RateLimitService::new(rlp.service.as_str(), rl_tracing_headers); + let rls = RateLimitService::new(rlp.service.as_str(), self.tracing_headers.clone()); let message = RateLimitService::message(rlp.domain.clone(), descriptors); match rls.send(message) { diff --git a/src/service/rate_limit.rs b/src/service/rate_limit.rs index 170d1bce..6c4726c5 100644 --- a/src/service/rate_limit.rs +++ b/src/service/rate_limit.rs @@ -1,22 +1,23 @@ use crate::envoy::{RateLimitDescriptor, RateLimitRequest}; +use crate::filter::http_context::TracingHeader; use crate::service::Service; use protobuf::{Message, RepeatedField}; use proxy_wasm::hostcalls::dispatch_grpc_call; -use proxy_wasm::types::Status; +use proxy_wasm::types::{Bytes, Status}; use std::time::Duration; const RATELIMIT_SERVICE_NAME: &str = "envoy.service.ratelimit.v3.RateLimitService"; const RATELIMIT_METHOD_NAME: &str = "ShouldRateLimit"; -pub struct RateLimitService<'a> { +pub struct RateLimitService { endpoint: String, - metadata: Vec<(&'a str, &'a [u8])>, + tracing_headers: Vec<(TracingHeader, Bytes)>, } -impl<'a> RateLimitService<'a> { - pub fn new(endpoint: &str, metadata: Vec<(&'a str, &'a [u8])>) -> RateLimitService<'a> { +impl RateLimitService { + pub fn new(endpoint: &str, metadata: Vec<(TracingHeader, Bytes)>) -> RateLimitService { Self { endpoint: String::from(endpoint), - metadata, + tracing_headers: metadata, } } pub fn message( @@ -49,9 +50,16 @@ fn grpc_call( ) } -impl Service for RateLimitService<'_> { +impl Service for RateLimitService { fn send(&self, message: RateLimitRequest) -> Result { - grpc_call(self.endpoint.as_str(), self.metadata.clone(), message) + grpc_call( + self.endpoint.as_str(), + self.tracing_headers + .iter() + .map(|(header, value)| (header.as_str(), value.as_slice())) + .collect(), + message, + ) } }