Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions src/hostcalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub fn get_map(map_type: MapType) -> Result<Vec<(String, String)>, Status> {
}
}

pub fn get_map_bytes(map_type: MapType) -> Result<Vec<(String, Vec<u8>)>, Status> {
pub fn get_map_bytes(map_type: MapType) -> Result<Vec<(String, Bytes)>, Status> {
unsafe {
let mut return_data: *mut u8 = null_mut();
let mut return_size: usize = 0;
Expand Down Expand Up @@ -253,6 +253,33 @@ pub fn get_map_value(map_type: MapType, key: &str) -> Result<Option<String>, Sta
}
}

pub fn get_map_value_bytes(map_type: MapType, key: &str) -> Result<Option<Bytes>, Status> {
let mut return_data: *mut u8 = null_mut();
let mut return_size: usize = 0;
unsafe {
match proxy_get_header_map_value(
map_type,
key.as_ptr(),
key.len(),
&mut return_data,
&mut return_size,
) {
Status::Ok => {
if !return_data.is_null() {
Ok(Some(Vec::from_raw_parts(
return_data,
return_size,
return_size,
)))
} else {
Ok(None)
}
}
status => panic!("unexpected status: {}", status as u32),
}
}
}

extern "C" {
fn proxy_replace_header_map_value(
map_type: MapType,
Expand Down Expand Up @@ -810,6 +837,16 @@ pub fn cancel_grpc_call(token_id: u32) -> Result<(), Status> {
}
}

pub fn cancel_grpc_stream(token_id: u32) -> Result<(), Status> {
unsafe {
match proxy_grpc_cancel(token_id) {
Status::Ok => Ok(()),
Status::NotFound => Err(Status::NotFound),
status => panic!("unexpected status: {}", status as u32),
}
}
}

extern "C" {
fn proxy_grpc_close(token_id: u32) -> Status;
}
Expand All @@ -824,6 +861,42 @@ pub fn close_grpc_stream(token_id: u32) -> Result<(), Status> {
}
}

extern "C" {
fn proxy_get_status(
return_code: *mut u32,
return_message_data: *mut *mut u8,
return_message_size: *mut usize,
) -> Status;
}

pub fn get_grpc_status() -> Result<(u32, Option<String>), Status> {
let mut return_code: u32 = 0;
let mut return_data: *mut u8 = null_mut();
let mut return_size: usize = 0;
unsafe {
match proxy_get_status(&mut return_code, &mut return_data, &mut return_size) {
Status::Ok => {
if !return_data.is_null() {
Ok((
return_code,
Some(
String::from_utf8(Vec::from_raw_parts(
return_data,
return_size,
return_size,
))
.unwrap(),
),
))
} else {
Ok((return_code, None))
}
}
status => panic!("unexpected status: {}", status as u32),
}
}
}

extern "C" {
fn proxy_set_effective_context(context_id: u32) -> Status;
}
Expand Down Expand Up @@ -1000,7 +1073,7 @@ mod utils {
map
}

pub(super) fn deserialize_bytes_map(bytes: &[u8]) -> Vec<(String, Vec<u8>)> {
pub(super) fn deserialize_bytes_map(bytes: &[u8]) -> Vec<(String, Bytes)> {
let mut map = Vec::new();
if bytes.is_empty() {
return map;
Expand Down
45 changes: 32 additions & 13 deletions src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ pub trait Context {
hostcalls::get_map(MapType::HttpCallResponseHeaders).unwrap()
}

fn get_http_call_response_header(&self, name: &str) -> Option<String> {
hostcalls::get_map_value(MapType::HttpCallResponseHeaders, &name).unwrap()
}

fn get_http_call_response_body(&self, start: usize, max_size: usize) -> Option<Bytes> {
hostcalls::get_buffer(BufferType::HttpCallResponseBody, start, max_size).unwrap()
}
Expand All @@ -90,6 +94,10 @@ pub trait Context {
hostcalls::get_map(MapType::HttpCallResponseTrailers).unwrap()
}

fn get_http_call_response_trailer(&self, name: &str) -> Option<String> {
hostcalls::get_map_value(MapType::HttpCallResponseTrailers, &name).unwrap()
}

fn dispatch_grpc_call(
&self,
upstream_name: &str,
Expand All @@ -115,8 +123,8 @@ pub trait Context {
hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, start, max_size).unwrap()
}

fn cancel_grpc_call(&self, token_id: u32) -> Result<(), Status> {
hostcalls::cancel_grpc_call(token_id)
fn cancel_grpc_call(&self, token_id: u32) {
hostcalls::cancel_grpc_call(token_id).unwrap()
}

fn open_grpc_stream(
Expand All @@ -131,17 +139,16 @@ pub trait Context {

fn on_grpc_stream_initial_metadata(&mut self, _token_id: u32, _num_elements: u32) {}

fn get_grpc_stream_initial_metadata(&self) -> Vec<(String, Vec<u8>)> {
fn get_grpc_stream_initial_metadata(&self) -> Vec<(String, Bytes)> {
hostcalls::get_map_bytes(MapType::GrpcReceiveInitialMetadata).unwrap()
}

fn send_grpc_stream_message(
&self,
token_id: u32,
message: Option<&[u8]>,
end_stream: bool,
) -> Result<(), Status> {
hostcalls::send_grpc_stream_message(token_id, message, end_stream)
fn get_grpc_stream_initial_metadata_value(&self, name: &str) -> Option<Bytes> {
hostcalls::get_map_value_bytes(MapType::GrpcReceiveInitialMetadata, &name).unwrap()
}

fn send_grpc_stream_message(&self, token_id: u32, message: Option<&[u8]>, end_stream: bool) {
hostcalls::send_grpc_stream_message(token_id, message, end_stream).unwrap()
}

fn on_grpc_stream_message(&mut self, _token_id: u32, _message_size: usize) {}
Expand All @@ -152,16 +159,28 @@ pub trait Context {

fn on_grpc_stream_trailing_metadata(&mut self, _token_id: u32, _num_elements: u32) {}

fn get_grpc_stream_trailing_metadata(&self) -> Vec<(String, Vec<u8>)> {
fn get_grpc_stream_trailing_metadata(&self) -> Vec<(String, Bytes)> {
hostcalls::get_map_bytes(MapType::GrpcReceiveTrailingMetadata).unwrap()
}

fn close_grpc_stream(&self, token_id: u32) -> Result<(), Status> {
hostcalls::close_grpc_stream(token_id)
fn get_grpc_stream_trailing_metadata_value(&self, name: &str) -> Option<Bytes> {
hostcalls::get_map_value_bytes(MapType::GrpcReceiveTrailingMetadata, &name).unwrap()
}

fn cancel_grpc_stream(&self, token_id: u32) {
hostcalls::cancel_grpc_stream(token_id).unwrap()
}

fn close_grpc_stream(&self, token_id: u32) {
hostcalls::close_grpc_stream(token_id).unwrap()
}

fn on_grpc_stream_close(&mut self, _token_id: u32, _status_code: u32) {}

fn get_grpc_status(&self) -> (u32, Option<String>) {
hostcalls::get_grpc_status().unwrap()
}

fn on_done(&mut self) -> bool {
true
}
Expand Down