From a86cfac4e0dfd45ec4165c0a275451ed693edf40 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 14 May 2024 14:29:23 +0530 Subject: [PATCH 1/9] enhancement of stats include lifetime stats, current stats, deleted stats --- server/src/analytics.rs | 113 +++++++++-- server/src/catalog.rs | 16 +- server/src/catalog/manifest.rs | 12 +- server/src/handlers/http/cluster/mod.rs | 35 +++- server/src/handlers/http/cluster/utils.rs | 63 +++++- server/src/handlers/http/logstream.rs | 54 ++++-- .../src/handlers/http/modal/ingest_server.rs | 1 + .../src/handlers/http/modal/query_server.rs | 5 +- server/src/handlers/http/modal/server.rs | 6 +- server/src/metadata.rs | 16 +- server/src/metrics/mod.rs | 180 +++++++++++++++++- server/src/migration.rs | 62 +++++- server/src/stats.rs | 145 +++++++++++++- server/src/storage.rs | 6 +- server/src/storage/object_storage.rs | 16 +- server/src/storage/retention.rs | 73 +++---- 16 files changed, 687 insertions(+), 116 deletions(-) diff --git a/server/src/analytics.rs b/server/src/analytics.rs index 95cfd3459..c011b5961 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -24,6 +24,7 @@ use crate::option::{Mode, CONFIG}; use crate::storage; use crate::{metadata, stats}; +use crate::stats::Stats; use actix_web::{web, HttpRequest, Responder}; use chrono::{DateTime, Utc}; use clokwerk::{AsyncScheduler, Interval}; @@ -70,6 +71,12 @@ pub struct Report { total_events_count: u64, total_json_bytes: u64, total_parquet_bytes: u64, + current_events_count: u64, + current_json_bytes: u64, + current_parquet_bytes: u64, + deleted_events_count: u64, + deleted_json_bytes: u64, + deleted_parquet_bytes: u64, metrics: HashMap, } @@ -112,6 +119,12 @@ impl Report { total_events_count: ingestor_metrics.3, total_json_bytes: ingestor_metrics.4, total_parquet_bytes: ingestor_metrics.5, + current_events_count: ingestor_metrics.6, + current_json_bytes: ingestor_metrics.7, + current_parquet_bytes: ingestor_metrics.8, + deleted_events_count: ingestor_metrics.9, + deleted_json_bytes: ingestor_metrics.10, + deleted_parquet_bytes: ingestor_metrics.11, metrics: build_metrics().await, }) } @@ -132,26 +145,70 @@ fn total_streams() -> usize { metadata::STREAM_INFO.list_streams().len() } -fn total_event_stats() -> (u64, u64, u64) { +fn total_event_stats() -> (Stats, Stats, Stats) { let mut total_events: u64 = 0; let mut total_parquet_bytes: u64 = 0; let mut total_json_bytes: u64 = 0; + let mut current_events: u64 = 0; + let mut current_parquet_bytes: u64 = 0; + let mut current_json_bytes: u64 = 0; + + let mut deleted_events: u64 = 0; + let mut deleted_parquet_bytes: u64 = 0; + let mut deleted_json_bytes: u64 = 0; + for stream in metadata::STREAM_INFO.list_streams() { let Some(stats) = stats::get_current_stats(&stream, "json") else { continue; }; - total_events += stats.events; - total_parquet_bytes += stats.storage; - total_json_bytes += stats.ingestion; + total_events += stats.lifetime_stats.events; + total_parquet_bytes += stats.lifetime_stats.storage; + total_json_bytes += stats.lifetime_stats.ingestion; + + current_events += stats.current_stats.events; + current_parquet_bytes += stats.current_stats.storage; + current_json_bytes += stats.current_stats.ingestion; + + deleted_events += stats.deleted_stats.events; + deleted_parquet_bytes += stats.deleted_stats.storage; + deleted_json_bytes += stats.deleted_stats.ingestion; } - (total_events, total_json_bytes, total_parquet_bytes) + + ( + Stats { + events: total_events, + ingestion: total_json_bytes, + storage: total_parquet_bytes, + }, + Stats { + events: current_events, + ingestion: current_json_bytes, + storage: current_parquet_bytes, + }, + Stats { + events: deleted_events, + ingestion: deleted_json_bytes, + storage: deleted_parquet_bytes, + }, + ) } -async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> { +async fn fetch_ingestors_metrics( +) -> anyhow::Result<(u64, u64, usize, u64, u64, u64, u64, u64, u64, u64, u64, u64)> { let event_stats = total_event_stats(); - let mut node_metrics = - NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2); + let mut node_metrics = NodeMetrics::new( + total_streams(), + event_stats.0.events, + event_stats.0.ingestion, + event_stats.0.storage, + event_stats.1.events, + event_stats.1.ingestion, + event_stats.1.storage, + event_stats.2.events, + event_stats.2.ingestion, + event_stats.2.storage, + ); let mut vec = vec![]; let mut active_ingestors = 0u64; @@ -198,6 +255,12 @@ async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, node_metrics.total_events_count, node_metrics.total_json_bytes, node_metrics.total_parquet_bytes, + node_metrics.current_events_count, + node_metrics.current_json_bytes, + node_metrics.current_parquet_bytes, + node_metrics.deleted_events_count, + node_metrics.deleted_json_bytes, + node_metrics.deleted_parquet_bytes, )) } @@ -255,6 +318,12 @@ struct NodeMetrics { total_events_count: u64, total_json_bytes: u64, total_parquet_bytes: u64, + current_events_count: u64, + current_json_bytes: u64, + current_parquet_bytes: u64, + deleted_events_count: u64, + deleted_json_bytes: u64, + deleted_parquet_bytes: u64, } impl NodeMetrics { @@ -262,23 +331,43 @@ impl NodeMetrics { let event_stats = total_event_stats(); Self { stream_count: total_streams(), - total_events_count: event_stats.0, - total_json_bytes: event_stats.1, - total_parquet_bytes: event_stats.2, + total_events_count: event_stats.0.events, + total_json_bytes: event_stats.0.ingestion, + total_parquet_bytes: event_stats.0.storage, + + current_events_count: event_stats.1.events, + current_json_bytes: event_stats.1.ingestion, + current_parquet_bytes: event_stats.1.storage, + + deleted_events_count: event_stats.2.events, + deleted_json_bytes: event_stats.2.ingestion, + deleted_parquet_bytes: event_stats.2.storage, } } - + #[allow(clippy::too_many_arguments)] fn new( stream_count: usize, total_events_count: u64, total_json_bytes: u64, total_parquet_bytes: u64, + current_events_count: u64, + current_json_bytes: u64, + current_parquet_bytes: u64, + deleted_events_count: u64, + deleted_json_bytes: u64, + deleted_parquet_bytes: u64, ) -> Self { Self { stream_count, total_events_count, total_json_bytes, total_parquet_bytes, + current_events_count, + current_json_bytes, + current_parquet_bytes, + deleted_events_count, + deleted_json_bytes, + deleted_parquet_bytes, } } diff --git a/server/src/catalog.rs b/server/src/catalog.rs index eb1237727..df336bd8e 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -20,7 +20,9 @@ use std::{io::ErrorKind, sync::Arc}; use self::{column::Column, snapshot::ManifestItem}; use crate::handlers::http::base_path_without_preceding_slash; +use crate::metrics::EVENTS_INGESTED_SIZE; use crate::option::CONFIG; +use crate::stats::{event_labels, update_deleted_stats}; use crate::{ catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, @@ -137,7 +139,7 @@ pub async fn update_snapshot( } if ch { if let Some(mut manifest) = storage.get_manifest(&path).await? { - manifest.apply_change(change); + manifest.apply_change(change, stream_name); storage.put_manifest(&path, manifest).await?; } else { //instead of returning an error, create a new manifest (otherwise local to storage sync fails) @@ -198,9 +200,15 @@ async fn create_manifest( .map_err(ObjectStorageError::IoError)?, ) .and_utc(); + let event_labels = event_labels(stream_name, "json"); + let ingestion_size = EVENTS_INGESTED_SIZE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as i64; let manifest = Manifest { files: vec![change], + ingestion_size, ..Manifest::default() }; @@ -233,6 +241,8 @@ pub async fn remove_manifest_from_snapshot( if !dates.is_empty() { // get current snapshot let mut meta = storage.get_object_store_format(stream_name).await?; + let meta_for_stats = meta.clone(); + update_deleted_stats(storage.clone(), stream_name, meta_for_stats, dates.clone()).await?; let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); @@ -308,8 +318,6 @@ pub async fn get_first_event( ); // Convert dates vector to Bytes object let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap()); - // delete the stream - let ingestor_first_event_at = handlers::http::cluster::send_retention_cleanup_request( &url, @@ -333,7 +341,7 @@ pub async fn get_first_event( /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. -fn partition_path( +pub fn partition_path( stream: &str, lower_bound: DateTime, upper_bound: DateTime, diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index ad5b32422..511e647c5 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -21,6 +21,8 @@ use std::collections::HashMap; use itertools::Itertools; use parquet::{file::reader::FileReader, format::SortingColumn}; +use crate::{metrics::EVENTS_INGESTED_SIZE_TODAY, stats::event_labels}; + use super::column::Column; #[derive( @@ -62,6 +64,7 @@ pub struct File { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Manifest { pub version: String, + pub ingestion_size: i64, pub files: Vec, } @@ -69,13 +72,14 @@ impl Default for Manifest { fn default() -> Self { Self { version: CURRENT_MANIFEST_VERSION.to_string(), + ingestion_size: 0, files: Vec::default(), } } } impl Manifest { - pub fn apply_change(&mut self, change: File) { + pub fn apply_change(&mut self, change: File, stream_name: &str) { if let Some(pos) = self .files .iter() @@ -85,6 +89,12 @@ impl Manifest { } else { self.files.push(change) } + let event_labels = event_labels(stream_name, "json"); + let ingestion_size = EVENTS_INGESTED_SIZE_TODAY + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as i64; + self.ingestion_size = ingestion_size; } } diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 88c563251..5a7ae30aa 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -167,19 +167,44 @@ pub async fn fetch_stats_from_ingestors( let mut ingestion_size = 0u64; let mut storage_size = 0u64; let mut count = 0u64; + let mut lifetime_ingestion_size = 0u64; + let mut lifetime_storage_size = 0u64; + let mut lifetime_count = 0u64; + let mut deleted_ingestion_size = 0u64; + let mut deleted_storage_size = 0u64; + let mut deleted_count = 0u64; for ob in obs { if let Ok(stat) = serde_json::from_slice::(&ob) { - count += stat.stats.events; - ingestion_size += stat.stats.ingestion; - storage_size += stat.stats.storage; + count += stat.stats.current_stats.events; + ingestion_size += stat.stats.current_stats.ingestion; + storage_size += stat.stats.current_stats.storage; + lifetime_count += stat.stats.lifetime_stats.events; + lifetime_ingestion_size += stat.stats.lifetime_stats.ingestion; + lifetime_storage_size += stat.stats.lifetime_stats.storage; + deleted_count += stat.stats.deleted_stats.events; + deleted_ingestion_size += stat.stats.deleted_stats.ingestion; + deleted_storage_size += stat.stats.deleted_stats.storage; } } let qs = QueriedStats::new( "", Utc::now(), - IngestionStats::new(count, format!("{} Bytes", ingestion_size), "json"), - StorageStats::new(format!("{} Bytes", storage_size), "parquet"), + IngestionStats::new( + count, + format!("{} Bytes", ingestion_size), + lifetime_count, + format!("{} Bytes", lifetime_ingestion_size), + deleted_count, + format!("{} Bytes", deleted_ingestion_size), + "json", + ), + StorageStats::new( + format!("{} Bytes", storage_size), + format!("{} Bytes", lifetime_storage_size), + format!("{} Bytes", deleted_storage_size), + "parquet", + ), ); Ok(vec![qs]) diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index cea27bb04..6f41755f4 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -86,14 +86,30 @@ pub struct IngestionStats { pub count: u64, pub size: String, pub format: String, + pub lifetime_count: u64, + pub lifetime_size: String, + pub deleted_count: u64, + pub deleted_size: String, } impl IngestionStats { - pub fn new(count: u64, size: String, format: &str) -> Self { + pub fn new( + count: u64, + size: String, + lifetime_count: u64, + lifetime_size: String, + deleted_count: u64, + deleted_size: String, + format: &str, + ) -> Self { Self { count, size, format: format.to_string(), + lifetime_count, + lifetime_size, + deleted_count, + deleted_size, } } } @@ -102,13 +118,17 @@ impl IngestionStats { pub struct StorageStats { pub size: String, pub format: String, + pub lifetime_size: String, + pub deleted_size: String, } impl StorageStats { - pub fn new(size: String, format: &str) -> Self { + pub fn new(size: String, lifetime_size: String, deleted_size: String, format: &str) -> Self { Self { size, format: format.to_string(), + lifetime_size, + deleted_size, } } } @@ -125,6 +145,7 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .map(|x| &x.ingestion) .fold(IngestionStats::default(), |acc, x| IngestionStats { count: acc.count + x.count, + size: format!( "{} Bytes", acc.size.split(' ').collect_vec()[0] @@ -135,6 +156,26 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .unwrap_or_default() ), format: x.format.clone(), + lifetime_count: acc.lifetime_count + x.lifetime_count, + lifetime_size: format!( + "{} Bytes", + acc.lifetime_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.lifetime_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + deleted_count: acc.deleted_count + x.deleted_count, + deleted_size: format!( + "{} Bytes", + acc.deleted_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.deleted_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), }); let cumulative_storage = @@ -152,6 +193,24 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .unwrap_or_default() ), format: x.format.clone(), + lifetime_size: format!( + "{} Bytes", + acc.lifetime_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.lifetime_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + deleted_size: format!( + "{} Bytes", + acc.deleted_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.deleted_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), }); QueriedStats::new( diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 5910982f6..da4f00ad3 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -509,24 +509,40 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = match &stream_meta.first_event_at { Some(_) => { let ingestion_stats = IngestionStats::new( - stats.events, - format!("{} {}", stats.ingestion, "Bytes"), + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), "json", ); - let storage_stats = - StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet"); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) } None => { let ingestion_stats = IngestionStats::new( - stats.events, - format!("{} {}", stats.ingestion, "Bytes"), + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), "json", ); - let storage_stats = - StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet"); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) } @@ -544,6 +560,7 @@ pub async fn get_stats(req: HttpRequest) -> Result } // Check if the first_event_at is empty +#[allow(dead_code)] pub fn first_event_at_empty(stream_name: &str) -> bool { let hash_map = STREAM_INFO.read().unwrap(); if let Some(stream_info) = hash_map.get(stream_name) { @@ -628,19 +645,16 @@ pub async fn get_stream_info(req: HttpRequest) -> Result = Vec::new(); - if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name, dates).await + let store = CONFIG.storage().get_object_store(); + let dates: Vec = Vec::new(); + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name, dates).await { + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) { - if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) - { - log::error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); - } + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); } } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 2ce245949..6630da786 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -334,6 +334,7 @@ impl IngestServer { } metrics::fetch_stats_from_storage().await; + metrics::reset_daily_metric_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 607cc5653..3558ec3df 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -173,10 +173,11 @@ impl QueryServer { log::warn!("could not populate local metadata. {:?}", e); } - // track all parquet files already in the data directory - storage::retention::load_retention_from_global(); // load data from stats back to prometheus metrics metrics::fetch_stats_from_storage().await; + metrics::reset_daily_metric_from_global(); + // track all parquet files already in the data directory + storage::retention::load_retention_from_global(); // all internal data structures populated now. // start the analytics scheduler if enabled diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 165581234..ff30baa98 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -116,7 +116,8 @@ impl ParseableServer for Server { banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); - self.initialize().await + self.initialize().await?; + Ok(()) } fn validate(&self) -> anyhow::Result<()> { @@ -404,8 +405,9 @@ impl Server { log::warn!("could not populate local metadata. {:?}", err); } - storage::retention::load_retention_from_global(); metrics::fetch_stats_from_storage().await; + metrics::reset_daily_metric_from_global(); + storage::retention::load_retention_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = diff --git a/server/src/metadata.rs b/server/src/metadata.rs index b3963bbc4..3065526ad 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -26,7 +26,10 @@ use std::sync::{Arc, RwLock}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; use crate::alerts::Alerts; -use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE}; +use crate::metrics::{ + EVENTS_INGESTED, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, LIFETIME_EVENTS_INGESTED, + LIFETIME_EVENTS_INGESTED_SIZE, +}; use crate::storage::{LogStream, ObjectStorage, StorageDir}; use crate::utils::arrow::MergedRecordReader; use derive_more::{Deref, DerefMut}; @@ -286,10 +289,19 @@ impl StreamInfo { ) -> Result<(), MetadataError> { EVENTS_INGESTED .with_label_values(&[stream_name, origin]) - .inc_by(num_rows); + .add(num_rows as i64); EVENTS_INGESTED_SIZE .with_label_values(&[stream_name, origin]) .add(size as i64); + EVENTS_INGESTED_SIZE_TODAY + .with_label_values(&[stream_name, origin]) + .add(size as i64); + LIFETIME_EVENTS_INGESTED + .with_label_values(&[stream_name, origin]) + .add(num_rows as i64); + LIFETIME_EVENTS_INGESTED_SIZE + .with_label_values(&[stream_name, origin]) + .add(size as i64); Ok(()) } } diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 3e337123d..8bc1f5827 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -18,17 +18,26 @@ pub mod prom_utils; pub mod storage; +use std::sync::Mutex; +use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, metrics, option::CONFIG}; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; +use clokwerk::AsyncScheduler; +use clokwerk::Job; +use clokwerk::TimeUnits; use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; - -use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, option::CONFIG}; +use std::thread; +use std::time::Duration; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); +type SchedulerHandle = thread::JoinHandle<()>; -pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { - IntCounterVec::new( +static METRIC_SCHEDULER_HANDLER: Lazy>> = + Lazy::new(|| Mutex::new(None)); + +pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { + IntGaugeVec::new( Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), &["stream", "format"], ) @@ -44,6 +53,18 @@ pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static EVENTS_INGESTED_SIZE_TODAY: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "events_ingested_size_today", + "Events ingested size today in bytes", + ) + .namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + pub static STORAGE_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), @@ -52,6 +73,67 @@ pub static STORAGE_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static EVENTS_DELETED: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("events_deleted", "Events deleted").namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static EVENTS_DELETED_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("events_deleted_size", "Events deleted size bytes").namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static DELETED_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "deleted_events_storage_size", + "Deleted events storage size bytes", + ) + .namespace(METRICS_NAMESPACE), + &["type", "stream", "format"], + ) + .expect("metric can be created") +}); + +pub static LIFETIME_EVENTS_INGESTED: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("lifetime_events_ingested", "Lifetime events ingested") + .namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static LIFETIME_EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "lifetime_events_ingested_size", + "Lifetime events ingested size bytes", + ) + .namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static LIFETIME_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "lifetime_events_storage_size", + "Lifetime events storage size bytes", + ) + .namespace(METRICS_NAMESPACE), + &["type", "stream", "format"], + ) + .expect("metric can be created") +}); + pub static STAGING_FILES: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), @@ -91,9 +173,30 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(EVENTS_INGESTED_SIZE.clone())) .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_INGESTED_SIZE_TODAY.clone())) + .expect("metric can be registered"); registry .register(Box::new(STORAGE_SIZE.clone())) .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_DELETED.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_DELETED_SIZE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(DELETED_EVENTS_STORAGE_SIZE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(LIFETIME_EVENTS_INGESTED.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(LIFETIME_EVENTS_INGESTED_SIZE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(LIFETIME_EVENTS_STORAGE_SIZE.clone())) + .expect("metric can be registered"); registry .register(Box::new(STAGING_FILES.clone())) .expect("metric can be registered"); @@ -145,12 +248,75 @@ pub async fn fetch_stats_from_storage() { EVENTS_INGESTED .with_label_values(&[&stream_name, "json"]) - .inc_by(stats.events); + .set(stats.current_stats.events as i64); EVENTS_INGESTED_SIZE .with_label_values(&[&stream_name, "json"]) - .set(stats.ingestion as i64); + .set(stats.current_stats.ingestion as i64); + EVENTS_INGESTED_SIZE_TODAY + .with_label_values(&[&stream_name, "json"]) + .set(stats.current_stats.ingestion as i64); STORAGE_SIZE .with_label_values(&["data", &stream_name, "parquet"]) - .set(stats.storage as i64) + .set(stats.current_stats.storage as i64); + + EVENTS_DELETED + .with_label_values(&[&stream_name, "json"]) + .set(stats.deleted_stats.events as i64); + EVENTS_DELETED_SIZE + .with_label_values(&[&stream_name, "json"]) + .set(stats.deleted_stats.ingestion as i64); + DELETED_EVENTS_STORAGE_SIZE + .with_label_values(&["data", &stream_name, "parquet"]) + .set(stats.deleted_stats.storage as i64); + + LIFETIME_EVENTS_INGESTED + .with_label_values(&[&stream_name, "json"]) + .set(stats.lifetime_stats.events as i64); + LIFETIME_EVENTS_INGESTED_SIZE + .with_label_values(&[&stream_name, "json"]) + .set(stats.lifetime_stats.ingestion as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", &stream_name, "parquet"]) + .set(stats.lifetime_stats.storage as i64); } } + +fn async_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .thread_name("reset-metrics-task-thread") + .enable_all() + .build() + .unwrap() +} + +pub fn reset_daily_metric_from_global() { + init_reset_daily_metric_scheduler(); +} + +pub fn init_reset_daily_metric_scheduler() { + log::info!("Setting up schedular"); + let mut scheduler = AsyncScheduler::new(); + let func = move || async { + //get retention every day at 12 am + for stream in STREAM_INFO.list_streams() { + metrics::EVENTS_INGESTED_SIZE_TODAY + .with_label_values(&[&stream, "json"]) + .set(0); + } + }; + + scheduler.every(1.day()).at("00:00").run(func); + + let scheduler_handler = thread::spawn(|| { + let rt = async_runtime(); + rt.block_on(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + scheduler.run_pending().await; + } + }); + }); + + *METRIC_SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); + log::info!("Scheduler is initialized") +} diff --git a/server/src/migration.rs b/server/src/migration.rs index d9c15fc4c..8b4fb5e3f 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -23,19 +23,19 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; -use bytes::Bytes; -use itertools::Itertools; -use relative_path::RelativePathBuf; -use serde::Serialize; - use crate::{ option::Config, + stats::{FullStats, Stats}, storage::{ object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, }; +use bytes::Bytes; +use itertools::Itertools; +use relative_path::RelativePathBuf; +use serde::Serialize; /// Migrate the metdata from v1 or v2 to v3 /// This is a one time migration @@ -223,9 +223,21 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { } run_meta_file_migration(&object_store, old_meta_file_path).await?; - run_stream_files_migration(object_store).await?; - + run_stream_files_migration(&object_store).await?; + run_stream_stats_migration(&object_store).await?; Ok(()) + + // let object_store = config.storage().get_object_store(); + + // let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); + // println!(" old meta file path {:?}", old_meta_file_path); + // // if this errors that means migrations is already done + // if (object_store.get_object(&old_meta_file_path).await).is_ok() { + // run_meta_file_migration(&object_store, old_meta_file_path).await?; + // run_stream_files_migration(&object_store).await?; + // } + // run_stream_stats_migration(&object_store).await?; + // Ok(()) } async fn run_meta_file_migration( @@ -263,7 +275,7 @@ async fn run_meta_file_migration( } async fn run_stream_files_migration( - object_store: Arc, + object_store: &Arc, ) -> anyhow::Result<()> { let streams = object_store .list_old_streams() @@ -297,3 +309,37 @@ async fn run_stream_files_migration( Ok(()) } + +async fn run_stream_stats_migration( + object_store: &Arc, +) -> anyhow::Result<()> { + let streams = object_store + .list_streams() + .await? + .into_iter() + .map(|stream| stream.name) + .collect_vec(); + for stream in streams { + let stream_metadata = object_store.get_object(&stream_json_path(&stream)).await?; + let mut stream_metadata: serde_json::Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); + let stats = &stream_metadata["stats"]; + if serde_json::from_value::(stats.clone()).is_err() { + let stats: Stats = serde_json::from_value(stats.clone()).unwrap(); + let full_stats = FullStats { + lifetime_stats: stats, + current_stats: stats, + deleted_stats: Stats::default(), + }; + let full_stats_bytes = to_bytes(&full_stats); + let full_stats_slice: &[u8] = &full_stats_bytes; + stream_metadata["stats"] = + serde_json::from_slice(full_stats_slice).expect("parseable config is valid json"); + object_store + .put_object(&stream_json_path(&stream), to_bytes(&stream_metadata)) + .await?; + } + } + + Ok(()) +} diff --git a/server/src/stats.rs b/server/src/stats.rs index 5b92bfd85..906de9c6f 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -15,8 +15,15 @@ * along with this program. If not, see . * */ +use crate::metrics::{ + DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, + EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, LIFETIME_EVENTS_INGESTED, + LIFETIME_EVENTS_INGESTED_SIZE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, +}; -use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE, STORAGE_SIZE}; +use crate::catalog::partition_path; +use crate::storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}; +use std::sync::Arc; /// Helper struct type created by copying stats values from metadata #[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] @@ -26,7 +33,14 @@ pub struct Stats { pub storage: u64, } -pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { +#[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] +pub struct FullStats { + pub lifetime_stats: Stats, + pub current_stats: Stats, + pub deleted_stats: Stats, +} + +pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { let event_labels = event_labels(stream_name, format); let storage_size_labels = storage_size_labels(stream_name); @@ -42,17 +56,124 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option, + stream_name: &str, + meta: ObjectStoreFormat, + dates: Vec, +) -> Result<(), ObjectStorageError> { + let mut num_row: u64 = 0; + let mut storage_size: u64 = 0; + let mut ingestion_size: i64 = 0; + + let mut manifests = meta.snapshot.manifest_list; + manifests.retain(|item| dates.iter().any(|date| item.manifest_path.contains(date))); + + if !manifests.is_empty() { + for manifest in manifests { + let path = partition_path( + stream_name, + manifest.time_lower_bound, + manifest.time_upper_bound, + ); + let Some(manifest) = storage.get_manifest(&path).await? else { + return Err(ObjectStorageError::UnhandledError( + "Manifest found in snapshot but not in object-storage" + .to_string() + .into(), + )); + }; + manifest.files.iter().for_each(|file| { + num_row += file.num_rows; + storage_size += file.file_size; + }); + ingestion_size += manifest.ingestion_size; + } + } + EVENTS_DELETED + .with_label_values(&[stream_name, "json"]) + .add(num_row as i64); + EVENTS_DELETED_SIZE + .with_label_values(&[stream_name, "json"]) + .add(ingestion_size); + DELETED_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(storage_size as i64); + EVENTS_INGESTED + .with_label_values(&[stream_name, "json"]) + .sub(num_row as i64); + EVENTS_INGESTED_SIZE + .with_label_values(&[stream_name, "json"]) + .sub(ingestion_size); + EVENTS_INGESTED_SIZE_TODAY + .with_label_values(&[stream_name, "json"]) + .sub(ingestion_size); + STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .sub(storage_size as i64); + let stats = get_current_stats(stream_name, "json"); + if let Some(stats) = stats { + if let Err(e) = storage.put_stats(stream_name, &stats).await { + log::warn!("Error updating stats to objectstore due to error [{}]", e); + } + } + + Ok(()) +} + pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Result<()> { let event_labels = event_labels(stream_name, format); let storage_size_labels = storage_size_labels(stream_name); @@ -60,14 +181,20 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu EVENTS_INGESTED.remove_label_values(&event_labels)?; EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + EVENTS_DELETED.remove_label_values(&event_labels)?; + EVENTS_DELETED_SIZE.remove_label_values(&event_labels)?; + DELETED_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + LIFETIME_EVENTS_INGESTED.remove_label_values(&event_labels)?; + LIFETIME_EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; + LIFETIME_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; Ok(()) } -fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] { +pub fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] { [stream_name, format] } -fn storage_size_labels(stream_name: &str) -> [&str; 3] { +pub fn storage_size_labels(stream_name: &str) -> [&str; 3] { ["data", stream_name, "parquet"] } diff --git a/server/src/storage.rs b/server/src/storage.rs index d6fb2fb7b..b04c84695 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,7 +16,7 @@ * */ -use crate::{catalog::snapshot::Snapshot, stats::Stats}; +use crate::{catalog::snapshot::Snapshot, stats::FullStats}; use chrono::Local; @@ -83,7 +83,7 @@ pub struct ObjectStoreFormat { pub first_event_at: Option, pub owner: Owner, pub permissions: Vec, - pub stats: Stats, + pub stats: FullStats, #[serde(default)] pub snapshot: Snapshot, #[serde(default)] @@ -157,7 +157,7 @@ impl Default for ObjectStoreFormat { first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], - stats: Stats::default(), + stats: FullStats::default(), snapshot: Snapshot::default(), cache_enabled: false, retention: None, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 04a63704a..228181ea9 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -26,6 +26,7 @@ use super::{ }; use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::metrics::LIFETIME_EVENTS_STORAGE_SIZE; use crate::option::Mode; use crate::{ alerts::Alerts, @@ -34,7 +35,7 @@ use crate::{ metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, option::CONFIG, - stats::{self, Stats}, + stats::{self, FullStats, Stats}, }; use actix_web_prometheus::PrometheusMetrics; @@ -174,7 +175,11 @@ pub trait ObjectStorage: Sync + 'static { .await } - async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { + async fn put_stats( + &self, + stream_name: &str, + stats: &FullStats, + ) -> Result<(), ObjectStorageError> { let path = stream_json_path(stream_name); let stream_metadata = self.get_object(&path).await?; let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); @@ -282,7 +287,7 @@ pub trait ObjectStorage: Sync + 'static { .expect("parseable config is valid json"); if CONFIG.parseable.mode == Mode::Ingest { - config.stats = Stats::default(); + config.stats = FullStats::default(); config.snapshot.manifest_list = vec![]; } @@ -320,7 +325,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(stats) } - async fn get_stats(&self, stream_name: &str) -> Result { + async fn get_stats(&self, stream_name: &str) -> Result { let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; let stream_metadata: Value = serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); @@ -514,6 +519,9 @@ pub trait ObjectStorage: Sync + 'static { STORAGE_SIZE .with_label_values(&["data", stream, "parquet"]) .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); let stats = stats::get_current_stats(stream, "json"); if let Some(stats) = stats { if let Err(e) = self.put_stats(stream, &stats).await { diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index f67a15eb7..eefe16c84 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -52,6 +52,7 @@ pub fn init_scheduler() { log::info!("Setting up schedular"); let mut scheduler = AsyncScheduler::new(); let func = move || async { + //get retention every day at 12 am for stream in STREAM_INFO.list_streams() { let res = CONFIG .storage() @@ -188,59 +189,61 @@ impl From for Vec { } mod action { + use crate::catalog::remove_manifest_from_snapshot; + use crate::{metadata, option::CONFIG}; use chrono::{Days, NaiveDate, Utc}; use futures::{stream::FuturesUnordered, StreamExt}; use itertools::Itertools; use relative_path::RelativePathBuf; - use crate::{catalog::remove_manifest_from_snapshot, metadata, option::CONFIG}; - pub(super) async fn delete(stream_name: String, days: u32) { log::info!("running retention task - delete for stream={stream_name}"); + let store = CONFIG.storage().get_object_store(); + let retain_until = get_retain_until(Utc::now().date_naive(), days as u64); - let Ok(mut dates) = CONFIG - .storage() - .get_object_store() - .list_dates(&stream_name) - .await - else { + + let Ok(mut dates) = store.list_dates(&stream_name).await else { return; }; dates.retain(|date| date.starts_with("date")); let dates_to_delete = dates .into_iter() - .filter(|date| string_to_date(date) < retain_until) + .filter(|date| string_to_date(date) >= retain_until) .collect_vec(); let dates = dates_to_delete.clone(); - let delete_tasks = FuturesUnordered::new(); - for date in dates_to_delete { - let path = RelativePathBuf::from_iter([&stream_name, &date]); - delete_tasks.push(async move { - CONFIG - .storage() - .get_object_store() - .delete_prefix(&path) - .await - }); - } + if !dates.is_empty() { + let delete_tasks = FuturesUnordered::new(); + let res_remove_manifest = + remove_manifest_from_snapshot(store.clone(), &stream_name, dates.clone()).await; + + for date in dates_to_delete { + let path = RelativePathBuf::from_iter([&stream_name, &date]); + delete_tasks.push(async move { + CONFIG + .storage() + .get_object_store() + .delete_prefix(&path) + .await + }); + } - let res: Vec<_> = delete_tasks.collect().await; + let res: Vec<_> = delete_tasks.collect().await; - for res in res { - if let Err(err) = res { - log::error!("Failed to run delete task {err:?}") + for res in res { + if let Err(err) = res { + log::error!("Failed to run delete task {err:?}"); + return; + } } - } - - let store = CONFIG.storage().get_object_store(); - let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await; - if let Ok(first_event_at) = res { - if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) - { - log::error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); + if let Ok(first_event_at) = res_remove_manifest { + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } } } } From b246a5635abf775827e5019ab02ccf1405062717 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 14 May 2024 17:47:13 +0530 Subject: [PATCH 2/9] fix for migration of manifest.json from old deployments --- server/src/migration.rs | 43 ++++++++++++++++++--------------- server/src/storage/retention.rs | 2 +- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/server/src/migration.rs b/server/src/migration.rs index 8b4fb5e3f..bc4ef90ed 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -24,10 +24,11 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; use crate::{ + catalog::{manifest::Manifest, partition_path}, option::Config, stats::{FullStats, Stats}, storage::{ - object_storage::{parseable_json_path, stream_json_path}, + object_storage::{manifest_path, parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, @@ -213,31 +214,18 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { let object_store = config.storage().get_object_store(); let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); - // if this errors that means migrations is already done if let Err(err) = object_store.get_object(&old_meta_file_path).await { if matches!(err, ObjectStorageError::NoSuchKey(_)) { - return Ok(()); + log::info!("Migration already done"); + } else { + run_meta_file_migration(&object_store, old_meta_file_path).await?; + run_stream_files_migration(&object_store).await?; } - return Err(err.into()); } - run_meta_file_migration(&object_store, old_meta_file_path).await?; - run_stream_files_migration(&object_store).await?; run_stream_stats_migration(&object_store).await?; Ok(()) - - // let object_store = config.storage().get_object_store(); - - // let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); - // println!(" old meta file path {:?}", old_meta_file_path); - // // if this errors that means migrations is already done - // if (object_store.get_object(&old_meta_file_path).await).is_ok() { - // run_meta_file_migration(&object_store, old_meta_file_path).await?; - // run_stream_files_migration(&object_store).await?; - // } - // run_stream_stats_migration(&object_store).await?; - // Ok(()) } async fn run_meta_file_migration( @@ -322,7 +310,7 @@ async fn run_stream_stats_migration( for stream in streams { let stream_metadata = object_store.get_object(&stream_json_path(&stream)).await?; let mut stream_metadata: serde_json::Value = - serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); + serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); let stats = &stream_metadata["stats"]; if serde_json::from_value::(stats.clone()).is_err() { let stats: Stats = serde_json::from_value(stats.clone()).unwrap(); @@ -339,6 +327,23 @@ async fn run_stream_stats_migration( .put_object(&stream_json_path(&stream), to_bytes(&stream_metadata)) .await?; } + let mut meta = object_store.get_object_store_format(&stream).await?; + let manifests = &mut meta.snapshot.manifest_list; + for manifest in manifests { + let path = partition_path( + &stream, + manifest.time_lower_bound, + manifest.time_upper_bound, + ); + let path = manifest_path(path.as_str()); + let manifest_bytes = object_store.get_object(&path).await?; + let mut manifest: serde_json::Value = + serde_json::from_slice(&manifest_bytes).expect("manifest.json is valid json"); + if serde_json::from_value::(manifest.clone()).is_err() { + manifest["ingestion_size"] = 0.into(); + } + object_store.put_object(&path, to_bytes(&manifest)).await?; + } } Ok(()) diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index eefe16c84..7e697b3ae 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -208,7 +208,7 @@ mod action { dates.retain(|date| date.starts_with("date")); let dates_to_delete = dates .into_iter() - .filter(|date| string_to_date(date) >= retain_until) + .filter(|date| string_to_date(date) < retain_until) .collect_vec(); let dates = dates_to_delete.clone(); if !dates.is_empty() { From c6bed63ad0a395bc617b55483046efb0b194810b Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 17 May 2024 17:18:27 +0530 Subject: [PATCH 3/9] add daily stats to snapshot manifest list add lifetime, deleted, daily stats to metrics reset daily stats everyday at 12 am migration of stream.json to v4 migration of snapshot to v2 --- server/src/catalog.rs | 61 +++++-- server/src/catalog/manifest.rs | 12 +- server/src/catalog/snapshot.rs | 5 +- server/src/handlers/http/cluster/mod.rs | 52 ++++-- server/src/metadata.rs | 7 +- server/src/metrics/mod.rs | 37 ++++- server/src/migration.rs | 80 ++------- server/src/migration/schema_migration.rs | 4 +- .../migration/stream_metadata_migration.rs | 155 ++++++++++++++++-- server/src/query/stream_schema_provider.rs | 9 + server/src/stats.rs | 93 +++++------ server/src/storage.rs | 4 +- server/src/storage/object_storage.rs | 42 ++--- 13 files changed, 365 insertions(+), 196 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index df336bd8e..c88d4c9a5 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -20,9 +20,9 @@ use std::{io::ErrorKind, sync::Arc}; use self::{column::Column, snapshot::ManifestItem}; use crate::handlers::http::base_path_without_preceding_slash; -use crate::metrics::EVENTS_INGESTED_SIZE; +use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY}; use crate::option::CONFIG; -use crate::stats::{event_labels, update_deleted_stats}; +use crate::stats::{event_labels, storage_size_labels, update_deleted_stats}; use crate::{ catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, @@ -103,13 +103,28 @@ pub async fn update_snapshot( change: manifest::File, ) -> Result<(), ObjectStorageError> { // get current snapshot + let event_labels = event_labels(stream_name, "json"); + let storage_size_labels = storage_size_labels(stream_name); + let events_ingested = EVENTS_INGESTED_TODAY + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_TODAY + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = STORAGE_SIZE_TODAY + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + let mut meta = storage.get_object_store_format(stream_name).await?; let meta_clone = meta.clone(); let manifests = &mut meta.snapshot.manifest_list; - let time_partition: Option = meta_clone.time_partition; + let time_partition = &meta_clone.time_partition; let lower_bound = match time_partition { Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(&change, time_partition); + let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string()); lower_bound } None => { @@ -131,15 +146,21 @@ pub async fn update_snapshot( let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); let mut ch = false; - for m in manifests.iter() { + for m in manifests.iter_mut() { let p = manifest_path("").to_string(); if m.manifest_path.contains(&p) { ch = true; + m.events_ingested = events_ingested; + m.ingestion_size = ingestion_size; + m.storage_size = storage_size; } } + + meta.snapshot.manifest_list = manifests.to_vec(); + storage.put_snapshot(stream_name, meta.snapshot).await?; if ch { if let Some(mut manifest) = storage.get_manifest(&path).await? { - manifest.apply_change(change, stream_name); + manifest.apply_change(change); storage.put_manifest(&path, manifest).await?; } else { //instead of returning an error, create a new manifest (otherwise local to storage sync fails) @@ -150,7 +171,10 @@ pub async fn update_snapshot( storage.clone(), stream_name, false, - meta, + meta_clone, + events_ingested, + ingestion_size, + storage_size, ) .await?; } @@ -161,7 +185,10 @@ pub async fn update_snapshot( storage.clone(), stream_name, true, - meta, + meta_clone, + events_ingested, + ingestion_size, + storage_size, ) .await?; } @@ -172,7 +199,10 @@ pub async fn update_snapshot( storage.clone(), stream_name, true, - meta, + meta_clone, + events_ingested, + ingestion_size, + storage_size, ) .await?; } @@ -180,6 +210,7 @@ pub async fn update_snapshot( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn create_manifest( lower_bound: DateTime, change: manifest::File, @@ -187,6 +218,9 @@ async fn create_manifest( stream_name: &str, update_snapshot: bool, mut meta: ObjectStoreFormat, + events_ingested: u64, + ingestion_size: u64, + storage_size: u64, ) -> Result<(), ObjectStorageError> { let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound @@ -200,15 +234,9 @@ async fn create_manifest( .map_err(ObjectStorageError::IoError)?, ) .and_utc(); - let event_labels = event_labels(stream_name, "json"); - let ingestion_size = EVENTS_INGESTED_SIZE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as i64; let manifest = Manifest { files: vec![change], - ingestion_size, ..Manifest::default() }; @@ -224,6 +252,9 @@ async fn create_manifest( manifest_path: path.to_string(), time_lower_bound: lower_bound, time_upper_bound: upper_bound, + events_ingested, + ingestion_size, + storage_size, }; manifests.push(new_snapshot_entry); meta.snapshot.manifest_list = manifests; diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index 511e647c5..ad5b32422 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -21,8 +21,6 @@ use std::collections::HashMap; use itertools::Itertools; use parquet::{file::reader::FileReader, format::SortingColumn}; -use crate::{metrics::EVENTS_INGESTED_SIZE_TODAY, stats::event_labels}; - use super::column::Column; #[derive( @@ -64,7 +62,6 @@ pub struct File { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Manifest { pub version: String, - pub ingestion_size: i64, pub files: Vec, } @@ -72,14 +69,13 @@ impl Default for Manifest { fn default() -> Self { Self { version: CURRENT_MANIFEST_VERSION.to_string(), - ingestion_size: 0, files: Vec::default(), } } } impl Manifest { - pub fn apply_change(&mut self, change: File, stream_name: &str) { + pub fn apply_change(&mut self, change: File) { if let Some(pos) = self .files .iter() @@ -89,12 +85,6 @@ impl Manifest { } else { self.files.push(change) } - let event_labels = event_labels(stream_name, "json"); - let ingestion_size = EVENTS_INGESTED_SIZE_TODAY - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as i64; - self.ingestion_size = ingestion_size; } } diff --git a/server/src/catalog/snapshot.rs b/server/src/catalog/snapshot.rs index 982e111fd..997f3566a 100644 --- a/server/src/catalog/snapshot.rs +++ b/server/src/catalog/snapshot.rs @@ -22,7 +22,7 @@ use chrono::{DateTime, Utc}; use crate::query::PartialTimeFilter; -pub const CURRENT_SNAPSHOT_VERSION: &str = "v1"; +pub const CURRENT_SNAPSHOT_VERSION: &str = "v2"; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Snapshot { pub version: String, @@ -76,4 +76,7 @@ pub struct ManifestItem { pub manifest_path: String, pub time_lower_bound: DateTime, pub time_upper_bound: DateTime, + pub events_ingested: u64, + pub ingestion_size: u64, + pub storage_size: u64, } diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 5a7ae30aa..e669f0fd9 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -28,8 +28,8 @@ use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use crate::storage::object_storage::ingestor_metadata_path; +use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; -use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; use actix_web::http::header; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; @@ -161,9 +161,12 @@ pub async fn fetch_stats_from_ingestors( .get_object_store() .get_objects( Some(&path), - Box::new(|file_name| file_name.starts_with(".ingestor")), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") + }), ) .await?; + let mut ingestion_size = 0u64; let mut storage_size = 0u64; let mut count = 0u64; @@ -174,16 +177,41 @@ pub async fn fetch_stats_from_ingestors( let mut deleted_storage_size = 0u64; let mut deleted_count = 0u64; for ob in obs { - if let Ok(stat) = serde_json::from_slice::(&ob) { - count += stat.stats.current_stats.events; - ingestion_size += stat.stats.current_stats.ingestion; - storage_size += stat.stats.current_stats.storage; - lifetime_count += stat.stats.lifetime_stats.events; - lifetime_ingestion_size += stat.stats.lifetime_stats.ingestion; - lifetime_storage_size += stat.stats.lifetime_stats.storage; - deleted_count += stat.stats.deleted_stats.events; - deleted_ingestion_size += stat.stats.deleted_stats.ingestion; - deleted_storage_size += stat.stats.deleted_stats.storage; + let stream_metadata: serde_json::Value = + serde_json::from_slice(&ob).expect("stream.json is valid json"); + let version = stream_metadata + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); + let stats = stream_metadata.get("stats").unwrap(); + match version { + Some("v4") => { + let current_stats = stats.get("current_stats").unwrap().clone(); + let lifetime_stats = stats.get("lifetime_stats").unwrap().clone(); + let deleted_stats = stats.get("deleted_stats").unwrap().clone(); + + count += current_stats.get("events").unwrap().as_u64().unwrap(); + ingestion_size += current_stats.get("ingestion").unwrap().as_u64().unwrap(); + storage_size += current_stats.get("storage").unwrap().as_u64().unwrap(); + lifetime_count += lifetime_stats.get("events").unwrap().as_u64().unwrap(); + lifetime_ingestion_size += + lifetime_stats.get("ingestion").unwrap().as_u64().unwrap(); + lifetime_storage_size += lifetime_stats.get("storage").unwrap().as_u64().unwrap(); + deleted_count += deleted_stats.get("events").unwrap().as_u64().unwrap(); + deleted_ingestion_size += deleted_stats.get("ingestion").unwrap().as_u64().unwrap(); + deleted_storage_size += deleted_stats.get("storage").unwrap().as_u64().unwrap(); + } + _ => { + count += stats.get("events").unwrap().as_u64().unwrap(); + ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); + storage_size += stats.get("storage").unwrap().as_u64().unwrap(); + lifetime_count += stats.get("events").unwrap().as_u64().unwrap(); + lifetime_ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); + lifetime_storage_size += stats.get("storage").unwrap().as_u64().unwrap(); + deleted_count += 0; + deleted_ingestion_size += 0; + deleted_storage_size += 0; + } } } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 3065526ad..76d5dd0da 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -27,8 +27,8 @@ use std::sync::{Arc, RwLock}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; use crate::alerts::Alerts; use crate::metrics::{ - EVENTS_INGESTED, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, LIFETIME_EVENTS_INGESTED, - LIFETIME_EVENTS_INGESTED_SIZE, + EVENTS_INGESTED, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, + LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; use crate::storage::{LogStream, ObjectStorage, StorageDir}; use crate::utils::arrow::MergedRecordReader; @@ -290,6 +290,9 @@ impl StreamInfo { EVENTS_INGESTED .with_label_values(&[stream_name, origin]) .add(num_rows as i64); + EVENTS_INGESTED_TODAY + .with_label_values(&[stream_name, origin]) + .add(num_rows as i64); EVENTS_INGESTED_SIZE .with_label_values(&[stream_name, origin]) .add(size as i64); diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 8bc1f5827..2407de1ae 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -44,6 +44,14 @@ pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static EVENTS_INGESTED_TODAY: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("events_ingested_today", "Events ingested today").namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("events_ingested_size", "Events ingested size bytes") @@ -73,6 +81,14 @@ pub static STORAGE_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static STORAGE_SIZE_TODAY: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("storage_size_today", "Storage size today in bytes").namespace(METRICS_NAMESPACE), + &["type", "stream", "format"], + ) + .expect("metric can be created") +}); + pub static EVENTS_DELETED: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("events_deleted", "Events deleted").namespace(METRICS_NAMESPACE), @@ -170,6 +186,9 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(EVENTS_INGESTED.clone())) .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_INGESTED_TODAY.clone())) + .expect("metric can be registered"); registry .register(Box::new(EVENTS_INGESTED_SIZE.clone())) .expect("metric can be registered"); @@ -179,6 +198,9 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(STORAGE_SIZE.clone())) .expect("metric can be registered"); + registry + .register(Box::new(STORAGE_SIZE_TODAY.clone())) + .expect("metric can be registered"); registry .register(Box::new(EVENTS_DELETED.clone())) .expect("metric can be registered"); @@ -249,16 +271,21 @@ pub async fn fetch_stats_from_storage() { EVENTS_INGESTED .with_label_values(&[&stream_name, "json"]) .set(stats.current_stats.events as i64); + EVENTS_INGESTED_TODAY + .with_label_values(&[&stream_name, "json"]) + .set(stats.current_date_stats.events as i64); EVENTS_INGESTED_SIZE .with_label_values(&[&stream_name, "json"]) .set(stats.current_stats.ingestion as i64); EVENTS_INGESTED_SIZE_TODAY .with_label_values(&[&stream_name, "json"]) - .set(stats.current_stats.ingestion as i64); + .set(stats.current_date_stats.ingestion as i64); STORAGE_SIZE .with_label_values(&["data", &stream_name, "parquet"]) .set(stats.current_stats.storage as i64); - + STORAGE_SIZE_TODAY + .with_label_values(&["data", &stream_name, "parquet"]) + .set(stats.current_date_stats.storage as i64); EVENTS_DELETED .with_label_values(&[&stream_name, "json"]) .set(stats.deleted_stats.events as i64); @@ -299,9 +326,15 @@ pub fn init_reset_daily_metric_scheduler() { let func = move || async { //get retention every day at 12 am for stream in STREAM_INFO.list_streams() { + metrics::EVENTS_INGESTED_TODAY + .with_label_values(&[&stream, "json"]) + .set(0); metrics::EVENTS_INGESTED_SIZE_TODAY .with_label_values(&[&stream, "json"]) .set(0); + metrics::STORAGE_SIZE_TODAY + .with_label_values(&["data", &stream, "parquet"]) + .set(0); } }; diff --git a/server/src/migration.rs b/server/src/migration.rs index bc4ef90ed..ae053b9db 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -24,11 +24,9 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; use crate::{ - catalog::{manifest::Manifest, partition_path}, option::Config, - stats::{FullStats, Stats}, storage::{ - object_storage::{manifest_path, parseable_json_path, stream_json_path}, + object_storage::{parseable_json_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, @@ -119,7 +117,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: match version { Some("v1") => { - let new_stream_metadata = stream_metadata_migration::v1_v3(stream_metadata); + let new_stream_metadata = stream_metadata_migration::v1_v4(stream_metadata); storage .put_object(&path, to_bytes(&new_stream_metadata)) .await?; @@ -128,11 +126,11 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema).ok(); - let map = schema_migration::v1_v3(schema)?; + let map = schema_migration::v1_v4(schema)?; storage.put_object(&schema_path, to_bytes(&map)).await?; } Some("v2") => { - let new_stream_metadata = stream_metadata_migration::v2_v3(stream_metadata); + let new_stream_metadata = stream_metadata_migration::v2_v4(stream_metadata); storage .put_object(&path, to_bytes(&new_stream_metadata)) .await?; @@ -141,9 +139,15 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema)?; - let map = schema_migration::v2_v3(schema)?; + let map = schema_migration::v2_v4(schema)?; storage.put_object(&schema_path, to_bytes(&map)).await?; } + Some("v3") => { + let new_stream_metadata = stream_metadata_migration::v3_v4(stream_metadata); + storage + .put_object(&path, to_bytes(&new_stream_metadata)) + .await?; + } _ => (), } @@ -214,17 +218,18 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { let object_store = config.storage().get_object_store(); let old_meta_file_path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME); + // if this errors that means migrations is already done if let Err(err) = object_store.get_object(&old_meta_file_path).await { if matches!(err, ObjectStorageError::NoSuchKey(_)) { - log::info!("Migration already done"); - } else { - run_meta_file_migration(&object_store, old_meta_file_path).await?; - run_stream_files_migration(&object_store).await?; + return Ok(()); } + return Err(err.into()); } - run_stream_stats_migration(&object_store).await?; + run_meta_file_migration(&object_store, old_meta_file_path).await?; + run_stream_files_migration(&object_store).await?; + Ok(()) } @@ -297,54 +302,3 @@ async fn run_stream_files_migration( Ok(()) } - -async fn run_stream_stats_migration( - object_store: &Arc, -) -> anyhow::Result<()> { - let streams = object_store - .list_streams() - .await? - .into_iter() - .map(|stream| stream.name) - .collect_vec(); - for stream in streams { - let stream_metadata = object_store.get_object(&stream_json_path(&stream)).await?; - let mut stream_metadata: serde_json::Value = - serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); - let stats = &stream_metadata["stats"]; - if serde_json::from_value::(stats.clone()).is_err() { - let stats: Stats = serde_json::from_value(stats.clone()).unwrap(); - let full_stats = FullStats { - lifetime_stats: stats, - current_stats: stats, - deleted_stats: Stats::default(), - }; - let full_stats_bytes = to_bytes(&full_stats); - let full_stats_slice: &[u8] = &full_stats_bytes; - stream_metadata["stats"] = - serde_json::from_slice(full_stats_slice).expect("parseable config is valid json"); - object_store - .put_object(&stream_json_path(&stream), to_bytes(&stream_metadata)) - .await?; - } - let mut meta = object_store.get_object_store_format(&stream).await?; - let manifests = &mut meta.snapshot.manifest_list; - for manifest in manifests { - let path = partition_path( - &stream, - manifest.time_lower_bound, - manifest.time_upper_bound, - ); - let path = manifest_path(path.as_str()); - let manifest_bytes = object_store.get_object(&path).await?; - let mut manifest: serde_json::Value = - serde_json::from_slice(&manifest_bytes).expect("manifest.json is valid json"); - if serde_json::from_value::(manifest.clone()).is_err() { - manifest["ingestion_size"] = 0.into(); - } - object_store.put_object(&path, to_bytes(&manifest)).await?; - } - } - - Ok(()) -} diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs index 5abacfdf1..9cca5ac6e 100644 --- a/server/src/migration/schema_migration.rs +++ b/server/src/migration/schema_migration.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use arrow_schema::{DataType, Field, Schema}; use serde_json::Value; -pub(super) fn v1_v3(schema: Option) -> anyhow::Result { +pub(super) fn v1_v4(schema: Option) -> anyhow::Result { if let Some(schema) = schema { value_to_schema(schema) } else { @@ -30,7 +30,7 @@ pub(super) fn v1_v3(schema: Option) -> anyhow::Result { } } -pub(super) fn v2_v3(schemas: HashMap) -> anyhow::Result { +pub(super) fn v2_v4(schemas: HashMap) -> anyhow::Result { let mut derived_schemas = Vec::new(); for value in schemas.into_values() { diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index dc0a12440..6fab31321 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -19,16 +19,81 @@ use serde_json::{json, Value}; -use crate::storage; +use crate::{catalog::snapshot::CURRENT_SNAPSHOT_VERSION, storage}; -pub fn v1_v3(mut stream_metadata: Value) -> Value { +pub fn v1_v4(mut stream_metadata: Value) -> Value { + let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); + let stats = stream_metadata_map.get("stats").unwrap().clone(); let default_stats = json!({ - "events": 0, - "ingestion": 0, - "storage": 0 + "lifetime_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "current_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "deleted_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + }, + "current_date_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + } }); + stream_metadata_map.insert("stats".to_owned(), default_stats); + stream_metadata_map.insert( + "version".to_owned(), + Value::String(storage::CURRENT_SCHEMA_VERSION.into()), + ); + stream_metadata_map.insert( + "objectstore-format".to_owned(), + Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), + ); + + let snapshot = stream_metadata_map.get("snapshot").unwrap().clone(); + let version = snapshot + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); + if let Some("v1") = version { + let updated_snapshot = v1_v2_snapshot_migration(snapshot); + stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); + } + stream_metadata +} + +pub fn v2_v4(mut stream_metadata: Value) -> Value { let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); - stream_metadata_map.entry("stats").or_insert(default_stats); + let stats = stream_metadata_map.get("stats").unwrap().clone(); + let default_stats = json!({ + "lifetime_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "current_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "deleted_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + }, + "current_date_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + } + }); + stream_metadata_map.insert("stats".to_owned(), default_stats); stream_metadata_map.insert( "version".to_owned(), Value::String(storage::CURRENT_SCHEMA_VERSION.into()), @@ -37,17 +102,46 @@ pub fn v1_v3(mut stream_metadata: Value) -> Value { "objectstore-format".to_owned(), Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), ); + + let snapshot = stream_metadata_map.get("snapshot").unwrap().clone(); + let version = snapshot + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); + if let Some("v1") = version { + let updated_snapshot = v1_v2_snapshot_migration(snapshot); + stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); + } stream_metadata } -pub fn v2_v3(mut stream_metadata: Value) -> Value { +pub fn v3_v4(mut stream_metadata: Value) -> Value { + let stream_metadata_map: &mut serde_json::Map = + stream_metadata.as_object_mut().unwrap(); + let stats = stream_metadata_map.get("stats").unwrap().clone(); let default_stats = json!({ - "events": 0, - "ingestion": 0, - "storage": 0 + "lifetime_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "current_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "deleted_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + }, + "current_date_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + } }); - let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); - stream_metadata_map.entry("stats").or_insert(default_stats); + stream_metadata_map.insert("stats".to_owned(), default_stats); stream_metadata_map.insert( "version".to_owned(), Value::String(storage::CURRENT_SCHEMA_VERSION.into()), @@ -56,5 +150,42 @@ pub fn v2_v3(mut stream_metadata: Value) -> Value { "objectstore-format".to_owned(), Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), ); + + let snapshot = stream_metadata_map.get("snapshot").unwrap().clone(); + let version = snapshot + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); + if let Some("v1") = version { + let updated_snapshot = v1_v2_snapshot_migration(snapshot); + stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); + } + stream_metadata } + +fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { + let manifest_list = snapshot.get("manifest_list").unwrap(); + let mut new_manifest_list = Vec::new(); + for manifest in manifest_list.as_array().unwrap() { + let manifest_map = manifest.as_object().unwrap(); + let time_lower_bound = manifest_map.get("time_lower_bound").unwrap(); + let time_upper_bound = manifest_map.get("time_upper_bound").unwrap(); + let new_manifest = json!({ + "manifest_path": manifest_map.get("manifest_path").unwrap(), + "time_lower_bound": time_lower_bound, + "time_upper_bound": time_upper_bound, + "events_count": 0, + "ingestion_size": 0, + "storage_size": 0 + }); + new_manifest_list.push(new_manifest); + } + let snapshot_map: &mut serde_json::Map = snapshot.as_object_mut().unwrap(); + snapshot_map.insert( + "version".to_owned(), + Value::String(CURRENT_SNAPSHOT_VERSION.into()), + ); + snapshot_map.insert("manifest_list".to_owned(), Value::Array(new_manifest_list)); + snapshot +} diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index ef0eb69ba..dccb3aa84 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -863,16 +863,25 @@ mod tests { manifest_path: "1".to_string(), time_lower_bound: datetime_min(2023, 12, 15), time_upper_bound: datetime_max(2023, 12, 15), + events_ingested: 0, + ingestion_size: 0, + storage_size: 0, }, ManifestItem { manifest_path: "2".to_string(), time_lower_bound: datetime_min(2023, 12, 16), time_upper_bound: datetime_max(2023, 12, 16), + events_ingested: 0, + ingestion_size: 0, + storage_size: 0, }, ManifestItem { manifest_path: "3".to_string(), time_lower_bound: datetime_min(2023, 12, 17), time_upper_bound: datetime_max(2023, 12, 17), + events_ingested: 0, + ingestion_size: 0, + storage_size: 0, }, ] } diff --git a/server/src/stats.rs b/server/src/stats.rs index 906de9c6f..05fc91dfd 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -17,11 +17,10 @@ */ use crate::metrics::{ DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, - EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, LIFETIME_EVENTS_INGESTED, - LIFETIME_EVENTS_INGESTED_SIZE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, + EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, + LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, LIFETIME_EVENTS_STORAGE_SIZE, + STORAGE_SIZE, STORAGE_SIZE_TODAY, }; - -use crate::catalog::partition_path; use crate::storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}; use std::sync::Arc; @@ -38,6 +37,7 @@ pub struct FullStats { pub lifetime_stats: Stats, pub current_stats: Stats, pub deleted_stats: Stats, + pub current_date_stats: Stats, } pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { @@ -47,49 +47,52 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option Option, ) -> Result<(), ObjectStorageError> { - let mut num_row: u64 = 0; - let mut storage_size: u64 = 0; + let mut num_row: i64 = 0; + let mut storage_size: i64 = 0; let mut ingestion_size: i64 = 0; let mut manifests = meta.snapshot.manifest_list; manifests.retain(|item| dates.iter().any(|date| item.manifest_path.contains(date))); - if !manifests.is_empty() { for manifest in manifests { - let path = partition_path( - stream_name, - manifest.time_lower_bound, - manifest.time_upper_bound, - ); - let Some(manifest) = storage.get_manifest(&path).await? else { - return Err(ObjectStorageError::UnhandledError( - "Manifest found in snapshot but not in object-storage" - .to_string() - .into(), - )); - }; - manifest.files.iter().for_each(|file| { - num_row += file.num_rows; - storage_size += file.file_size; - }); - ingestion_size += manifest.ingestion_size; + num_row += manifest.events_ingested as i64; + ingestion_size += manifest.ingestion_size as i64; + storage_size += manifest.storage_size as i64; } } EVENTS_DELETED .with_label_values(&[stream_name, "json"]) - .add(num_row as i64); + .add(num_row); EVENTS_DELETED_SIZE .with_label_values(&[stream_name, "json"]) .add(ingestion_size); DELETED_EVENTS_STORAGE_SIZE .with_label_values(&["data", stream_name, "parquet"]) - .add(storage_size as i64); + .add(storage_size); EVENTS_INGESTED .with_label_values(&[stream_name, "json"]) - .sub(num_row as i64); + .sub(num_row); EVENTS_INGESTED_SIZE .with_label_values(&[stream_name, "json"]) .sub(ingestion_size); - EVENTS_INGESTED_SIZE_TODAY - .with_label_values(&[stream_name, "json"]) - .sub(ingestion_size); STORAGE_SIZE .with_label_values(&["data", stream_name, "parquet"]) - .sub(storage_size as i64); + .sub(storage_size); let stats = get_current_stats(stream_name, "json"); if let Some(stats) = stats { if let Err(e) = storage.put_stats(stream_name, &stats).await { @@ -179,8 +169,11 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu let storage_size_labels = storage_size_labels(stream_name); EVENTS_INGESTED.remove_label_values(&event_labels)?; + EVENTS_INGESTED_TODAY.remove_label_values(&event_labels)?; EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; + EVENTS_INGESTED_SIZE_TODAY.remove_label_values(&event_labels)?; STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + STORAGE_SIZE_TODAY.remove_label_values(&storage_size_labels)?; EVENTS_DELETED.remove_label_values(&event_labels)?; EVENTS_DELETED_SIZE.remove_label_values(&event_labels)?; DELETED_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; diff --git a/server/src/storage.rs b/server/src/storage.rs index b04c84695..cf25ad826 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -66,8 +66,8 @@ const MAX_OBJECT_STORE_REQUESTS: usize = 1000; // const PERMISSIONS_READ_WRITE: &str = "readwrite"; const ACCESS_ALL: &str = "all"; -pub const CURRENT_OBJECT_STORE_VERSION: &str = "v3"; -pub const CURRENT_SCHEMA_VERSION: &str = "v3"; +pub const CURRENT_OBJECT_STORE_VERSION: &str = "v4"; +pub const CURRENT_SCHEMA_VERSION: &str = "v4"; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct ObjectStoreFormat { diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 228181ea9..f5945b86d 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -26,7 +26,7 @@ use super::{ }; use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::metrics::LIFETIME_EVENTS_STORAGE_SIZE; +use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY}; use crate::option::Mode; use crate::{ alerts::Alerts, @@ -187,7 +187,6 @@ pub trait ObjectStorage: Sync + 'static { serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); stream_metadata["stats"] = stats; - self.put_object(&path, to_bytes(&stream_metadata)).await } @@ -437,7 +436,6 @@ pub trait ObjectStorage: Sync + 'static { } let streams = STREAM_INFO.list_streams(); - let mut stream_stats = HashMap::new(); let cache_manager = LocalCacheManager::global(); let mut cache_updates: HashMap<&String, Vec<_>> = HashMap::new(); @@ -469,15 +467,20 @@ pub trait ObjectStorage: Sync + 'static { commit_schema_to_storage(stream, schema).await?; } } - + let mut compressed_size: u64 = 0; let parquet_files = dir.parquet_files(); parquet_files.iter().for_each(|file| { - let compressed_size = file.metadata().map_or(0, |meta| meta.len()); - stream_stats - .entry(stream) - .and_modify(|size| *size += compressed_size) - .or_insert_with(|| compressed_size); + compressed_size += file.metadata().map_or(0, |meta| meta.len()); }); + STORAGE_SIZE + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); + STORAGE_SIZE_TODAY + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); for file in parquet_files { let filename = file @@ -504,6 +507,12 @@ pub trait ObjectStorage: Sync + 'static { let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); catalog::update_snapshot(store, stream, manifest).await?; + let stats = stats::get_current_stats(stream, "json"); + if let Some(stats) = stats { + if let Err(e) = self.put_stats(stream, &stats).await { + log::warn!("Error updating stats to objectstore due to error [{}]", e); + } + } if cache_enabled && cache_manager.is_some() { cache_updates .entry(stream) @@ -515,21 +524,6 @@ pub trait ObjectStorage: Sync + 'static { } } - for (stream, compressed_size) in stream_stats { - STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - LIFETIME_EVENTS_STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - let stats = stats::get_current_stats(stream, "json"); - if let Some(stats) = stats { - if let Err(e) = self.put_stats(stream, &stats).await { - log::warn!("Error updating stats to objectstore due to error [{}]", e); - } - } - } - if let Some(manager) = cache_manager { let cache_updates = cache_updates .into_iter() From f57a14285ad3a7fd3e69088a17a76f927b5debad Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 17 May 2024 17:37:46 +0530 Subject: [PATCH 4/9] deep source analysis fix --- server/src/handlers/http/cluster/mod.rs | 52 +++++++++---------- .../migration/stream_metadata_migration.rs | 6 +-- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index e669f0fd9..94516deb3 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -184,34 +184,30 @@ pub async fn fetch_stats_from_ingestors( .and_then(|meta| meta.get("version")) .and_then(|version| version.as_str()); let stats = stream_metadata.get("stats").unwrap(); - match version { - Some("v4") => { - let current_stats = stats.get("current_stats").unwrap().clone(); - let lifetime_stats = stats.get("lifetime_stats").unwrap().clone(); - let deleted_stats = stats.get("deleted_stats").unwrap().clone(); - - count += current_stats.get("events").unwrap().as_u64().unwrap(); - ingestion_size += current_stats.get("ingestion").unwrap().as_u64().unwrap(); - storage_size += current_stats.get("storage").unwrap().as_u64().unwrap(); - lifetime_count += lifetime_stats.get("events").unwrap().as_u64().unwrap(); - lifetime_ingestion_size += - lifetime_stats.get("ingestion").unwrap().as_u64().unwrap(); - lifetime_storage_size += lifetime_stats.get("storage").unwrap().as_u64().unwrap(); - deleted_count += deleted_stats.get("events").unwrap().as_u64().unwrap(); - deleted_ingestion_size += deleted_stats.get("ingestion").unwrap().as_u64().unwrap(); - deleted_storage_size += deleted_stats.get("storage").unwrap().as_u64().unwrap(); - } - _ => { - count += stats.get("events").unwrap().as_u64().unwrap(); - ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); - storage_size += stats.get("storage").unwrap().as_u64().unwrap(); - lifetime_count += stats.get("events").unwrap().as_u64().unwrap(); - lifetime_ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); - lifetime_storage_size += stats.get("storage").unwrap().as_u64().unwrap(); - deleted_count += 0; - deleted_ingestion_size += 0; - deleted_storage_size += 0; - } + if let Some("v4") = version { + let current_stats = stats.get("current_stats").unwrap().clone(); + let lifetime_stats = stats.get("lifetime_stats").unwrap().clone(); + let deleted_stats = stats.get("deleted_stats").unwrap().clone(); + + count += current_stats.get("events").unwrap().as_u64().unwrap(); + ingestion_size += current_stats.get("ingestion").unwrap().as_u64().unwrap(); + storage_size += current_stats.get("storage").unwrap().as_u64().unwrap(); + lifetime_count += lifetime_stats.get("events").unwrap().as_u64().unwrap(); + lifetime_ingestion_size += lifetime_stats.get("ingestion").unwrap().as_u64().unwrap(); + lifetime_storage_size += lifetime_stats.get("storage").unwrap().as_u64().unwrap(); + deleted_count += deleted_stats.get("events").unwrap().as_u64().unwrap(); + deleted_ingestion_size += deleted_stats.get("ingestion").unwrap().as_u64().unwrap(); + deleted_storage_size += deleted_stats.get("storage").unwrap().as_u64().unwrap(); + } else { + count += stats.get("events").unwrap().as_u64().unwrap(); + ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); + storage_size += stats.get("storage").unwrap().as_u64().unwrap(); + lifetime_count += stats.get("events").unwrap().as_u64().unwrap(); + lifetime_ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); + lifetime_storage_size += stats.get("storage").unwrap().as_u64().unwrap(); + deleted_count += 0; + deleted_ingestion_size += 0; + deleted_storage_size += 0; } } diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index 6fab31321..631208a7e 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -61,7 +61,7 @@ pub fn v1_v4(mut stream_metadata: Value) -> Value { .as_object() .and_then(|meta| meta.get("version")) .and_then(|version| version.as_str()); - if let Some("v1") = version { + if matches!(version, Some("v1")) { let updated_snapshot = v1_v2_snapshot_migration(snapshot); stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); } @@ -108,7 +108,7 @@ pub fn v2_v4(mut stream_metadata: Value) -> Value { .as_object() .and_then(|meta| meta.get("version")) .and_then(|version| version.as_str()); - if let Some("v1") = version { + if matches!(version, Some("v1")) { let updated_snapshot = v1_v2_snapshot_migration(snapshot); stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); } @@ -156,7 +156,7 @@ pub fn v3_v4(mut stream_metadata: Value) -> Value { .as_object() .and_then(|meta| meta.get("version")) .and_then(|version| version.as_str()); - if let Some("v1") = version { + if matches!(version, Some("v1")) { let updated_snapshot = v1_v2_snapshot_migration(snapshot); stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); } From 18df8d2ac1a2fa759ffcbd9ebbcfd808009c9254 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 17 May 2024 17:41:00 +0530 Subject: [PATCH 5/9] deep source fix --- server/src/handlers/http/cluster/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 94516deb3..505d555b8 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -184,7 +184,7 @@ pub async fn fetch_stats_from_ingestors( .and_then(|meta| meta.get("version")) .and_then(|version| version.as_str()); let stats = stream_metadata.get("stats").unwrap(); - if let Some("v4") = version { + if matches!(version, Some("v4")) { let current_stats = stats.get("current_stats").unwrap().clone(); let lifetime_stats = stats.get("lifetime_stats").unwrap().clone(); let deleted_stats = stats.get("deleted_stats").unwrap().clone(); From 46b84ae2cf2b60739c8c32a5aa73acba0460b013 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 17 May 2024 19:02:57 +0530 Subject: [PATCH 6/9] corrected events_ingested field for snapshot --- server/src/migration/stream_metadata_migration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index 631208a7e..68ed7cba3 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -175,7 +175,7 @@ fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { "manifest_path": manifest_map.get("manifest_path").unwrap(), "time_lower_bound": time_lower_bound, "time_upper_bound": time_upper_bound, - "events_count": 0, + "events_ingested": 0, "ingestion_size": 0, "storage_size": 0 }); From 2c96a5435901d33534664988462e2c73c58c5ba1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 17 May 2024 20:32:46 +0530 Subject: [PATCH 7/9] stream.json migration from v1/v2 to v4 --- .../migration/stream_metadata_migration.rs | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index 68ed7cba3..24da19bf0 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -55,16 +55,13 @@ pub fn v1_v4(mut stream_metadata: Value) -> Value { "objectstore-format".to_owned(), Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), ); - - let snapshot = stream_metadata_map.get("snapshot").unwrap().clone(); - let version = snapshot - .as_object() - .and_then(|meta| meta.get("version")) - .and_then(|version| version.as_str()); - if matches!(version, Some("v1")) { - let updated_snapshot = v1_v2_snapshot_migration(snapshot); - stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); - } + stream_metadata_map.insert( + "snapshot".to_owned(), + json!({ + "version": CURRENT_SNAPSHOT_VERSION, + "manifest_list": [] + }), + ); stream_metadata } @@ -103,15 +100,13 @@ pub fn v2_v4(mut stream_metadata: Value) -> Value { Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), ); - let snapshot = stream_metadata_map.get("snapshot").unwrap().clone(); - let version = snapshot - .as_object() - .and_then(|meta| meta.get("version")) - .and_then(|version| version.as_str()); - if matches!(version, Some("v1")) { - let updated_snapshot = v1_v2_snapshot_migration(snapshot); - stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); - } + stream_metadata_map.insert( + "snapshot".to_owned(), + json!({ + "version": CURRENT_SNAPSHOT_VERSION, + "manifest_list": [] + }), + ); stream_metadata } From aae37ee33ce69ba29c32714abd947dad726fa539 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 19 May 2024 10:51:21 +0530 Subject: [PATCH 8/9] schema migration from v2 to latest --- server/src/migration.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/migration.rs b/server/src/migration.rs index ae053b9db..dae6f0801 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -147,6 +147,12 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: storage .put_object(&path, to_bytes(&new_stream_metadata)) .await?; + let schema_path = + RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); + let schema = storage.get_object(&schema_path).await?; + let schema = serde_json::from_slice(&schema)?; + let map = schema_migration::v2_v4(schema)?; + storage.put_object(&schema_path, to_bytes(&map)).await?; } _ => (), } From ff80712290da535d7334d47ca5d4a3f007a32bc5 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 20 May 2024 18:51:27 +0530 Subject: [PATCH 9/9] no need of schema migration hence removing the code --- server/src/migration.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/migration.rs b/server/src/migration.rs index dae6f0801..ae053b9db 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -147,12 +147,6 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: storage .put_object(&path, to_bytes(&new_stream_metadata)) .await?; - let schema_path = - RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - let schema = storage.get_object(&schema_path).await?; - let schema = serde_json::from_slice(&schema)?; - let map = schema_migration::v2_v4(schema)?; - storage.put_object(&schema_path, to_bytes(&map)).await?; } _ => (), }