Skip to content

Commit a852236

Browse files
Merge pull request #81 from Kuadrant/on_grpc_response_flow
On grpc response flow
2 parents 40ce002 + 7125754 commit a852236

File tree

7 files changed

+554
-260
lines changed

7 files changed

+554
-260
lines changed

src/envoy/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub use {
3737
AttributeContext_Request,
3838
},
3939
base::Metadata,
40-
external_auth::CheckRequest,
40+
external_auth::{CheckRequest, DeniedHttpResponse, OkHttpResponse},
4141
ratelimit::{RateLimitDescriptor, RateLimitDescriptor_Entry},
4242
rls::{RateLimitRequest, RateLimitResponse, RateLimitResponse_Code},
4343
};

src/filter/http_context.rs

+85-64
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use crate::configuration::{FailureMode, FilterConfig};
1+
use crate::configuration::{ExtensionType, FailureMode, FilterConfig};
22
use crate::envoy::{RateLimitResponse, RateLimitResponse_Code};
33
use crate::operation_dispatcher::OperationDispatcher;
44
use crate::policy::Policy;
5+
use crate::service::grpc_message::GrpcMessageResponse;
56
use log::{debug, warn};
6-
use protobuf::Message;
77
use proxy_wasm::traits::{Context, HttpContext};
88
use proxy_wasm::types::Action;
99
use std::rc::Rc;
@@ -29,8 +29,8 @@ impl Filter {
2929
}
3030
}
3131

32-
fn process_rate_limit_policy(&self, rlp: &Policy) -> Action {
33-
let descriptors = rlp.build_descriptors(self);
32+
fn process_policy(&self, policy: &Policy) -> Action {
33+
let descriptors = policy.build_descriptors(self);
3434
if descriptors.is_empty() {
3535
debug!(
3636
"#{} process_rate_limit_policy: empty descriptors",
@@ -39,7 +39,8 @@ impl Filter {
3939
return Action::Continue;
4040
}
4141

42-
self.operation_dispatcher.build_operations(rlp, descriptors);
42+
self.operation_dispatcher
43+
.build_operations(policy, descriptors);
4344

4445
if let Some(operation) = self.operation_dispatcher.next() {
4546
match operation.get_result() {
@@ -63,22 +64,52 @@ impl Filter {
6364
}
6465
}
6566

66-
fn handle_error_on_grpc_response(&self) {
67-
// todo(adam-cattermole): We need a method of knowing which service is the one currently
68-
// being used (the current action) so that we can get the failure mode
69-
let rls = self
70-
.config
71-
.services
72-
.values()
73-
.next()
74-
.expect("expect a value");
75-
match rls.failure_mode() {
67+
fn handle_error_on_grpc_response(&self, failure_mode: &FailureMode) {
68+
match failure_mode {
7669
FailureMode::Deny => {
7770
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
7871
}
7972
FailureMode::Allow => self.resume_http_request(),
8073
}
8174
}
75+
76+
fn process_ratelimit_grpc_response(
77+
&mut self,
78+
rl_resp: GrpcMessageResponse,
79+
failure_mode: &FailureMode,
80+
) {
81+
match rl_resp {
82+
GrpcMessageResponse::RateLimit(RateLimitResponse {
83+
overall_code: RateLimitResponse_Code::UNKNOWN,
84+
..
85+
}) => {
86+
self.handle_error_on_grpc_response(failure_mode);
87+
}
88+
GrpcMessageResponse::RateLimit(RateLimitResponse {
89+
overall_code: RateLimitResponse_Code::OVER_LIMIT,
90+
response_headers_to_add: rl_headers,
91+
..
92+
}) => {
93+
let mut response_headers = vec![];
94+
for header in &rl_headers {
95+
response_headers.push((header.get_key(), header.get_value()));
96+
}
97+
self.send_http_response(429, response_headers, Some(b"Too Many Requests\n"));
98+
}
99+
GrpcMessageResponse::RateLimit(RateLimitResponse {
100+
overall_code: RateLimitResponse_Code::OK,
101+
response_headers_to_add: additional_headers,
102+
..
103+
}) => {
104+
for header in additional_headers {
105+
self.response_headers_to_add
106+
.push((header.key, header.value));
107+
}
108+
}
109+
_ => {}
110+
}
111+
self.operation_dispatcher.next();
112+
}
82113
}
83114

84115
impl HttpContext for Filter {
@@ -97,9 +128,12 @@ impl HttpContext for Filter {
97128
);
98129
Action::Continue
99130
}
100-
Some(rlp) => {
101-
debug!("#{} ratelimitpolicy selected {}", self.context_id, rlp.name);
102-
self.process_rate_limit_policy(rlp)
131+
Some(policy) => {
132+
debug!(
133+
"#{} ratelimitpolicy selected {}",
134+
self.context_id, policy.name
135+
);
136+
self.process_policy(policy)
103137
}
104138
}
105139
}
@@ -124,55 +158,42 @@ impl Context for Filter {
124158
self.context_id
125159
);
126160

127-
let res_body_bytes = match self.get_grpc_call_response_body(0, resp_size) {
128-
Some(bytes) => bytes,
129-
None => {
130-
warn!("grpc response body is empty!");
131-
self.handle_error_on_grpc_response();
132-
return;
133-
}
134-
};
135-
136-
let rl_resp: RateLimitResponse = match Message::parse_from_bytes(&res_body_bytes) {
137-
Ok(res) => res,
138-
Err(e) => {
139-
warn!("failed to parse grpc response body into RateLimitResponse message: {e}");
140-
self.handle_error_on_grpc_response();
141-
return;
142-
}
143-
};
144-
145-
match rl_resp {
146-
RateLimitResponse {
147-
overall_code: RateLimitResponse_Code::UNKNOWN,
148-
..
149-
} => {
150-
self.handle_error_on_grpc_response();
151-
return;
152-
}
153-
RateLimitResponse {
154-
overall_code: RateLimitResponse_Code::OVER_LIMIT,
155-
response_headers_to_add: rl_headers,
156-
..
157-
} => {
158-
let mut response_headers = vec![];
159-
for header in &rl_headers {
160-
response_headers.push((header.get_key(), header.get_value()));
161+
if let Some(operation) = self.operation_dispatcher.get_operation(token_id) {
162+
let failure_mode = &operation.get_failure_mode();
163+
let res_body_bytes = match self.get_grpc_call_response_body(0, resp_size) {
164+
Some(bytes) => bytes,
165+
None => {
166+
warn!("grpc response body is empty!");
167+
self.handle_error_on_grpc_response(failure_mode);
168+
return;
161169
}
162-
self.send_http_response(429, response_headers, Some(b"Too Many Requests\n"));
163-
return;
164-
}
165-
RateLimitResponse {
166-
overall_code: RateLimitResponse_Code::OK,
167-
response_headers_to_add: additional_headers,
168-
..
169-
} => {
170-
for header in additional_headers {
171-
self.response_headers_to_add
172-
.push((header.key, header.value));
170+
};
171+
let res = match GrpcMessageResponse::new(
172+
operation.get_extension_type(),
173+
&res_body_bytes,
174+
status_code,
175+
) {
176+
Ok(res) => res,
177+
Err(e) => {
178+
warn!(
179+
"failed to parse grpc response body into GrpcMessageResponse message: {e}"
180+
);
181+
self.handle_error_on_grpc_response(failure_mode);
182+
return;
173183
}
184+
};
185+
match operation.get_extension_type() {
186+
ExtensionType::Auth => {} // TODO(didierofrivia): Process auth grpc response.
187+
ExtensionType::RateLimit => self.process_ratelimit_grpc_response(res, failure_mode),
174188
}
189+
190+
if let Some(_op) = self.operation_dispatcher.next() {
191+
} else {
192+
self.resume_http_request()
193+
}
194+
} else {
195+
warn!("No Operation found with token_id: {token_id}");
196+
self.handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode
175197
}
176-
self.resume_http_request();
177198
}
178199
}

0 commit comments

Comments
 (0)