diff --git a/src/hostcalls.rs b/src/hostcalls.rs index 947d0559..3df832c4 100644 --- a/src/hostcalls.rs +++ b/src/hostcalls.rs @@ -177,7 +177,7 @@ pub fn get_map(map_type: MapType) -> Result, Status> { } } -pub fn get_map_bytes(map_type: MapType) -> Result)>, Status> { +pub fn get_map_bytes(map_type: MapType) -> Result, Status> { unsafe { let mut return_data: *mut u8 = null_mut(); let mut return_size: usize = 0; @@ -253,6 +253,33 @@ pub fn get_map_value(map_type: MapType, key: &str) -> Result, Sta } } +pub fn get_map_value_bytes(map_type: MapType, key: &str) -> Result, Status> { + let mut return_data: *mut u8 = null_mut(); + let mut return_size: usize = 0; + unsafe { + match proxy_get_header_map_value( + map_type, + key.as_ptr(), + key.len(), + &mut return_data, + &mut return_size, + ) { + Status::Ok => { + if !return_data.is_null() { + Ok(Some(Vec::from_raw_parts( + return_data, + return_size, + return_size, + ))) + } else { + Ok(None) + } + } + status => panic!("unexpected status: {}", status as u32), + } + } +} + extern "C" { fn proxy_replace_header_map_value( map_type: MapType, @@ -810,6 +837,16 @@ pub fn cancel_grpc_call(token_id: u32) -> Result<(), Status> { } } +pub fn cancel_grpc_stream(token_id: u32) -> Result<(), Status> { + unsafe { + match proxy_grpc_cancel(token_id) { + Status::Ok => Ok(()), + Status::NotFound => Err(Status::NotFound), + status => panic!("unexpected status: {}", status as u32), + } + } +} + extern "C" { fn proxy_grpc_close(token_id: u32) -> Status; } @@ -824,6 +861,42 @@ pub fn close_grpc_stream(token_id: u32) -> Result<(), Status> { } } +extern "C" { + fn proxy_get_status( + return_code: *mut u32, + return_message_data: *mut *mut u8, + return_message_size: *mut usize, + ) -> Status; +} + +pub fn get_grpc_status() -> Result<(u32, Option), Status> { + let mut return_code: u32 = 0; + let mut return_data: *mut u8 = null_mut(); + let mut return_size: usize = 0; + unsafe { + match proxy_get_status(&mut return_code, &mut return_data, &mut return_size) { + Status::Ok => { + if !return_data.is_null() { + Ok(( + return_code, + Some( + String::from_utf8(Vec::from_raw_parts( + return_data, + return_size, + return_size, + )) + .unwrap(), + ), + )) + } else { + Ok((return_code, None)) + } + } + status => panic!("unexpected status: {}", status as u32), + } + } +} + extern "C" { fn proxy_set_effective_context(context_id: u32) -> Status; } @@ -1000,7 +1073,7 @@ mod utils { map } - pub(super) fn deserialize_bytes_map(bytes: &[u8]) -> Vec<(String, Vec)> { + pub(super) fn deserialize_bytes_map(bytes: &[u8]) -> Vec<(String, Bytes)> { let mut map = Vec::new(); if bytes.is_empty() { return map; diff --git a/src/traits.rs b/src/traits.rs index b6125bd9..4553a0aa 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -82,6 +82,10 @@ pub trait Context { hostcalls::get_map(MapType::HttpCallResponseHeaders).unwrap() } + fn get_http_call_response_header(&self, name: &str) -> Option { + hostcalls::get_map_value(MapType::HttpCallResponseHeaders, &name).unwrap() + } + fn get_http_call_response_body(&self, start: usize, max_size: usize) -> Option { hostcalls::get_buffer(BufferType::HttpCallResponseBody, start, max_size).unwrap() } @@ -90,6 +94,10 @@ pub trait Context { hostcalls::get_map(MapType::HttpCallResponseTrailers).unwrap() } + fn get_http_call_response_trailer(&self, name: &str) -> Option { + hostcalls::get_map_value(MapType::HttpCallResponseTrailers, &name).unwrap() + } + fn dispatch_grpc_call( &self, upstream_name: &str, @@ -115,8 +123,8 @@ pub trait Context { hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, start, max_size).unwrap() } - fn cancel_grpc_call(&self, token_id: u32) -> Result<(), Status> { - hostcalls::cancel_grpc_call(token_id) + fn cancel_grpc_call(&self, token_id: u32) { + hostcalls::cancel_grpc_call(token_id).unwrap() } fn open_grpc_stream( @@ -131,17 +139,16 @@ pub trait Context { fn on_grpc_stream_initial_metadata(&mut self, _token_id: u32, _num_elements: u32) {} - fn get_grpc_stream_initial_metadata(&self) -> Vec<(String, Vec)> { + fn get_grpc_stream_initial_metadata(&self) -> Vec<(String, Bytes)> { hostcalls::get_map_bytes(MapType::GrpcReceiveInitialMetadata).unwrap() } - fn send_grpc_stream_message( - &self, - token_id: u32, - message: Option<&[u8]>, - end_stream: bool, - ) -> Result<(), Status> { - hostcalls::send_grpc_stream_message(token_id, message, end_stream) + fn get_grpc_stream_initial_metadata_value(&self, name: &str) -> Option { + hostcalls::get_map_value_bytes(MapType::GrpcReceiveInitialMetadata, &name).unwrap() + } + + fn send_grpc_stream_message(&self, token_id: u32, message: Option<&[u8]>, end_stream: bool) { + hostcalls::send_grpc_stream_message(token_id, message, end_stream).unwrap() } fn on_grpc_stream_message(&mut self, _token_id: u32, _message_size: usize) {} @@ -152,16 +159,28 @@ pub trait Context { fn on_grpc_stream_trailing_metadata(&mut self, _token_id: u32, _num_elements: u32) {} - fn get_grpc_stream_trailing_metadata(&self) -> Vec<(String, Vec)> { + fn get_grpc_stream_trailing_metadata(&self) -> Vec<(String, Bytes)> { hostcalls::get_map_bytes(MapType::GrpcReceiveTrailingMetadata).unwrap() } - fn close_grpc_stream(&self, token_id: u32) -> Result<(), Status> { - hostcalls::close_grpc_stream(token_id) + fn get_grpc_stream_trailing_metadata_value(&self, name: &str) -> Option { + hostcalls::get_map_value_bytes(MapType::GrpcReceiveTrailingMetadata, &name).unwrap() + } + + fn cancel_grpc_stream(&self, token_id: u32) { + hostcalls::cancel_grpc_stream(token_id).unwrap() + } + + fn close_grpc_stream(&self, token_id: u32) { + hostcalls::close_grpc_stream(token_id).unwrap() } fn on_grpc_stream_close(&mut self, _token_id: u32, _status_code: u32) {} + fn get_grpc_status(&self) -> (u32, Option) { + hostcalls::get_grpc_status().unwrap() + } + fn on_done(&mut self) -> bool { true }