From 33024626b58fd2ed67f67ea698296d352bd60394 Mon Sep 17 00:00:00 2001 From: samuchila Date: Wed, 10 Jan 2024 14:51:14 +0100 Subject: [PATCH] Improved log interface performance and streaming This commit enhances the log interface by reducing lag and improving the rendering speed. It introduces a feature to load the logs for the past day first and logs are now streamed in real-time from backend, ensuring immediate access to new logs. Additionally, a download button that downloads all logs since container was started is now made available at the top of the logs dialog. --- api/Cargo.lock | 1 + api/Cargo.toml | 1 + api/res/openapi.yml | 12 +- api/src/apps/mod.rs | 98 +++++++-- api/src/apps/routes.rs | 195 +++++++++++++++-- api/src/infrastructure/docker.rs | 130 ++++++------ .../infrastructure/dummy_infrastructure.rs | 48 +++-- api/src/infrastructure/infrastructure.rs | 18 +- .../kubernetes/infrastructure.rs | 84 ++++---- frontend/package-lock.json | 8 +- frontend/src/LogsDialog.vue | 198 +++++++++++------- frontend/src/main.js | 3 +- 12 files changed, 541 insertions(+), 255 deletions(-) diff --git a/api/Cargo.lock b/api/Cargo.lock index 2f6751c6..3aecc4b8 100644 --- a/api/Cargo.lock +++ b/api/Cargo.lock @@ -2137,6 +2137,7 @@ name = "prevant" version = "0.9.0" dependencies = [ "assert-json-diff", + "async-stream", "async-trait", "base64 0.21.7", "boa_engine", diff --git a/api/Cargo.toml b/api/Cargo.toml index a1513a36..85bf451e 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -13,6 +13,7 @@ path = "src/main.rs" [dependencies] async-trait = "0.1" +async-stream = "0.3" base64 = "0.21" boa_engine = "0.17" bytesize = { version = "1.3", features = ["serde"] } diff --git a/api/res/openapi.yml b/api/res/openapi.yml index aa488346..1fa66c1d 100644 --- a/api/res/openapi.yml +++ b/api/res/openapi.yml @@ -246,12 +246,20 @@ paths: example: '2019-07-22T08:42:47-00:00' - in: query name: limit - description: The number of log lines to retrieve. If not present, 1000 lines will be retrieved. + description: The number of log lines to retrieve. If not present, all the lines from `since` are retrieved. schema: type: integer + - in: query + name: asAttachment + description: >- + Determines how the response is presented by the browser. When `true`, the response content is provided as a downloadable attachment. + If `false` or not provided, the response is displayed inline. + schema: + type: boolean responses: '200': - description: The available log statements + description: | + The available log statements. MIME type `text/event-stream` supports streaming of logs. headers: Link: schema: diff --git a/api/src/apps/mod.rs b/api/src/apps/mod.rs index f9454578..aeb76d79 100644 --- a/api/src/apps/mod.rs +++ b/api/src/apps/mod.rs @@ -36,6 +36,8 @@ use crate::models::{AppName, AppStatusChangeId, LogChunk, ServiceConfig}; use crate::registry::Registry; use crate::registry::RegistryError; use chrono::{DateTime, FixedOffset}; +use futures::stream::BoxStream; +use futures::StreamExt; use handlebars::RenderError; pub use host_meta_cache::new as host_meta_crawling; pub use host_meta_cache::HostMetaCache; @@ -349,22 +351,38 @@ impl AppsService { } } - pub async fn get_logs( - &self, - app_name: &AppName, - service_name: &str, - since: &Option>, - limit: usize, + pub async fn stream_logs<'a>( + &'a self, + app_name: &'a AppName, + service_name: &'a str, + since: &'a Option>, + limit: &'a Option, + ) -> BoxStream<'a, Result<(DateTime, String), failure::Error>> { + self.infrastructure + .get_logs(app_name, service_name, since, limit, true) + .await + } + + pub async fn get_logs<'a>( + &'a self, + app_name: &'a AppName, + service_name: &'a str, + since: &'a Option>, + limit: &'a Option, ) -> Result, AppsServiceError> { - match self + let mut log_lines = Vec::new(); + let mut log_stream = self .infrastructure - .get_logs(app_name, service_name, since, limit) - .await? - { - None => Ok(None), - Some(ref logs) if logs.is_empty() => Ok(None), - Some(logs) => Ok(Some(LogChunk::from(logs))), + .get_logs(app_name, service_name, since, limit, false) + .await; + + while let Some(result) = log_stream.next().await { + if let Ok(log_line) = result { + log_lines.push(log_line); + } } + + Ok(Some(LogChunk::from(log_lines))) } pub async fn change_status( @@ -683,7 +701,7 @@ mod tests { .await?; let log_chunk = apps - .get_logs(&app_name, &String::from("service-a"), &None, 100) + .get_logs(&app_name, &String::from("service-a"), &None, &Some(100)) .await .unwrap() .unwrap(); @@ -709,6 +727,58 @@ Log msg 3 of service-a of app master Ok(()) } + #[tokio::test] + async fn should_stream_logs_from_infrastructure() -> Result<(), AppsServiceError> { + let config = Config::default(); + let infrastructure = Box::new(Dummy::new()); + let apps = AppsService::new(config, infrastructure)?; + + let app_name = AppName::from_str("master").unwrap(); + let services = vec![sc!("service-a"), sc!("service-b")]; + apps.create_or_update(&app_name, &AppStatusChangeId::new(), None, &services) + .await?; + for service in services { + let mut log_stream = apps + .stream_logs(&app_name, service.service_name(), &None, &None) + .await; + + assert_eq!( + log_stream.next().await.unwrap().unwrap(), + ( + DateTime::parse_from_rfc3339("2019-07-18T07:25:00.000000000Z").unwrap(), + format!( + "Log msg 1 of {} of app {app_name}\n", + service.service_name() + ) + ) + ); + + assert_eq!( + log_stream.next().await.unwrap().unwrap(), + ( + DateTime::parse_from_rfc3339("2019-07-18T07:30:00.000000000Z").unwrap(), + format!( + "Log msg 2 of {} of app {app_name}\n", + service.service_name() + ) + ) + ); + + assert_eq!( + log_stream.next().await.unwrap().unwrap(), + ( + DateTime::parse_from_rfc3339("2019-07-18T07:35:00.000000000Z").unwrap(), + format!( + "Log msg 3 of {} of app {app_name}\n", + service.service_name() + ) + ) + ); + } + + Ok(()) + } + #[tokio::test] async fn should_deploy_companions() -> Result<(), AppsServiceError> { let config = config_from_str!( diff --git a/api/src/apps/routes.rs b/api/src/apps/routes.rs index acfd93ab..952ae549 100644 --- a/api/src/apps/routes.rs +++ b/api/src/apps/routes.rs @@ -33,11 +33,15 @@ use crate::models::ServiceConfig; use crate::models::{AppName, AppNameError, LogChunk}; use crate::models::{AppStatusChangeId, AppStatusChangeIdError}; use chrono::DateTime; +use futures::StreamExt; use http_api_problem::{HttpApiProblem, StatusCode}; use multimap::MultiMap; use regex::Regex; +use rocket::http::hyper::header::CONTENT_DISPOSITION; +use rocket::http::hyper::header::LINK; use rocket::http::{RawStr, Status}; use rocket::request::{FromRequest, Outcome, Request}; +use rocket::response::stream::{Event, EventStream}; use rocket::response::{Responder, Response}; use rocket::serde::json::Json; use rocket::State; @@ -54,7 +58,8 @@ pub fn apps_routes() -> Vec { create_app, logs, change_status, - status_change + status_change, + stream_logs ] } @@ -185,20 +190,16 @@ async fn change_status( Ok(ServiceStatusResponse { service }) } -#[get( - "//logs/?&", - format = "text/plain" -)] +#[get("//logs/?", format = "text/plain")] async fn logs<'r>( app_name: Result, service_name: &'r str, - since: Option, - limit: Option, + log_query: LogQuery, apps: &State>, ) -> HttpResult> { let app_name = app_name?; - let since = match since { + let since = match log_query.since { None => None, Some(since) => match DateTime::parse_from_rfc3339(&since) { Ok(since) => Some(since), @@ -211,17 +212,59 @@ async fn logs<'r>( } }, }; - let limit = limit.unwrap_or(20_000); let log_chunk = apps - .get_logs(&app_name, service_name, &since, limit) + .get_logs(&app_name, service_name, &since, &log_query.limit) .await?; Ok(LogsResponse { log_chunk, app_name, service_name, - limit, + limit: log_query.limit, + as_attachment: log_query.as_attachment, + }) +} + +#[get( + "//logs/?", + format = "text/event-stream", + rank = 2 +)] +async fn stream_logs<'r>( + app_name: Result, + service_name: &'r str, + log_query: LogQuery, + apps: &'r State>, +) -> HttpResult { + let app_name = app_name.unwrap(); + let since = match &log_query.since { + None => None, + Some(since) => match DateTime::parse_from_rfc3339(&since) { + Ok(since) => Some(since), + Err(err) => { + return Err( + HttpApiProblem::with_title(http_api_problem::StatusCode::BAD_REQUEST) + .detail(format!("{}", err)) + .into(), + ); + } + }, + }; + + Ok(EventStream! { + let mut log_chunk = apps + .stream_logs(&app_name, service_name, &since, &log_query.limit) + .await; + + while let Some(result) = log_chunk.as_mut().next().await { + match result { + Ok((_, log_line)) => yield Event::data(log_line), + Err(_e) => { + break; + } + } + } }) } @@ -269,7 +312,16 @@ pub struct LogsResponse<'a> { log_chunk: Option, app_name: AppName, service_name: &'a str, - limit: usize, + limit: Option, + as_attachment: bool, +} + +#[derive(FromForm)] +struct LogQuery { + since: Option, + limit: Option, + #[field(name = "asAttachment")] + as_attachment: bool, } #[derive(FromForm)] @@ -302,17 +354,37 @@ impl<'r> Responder<'r, 'static> for LogsResponse<'r> { let from = *log_chunk.until() + chrono::Duration::milliseconds(1); - let next_logs_url = format!( - "/api/apps/{}/logs/{}/?limit={}&since={}", - self.app_name, - self.service_name, - self.limit, - RawStr::new(&from.to_rfc3339()).percent_encode(), - ); + let next_logs_url = match self.limit { + Some(limit) => format!( + "/api/apps/{}/logs/{}?limit={}&since={}", + self.app_name, + self.service_name, + limit, + RawStr::new(&from.to_rfc3339()).percent_encode(), + ), + None => format!( + "/api/apps/{}/logs/{}?since={}", + self.app_name, + self.service_name, + RawStr::new(&from.to_rfc3339()).percent_encode(), + ), + }; + + let content_dispositon_value = if self.as_attachment { + format!( + "attachment; filename=\"{}_{}_{}.txt\"", + self.app_name, + self.service_name, + log_chunk.until().format("%Y%m%d_%H%M%S") + ) + } else { + String::from("inline") + }; let log_lines = log_chunk.log_lines(); Response::build() - .raw_header("Link", format!("<{}>;rel=next", next_logs_url)) + .raw_header(LINK.as_str(), format!("<{}>;rel=next", next_logs_url)) + .raw_header(CONTENT_DISPOSITION.as_str(), content_dispositon_value) .sized_body(log_lines.len(), Cursor::new(log_lines.clone())) .ok() } @@ -531,9 +603,9 @@ mod tests { use crate::models::{AppName, AppStatusChangeId}; use crate::sc; use assert_json_diff::assert_json_include; - use rocket::http::ContentType; use rocket::http::Header; use rocket::http::Status; + use rocket::http::{Accept, ContentType}; use rocket::local::asynchronous::Client; use serde_json::json; use serde_json::Value; @@ -766,6 +838,87 @@ mod tests { let response = get.dispatch().await; assert_eq!(response.status(), Status::BadRequest); } + + #[tokio::test] + async fn log_weblink_with_no_limit() -> Result<(), crate::apps::AppsServiceError> { + let (host_meta_cache, mut _host_meta_crawler) = crate::host_meta_crawling(); + + let client = + set_up_rocket_with_dummy_infrastructure_and_a_running_app(host_meta_cache).await?; + + let response = client + .get("/api/apps/master/logs/service-a") + .header(Accept::Text) + .dispatch() + .await; + let mut link_header = response.headers().get("Link"); + assert_eq!( + link_header.next(), + Some( + ";rel=next" + ) + ); + Ok(()) + } + + #[tokio::test] + async fn log_weblink_with_some_limit() -> Result<(), crate::apps::AppsServiceError> { + let (host_meta_cache, mut _host_meta_crawler) = crate::host_meta_crawling(); + + let client = + set_up_rocket_with_dummy_infrastructure_and_a_running_app(host_meta_cache).await?; + + let response = client + .get("/api/apps/master/logs/service-a?limit=20000&since=2019-07-22T08:42:47-00:00") + .header(Accept::Text) + .dispatch() + .await; + let mut link_header = response.headers().get("Link"); + assert_eq!( + link_header.next(), + Some(";rel=next") + ); + Ok(()) + } + + #[tokio::test] + async fn log_content_disposition_for_downloading_as_attachment( + ) -> Result<(), crate::apps::AppsServiceError> { + let (host_meta_cache, mut _host_meta_crawler) = crate::host_meta_crawling(); + + let client = + set_up_rocket_with_dummy_infrastructure_and_a_running_app(host_meta_cache).await?; + + let response = client + .get("/api/apps/master/logs/service-a?limit=20000&since=2019-07-22T08:42:47-00:00&asAttachment=true") + .header(Accept::Text) + .dispatch() + .await; + let mut link_header = response.headers().get("Content-Disposition"); + assert_eq!( + link_header.next(), + Some("attachment; filename=\"master_service-a_20190718_073500.txt\"") + ); + Ok(()) + } + + #[tokio::test] + async fn log_content_disposition_for_displaying_as_inline( + ) -> Result<(), crate::apps::AppsServiceError> { + let (host_meta_cache, mut _host_meta_crawler) = crate::host_meta_crawling(); + + let client = + set_up_rocket_with_dummy_infrastructure_and_a_running_app(host_meta_cache).await?; + + let response = client + .get("/api/apps/master/logs/service-a?limit=20000&since=2019-07-22T08:42:47-00:00") + .header(Accept::Text) + .dispatch() + .await; + let mut link_header = response.headers().get("Content-Disposition"); + assert_eq!(link_header.next(), Some("inline")); + Ok(()) + } } mod http_api_error { diff --git a/api/src/infrastructure/docker.rs b/api/src/infrastructure/docker.rs index 84afa058..5c5f5f32 100644 --- a/api/src/infrastructure/docker.rs +++ b/api/src/infrastructure/docker.rs @@ -35,10 +35,12 @@ use crate::models::service::{ContainerType, Service, ServiceError, ServiceStatus use crate::models::{ AppName, Environment, Image, ServiceBuilder, ServiceBuilderError, ServiceConfig, }; +use async_stream::stream; use async_trait::async_trait; use chrono::{DateTime, FixedOffset}; use failure::{format_err, Error}; use futures::future::join_all; +use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use multimap::MultiMap; use regex::Regex; @@ -54,6 +56,7 @@ use std::collections::HashMap; use std::convert::{From, TryFrom}; use std::net::{AddrParseError, IpAddr}; use std::str::FromStr; + static CONTAINER_PORT_LABEL: &str = "traefik.port"; pub struct DockerInfrastructure { @@ -777,75 +780,70 @@ impl Infrastructure for DockerInfrastructure { result } - async fn get_logs( - &self, - app_name: &AppName, - service_name: &str, - from: &Option>, - limit: usize, - ) -> Result, String)>>, failure::Error> { - match self.get_app_container(app_name, service_name).await? { - None => Ok(None), - Some(container) => { - let docker = Docker::new(); + async fn get_logs<'a>( + &'a self, + app_name: &'a AppName, + service_name: &'a str, + from: &'a Option>, + limit: &'a Option, + follow: bool, + ) -> BoxStream<'a, Result<(DateTime, String), failure::Error>> { + stream! { + match self + .get_app_container(&AppName::from_str(app_name).unwrap(), service_name) + .await + { + Ok(None) => {} + Ok(Some(container)) => { + let docker = Docker::new(); + + trace!( + "Acquiring logs of container {} since {:?}", + container.id, + from + ); - trace!( - "Acquiring logs of container {} since {:?}", - container.id, - from - ); - - let log_options = match from { - Some(from) => LogsOptions::builder() - .since(from) - .stdout(true) - .stderr(true) - .timestamps(true) - .build(), - None => LogsOptions::builder() - .stdout(true) - .stderr(true) - .timestamps(true) - .build(), - }; + let mut log_options = LogsOptions::builder(); + log_options.stdout(true).stderr(true).timestamps(true); + + if let Some(since) = from { + log_options.since(since); + } + + log_options.follow(follow); + + let logs = docker + .containers() + .get(&container.id) + .logs(&log_options.build()); + + let mut logs = match limit { + Some(log_limit) => Box::pin(logs.take(*log_limit)) + as BoxStream>, + None => Box::pin(logs) as BoxStream>, + }; - let logs = docker - .containers() - .get(&container.id) - .logs(&log_options) - .collect::>>() - .await; - - let logs = logs.into_iter() - .enumerate() - // Unfortunately, docker API does not support head (cf. https://github.com/moby/moby/issues/13096) - // Until then we have to skip these log messages which is super slow… - .filter(move |(index, _)| index < &limit) - .filter_map(|(_, chunk)| chunk.ok()) - .map(|chunk| { - let line = String::from_utf8_lossy(&chunk.to_vec()).to_string(); - - let mut iter = line.splitn(2, ' '); - let timestamp = iter.next() - .expect("This should never happen: docker should return timestamps, separated by space"); - - let datetime = DateTime::parse_from_rfc3339(timestamp).expect("Expecting a valid timestamp"); - - let log_line : String = iter - .collect::>() - .join(" "); - (datetime, log_line) - }) - .filter(move |(timestamp, _)| { - // Due to the fact that docker's REST API supports only unix time (cf. since), - // it is necessary to filter the timestamps as well. - from.map(|from| timestamp >= &from).unwrap_or(true) - }) - .collect(); - - Ok(Some(logs)) + while let Some(result) = logs.next().await { + match result { + Ok(chunk) => { + let line = String::from_utf8_lossy(&chunk.to_vec()).to_string(); + + let mut iter = line.splitn(2, ' '); + let timestamp = iter.next() + .expect("This should never happen: docker should return timestamps, separated by space"); + + let datetime = DateTime::parse_from_rfc3339(timestamp) + .expect("Expecting a valid timestamp"); + let log_line: String = iter.collect::>().join(" "); + yield Ok((datetime, log_line)) + } + Err(e) => yield Err(e.into()), + } + } + } + Err(e) => yield Err(e.into()), } - } + }.boxed() } async fn change_status( diff --git a/api/src/infrastructure/dummy_infrastructure.rs b/api/src/infrastructure/dummy_infrastructure.rs index 8b6abc55..4ee1cdc1 100644 --- a/api/src/infrastructure/dummy_infrastructure.rs +++ b/api/src/infrastructure/dummy_infrastructure.rs @@ -32,6 +32,7 @@ use crate::models::service::{Service, ServiceStatus}; use crate::models::{AppName, ServiceBuilder, ServiceConfig}; use async_trait::async_trait; use chrono::{DateTime, FixedOffset, Utc}; +use futures::stream::{self, BoxStream}; use multimap::MultiMap; use std::collections::HashSet; use std::str::FromStr; @@ -178,27 +179,32 @@ impl Infrastructure for DummyInfrastructure { } } - async fn get_logs( - &self, - app_name: &AppName, - service_name: &str, - _from: &Option>, - _limit: usize, - ) -> Result, String)>>, failure::Error> { - Ok(Some(vec![ - ( - DateTime::parse_from_rfc3339("2019-07-18T07:25:00.000000000Z").unwrap(), - format!("Log msg 1 of {} of app {}\n", service_name, app_name), - ), - ( - DateTime::parse_from_rfc3339("2019-07-18T07:30:00.000000000Z").unwrap(), - format!("Log msg 2 of {} of app {}\n", service_name, app_name), - ), - ( - DateTime::parse_from_rfc3339("2019-07-18T07:35:00.000000000Z").unwrap(), - format!("Log msg 3 of {} of app {}\n", service_name, app_name), - ), - ])) + async fn get_logs<'a>( + &'a self, + app_name: &'a AppName, + service_name: &'a str, + _from: &'a Option>, + _limit: &'a Option, + _follow: bool, + ) -> BoxStream<'a, Result<(DateTime, String), failure::Error>> { + Box::pin(stream::iter( + vec![ + ( + DateTime::parse_from_rfc3339("2019-07-18T07:25:00.000000000Z").unwrap(), + format!("Log msg 1 of {} of app {}\n", service_name, app_name), + ), + ( + DateTime::parse_from_rfc3339("2019-07-18T07:30:00.000000000Z").unwrap(), + format!("Log msg 2 of {} of app {}\n", service_name, app_name), + ), + ( + DateTime::parse_from_rfc3339("2019-07-18T07:35:00.000000000Z").unwrap(), + format!("Log msg 3 of {} of app {}\n", service_name, app_name), + ), + ] + .into_iter() + .map(|s| Ok(s)), + )) } async fn change_status( diff --git a/api/src/infrastructure/infrastructure.rs b/api/src/infrastructure/infrastructure.rs index 926b1a9f..4170e5ee 100644 --- a/api/src/infrastructure/infrastructure.rs +++ b/api/src/infrastructure/infrastructure.rs @@ -32,6 +32,7 @@ use crate::models::{AppName, ContainerType, ServiceConfig}; use async_trait::async_trait; use chrono::{DateTime, FixedOffset}; use failure::Error; +use futures::stream::BoxStream; use multimap::MultiMap; #[async_trait] @@ -68,14 +69,15 @@ pub trait Infrastructure: Send + Sync { app_name: &AppName, ) -> Result, Error>; - /// Returns the log lines with a the corresponding timestamps in it. - async fn get_logs( - &self, - app_name: &AppName, - service_name: &str, - from: &Option>, - limit: usize, - ) -> Result, String)>>, Error>; + /// Streams the log lines with a the corresponding timestamps in it. + async fn get_logs<'a>( + &'a self, + app_name: &'a AppName, + service_name: &'a str, + from: &'a Option>, + limit: &'a Option, + follow: bool, + ) -> BoxStream<'a, Result<(DateTime, String), failure::Error>>; /// Changes the status of a service, for example, the service might me stopped or started. async fn change_status( diff --git a/api/src/infrastructure/kubernetes/infrastructure.rs b/api/src/infrastructure/kubernetes/infrastructure.rs index 503bacea..78969c8b 100644 --- a/api/src/infrastructure/kubernetes/infrastructure.rs +++ b/api/src/infrastructure/kubernetes/infrastructure.rs @@ -41,11 +41,14 @@ use crate::models::service::{ContainerType, Service, ServiceError, ServiceStatus use crate::models::{ AppName, Environment, Image, ServiceBuilder, ServiceBuilderError, ServiceConfig, }; +use async_stream::stream; use async_trait::async_trait; use chrono::{DateTime, FixedOffset, Utc}; use failure::Error; use futures::stream::FuturesUnordered; +use futures::stream::{self, BoxStream}; use futures::StreamExt; +use futures::{AsyncBufReadExt, TryStreamExt}; use k8s_openapi::api::core::v1::PersistentVolumeClaim; use k8s_openapi::api::storage::v1::StorageClass; use k8s_openapi::api::{ @@ -567,53 +570,43 @@ impl Infrastructure for KubernetesInfrastructure { Ok(services) } - async fn get_logs( - &self, - app_name: &AppName, - service_name: &str, - from: &Option>, - limit: usize, - ) -> Result, String)>>, Error> { - let client = self.client().await?; - let namespace = app_name.to_rfc1123_namespace_id(); - + async fn get_logs<'a>( + &'a self, + app_name: &'a AppName, + service_name: &'a str, + from: &'a Option>, + limit: &'a Option, + follow: bool, + ) -> BoxStream<'a, Result<(DateTime, String), failure::Error>> { let Some((_deployment, Some(pod))) = - self.get_deployment_and_pod(app_name, service_name).await? + (match self.get_deployment_and_pod(app_name, service_name).await { + Ok(result) => result, + Err(_) => return stream::empty().boxed(), + }) else { - return Ok(None); - }; - - let p = LogParams { - timestamps: true, - since_seconds: from - .map(|from| { - from.timestamp() - - pod - .status - .as_ref() - .unwrap() - .start_time - .as_ref() - .unwrap() - .0 - .timestamp() - }) - .filter(|since_seconds| since_seconds > &0), - ..Default::default() + return stream::empty().boxed(); }; - let logs = Api::::namespaced(client, &namespace) - .logs(&pod.metadata.name.unwrap(), &p) - .await?; + stream! { + let p = LogParams { + timestamps: true, + since_time: from.map(|from| from.with_timezone(&Utc)), + follow, + ..Default::default() + }; + let client = self.client().await?; + let namespace = app_name.to_rfc1123_namespace_id(); - let logs = logs - .split('\n') - .enumerate() - // Unfortunately, API does not support head (also like docker, cf. https://github.com/moby/moby/issues/13096) - // Until then we have to skip these log messages which is super slow… - .filter(move |(index, _)| index < &limit) - .filter(|(_, line)| !line.is_empty()) - .map(|(_, line)| { + let logs = Api::::namespaced(client, &namespace) + .log_stream(&pod.metadata.name.unwrap(), &p) + .await?; + let mut logs = match limit { + Some(log_limit) => { + Box::pin(logs.lines().take(*log_limit)) as BoxStream> + } + None => Box::pin(logs.lines()) as BoxStream>, + }; + while let Some(line) = logs.try_next().await? { let mut iter = line.splitn(2, ' '); let timestamp = iter.next().expect( "This should never happen: kubernetes should return timestamps, separated by space", @@ -624,11 +617,10 @@ impl Infrastructure for KubernetesInfrastructure { let mut log_line: String = iter.collect::>().join(" "); log_line.push('\n'); - (datetime, log_line) - }) - .collect(); - Ok(Some(logs)) + yield Ok((datetime, log_line)) + } + }.boxed() } async fn change_status( diff --git a/frontend/package-lock.json b/frontend/package-lock.json index a6adf83d..aa3e7e96 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -19,7 +19,7 @@ "current-script-polyfill": "^1.0.0", "esprima": "^4.0.1", "jquery": "^3.6.0", - "moment": "^2.29.1", + "moment": "^2.30.1", "parse-link-header": "^2.0.0", "popper.js": "^1.16.1", "swagger-ui": "^4.18.2", @@ -8011,9 +8011,9 @@ "dev": true }, "node_modules/moment": { - "version": "2.29.4", - "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.4.tgz", - "integrity": "sha512-5LC9SOxjSc2HF6vO2CyuTDNivEdoz2IvyJJGj6X8DJ0eFyfszE0QiEd+iXmBvUP3WHxSjFH/vIsA0EN00cgr8w==", + "version": "2.30.1", + "resolved": "https://registry.npmjs.org/moment/-/moment-2.30.1.tgz", + "integrity": "sha512-uEmtNhbDOrWPFS+hdjFCBfy9f2YoyzRpwcl+DqpC6taX21FzsTLQVbMV/W7PzNSX6x/bhC1zA3c2UQ5NzH6how==", "engines": { "node": "*" } diff --git a/frontend/src/LogsDialog.vue b/frontend/src/LogsDialog.vue index 2e5bea76..ed9e2481 100644 --- a/frontend/src/LogsDialog.vue +++ b/frontend/src/LogsDialog.vue @@ -24,11 +24,23 @@ * =========================LICENSE_END================================== */