diff --git a/server/src/alerts.rs b/server/src/alerts.rs index 13ae789a9..cea6bd6cf 100644 --- a/server/src/alerts.rs +++ b/server/src/alerts.rs @@ -19,8 +19,6 @@ use log::{error, info}; use serde::{Deserialize, Serialize}; -use crate::error::Error; - #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Alerts { @@ -39,7 +37,7 @@ pub struct Alert { impl Alert { // TODO: spawn async tasks to call webhooks if alert rules are met // This is done to ensure that threads aren't blocked by calls to the webhook - pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), Error> { + pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), ()> { if self.rule.resolves(event).await { info!("Alert triggered; name: {}", self.name); for target in self.targets.clone() { diff --git a/server/src/error.rs b/server/src/error.rs deleted file mode 100644 index 4a0e9febb..000000000 --- a/server/src/error.rs +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Parseable Server (C) 2022 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use crate::{response::EventError, storage::ObjectStorageError}; -use datafusion::{arrow::error::ArrowError, error::DataFusionError, parquet::errors::ParquetError}; - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("io error: {0}")] - Io(#[from] std::io::Error), - #[error("serde_json error: {0}")] - Serde(#[from] serde_json::Error), - #[error("error parsing time: {0}")] - TimeParse(#[from] chrono::ParseError), - #[error("JSON provided to query api doesn't contain {0}")] - JsonQuery(&'static str), - #[error("Storage error: {0}")] - Storage(ObjectStorageError), - #[error("Event error: {0}")] - Event(#[from] EventError), - #[error("Parquet error: {0}")] - Parquet(#[from] ParquetError), - #[error("Arrow error: {0}")] - Arrow(#[from] ArrowError), - #[error("Data Fusion error: {0}")] - DataFusion(#[from] DataFusionError), - #[error("UTF8 parsing error: {0}")] - Utf8(#[from] std::string::FromUtf8Error), - #[error("log stream name cannot be empty")] - EmptyName, - #[error("log stream name cannot contain spaces: {0}")] - NameWhiteSpace(String), - #[error("log stream name cannot contain special characters: {0}")] - NameSpecialChar(String), - #[error("log stream name cannot contain uppercase characters: {0}")] - NameUpperCase(String), - #[error("log stream name cannot be numeric only: {0}")] - NameNumericOnly(String), - #[error("log stream name cannot start with a number: {0}")] - NameCantStartWithNumber(String), - #[error("log stream name cannot be a sql keyword: {0}")] - SQLKeyword(String), - #[error("queries across multiple streams are not supported currently: {0}")] - MultipleStreams(String), - #[error("start time can not be later than end time")] - StartTimeAfterEndTime(), - #[error("query '{0}' is incomplete")] - IncompleteQuery(String), - #[error("query cannot be empty")] - EmptyQuery, - #[error("start time cannot be empty in query")] - EmptyStartTime, - #[error("end time cannot be empty in query")] - EmptyEndTime, - #[error("joins are not supported currently: {0}")] - Join(String), - #[error("missing record batch")] - MissingRecord, - #[error("metadata not found for log stream: {0}")] - StreamMetaNotFound(String), - #[error("invalid alert config: {0}")] - InvalidAlert(String), - #[error("this event schema doesn't match with stream schema. please ensure event data is in same format as previous events sent to the stream: {0}")] - SchemaMismatch(String), - #[error("alert for stream not found in storage: {0}")] - AlertNotInStore(String), - #[error("schema for stream not found in storage: {0}")] - SchemaNotInStore(String), -} diff --git a/server/src/event.rs b/server/src/event.rs index 23e13fbdf..0b27c5a89 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -18,6 +18,7 @@ */ use datafusion::arrow; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::error::ArrowError; use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::json; use datafusion::arrow::json::reader::infer_json_schema; @@ -34,9 +35,9 @@ use std::sync::RwLock; use crate::metadata; use crate::option::CONFIG; -use crate::response; use crate::storage::ObjectStorage; -use crate::Error; + +use self::error::EventError; type LocalWriter = Mutex>>; type LocalWriterGuard<'a> = MutexGuard<'a, Option>>; @@ -51,13 +52,13 @@ impl STREAM_WRITERS { fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), StreamWriterError> { let hashmap_guard = STREAM_WRITERS .read() - .map_err(|_| StreamWriterError::RwPoisioned)?; + .map_err(|_| StreamWriterError::RwPoisoned)?; match hashmap_guard.get(stream) { Some(localwriter) => { let mut writer_guard = localwriter .lock() - .map_err(|_| StreamWriterError::MutexPoisioned)?; + .map_err(|_| StreamWriterError::MutexPoisoned)?; // if it's some writer then we write without dropping any lock // hashmap cannot be brought mutably at any point until this finishes @@ -85,7 +86,7 @@ impl STREAM_WRITERS { fn create_entry(stream: String, record: &RecordBatch) -> Result<(), StreamWriterError> { let mut hashmap_guard = STREAM_WRITERS .write() - .map_err(|_| StreamWriterError::RwPoisioned)?; + .map_err(|_| StreamWriterError::RwPoisoned)?; let file = OpenOptions::new() .append(true) @@ -109,7 +110,7 @@ impl STREAM_WRITERS { pub fn delete_entry(stream: &str) -> Result<(), StreamWriterError> { let mut hashmap_guard = STREAM_WRITERS .write() - .map_err(|_| StreamWriterError::RwPoisioned)?; + .map_err(|_| StreamWriterError::RwPoisoned)?; hashmap_guard.remove(stream); @@ -143,14 +144,14 @@ impl STREAM_WRITERS { pub fn unset_entry(stream: &str) -> Result<(), StreamWriterError> { let guard = STREAM_WRITERS .read() - .map_err(|_| StreamWriterError::RwPoisioned)?; + .map_err(|_| StreamWriterError::RwPoisoned)?; let stream_writer = match guard.get(stream) { Some(writer) => writer, None => return Ok(()), }; stream_writer .lock() - .map_err(|_| StreamWriterError::MutexPoisioned)? + .map_err(|_| StreamWriterError::MutexPoisoned)? .take(); Ok(()) @@ -163,10 +164,10 @@ pub enum StreamWriterError { Writer(arrow::error::ArrowError), #[error("Io Error when creating new file: {0}")] Io(std::io::Error), - #[error("RwLock was poisioned")] - RwPoisioned, - #[error("Mutex was poisioned")] - MutexPoisioned, + #[error("RwLock was poisoned")] + RwPoisoned, + #[error("Mutex was poisoned")] + MutexPoisoned, } fn data_file_path(stream_name: &str) -> String { @@ -189,24 +190,17 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process( - &self, - storage: &impl ObjectStorage, - ) -> Result { - let inferred_schema = self.infer_schema().map_err(|e| { - error!("Failed to infer schema for event. {:?}", e); - e - })?; + pub async fn process(&self, storage: &impl ObjectStorage) -> Result<(), EventError> { + let inferred_schema = self.infer_schema()?; let event = self.get_reader(inferred_schema.clone()); let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; - let is_first_event = stream_schema.is_none(); if let Some(existing_schema) = stream_schema { // validate schema before processing the event if existing_schema != inferred_schema { - return Err(Error::SchemaMismatch(self.stream_name.clone())); + return Err(EventError::SchemaMismatch(self.stream_name.clone())); } else { self.process_event(event)? } @@ -221,16 +215,7 @@ impl Event { error!("Error checking for alerts. {:?}", e); } - let msg = if is_first_event { - format!( - "Intial Event recieved for log stream {}, schema uploaded successfully", - &self.stream_name, - ) - } else { - format!("Event recieved for log stream {}", &self.stream_name) - }; - - Ok(response::EventResponse { msg }) + Ok(()) } // This is called when the first event of a log stream is received. The first event is @@ -241,56 +226,42 @@ impl Event { mut event: json::Reader, schema: Schema, storage: &impl ObjectStorage, - ) -> Result { - let rb = event.next()?.ok_or(Error::MissingRecord)?; + ) -> Result { + let rb = event.next()?.ok_or(EventError::MissingRecord)?; let stream_name = &self.stream_name; // Store record batch on local cache STREAM_WRITERS::create_entry(stream_name.clone(), &rb).unwrap(); // Put the inferred schema to object store - storage - .put_schema(stream_name.clone(), &schema) - .await - .map_err(|e| response::EventError { - msg: format!( - "Failed to upload schema for log stream {} due to err: {}", - stream_name, e - ), - })?; + storage.put_schema(stream_name.clone(), &schema).await?; // set the schema in memory for this stream - metadata::STREAM_INFO - .set_schema(stream_name, schema) - .map_err(|e| response::EventError { - msg: format!( - "Failed to set schema for log stream {} due to err: {}", - stream_name, e - ), - })?; + metadata::STREAM_INFO.set_schema(stream_name, schema)?; Ok(0) } // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. - fn process_event(&self, mut event: json::Reader) -> Result { - let rb = event.next()?.ok_or(Error::MissingRecord)?; + fn process_event( + &self, + mut event: json::Reader, + ) -> Result { + let rb = event.next()?.ok_or(EventError::MissingRecord)?; let stream_name = &self.stream_name; - STREAM_WRITERS::append_to_local(stream_name, &rb).unwrap(); + STREAM_WRITERS::append_to_local(stream_name, &rb)?; Ok(0) } // inferSchema is a constructor to Schema // returns raw arrow schema type and arrow schema to string type. - fn infer_schema(&self) -> Result { + fn infer_schema(&self) -> Result { let reader = self.body.as_bytes(); let mut buf_reader = BufReader::new(reader); - let inferred_schema = infer_json_schema(&mut buf_reader, None)?; - - Ok(inferred_schema) + infer_json_schema(&mut buf_reader, None) } fn get_reader(&self, arrow_schema: arrow::datatypes::Schema) -> json::Reader<&[u8]> { @@ -301,3 +272,27 @@ impl Event { ) } } + +pub mod error { + use crate::metadata::error::stream_info::MetadataError; + use crate::storage::ObjectStorageError; + use datafusion::arrow::error::ArrowError; + + use super::StreamWriterError; + + #[derive(Debug, thiserror::Error)] + pub enum EventError { + #[error("Missing Record from event body")] + MissingRecord, + #[error("Stream Writer Failed: {0}")] + StreamWriter(#[from] StreamWriterError), + #[error("Metadata Error: {0}")] + Metadata(#[from] MetadataError), + #[error("Stream Writer Failed: {0}")] + Arrow(#[from] ArrowError), + #[error("Schema Mismatch: {0}")] + SchemaMismatch(String), + #[error("Schema Mismatch: {0}")] + ObjectStorage(#[from] ObjectStorageError), + } +} diff --git a/server/src/handlers/event.rs b/server/src/handlers/event.rs index 0b04e931f..5a3115655 100644 --- a/server/src/handlers/event.rs +++ b/server/src/handlers/event.rs @@ -18,143 +18,135 @@ use std::collections::HashMap; -use actix_web::http::StatusCode; -use actix_web::{web, HttpRequest, HttpResponse, ResponseError}; +use actix_web::{web, HttpRequest, HttpResponse}; use serde_json::Value; use crate::event; -use crate::metadata; use crate::query::Query; -use crate::response::{self, EventResponse}; +use crate::response::QueryResponse; use crate::s3::S3; -use crate::storage::ObjectStorage; use crate::utils::header_parsing::collect_labelled_headers; -use crate::utils::{self, merge}; +use crate::utils::{self, flatten_json_body, merge}; + +use self::error::{PostError, QueryError}; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const SEPARATOR: char = '^'; -pub async fn query(_req: HttpRequest, json: web::Json) -> HttpResponse { +pub async fn query(_req: HttpRequest, json: web::Json) -> Result { let json = json.into_inner(); - let query = match Query::parse(json) { - Ok(s) => s, - Err(crate::Error::JsonQuery(e)) => { - return response::ServerResponse { - msg: format!("Bad Request: missing \"{}\" field in query payload", e), - code: StatusCode::BAD_REQUEST, - } - .to_http() - } - Err(e) => { - return response::ServerResponse { - msg: format!("Failed to execute query due to err: {}", e), - code: StatusCode::BAD_REQUEST, - } - .to_http() - } - }; + let query = Query::parse(json)?; let storage = S3::new(); - if storage.get_schema(&query.stream_name).await.is_err() { - return response::ServerResponse { - msg: format!("log stream {} does not exist", query.stream_name), - code: StatusCode::BAD_REQUEST, - } - .to_http(); - } + let query_result = query.execute(&storage).await; - match query.execute(&storage).await { - Ok(results) => response::QueryResponse { - body: results, - code: StatusCode::OK, - } - .to_http(), - Err(e) => response::ServerResponse { - msg: e.to_string(), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(), - } + query_result + .map(Into::::into) + .map(|response| response.to_http()) + .map_err(|e| e.into()) } -pub async fn post_event(req: HttpRequest, body: web::Json) -> HttpResponse { +pub async fn post_event( + req: HttpRequest, + body: web::Json, +) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let tags = match collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR) { - Ok(tags) => HashMap::from([("p_tags".to_string(), tags)]), - Err(e) => return e.error_response(), - }; + let tags = HashMap::from([( + "p_tags".to_string(), + collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?, + )]); - let metadata = match collect_labelled_headers(&req, PREFIX_META, SEPARATOR) { - Ok(metadata) => HashMap::from([("p_metadata".to_string(), metadata)]), - Err(e) => return e.error_response(), - }; - - if let Err(e) = metadata::STREAM_INFO.schema(&stream_name) { - // if stream doesn't exist, fail to post data - return response::ServerResponse { - msg: format!( - "Failed to post event. Log stream {} does not exist. Error: {}", - stream_name, e - ), - code: StatusCode::NOT_FOUND, - } - .to_http(); - }; + let metadata = HashMap::from([( + "p_metadata".to_string(), + collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?, + )]); let s3 = S3::new(); if let Some(array) = body.as_array() { - let mut i = 0; - for body in array { let body = merge(body.clone(), metadata.clone()); let body = merge(body, tags.clone()); - let body = utils::flatten_json_body(web::Json(body)).unwrap(); + let body = flatten_json_body(web::Json(body)).unwrap(); let e = event::Event { body, stream_name: stream_name.clone(), }; - if let Err(e) = e.process(&s3).await { - return response::ServerResponse { - msg: format!("failed to process event because {}", e), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); - } - - i += 1; + e.process(&s3).await?; } + } else { + let body = merge(body.clone(), metadata); + let body = merge(body, tags); - return response::ServerResponse { - msg: format!("Successfully posted {} events", i), - code: StatusCode::OK, - } - .to_http(); + let event = event::Event { + body: utils::flatten_json_body(web::Json(body)).unwrap(), + stream_name, + }; + + event.process(&s3).await?; } - let body = merge(body.clone(), metadata); - let body = merge(body, tags); + Ok(HttpResponse::Ok().finish()) +} + +pub mod error { + use actix_web::http::header::ContentType; + use http::StatusCode; - let event = event::Event { - body: utils::flatten_json_body(web::Json(body)).unwrap(), - stream_name, + use crate::{ + event::error::EventError, + query::error::{ExecuteError, ParseError}, + utils::header_parsing::ParseHeaderError, }; - match event.process(&s3).await { - Ok(EventResponse { msg }) => response::ServerResponse { - msg, - code: StatusCode::INTERNAL_SERVER_ERROR, + #[derive(Debug, thiserror::Error)] + pub enum QueryError { + #[error("Bad request: {0}")] + Parse(#[from] ParseError), + #[error("Query execution failed due to {0}")] + Execute(#[from] ExecuteError), + } + + impl actix_web::ResponseError for QueryError { + fn status_code(&self) -> http::StatusCode { + match self { + QueryError::Parse(_) => StatusCode::BAD_REQUEST, + QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, + } } - .to_http(), - Err(e) => response::ServerResponse { - msg: format!("Failed to process event due to err: {}", e), - code: StatusCode::INTERNAL_SERVER_ERROR, + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } + } + + #[derive(Debug, thiserror::Error)] + pub enum PostError { + #[error("Header Error: {0}")] + Header(#[from] ParseHeaderError), + #[error("Event Error: {0}")] + Event(#[from] EventError), + } + + impl actix_web::ResponseError for PostError { + fn status_code(&self) -> http::StatusCode { + match self { + PostError::Header(_) => StatusCode::BAD_REQUEST, + PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) } - .to_http(), } } diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index fc084eac7..48bcc8188 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -59,13 +59,7 @@ pub async fn delete(req: HttpRequest) -> HttpResponse { .to_http(); } - if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) { - log::warn!( - "failed to delete log stream {} from metadata due to err: {}", - stream_name, - e - ) - } + metadata::STREAM_INFO.delete_stream(&stream_name); if event::STREAM_WRITERS::delete_entry(&stream_name).is_err() { log::warn!( @@ -172,22 +166,11 @@ pub async fn put(req: HttpRequest) -> HttpResponse { // Proceed to create log stream if it doesn't exist if s3.get_schema(&stream_name).await.is_err() { - if let Err(e) = - metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()) - { - return response::ServerResponse { - msg: format!( - "failed to create log stream {} due to error: {}", - stream_name, e - ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); - } + metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default()); // Fail if unable to create log stream on object store backend if let Err(e) = s3.create_stream(&stream_name).await { // delete the stream from metadata because we couldn't create it on object store backend - metadata::STREAM_INFO.delete_stream(&stream_name).unwrap(); + metadata::STREAM_INFO.delete_stream(&stream_name); return response::ServerResponse { msg: format!( "failed to create log stream {} due to err: {}", @@ -231,7 +214,7 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> } }; - if let Err(e) = validator::alert(serde_json::to_string(&alerts).unwrap()) { + if let Err(e) = validator::alert(&alerts) { return response::ServerResponse { msg: format!( "failed to set alert configuration for log stream {} due to err: {}", diff --git a/server/src/main.rs b/server/src/main.rs index cd5df458d..a5830375c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -43,7 +43,6 @@ use tokio::sync::oneshot::error::TryRecvError; mod alerts; mod banner; -mod error; mod event; mod handlers; mod metadata; @@ -55,7 +54,6 @@ mod storage; mod utils; mod validator; -use error::Error; use option::CONFIG; use s3::S3; use storage::{ObjectStorage, StorageDir}; diff --git a/server/src/metadata.rs b/server/src/metadata.rs index c7f923529..34e7f06e5 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,16 +18,16 @@ use datafusion::arrow::datatypes::Schema; use lazy_static::lazy_static; -use log::error; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::RwLock; use crate::alerts::Alerts; -use crate::error::Error; use crate::event::Event; use crate::storage::ObjectStorage; +use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; + #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct LogStreamMetadata { pub schema: Option, @@ -60,6 +60,9 @@ lazy_static! { RwLock::new(HashMap::new()); } +// It is very unlikely that panic will occur when dealing with metadata. +pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding a lock"; + // STREAM_INFO should be updated // 1. During server start up // 2. When a new stream is created (make a new entry in the map) @@ -68,108 +71,102 @@ lazy_static! { // 5. When set alert API is called (update the alert) #[allow(clippy::all)] impl STREAM_INFO { - pub async fn check_alerts(&self, event: &Event) -> Result<(), Error> { - let mut map = self.write().unwrap(); + pub async fn check_alerts(&self, event: &Event) -> Result<(), CheckAlertError> { + let event_json: serde_json::Value = serde_json::from_str(&event.body)?; + + let mut map = self.write().expect(LOCK_EXPECT); let meta = map .get_mut(&event.stream_name) - .ok_or(Error::StreamMetaNotFound(event.stream_name.to_owned()))?; - - let event: serde_json::Value = serde_json::from_str(&event.body)?; + .ok_or(MetadataError::StreamMetaNotFound( + event.stream_name.to_owned(), + ))?; for alert in meta.alerts.alerts.iter_mut() { - if let Err(e) = alert.check_alert(&event).await { - error!("Error while parsing event against alerts: {}", e); + if alert.check_alert(&event_json).await.is_err() { + log::error!("Error while parsing event against alerts"); } } Ok(()) } - pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), Error> { - let mut map = self.write().unwrap(); + pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) - .ok_or(Error::StreamMetaNotFound(stream_name.to_string())) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| { metadata.schema.replace(schema); }) } - pub fn schema(&self, stream_name: &str) -> Result, Error> { - let map = self.read().unwrap(); + pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) - .ok_or(Error::StreamMetaNotFound(stream_name.to_string())) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| metadata.schema.to_owned()) } - pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), Error> { - let mut map = self.write().unwrap(); + pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) - .ok_or(Error::StreamMetaNotFound(stream_name.to_string())) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| { metadata.alerts = alerts; }) } - pub fn alert(&self, stream_name: &str) -> Result { - let map = self.read().unwrap(); + pub fn alert(&self, stream_name: &str) -> Result { + let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) - .ok_or(Error::StreamMetaNotFound(stream_name.to_owned())) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned())) .map(|metadata| metadata.alerts.to_owned()) } - pub fn add_stream( - &self, - stream_name: String, - schema: Option, - alerts: Alerts, - ) -> Result<(), Error> { - let mut map = self.write().unwrap(); + pub fn add_stream(&self, stream_name: String, schema: Option, alerts: Alerts) { + let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { schema, alerts, ..Default::default() }; map.insert(stream_name, metadata); - - Ok(()) } - pub fn delete_stream(&self, stream_name: &str) -> Result<(), Error> { - let mut map = self.write().unwrap(); + pub fn delete_stream(&self, stream_name: &str) { + let mut map = self.write().expect(LOCK_EXPECT); map.remove(stream_name); - - Ok(()) } - pub async fn load(&self, storage: &impl ObjectStorage) -> Result<(), Error> { + pub async fn load(&self, storage: &impl ObjectStorage) -> Result<(), LoadError> { + // When loading streams this funtion will assume list_streams only returns valid streams. + // a valid stream would have a .schema file. + // .schema file could be empty in that case it will be treated as an uninitialized stream. + // return error in case of an error from object storage itself. + for stream in storage.list_streams().await? { - // Ignore S3 errors here, because we are just trying - // to load the stream metadata based on whatever is available. - let alerts = storage - .get_alerts(&stream.name) - .await - .map_err(|_| Error::AlertNotInStore(stream.name.to_owned())); - - let schema = storage - .get_schema(&stream.name) - .await - .map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?; + let alerts = storage.get_alerts(&stream.name).await?; + let schema = storage.get_schema(&stream.name).await?; let metadata = LogStreamMetadata { schema, - alerts: alerts.unwrap_or_default(), - ..Default::default() + alerts, + ..LogStreamMetadata::default() }; - let mut map = self.write().unwrap(); - map.insert(stream.name.clone(), metadata); + let mut map = self.write().expect(LOCK_EXPECT); + + map.insert(stream.name, metadata); } Ok(()) } pub fn list_streams(&self) -> Vec { - self.read().unwrap().keys().map(String::clone).collect() + self.read() + .expect(LOCK_EXPECT) + .keys() + .map(String::clone) + .collect() } #[allow(dead_code)] @@ -178,11 +175,11 @@ impl STREAM_INFO { stream_name: &str, size: u64, compressed_size: u64, - ) -> Result<(), Error> { - let mut map = self.write().unwrap(); + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); let stream = map .get_mut(stream_name) - .ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))?; + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?; stream.stats.update(size, compressed_size); @@ -190,6 +187,32 @@ impl STREAM_INFO { } } +pub mod error { + pub mod stream_info { + use crate::storage::ObjectStorageError; + + #[derive(Debug, thiserror::Error)] + pub enum CheckAlertError { + #[error("Serde Json Error: {0}")] + Serde(#[from] serde_json::Error), + #[error("Metadata Error: {0}")] + Metadata(#[from] MetadataError), + } + + #[derive(Debug, thiserror::Error)] + pub enum MetadataError { + #[error("Metadata for stream {0} not found. Maybe the stream does not exist")] + StreamMetaNotFound(String), + } + + #[derive(Debug, thiserror::Error)] + pub enum LoadError { + #[error("Error while loading from object storage: {0}")] + ObjectStorage(#[from] ObjectStorageError), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -238,9 +261,7 @@ mod tests { fn test_add_stream(#[case] stream_name: String, #[case] schema: Option) { let alerts = Alerts { alerts: vec![] }; clear_map(); - STREAM_INFO - .add_stream(stream_name.clone(), schema.clone(), alerts.clone()) - .unwrap(); + STREAM_INFO.add_stream(stream_name.clone(), schema.clone(), alerts.clone()); let left = STREAM_INFO.read().unwrap().clone(); let right = hashmap! { @@ -258,11 +279,9 @@ mod tests { #[serial] fn test_delete_stream(#[case] stream_name: String) { clear_map(); - STREAM_INFO - .add_stream(stream_name.clone(), None, Alerts { alerts: vec![] }) - .unwrap(); + STREAM_INFO.add_stream(stream_name.clone(), None, Alerts { alerts: vec![] }); - STREAM_INFO.delete_stream(&stream_name).unwrap(); + STREAM_INFO.delete_stream(&stream_name); let map = STREAM_INFO.read().unwrap(); assert!(!map.contains_key(&stream_name)); } diff --git a/server/src/query.rs b/server/src/query.rs index 318201433..9b21f9b01 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -28,21 +28,18 @@ use datafusion::prelude::*; use serde_json::Value; use std::sync::Arc; -use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage; use crate::storage::ObjectStorage; use crate::storage::ObjectStorageError; use crate::utils::TimePeriod; use crate::validator; -use crate::Error; - -fn get_value<'a>(value: &'a Value, key: &'static str) -> Result<&'a str, Error> { - value - .get(key) - .ok_or(Error::JsonQuery(key))? - .as_str() - .ok_or(Error::JsonQuery(key)) + +use self::error::{ExecuteError, ParseError}; + +type Key = &'static str; +fn get_value(value: &Value, key: Key) -> Result<&str, Key> { + value.get(key).and_then(|value| value.as_str()).ok_or(key) } // Query holds all values relevant to a query for a single log stream @@ -57,13 +54,13 @@ pub struct Query { impl Query { // parse_query parses the SQL query and returns the log stream name on which // this query is supposed to be executed - pub fn parse(query_json: Value) -> Result { + pub fn parse(query_json: Value) -> Result { // retrieve query, start and end time information from payload. let query = get_value(&query_json, "query")?; let start_time = get_value(&query_json, "startTime")?; let end_time = get_value(&query_json, "endTime")?; - validator::query(query, start_time, end_time) + Ok(validator::query(query, start_time, end_time)?) } /// Return prefixes, each per day/hour/minutes as necessary @@ -73,8 +70,11 @@ impl Query { } /// Execute query on object storage(and if necessary on cache as well) with given stream information - /// TODO: Query local and remote S3 parquet files in a single context - pub async fn execute(&self, storage: &impl ObjectStorage) -> Result, Error> { + /// TODO: find a way to query all selected parquet files together in a single context. + pub async fn execute( + &self, + storage: &impl ObjectStorage, + ) -> Result, ExecuteError> { let mut results = vec![]; storage.query(self, &mut results).await?; @@ -87,7 +87,7 @@ impl Query { Ok(results) } - async fn execute_on_cache(&self, results: &mut Vec) -> Result<(), Error> { + async fn execute_on_cache(&self, results: &mut Vec) -> Result<(), ExecuteError> { let ctx = SessionContext::new(); let file_format = ParquetFormat::default().with_enable_pruning(true); @@ -99,13 +99,6 @@ impl Query { target_partitions: 1, }; - let schema = STREAM_INFO.schema(&self.stream_name)?; - - let schema = match schema { - Some(schema) => Arc::new(schema), - None => return Ok(()), - }; - let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name); let table_path = match ListingTableUrl::parse( @@ -120,7 +113,7 @@ impl Query { let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) - .with_schema(schema); + .with_schema(Arc::clone(&self.schema)); let table = ListingTable::try_new(config)?; @@ -129,12 +122,42 @@ impl Query { // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; - results.extend(df.collect().await.map_err(Error::DataFusion)?); + results.extend(df.collect().await?); Ok(()) } } +pub mod error { + use datafusion::error::DataFusionError; + + use crate::{storage::ObjectStorageError, validator::error::QueryValidationError}; + + use super::Key; + + #[derive(Debug, thiserror::Error)] + pub enum ParseError { + #[error("Key not found: {0}")] + Key(String), + #[error("Error parsing query: {0}")] + Validation(#[from] QueryValidationError), + } + + impl From for ParseError { + fn from(key: Key) -> Self { + ParseError::Key(key.to_string()) + } + } + + #[derive(Debug, thiserror::Error)] + pub enum ExecuteError { + #[error("Query Execution failed due to error in object storage: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Query Execution failed due to error in datafusion: {0}")] + Datafusion(#[from] DataFusionError), + } +} + #[cfg(test)] mod tests { use super::Query; @@ -177,9 +200,7 @@ mod tests { #[serial_test::serial] fn query_parse_prefix_with_some_schema(#[case] prefix: &str, #[case] right: &[&str]) { clear_map(); - STREAM_INFO - .add_stream("stream_name".to_string(), Some(schema()), Alerts::default()) - .unwrap(); + STREAM_INFO.add_stream("stream_name".to_string(), Some(schema()), Alerts::default()); let query = Value::from_str(prefix).unwrap(); let query = Query::parse(query).unwrap(); @@ -201,9 +222,7 @@ mod tests { #[serial_test::serial] fn query_parse_prefix_with_no_schema(#[case] prefix: &str) { clear_map(); - STREAM_INFO - .add_stream("stream_name".to_string(), None, Alerts::default()) - .unwrap(); + STREAM_INFO.add_stream("stream_name".to_string(), None, Alerts::default()); let query = Value::from_str(prefix).unwrap(); assert!(Query::parse(query).is_err()); diff --git a/server/src/response.rs b/server/src/response.rs index 9020bc217..4f727f7cd 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -62,8 +62,13 @@ impl QueryResponse { } } -pub struct EventResponse { - pub msg: String, +impl From> for QueryResponse { + fn from(body: Vec) -> Self { + Self { + code: StatusCode::OK, + body, + } + } } #[derive(Debug, Display, Error)] diff --git a/server/src/s3.rs b/server/src/s3.rs index d7a7ee0c1..4843614c0 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -382,10 +382,15 @@ impl ObjectStorage for S3 { } async fn get_alerts(&self, stream_name: &str) -> Result { - let body_bytes = self._alert_exists(stream_name).await?; - let alerts = serde_json::from_slice(&body_bytes).unwrap_or_default(); - - Ok(alerts) + let res = self._alert_exists(stream_name).await; + + match res { + Ok(bytes) => Ok(serde_json::from_slice(&bytes).unwrap_or_default()), + Err(e) => match e { + AwsSdkError::NoSuchKey(_) => Ok(Alerts::default()), + e => Err(e.into()), + }, + } } async fn get_stats(&self, stream_name: &str) -> Result { diff --git a/server/src/storage.rs b/server/src/storage.rs index d86663c65..18ab005eb 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -304,9 +304,3 @@ pub enum ObjectStorageError { #[error("Authentication Error: {0}")] AuthenticationError(Box), } - -impl From for crate::error::Error { - fn from(e: ObjectStorageError) -> Self { - crate::error::Error::Storage(e) - } -} diff --git a/server/src/utils.rs b/server/src/utils.rs index 021eba995..a0856e127 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -21,9 +21,7 @@ use chrono::{Date, DateTime, Timelike, Utc}; use serde_json::{json, Value}; use std::collections::HashMap; -use crate::Error; - -pub fn flatten_json_body(body: web::Json) -> Result { +pub fn flatten_json_body(body: web::Json) -> Result { let mut flat_value: Value = json!({}); flatten_json::flatten(&body, &mut flat_value, None, true, Some("_")).unwrap(); let flattened = serde_json::to_string(&flat_value)?; diff --git a/server/src/validator.rs b/server/src/validator.rs index 031ace6e1..2209f4574 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -24,83 +24,91 @@ use serde_json::json; use crate::alerts::Alerts; use crate::metadata::STREAM_INFO; use crate::query::Query; -use crate::Error; + +use self::error::{AlertValidationError, QueryValidationError, StreamNameValidationError}; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ "select", "from", "where", "group", "by", "order", "limit", "offset", "join", "and", ]; -pub fn alert(body: String) -> Result<(), Error> { - let alerts: Alerts = serde_json::from_str(body.as_str())?; - for alert in alerts.alerts { +pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { + for alert in &alerts.alerts { if alert.name.is_empty() { - return Err(Error::InvalidAlert( - "alert name cannot be empty".to_string(), - )); + return Err(AlertValidationError::EmptyName); } if alert.message.is_empty() { - return Err(Error::InvalidAlert( - "alert message cannot be empty".to_string(), - )); + return Err(AlertValidationError::EmptyMessage); } if alert.rule.value == json!(null) { - return Err(Error::InvalidAlert( - "rule.value cannot be empty".to_string(), - )); + return Err(AlertValidationError::EmptyRuleValue); } if alert.rule.field.is_empty() { - return Err(Error::InvalidAlert("rule.field must be set".to_string())); + return Err(AlertValidationError::EmptyRuleField); } if alert.rule.within.is_empty() { - return Err(Error::InvalidAlert("rule.within must be set".to_string())); + return Err(AlertValidationError::EmptyRuleWithin); } if alert.rule.repeats == 0 { - return Err(Error::InvalidAlert( - "rule.repeats can't be set to 0".to_string(), - )); + return Err(AlertValidationError::InvalidRuleRepeat); } if alert.targets.is_empty() { - return Err(Error::InvalidAlert( - "alert must have at least one target".to_string(), - )); + return Err(AlertValidationError::NoTarget); } } Ok(()) } -pub fn stream_name(str_name: &str) -> Result<(), Error> { - if str_name.is_empty() { - return Err(Error::EmptyName); +pub fn stream_name(stream_name: &str) -> Result<(), StreamNameValidationError> { + if stream_name.is_empty() { + return Err(StreamNameValidationError::EmptyName); } - if str_name.chars().all(char::is_numeric) { - return Err(Error::NameNumericOnly(str_name.to_owned())); + if stream_name.chars().all(char::is_numeric) { + return Err(StreamNameValidationError::NameNumericOnly( + stream_name.to_owned(), + )); } - if str_name.chars().next().unwrap().is_numeric() { - return Err(Error::NameCantStartWithNumber(str_name.to_owned())); + if stream_name.chars().next().unwrap().is_numeric() { + return Err(StreamNameValidationError::NameCantStartWithNumber( + stream_name.to_owned(), + )); } - for c in str_name.chars() { + for c in stream_name.chars() { match c { - ' ' => return Err(Error::NameWhiteSpace(str_name.to_owned())), - c if !c.is_alphanumeric() => return Err(Error::NameSpecialChar(str_name.to_owned())), - c if c.is_ascii_uppercase() => return Err(Error::NameUpperCase(str_name.to_owned())), + ' ' => { + return Err(StreamNameValidationError::NameWhiteSpace( + stream_name.to_owned(), + )) + } + c if !c.is_alphanumeric() => { + return Err(StreamNameValidationError::NameSpecialChar( + stream_name.to_owned(), + )) + } + c if c.is_ascii_uppercase() => { + return Err(StreamNameValidationError::NameUpperCase( + stream_name.to_owned(), + )) + } _ => {} } } - if DENIED_NAMES.contains(&str_name) { - return Err(Error::SQLKeyword(str_name.to_owned())); + if DENIED_NAMES.contains(&stream_name) { + return Err(StreamNameValidationError::SQLKeyword( + stream_name.to_owned(), + )); } Ok(()) } -pub fn query(query: &str, start_time: &str, end_time: &str) -> Result { +pub fn query(query: &str, start_time: &str, end_time: &str) -> Result { if query.is_empty() { - return Err(Error::EmptyQuery); + return Err(QueryValidationError::EmptyQuery); } // convert query to lower case for validation only @@ -110,16 +118,16 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result>(); if tokens.contains(&"join") { - return Err(Error::Join(query.to_string())); + return Err(QueryValidationError::ContainsJoin(query.to_string())); } if tokens.len() < 4 { - return Err(Error::IncompleteQuery(query.to_string())); + return Err(QueryValidationError::IncompleteQuery(query.to_string())); } if start_time.is_empty() { - return Err(Error::EmptyStartTime); + return Err(QueryValidationError::EmptyStartTime); } if end_time.is_empty() { - return Err(Error::EmptyEndTime); + return Err(QueryValidationError::EmptyEndTime); } // log stream name is located after the `from` keyword @@ -127,20 +135,26 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result stream_name_index + 1 && tokens[stream_name_index + 1] == "and" { - return Err(Error::MultipleStreams(query.to_string())); + return Err(QueryValidationError::MultipleStreams(query.to_string())); } - let start: DateTime = DateTime::parse_from_rfc3339(start_time)?.into(); - let end: DateTime = DateTime::parse_from_rfc3339(end_time)?.into(); + let start: DateTime = DateTime::parse_from_rfc3339(start_time) + .map_err(|_| QueryValidationError::StartTimeParse)? + .into(); + + let end: DateTime = DateTime::parse_from_rfc3339(end_time) + .map_err(|_| QueryValidationError::EndTimeParse)? + .into(); + if start.timestamp() > end.timestamp() { - return Err(Error::StartTimeAfterEndTime()); + return Err(QueryValidationError::StartTimeAfterEndTime); } let stream_name = tokens[stream_name_index].to_string(); let schema = match STREAM_INFO.schema(&stream_name)? { Some(schema) => Arc::new(schema), - None => return Err(Error::MissingRecord), + None => return Err(QueryValidationError::UninitializedStream), }; Ok(Query { @@ -151,3 +165,69 @@ pub fn query(query: &str, start_time: &str, end_time: &str) -> Result