diff --git a/crates/cluster_agent/src/authorizer.rs b/crates/cluster_agent/src/authorizer.rs new file mode 100644 index 000000000..ebc8eb385 --- /dev/null +++ b/crates/cluster_agent/src/authorizer.rs @@ -0,0 +1,107 @@ +use k8s_openapi::api::authorization::v1::{ + ResourceAttributes, SelfSubjectAccessReview, SelfSubjectAccessReviewSpec, +}; +use kube::{Api, Client, Config, api::PostParams, config::AuthInfo}; +use tonic::{Status, metadata::MetadataMap}; + +pub struct Authorizer { + k8s_config: Config, +} + +/// Checks that the the k8s doing the request has proper rights to access the log files. +#[cfg(not(test))] +impl Authorizer { + /// Creates a new Authorizer, using the k8s authorization token to construct the proper + /// client set during authorization. + pub async fn new(request_metadata: &MetadataMap) -> Result { + let token = request_metadata + .get("authorization") + .and_then(|token| token.to_str().ok()) + .ok_or_else(|| { + Status::new( + tonic::Code::Unauthenticated, + "authentication token not found", + ) + })? + .to_owned(); + + let mut k8s_config = Config::infer().await.map_err(|error| { + Status::new( + tonic::Code::Unknown, + format!("unable to infer k8s config {error}"), + ) + })?; + + k8s_config.auth_info = AuthInfo { + token: Some(token.into()), + ..Default::default() + }; + + Ok(Self { k8s_config }) + } + + /// Checks if the request is authorized by calling the k8s API. + pub async fn is_authorized( + &self, + mut namespaces: &Vec, + verb: &str, + ) -> Result<(), Status> { + let client = Client::try_from(self.k8s_config.clone()) + .map_err(|error| Status::new(tonic::Code::Unauthenticated, error.to_string()))?; + + // Default to all namespaces if no namespace is provided. + let empty_namespace = vec![String::new()]; + if namespaces.is_empty() { + namespaces = &empty_namespace; + } + + let access_reviews: Api = Api::all(client); + for namespace in namespaces { + let access_review = SelfSubjectAccessReview { + spec: SelfSubjectAccessReviewSpec { + resource_attributes: Some(ResourceAttributes { + namespace: Some(namespace.to_owned()), + group: None, + verb: Some(verb.to_owned()), + resource: Some("pods/log".to_owned()), + ..ResourceAttributes::default() + }), + non_resource_attributes: None, + }, + ..SelfSubjectAccessReview::default() + }; + + let response = access_reviews + .create(&PostParams::default(), &access_review) + .await + .map_err(|error| { + Status::new( + tonic::Code::Unknown, + format!("failed to authenticate {error}"), + ) + })?; + + if response.status.is_none() || !response.status.unwrap().allowed { + return Err(Status::new( + tonic::Code::Unauthenticated, + format!("permission denied: `{verb} pods/log` in namespace `{namespace}`"), + )); + } + } + + Ok(()) + } +} + +#[cfg(test)] +impl Authorizer { + pub async fn new(_request_metadata: &MetadataMap) -> Result { + Ok(Self { + k8s_config: Config::infer().await.unwrap(), + }) + } + + pub async fn is_authorized(self, _namespaces: &Vec, _verb: &str) -> Result<(), Status> { + Ok(()) + } +} diff --git a/crates/cluster_agent/src/log_metadata.rs b/crates/cluster_agent/src/log_metadata.rs index 8cf313ec6..c37adf726 100644 --- a/crates/cluster_agent/src/log_metadata.rs +++ b/crates/cluster_agent/src/log_metadata.rs @@ -19,6 +19,7 @@ use types::cluster_agent::{ LogMetadataWatchEvent, LogMetadataWatchRequest, }; +use crate::authorizer::Authorizer; use crate::log_metadata::log_metadata_watcher::LogMetadataWatcher; mod log_metadata_watcher; @@ -99,6 +100,7 @@ impl LogMetadataService for LogMetadataImpl { &self, request: Request, ) -> Result, Status> { + let authorizer = Authorizer::new(request.metadata()).await?; let request = request.into_inner(); if !self.logs_dir.is_dir() { @@ -117,6 +119,8 @@ impl LogMetadataService for LogMetadataImpl { .filter(|namespace| !namespace.is_empty()) .collect(); + authorizer.is_authorized(&namespaces, "list").await?; + let mut files = ReadDirStream::new(read_dir(&self.logs_dir).await?); let mut metadata_items = Vec::new(); @@ -173,6 +177,7 @@ impl LogMetadataService for LogMetadataImpl { &self, request: Request, ) -> Result, Status> { + let authorizer = Authorizer::new(request.metadata()).await?; let request = request.into_inner(); let term_tx = self.term_tx.clone(); @@ -182,6 +187,8 @@ impl LogMetadataService for LogMetadataImpl { .filter(|namespace| !namespace.is_empty()) .collect(); + authorizer.is_authorized(&namespaces, "watch").await?; + let (log_metadata_watcher, log_metadata_rx) = LogMetadataWatcher::new( Path::new(&self.logs_dir).to_path_buf(), namespaces, diff --git a/crates/cluster_agent/src/log_metadata/log_metadata_watcher.rs b/crates/cluster_agent/src/log_metadata/log_metadata_watcher.rs index 41f6b868b..c4552d326 100644 --- a/crates/cluster_agent/src/log_metadata/log_metadata_watcher.rs +++ b/crates/cluster_agent/src/log_metadata/log_metadata_watcher.rs @@ -596,7 +596,13 @@ mod test { if let Some(file_size) = file_size { assert_eq!(event_file_info.as_ref().unwrap().size, file_size as i64); } else { - assert_eq!(event_file_info, None); + assert_eq!( + event_file_info, + Some(LogMetadataFileInfo { + size: 0, + last_modified_at: None, + }) + ); } } } diff --git a/crates/cluster_agent/src/log_records.rs b/crates/cluster_agent/src/log_records.rs index a585e14b3..bf5089a5e 100644 --- a/crates/cluster_agent/src/log_records.rs +++ b/crates/cluster_agent/src/log_records.rs @@ -13,6 +13,8 @@ use rgkl::{stream_backward, stream_forward}; use tonic::{Request, Response, Status}; +use crate::authorizer::Authorizer; + #[derive(Debug)] pub struct LogRecordsImpl { logs_dir: PathBuf, @@ -62,11 +64,15 @@ impl LogRecordsService for LogRecordsImpl { &self, request: Request, ) -> Result, Status> { + let authorizer = Authorizer::new(request.metadata()).await?; let request = request.into_inner(); let file_path = self.get_log_filename(&request).map_err(|status| *status)?; let (tx, rx) = mpsc::channel(100); let term_tx = self.term_tx.clone(); + let namespaces = vec![request.namespace.clone()]; + authorizer.is_authorized(&namespaces, "list").await?; + self.task_tracker.spawn(async move { stream_backward::stream_backward( &file_path, @@ -91,9 +97,13 @@ impl LogRecordsService for LogRecordsImpl { &self, request: Request, ) -> Result, Status> { + let authorizer = Authorizer::new(request.metadata()).await?; let request = request.into_inner(); let file_path = self.get_log_filename(&request).map_err(|status| *status)?; + let namespaces = vec![request.namespace.clone()]; + authorizer.is_authorized(&namespaces, "list").await?; + let (tx, rx) = mpsc::channel(100); let term_tx = self.term_tx.clone(); diff --git a/crates/cluster_agent/src/main.rs b/crates/cluster_agent/src/main.rs index 07a9b9176..40223054c 100644 --- a/crates/cluster_agent/src/main.rs +++ b/crates/cluster_agent/src/main.rs @@ -14,6 +14,7 @@ use types::cluster_agent::FILE_DESCRIPTOR_SET; use types::cluster_agent::log_metadata_service_server::LogMetadataServiceServer; use types::cluster_agent::log_records_service_server::LogRecordsServiceServer; +mod authorizer; mod config; mod log_metadata; mod log_records;