Skip to content

Commit 17cc2f0

Browse files
Merge pull request #77 from Kuadrant/action_dispatcher
Operation dispatcher FSM
2 parents 6c9343d + c221036 commit 17cc2f0

10 files changed

+685
-96
lines changed

src/configuration.rs

+97-20
Original file line numberDiff line numberDiff line change
@@ -486,16 +486,7 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
486486
let services = config
487487
.extensions
488488
.into_iter()
489-
.map(|(name, ext)| {
490-
(
491-
name,
492-
Rc::new(GrpcService::new(
493-
ext.extension_type,
494-
ext.endpoint,
495-
ext.failure_mode,
496-
)),
497-
)
498-
})
489+
.map(|(name, ext)| (name, Rc::new(GrpcService::new(Rc::new(ext)))))
499490
.collect();
500491

501492
Ok(Self {
@@ -505,18 +496,19 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
505496
}
506497
}
507498

508-
#[derive(Deserialize, Debug, Clone, Default)]
499+
#[derive(Deserialize, Debug, Clone, Default, PartialEq)]
509500
#[serde(rename_all = "lowercase")]
510501
pub enum FailureMode {
511502
#[default]
512503
Deny,
513504
Allow,
514505
}
515506

516-
#[derive(Deserialize, Debug, Clone)]
507+
#[derive(Deserialize, Debug, Clone, Default, PartialEq)]
517508
#[serde(rename_all = "lowercase")]
518509
pub enum ExtensionType {
519510
Auth,
511+
#[default]
520512
RateLimit,
521513
}
522514

@@ -527,7 +519,7 @@ pub struct PluginConfiguration {
527519
pub policies: Vec<Policy>,
528520
}
529521

530-
#[derive(Deserialize, Debug, Clone)]
522+
#[derive(Deserialize, Debug, Clone, Default)]
531523
#[serde(rename_all = "camelCase")]
532524
pub struct Extension {
533525
#[serde(rename = "type")]
@@ -537,6 +529,14 @@ pub struct Extension {
537529
pub failure_mode: FailureMode,
538530
}
539531

532+
#[derive(Deserialize, Debug, Clone)]
533+
#[serde(rename_all = "camelCase")]
534+
pub struct Action {
535+
pub extension: String,
536+
#[allow(dead_code)]
537+
pub data: DataType,
538+
}
539+
540540
#[cfg(test)]
541541
mod test {
542542
use super::*;
@@ -587,7 +587,18 @@ mod test {
587587
"selector": "auth.metadata.username"
588588
}
589589
}]
590-
}]
590+
}],
591+
"actions": [
592+
{
593+
"extension": "limitador",
594+
"data": {
595+
"static": {
596+
"key": "rlp-ns-A/rlp-name-A",
597+
"value": "1"
598+
}
599+
}
600+
}
601+
]
591602
}]
592603
}"#;
593604

@@ -682,7 +693,18 @@ mod test {
682693
"default": "my_selector_default_value"
683694
}
684695
}]
685-
}]
696+
}],
697+
"actions": [
698+
{
699+
"extension": "limitador",
700+
"data": {
701+
"static": {
702+
"key": "rlp-ns-A/rlp-name-A",
703+
"value": "1"
704+
}
705+
}
706+
}
707+
]
686708
}]
687709
}"#;
688710
let res = serde_json::from_str::<PluginConfiguration>(config);
@@ -759,7 +781,18 @@ mod test {
759781
}]
760782
}],
761783
"data": [ { "selector": { "selector": "my.selector.path" } }]
762-
}]
784+
}],
785+
"actions": [
786+
{
787+
"extension": "limitador",
788+
"data": {
789+
"static": {
790+
"key": "rlp-ns-A/rlp-name-A",
791+
"value": "1"
792+
}
793+
}
794+
}
795+
]
763796
}]
764797
}"#;
765798
let res = serde_json::from_str::<PluginConfiguration>(config);
@@ -825,7 +858,18 @@ mod test {
825858
"selector": "auth.metadata.username"
826859
}
827860
}]
828-
}]
861+
}],
862+
"actions": [
863+
{
864+
"extension": "limitador",
865+
"data": {
866+
"static": {
867+
"key": "rlp-ns-A/rlp-name-A",
868+
"value": "1"
869+
}
870+
}
871+
}
872+
]
829873
}]
830874
}"#;
831875
let res = serde_json::from_str::<PluginConfiguration>(config);
@@ -872,7 +916,18 @@ mod test {
872916
"selector": "auth.metadata.username"
873917
}
874918
}]
875-
}]
919+
}],
920+
"actions": [
921+
{
922+
"extension": "limitador",
923+
"data": {
924+
"static": {
925+
"key": "rlp-ns-A/rlp-name-A",
926+
"value": "1"
927+
}
928+
}
929+
}
930+
]
876931
}]
877932
}"#;
878933
let res = serde_json::from_str::<PluginConfiguration>(bad_config);
@@ -902,7 +957,18 @@ mod test {
902957
"value": "1"
903958
}
904959
}]
905-
}]
960+
}],
961+
"actions": [
962+
{
963+
"extension": "limitador",
964+
"data": {
965+
"static": {
966+
"key": "rlp-ns-A/rlp-name-A",
967+
"value": "1"
968+
}
969+
}
970+
}
971+
]
906972
}]
907973
}"#;
908974
let res = serde_json::from_str::<PluginConfiguration>(bad_config);
@@ -934,7 +1000,18 @@ mod test {
9341000
}]
9351001
}],
9361002
"data": [ { "selector": { "selector": "my.selector.path" } }]
937-
}]
1003+
}],
1004+
"actions": [
1005+
{
1006+
"extension": "limitador",
1007+
"data": {
1008+
"static": {
1009+
"key": "rlp-ns-A/rlp-name-A",
1010+
"value": "1"
1011+
}
1012+
}
1013+
}
1014+
]
9381015
}]
9391016
}"#;
9401017
let res = serde_json::from_str::<PluginConfiguration>(bad_config);

src/filter/http_context.rs

+20-27
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use crate::configuration::{FailureMode, FilterConfig};
22
use crate::envoy::{RateLimitResponse, RateLimitResponse_Code};
3+
use crate::operation_dispatcher::OperationDispatcher;
34
use crate::policy::Policy;
4-
use crate::service::rate_limit::RateLimitService;
5-
use crate::service::{GrpcServiceHandler, HeaderResolver};
65
use log::{debug, warn};
76
use protobuf::Message;
87
use proxy_wasm::traits::{Context, HttpContext};
@@ -13,7 +12,7 @@ pub struct Filter {
1312
pub context_id: u32,
1413
pub config: Rc<FilterConfig>,
1514
pub response_headers_to_add: Vec<(String, String)>,
16-
pub header_resolver: Rc<HeaderResolver>,
15+
pub operation_dispatcher: OperationDispatcher,
1716
}
1817

1918
impl Filter {
@@ -40,33 +39,27 @@ impl Filter {
4039
return Action::Continue;
4140
}
4241

43-
// todo(adam-cattermole): For now we just get the first GrpcService but we expect to have
44-
// an action which links to the service that should be used
45-
let rls = self
46-
.config
47-
.services
48-
.values()
49-
.next()
50-
.expect("expect a value");
51-
52-
let handler = GrpcServiceHandler::new(Rc::clone(rls), Rc::clone(&self.header_resolver));
53-
let message = RateLimitService::message(rlp.domain.clone(), descriptors);
42+
self.operation_dispatcher.build_operations(rlp, descriptors);
5443

55-
match handler.send(message) {
56-
Ok(call_id) => {
57-
debug!(
58-
"#{} initiated gRPC call (id# {}) to Limitador",
59-
self.context_id, call_id
60-
);
61-
Action::Pause
62-
}
63-
Err(e) => {
64-
warn!("gRPC call to Limitador failed! {e:?}");
65-
if let FailureMode::Deny = rls.failure_mode() {
66-
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
44+
if let Some(operation) = self.operation_dispatcher.next() {
45+
match operation.get_result() {
46+
Ok(call_id) => {
47+
debug!(
48+
"#{} initiated gRPC call (id# {}) to Limitador",
49+
self.context_id, call_id
50+
);
51+
Action::Pause
52+
}
53+
Err(e) => {
54+
warn!("gRPC call to Limitador failed! {e:?}");
55+
if let FailureMode::Deny = operation.get_failure_mode() {
56+
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
57+
}
58+
Action::Continue
6759
}
68-
Action::Continue
6960
}
61+
} else {
62+
Action::Continue
7063
}
7164
}
7265

src/filter/root_context.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use crate::configuration::{FilterConfig, PluginConfiguration};
22
use crate::filter::http_context::Filter;
3-
use crate::service::HeaderResolver;
3+
use crate::operation_dispatcher::OperationDispatcher;
4+
use crate::service::{GrpcServiceHandler, HeaderResolver};
45
use const_format::formatcp;
56
use log::{debug, error, info};
67
use proxy_wasm::traits::{Context, HttpContext, RootContext};
78
use proxy_wasm::types::ContextType;
9+
use std::collections::HashMap;
810
use std::rc::Rc;
911

1012
const WASM_SHIM_VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -37,11 +39,23 @@ impl RootContext for FilterRoot {
3739

3840
fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
3941
debug!("#{} create_http_context", context_id);
42+
let mut service_handlers: HashMap<String, Rc<GrpcServiceHandler>> = HashMap::new();
43+
self.config
44+
.services
45+
.iter()
46+
.for_each(|(extension, service)| {
47+
service_handlers
48+
.entry(extension.clone())
49+
.or_insert(Rc::from(GrpcServiceHandler::new(
50+
Rc::clone(service),
51+
Rc::new(HeaderResolver::new()),
52+
)));
53+
});
4054
Some(Box::new(Filter {
4155
context_id,
4256
config: Rc::clone(&self.config),
4357
response_headers_to_add: Vec::default(),
44-
header_resolver: Rc::new(HeaderResolver::new()),
58+
operation_dispatcher: OperationDispatcher::new(service_handlers),
4559
}))
4660
}
4761

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod configuration;
33
mod envoy;
44
mod filter;
55
mod glob;
6+
mod operation_dispatcher;
67
mod policy;
78
mod policy_index;
89
mod service;

0 commit comments

Comments
 (0)