From a99e1c01a5db9aaac0a0b1513a9c4c6f442ca903 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 22 Mar 2025 06:34:43 -0400 Subject: [PATCH] restrict otel ingestion log ingestion is not allowed if stream is already associated with otel metrics or traces metrics ingestion is not allowed if stream is already associated with otel traces or any log formats similarly, traces ingestion is not allowed if stream is already associated with otel metrics or any log formats otel logs can be ingested with other log formats --- src/handlers/http/ingest.rs | 95 ++++++++++++++++++++++++++++++++++--- src/parseable/mod.rs | 7 +-- 2 files changed, 90 insertions(+), 12 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index fa699d7af..485b071a9 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -89,16 +89,32 @@ pub async fn ingest( }; let log_source_entry = LogSourceEntry::new(log_source.clone(), fields); - let p_custom_fields = get_custom_fields_from_header(req); - + PARSEABLE .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, - vec![log_source_entry], + vec![log_source_entry.clone()], ) .await?; + //if stream exists, fetch the stream log source + //return error if the stream log source is otel traces or otel metrics + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format != LogSource::OtelTraces + && stream_log_source_entry.log_source_format != LogSource::OtelMetrics + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry) + .await?; + let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) @@ -159,9 +175,27 @@ pub async fn handle_otel_logs_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, - vec![log_source_entry], + vec![log_source_entry.clone()], ) .await?; + + //if stream exists, fetch the stream log source + //return error if the stream log source is otel traces or otel metrics + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format != LogSource::OtelTraces + && stream_log_source_entry.log_source_format != LogSource::OtelMetrics + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry) + .await?; + let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -188,6 +222,7 @@ pub async fn handle_otel_metrics_ingestion( } let stream_name = stream_name.to_str().unwrap().to_owned(); + let log_source_entry = LogSourceEntry::new( log_source.clone(), OTEL_METRICS_KNOWN_FIELD_LIST @@ -199,10 +234,26 @@ pub async fn handle_otel_metrics_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, - vec![log_source_entry], + vec![log_source_entry.clone()], ) .await?; + //if stream exists, fetch the stream log source + //return error if the stream log source is not otel metrics + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format == log_source.clone() + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry) + .await?; + let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -229,6 +280,7 @@ pub async fn handle_otel_traces_ingestion( return Err(PostError::IncorrectLogSource(LogSource::OtelTraces)); } let stream_name = stream_name.to_str().unwrap().to_owned(); + let log_source_entry = LogSourceEntry::new( log_source.clone(), OTEL_TRACES_KNOWN_FIELD_LIST @@ -241,10 +293,26 @@ pub async fn handle_otel_traces_ingestion( .create_stream_if_not_exists( &stream_name, StreamType::UserDefined, - vec![log_source_entry], + vec![log_source_entry.clone()], ) .await?; + //if stream exists, fetch the stream log source + //return error if the stream log source is not otel traces + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format == log_source.clone() + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } + + PARSEABLE + .add_update_log_source(&stream_name, log_source_entry) + .await?; + let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -304,6 +372,18 @@ pub async fn post_event( _ => {} } + //if stream exists, fetch the stream log source + //return error if the stream log source is otel traces or otel metrics + if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format != LogSource::OtelTraces + && stream_log_source_entry.log_source_format != LogSource::OtelMetrics + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; + } let p_custom_fields = get_custom_fields_from_header(req); flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; @@ -373,6 +453,8 @@ pub enum PostError { MissingTimePartition(String), #[error("{0}")] KnownFormat(#[from] known_schema::Error), + #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] + IncorrectLogFormat(String), } impl actix_web::ResponseError for PostError { @@ -400,6 +482,7 @@ impl actix_web::ResponseError for PostError { PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST, PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, + PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index b10a34bdd..ed6354eb8 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -427,11 +427,6 @@ impl Parseable { log_source: Vec, ) -> Result { if self.streams.contains(stream_name) { - for stream_log_source in log_source { - self.add_update_log_source(stream_name, stream_log_source) - .await?; - } - return Ok(true); } @@ -443,7 +438,7 @@ impl Parseable { .create_stream_and_schema_from_storage(stream_name) .await? { - return Ok(false); + return Ok(true); } self.create_stream(