-
Notifications
You must be signed in to change notification settings - Fork 51
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add grpc api for dataplane (#26)
feat: add grpc api for dataplane This also includes a new xtask for manually inserting and removing routing data from the dataplane, e.g.: cargo xtask grpc-client --vip-ip 172.17.0.2 --port 9875 NOTE: This change was made in lieu of adding a pull mechanism for dataplane configuration (which is actually what I added before in a previous iteration). The reason for this choice was purely to make development easy in the coming weeks, allowing a quick and easy way to insert routing data into the dataplane for debugging and testing. I would not be surprised if later once things are futher along we went the other way again.
- Loading branch information
Showing
16 changed files
with
640 additions
and
152 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
[workspace] | ||
members = ["controllers/udproute", "loader", "common", "xtask"] | ||
members = ["api-server", "loader", "common", "xtask"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
[package] | ||
name = "api-server" | ||
version = "0.1.0" | ||
edition = "2021" | ||
publish = false | ||
|
||
[dependencies] | ||
prost = "0.11.2" | ||
tonic = "0.8.2" | ||
anyhow = "1" | ||
log = "0.4" | ||
aya = { version = ">=0.11", features=["async_tokio"] } | ||
tokio = { version = "1.18", features = ["macros", "rt", "rt-multi-thread", "net", "signal"] } | ||
common = { path = "../common", features=["user"] } | ||
|
||
[build-dependencies] | ||
tonic-build = "0.8.2" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
fn main() { | ||
let proto_file = "./proto/backends.proto"; | ||
|
||
println!("building proto {}", proto_file); | ||
|
||
tonic_build::configure() | ||
.build_server(true) | ||
.out_dir("./src") | ||
.compile(&[proto_file], &["."]) | ||
.unwrap_or_else(|e| panic!("protobuf compile error: {}", e)); | ||
|
||
println!("cargo:rerun-if-changed={}", proto_file); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
syntax = "proto3"; | ||
|
||
package backends; | ||
|
||
message Vip { | ||
uint32 ip = 1; | ||
uint32 port = 2; | ||
} | ||
|
||
message Target { | ||
uint32 daddr = 1; | ||
uint32 dport = 2; | ||
uint32 ifindex = 3; | ||
} | ||
|
||
message Targets { | ||
Vip vip = 1; | ||
Target target = 2; | ||
} | ||
|
||
message Confirmation { | ||
string confirmation = 1; | ||
} | ||
|
||
service backends { | ||
rpc Update(Targets) returns (Confirmation); | ||
rpc Delete(Vip) returns (Confirmation); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,319 @@ | ||
#[derive(Clone, PartialEq, ::prost::Message)] | ||
pub struct Vip { | ||
#[prost(uint32, tag = "1")] | ||
pub ip: u32, | ||
#[prost(uint32, tag = "2")] | ||
pub port: u32, | ||
} | ||
#[derive(Clone, PartialEq, ::prost::Message)] | ||
pub struct Target { | ||
#[prost(uint32, tag = "1")] | ||
pub daddr: u32, | ||
#[prost(uint32, tag = "2")] | ||
pub dport: u32, | ||
#[prost(uint32, tag = "3")] | ||
pub ifindex: u32, | ||
} | ||
#[derive(Clone, PartialEq, ::prost::Message)] | ||
pub struct Targets { | ||
#[prost(message, optional, tag = "1")] | ||
pub vip: ::core::option::Option<Vip>, | ||
#[prost(message, optional, tag = "2")] | ||
pub target: ::core::option::Option<Target>, | ||
} | ||
#[derive(Clone, PartialEq, ::prost::Message)] | ||
pub struct Confirmation { | ||
#[prost(string, tag = "1")] | ||
pub confirmation: ::prost::alloc::string::String, | ||
} | ||
/// Generated client implementations. | ||
pub mod backends_client { | ||
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] | ||
use tonic::codegen::*; | ||
use tonic::codegen::http::Uri; | ||
#[derive(Debug, Clone)] | ||
pub struct BackendsClient<T> { | ||
inner: tonic::client::Grpc<T>, | ||
} | ||
impl BackendsClient<tonic::transport::Channel> { | ||
/// Attempt to create a new client by connecting to a given endpoint. | ||
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error> | ||
where | ||
D: std::convert::TryInto<tonic::transport::Endpoint>, | ||
D::Error: Into<StdError>, | ||
{ | ||
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; | ||
Ok(Self::new(conn)) | ||
} | ||
} | ||
impl<T> BackendsClient<T> | ||
where | ||
T: tonic::client::GrpcService<tonic::body::BoxBody>, | ||
T::Error: Into<StdError>, | ||
T::ResponseBody: Body<Data = Bytes> + Send + 'static, | ||
<T::ResponseBody as Body>::Error: Into<StdError> + Send, | ||
{ | ||
pub fn new(inner: T) -> Self { | ||
let inner = tonic::client::Grpc::new(inner); | ||
Self { inner } | ||
} | ||
pub fn with_origin(inner: T, origin: Uri) -> Self { | ||
let inner = tonic::client::Grpc::with_origin(inner, origin); | ||
Self { inner } | ||
} | ||
pub fn with_interceptor<F>( | ||
inner: T, | ||
interceptor: F, | ||
) -> BackendsClient<InterceptedService<T, F>> | ||
where | ||
F: tonic::service::Interceptor, | ||
T::ResponseBody: Default, | ||
T: tonic::codegen::Service< | ||
http::Request<tonic::body::BoxBody>, | ||
Response = http::Response< | ||
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody, | ||
>, | ||
>, | ||
<T as tonic::codegen::Service< | ||
http::Request<tonic::body::BoxBody>, | ||
>>::Error: Into<StdError> + Send + Sync, | ||
{ | ||
BackendsClient::new(InterceptedService::new(inner, interceptor)) | ||
} | ||
/// Compress requests with the given encoding. | ||
/// | ||
/// This requires the server to support it otherwise it might respond with an | ||
/// error. | ||
#[must_use] | ||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { | ||
self.inner = self.inner.send_compressed(encoding); | ||
self | ||
} | ||
/// Enable decompressing responses. | ||
#[must_use] | ||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { | ||
self.inner = self.inner.accept_compressed(encoding); | ||
self | ||
} | ||
pub async fn update( | ||
&mut self, | ||
request: impl tonic::IntoRequest<super::Targets>, | ||
) -> Result<tonic::Response<super::Confirmation>, tonic::Status> { | ||
self.inner | ||
.ready() | ||
.await | ||
.map_err(|e| { | ||
tonic::Status::new( | ||
tonic::Code::Unknown, | ||
format!("Service was not ready: {}", e.into()), | ||
) | ||
})?; | ||
let codec = tonic::codec::ProstCodec::default(); | ||
let path = http::uri::PathAndQuery::from_static("/backends.backends/Update"); | ||
self.inner.unary(request.into_request(), path, codec).await | ||
} | ||
pub async fn delete( | ||
&mut self, | ||
request: impl tonic::IntoRequest<super::Vip>, | ||
) -> Result<tonic::Response<super::Confirmation>, tonic::Status> { | ||
self.inner | ||
.ready() | ||
.await | ||
.map_err(|e| { | ||
tonic::Status::new( | ||
tonic::Code::Unknown, | ||
format!("Service was not ready: {}", e.into()), | ||
) | ||
})?; | ||
let codec = tonic::codec::ProstCodec::default(); | ||
let path = http::uri::PathAndQuery::from_static("/backends.backends/Delete"); | ||
self.inner.unary(request.into_request(), path, codec).await | ||
} | ||
} | ||
} | ||
/// Generated server implementations. | ||
pub mod backends_server { | ||
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] | ||
use tonic::codegen::*; | ||
///Generated trait containing gRPC methods that should be implemented for use with BackendsServer. | ||
#[async_trait] | ||
pub trait Backends: Send + Sync + 'static { | ||
async fn update( | ||
&self, | ||
request: tonic::Request<super::Targets>, | ||
) -> Result<tonic::Response<super::Confirmation>, tonic::Status>; | ||
async fn delete( | ||
&self, | ||
request: tonic::Request<super::Vip>, | ||
) -> Result<tonic::Response<super::Confirmation>, tonic::Status>; | ||
} | ||
#[derive(Debug)] | ||
pub struct BackendsServer<T: Backends> { | ||
inner: _Inner<T>, | ||
accept_compression_encodings: EnabledCompressionEncodings, | ||
send_compression_encodings: EnabledCompressionEncodings, | ||
} | ||
struct _Inner<T>(Arc<T>); | ||
impl<T: Backends> BackendsServer<T> { | ||
pub fn new(inner: T) -> Self { | ||
Self::from_arc(Arc::new(inner)) | ||
} | ||
pub fn from_arc(inner: Arc<T>) -> Self { | ||
let inner = _Inner(inner); | ||
Self { | ||
inner, | ||
accept_compression_encodings: Default::default(), | ||
send_compression_encodings: Default::default(), | ||
} | ||
} | ||
pub fn with_interceptor<F>( | ||
inner: T, | ||
interceptor: F, | ||
) -> InterceptedService<Self, F> | ||
where | ||
F: tonic::service::Interceptor, | ||
{ | ||
InterceptedService::new(Self::new(inner), interceptor) | ||
} | ||
/// Enable decompressing requests with the given encoding. | ||
#[must_use] | ||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { | ||
self.accept_compression_encodings.enable(encoding); | ||
self | ||
} | ||
/// Compress responses with the given encoding, if the client supports it. | ||
#[must_use] | ||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { | ||
self.send_compression_encodings.enable(encoding); | ||
self | ||
} | ||
} | ||
impl<T, B> tonic::codegen::Service<http::Request<B>> for BackendsServer<T> | ||
where | ||
T: Backends, | ||
B: Body + Send + 'static, | ||
B::Error: Into<StdError> + Send + 'static, | ||
{ | ||
type Response = http::Response<tonic::body::BoxBody>; | ||
type Error = std::convert::Infallible; | ||
type Future = BoxFuture<Self::Response, Self::Error>; | ||
fn poll_ready( | ||
&mut self, | ||
_cx: &mut Context<'_>, | ||
) -> Poll<Result<(), Self::Error>> { | ||
Poll::Ready(Ok(())) | ||
} | ||
fn call(&mut self, req: http::Request<B>) -> Self::Future { | ||
let inner = self.inner.clone(); | ||
match req.uri().path() { | ||
"/backends.backends/Update" => { | ||
#[allow(non_camel_case_types)] | ||
struct UpdateSvc<T: Backends>(pub Arc<T>); | ||
impl<T: Backends> tonic::server::UnaryService<super::Targets> | ||
for UpdateSvc<T> { | ||
type Response = super::Confirmation; | ||
type Future = BoxFuture< | ||
tonic::Response<Self::Response>, | ||
tonic::Status, | ||
>; | ||
fn call( | ||
&mut self, | ||
request: tonic::Request<super::Targets>, | ||
) -> Self::Future { | ||
let inner = self.0.clone(); | ||
let fut = async move { (*inner).update(request).await }; | ||
Box::pin(fut) | ||
} | ||
} | ||
let accept_compression_encodings = self.accept_compression_encodings; | ||
let send_compression_encodings = self.send_compression_encodings; | ||
let inner = self.inner.clone(); | ||
let fut = async move { | ||
let inner = inner.0; | ||
let method = UpdateSvc(inner); | ||
let codec = tonic::codec::ProstCodec::default(); | ||
let mut grpc = tonic::server::Grpc::new(codec) | ||
.apply_compression_config( | ||
accept_compression_encodings, | ||
send_compression_encodings, | ||
); | ||
let res = grpc.unary(method, req).await; | ||
Ok(res) | ||
}; | ||
Box::pin(fut) | ||
} | ||
"/backends.backends/Delete" => { | ||
#[allow(non_camel_case_types)] | ||
struct DeleteSvc<T: Backends>(pub Arc<T>); | ||
impl<T: Backends> tonic::server::UnaryService<super::Vip> | ||
for DeleteSvc<T> { | ||
type Response = super::Confirmation; | ||
type Future = BoxFuture< | ||
tonic::Response<Self::Response>, | ||
tonic::Status, | ||
>; | ||
fn call( | ||
&mut self, | ||
request: tonic::Request<super::Vip>, | ||
) -> Self::Future { | ||
let inner = self.0.clone(); | ||
let fut = async move { (*inner).delete(request).await }; | ||
Box::pin(fut) | ||
} | ||
} | ||
let accept_compression_encodings = self.accept_compression_encodings; | ||
let send_compression_encodings = self.send_compression_encodings; | ||
let inner = self.inner.clone(); | ||
let fut = async move { | ||
let inner = inner.0; | ||
let method = DeleteSvc(inner); | ||
let codec = tonic::codec::ProstCodec::default(); | ||
let mut grpc = tonic::server::Grpc::new(codec) | ||
.apply_compression_config( | ||
accept_compression_encodings, | ||
send_compression_encodings, | ||
); | ||
let res = grpc.unary(method, req).await; | ||
Ok(res) | ||
}; | ||
Box::pin(fut) | ||
} | ||
_ => { | ||
Box::pin(async move { | ||
Ok( | ||
http::Response::builder() | ||
.status(200) | ||
.header("grpc-status", "12") | ||
.header("content-type", "application/grpc") | ||
.body(empty_body()) | ||
.unwrap(), | ||
) | ||
}) | ||
} | ||
} | ||
} | ||
} | ||
impl<T: Backends> Clone for BackendsServer<T> { | ||
fn clone(&self) -> Self { | ||
let inner = self.inner.clone(); | ||
Self { | ||
inner, | ||
accept_compression_encodings: self.accept_compression_encodings, | ||
send_compression_encodings: self.send_compression_encodings, | ||
} | ||
} | ||
} | ||
impl<T: Backends> Clone for _Inner<T> { | ||
fn clone(&self) -> Self { | ||
Self(self.0.clone()) | ||
} | ||
} | ||
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "{:?}", self.0) | ||
} | ||
} | ||
impl<T: Backends> tonic::server::NamedService for BackendsServer<T> { | ||
const NAME: &'static str = "backends.backends"; | ||
} | ||
} |
Oops, something went wrong.