Skip to content
113 changes: 101 additions & 12 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::option::{Mode, CONFIG};
use crate::storage;
use crate::{metadata, stats};

use crate::stats::Stats;
use actix_web::{web, HttpRequest, Responder};
use chrono::{DateTime, Utc};
use clokwerk::{AsyncScheduler, Interval};
Expand Down Expand Up @@ -70,6 +71,12 @@ pub struct Report {
total_events_count: u64,
total_json_bytes: u64,
total_parquet_bytes: u64,
current_events_count: u64,
current_json_bytes: u64,
current_parquet_bytes: u64,
deleted_events_count: u64,
deleted_json_bytes: u64,
deleted_parquet_bytes: u64,
metrics: HashMap<String, Value>,
}

Expand Down Expand Up @@ -112,6 +119,12 @@ impl Report {
total_events_count: ingestor_metrics.3,
total_json_bytes: ingestor_metrics.4,
total_parquet_bytes: ingestor_metrics.5,
current_events_count: ingestor_metrics.6,
current_json_bytes: ingestor_metrics.7,
current_parquet_bytes: ingestor_metrics.8,
deleted_events_count: ingestor_metrics.9,
deleted_json_bytes: ingestor_metrics.10,
deleted_parquet_bytes: ingestor_metrics.11,
metrics: build_metrics().await,
})
}
Expand All @@ -132,26 +145,70 @@ fn total_streams() -> usize {
metadata::STREAM_INFO.list_streams().len()
}

fn total_event_stats() -> (u64, u64, u64) {
fn total_event_stats() -> (Stats, Stats, Stats) {
let mut total_events: u64 = 0;
let mut total_parquet_bytes: u64 = 0;
let mut total_json_bytes: u64 = 0;

let mut current_events: u64 = 0;
let mut current_parquet_bytes: u64 = 0;
let mut current_json_bytes: u64 = 0;

let mut deleted_events: u64 = 0;
let mut deleted_parquet_bytes: u64 = 0;
let mut deleted_json_bytes: u64 = 0;

for stream in metadata::STREAM_INFO.list_streams() {
let Some(stats) = stats::get_current_stats(&stream, "json") else {
continue;
};
total_events += stats.events;
total_parquet_bytes += stats.storage;
total_json_bytes += stats.ingestion;
total_events += stats.lifetime_stats.events;
total_parquet_bytes += stats.lifetime_stats.storage;
total_json_bytes += stats.lifetime_stats.ingestion;

current_events += stats.current_stats.events;
current_parquet_bytes += stats.current_stats.storage;
current_json_bytes += stats.current_stats.ingestion;

deleted_events += stats.deleted_stats.events;
deleted_parquet_bytes += stats.deleted_stats.storage;
deleted_json_bytes += stats.deleted_stats.ingestion;
}
(total_events, total_json_bytes, total_parquet_bytes)

(
Stats {
events: total_events,
ingestion: total_json_bytes,
storage: total_parquet_bytes,
},
Stats {
events: current_events,
ingestion: current_json_bytes,
storage: current_parquet_bytes,
},
Stats {
events: deleted_events,
ingestion: deleted_json_bytes,
storage: deleted_parquet_bytes,
},
)
}

async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> {
async fn fetch_ingestors_metrics(
) -> anyhow::Result<(u64, u64, usize, u64, u64, u64, u64, u64, u64, u64, u64, u64)> {
let event_stats = total_event_stats();
let mut node_metrics =
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
let mut node_metrics = NodeMetrics::new(
total_streams(),
event_stats.0.events,
event_stats.0.ingestion,
event_stats.0.storage,
event_stats.1.events,
event_stats.1.ingestion,
event_stats.1.storage,
event_stats.2.events,
event_stats.2.ingestion,
event_stats.2.storage,
);

let mut vec = vec![];
let mut active_ingestors = 0u64;
Expand Down Expand Up @@ -198,6 +255,12 @@ async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64,
node_metrics.total_events_count,
node_metrics.total_json_bytes,
node_metrics.total_parquet_bytes,
node_metrics.current_events_count,
node_metrics.current_json_bytes,
node_metrics.current_parquet_bytes,
node_metrics.deleted_events_count,
node_metrics.deleted_json_bytes,
node_metrics.deleted_parquet_bytes,
))
}

Expand Down Expand Up @@ -255,30 +318,56 @@ struct NodeMetrics {
total_events_count: u64,
total_json_bytes: u64,
total_parquet_bytes: u64,
current_events_count: u64,
current_json_bytes: u64,
current_parquet_bytes: u64,
deleted_events_count: u64,
deleted_json_bytes: u64,
deleted_parquet_bytes: u64,
}

impl NodeMetrics {
fn build() -> Self {
let event_stats = total_event_stats();
Self {
stream_count: total_streams(),
total_events_count: event_stats.0,
total_json_bytes: event_stats.1,
total_parquet_bytes: event_stats.2,
total_events_count: event_stats.0.events,
total_json_bytes: event_stats.0.ingestion,
total_parquet_bytes: event_stats.0.storage,

current_events_count: event_stats.1.events,
current_json_bytes: event_stats.1.ingestion,
current_parquet_bytes: event_stats.1.storage,

deleted_events_count: event_stats.2.events,
deleted_json_bytes: event_stats.2.ingestion,
deleted_parquet_bytes: event_stats.2.storage,
}
}

#[allow(clippy::too_many_arguments)]
fn new(
stream_count: usize,
total_events_count: u64,
total_json_bytes: u64,
total_parquet_bytes: u64,
current_events_count: u64,
current_json_bytes: u64,
current_parquet_bytes: u64,
deleted_events_count: u64,
deleted_json_bytes: u64,
deleted_parquet_bytes: u64,
) -> Self {
Self {
stream_count,
total_events_count,
total_json_bytes,
total_parquet_bytes,
current_events_count,
current_json_bytes,
current_parquet_bytes,
deleted_events_count,
deleted_json_bytes,
deleted_parquet_bytes,
}
}

Expand Down
57 changes: 48 additions & 9 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use std::{io::ErrorKind, sync::Arc};

use self::{column::Column, snapshot::ManifestItem};
use crate::handlers::http::base_path_without_preceding_slash;
use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY};
use crate::option::CONFIG;
use crate::stats::{event_labels, storage_size_labels, update_deleted_stats};
use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
Expand Down Expand Up @@ -101,13 +103,28 @@ pub async fn update_snapshot(
change: manifest::File,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let event_labels = event_labels(stream_name, "json");
let storage_size_labels = storage_size_labels(stream_name);
let events_ingested = EVENTS_INGESTED_TODAY
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_TODAY
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = STORAGE_SIZE_TODAY
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;

let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = &mut meta.snapshot.manifest_list;
let time_partition: Option<String> = meta_clone.time_partition;
let time_partition = &meta_clone.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(&change, time_partition);
let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string());
lower_bound
}
None => {
Expand All @@ -129,12 +146,18 @@ pub async fn update_snapshot(
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);

let mut ch = false;
for m in manifests.iter() {
for m in manifests.iter_mut() {
let p = manifest_path("").to_string();
if m.manifest_path.contains(&p) {
ch = true;
m.events_ingested = events_ingested;
m.ingestion_size = ingestion_size;
m.storage_size = storage_size;
}
}

meta.snapshot.manifest_list = manifests.to_vec();
storage.put_snapshot(stream_name, meta.snapshot).await?;
if ch {
if let Some(mut manifest) = storage.get_manifest(&path).await? {
manifest.apply_change(change);
Expand All @@ -148,7 +171,10 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
false,
meta,
meta_clone,
events_ingested,
ingestion_size,
storage_size,
)
.await?;
}
Expand All @@ -159,7 +185,10 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
true,
meta,
meta_clone,
events_ingested,
ingestion_size,
storage_size,
)
.await?;
}
Expand All @@ -170,21 +199,28 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
true,
meta,
meta_clone,
events_ingested,
ingestion_size,
storage_size,
)
.await?;
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn create_manifest(
lower_bound: DateTime<Utc>,
change: manifest::File,
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
update_snapshot: bool,
mut meta: ObjectStoreFormat,
events_ingested: u64,
ingestion_size: u64,
storage_size: u64,
) -> Result<(), ObjectStorageError> {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
Expand Down Expand Up @@ -216,6 +252,9 @@ async fn create_manifest(
manifest_path: path.to_string(),
time_lower_bound: lower_bound,
time_upper_bound: upper_bound,
events_ingested,
ingestion_size,
storage_size,
};
manifests.push(new_snapshot_entry);
meta.snapshot.manifest_list = manifests;
Expand All @@ -233,6 +272,8 @@ pub async fn remove_manifest_from_snapshot(
if !dates.is_empty() {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_for_stats = meta.clone();
update_deleted_stats(storage.clone(), stream_name, meta_for_stats, dates.clone()).await?;
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
Expand Down Expand Up @@ -308,8 +349,6 @@ pub async fn get_first_event(
);
// Convert dates vector to Bytes object
let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap());
// delete the stream

let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
Expand All @@ -333,7 +372,7 @@ pub async fn get_first_event(

/// Partition the path to which this manifest belongs.
/// Useful when uploading the manifest file.
fn partition_path(
pub fn partition_path(
stream: &str,
lower_bound: DateTime<Utc>,
upper_bound: DateTime<Utc>,
Expand Down
5 changes: 4 additions & 1 deletion server/src/catalog/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use chrono::{DateTime, Utc};

use crate::query::PartialTimeFilter;

pub const CURRENT_SNAPSHOT_VERSION: &str = "v1";
pub const CURRENT_SNAPSHOT_VERSION: &str = "v2";
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Snapshot {
pub version: String,
Expand Down Expand Up @@ -76,4 +76,7 @@ pub struct ManifestItem {
pub manifest_path: String,
pub time_lower_bound: DateTime<Utc>,
pub time_upper_bound: DateTime<Utc>,
pub events_ingested: u64,
pub ingestion_size: u64,
pub storage_size: u64,
}
Loading