Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions server/src/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
use log::{error, info};
use serde::{Deserialize, Serialize};

use crate::error::Error;

#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Alerts {
Expand All @@ -39,7 +37,7 @@ pub struct Alert {
impl Alert {
// TODO: spawn async tasks to call webhooks if alert rules are met
// This is done to ensure that threads aren't blocked by calls to the webhook
pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), Error> {
pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), ()> {
if self.rule.resolves(event).await {
info!("Alert triggered; name: {}", self.name);
for target in self.targets.clone() {
Expand Down
84 changes: 0 additions & 84 deletions server/src/error.rs

This file was deleted.

111 changes: 53 additions & 58 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::json;
use datafusion::arrow::json::reader::infer_json_schema;
Expand All @@ -34,9 +35,9 @@ use std::sync::RwLock;

use crate::metadata;
use crate::option::CONFIG;
use crate::response;
use crate::storage::ObjectStorage;
use crate::Error;

use self::error::EventError;

type LocalWriter = Mutex<Option<StreamWriter<std::fs::File>>>;
type LocalWriterGuard<'a> = MutexGuard<'a, Option<StreamWriter<std::fs::File>>>;
Expand All @@ -51,13 +52,13 @@ impl STREAM_WRITERS {
fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), StreamWriterError> {
let hashmap_guard = STREAM_WRITERS
.read()
.map_err(|_| StreamWriterError::RwPoisioned)?;
.map_err(|_| StreamWriterError::RwPoisoned)?;

match hashmap_guard.get(stream) {
Some(localwriter) => {
let mut writer_guard = localwriter
.lock()
.map_err(|_| StreamWriterError::MutexPoisioned)?;
.map_err(|_| StreamWriterError::MutexPoisoned)?;

// if it's some writer then we write without dropping any lock
// hashmap cannot be brought mutably at any point until this finishes
Expand Down Expand Up @@ -85,7 +86,7 @@ impl STREAM_WRITERS {
fn create_entry(stream: String, record: &RecordBatch) -> Result<(), StreamWriterError> {
let mut hashmap_guard = STREAM_WRITERS
.write()
.map_err(|_| StreamWriterError::RwPoisioned)?;
.map_err(|_| StreamWriterError::RwPoisoned)?;

let file = OpenOptions::new()
.append(true)
Expand All @@ -109,7 +110,7 @@ impl STREAM_WRITERS {
pub fn delete_entry(stream: &str) -> Result<(), StreamWriterError> {
let mut hashmap_guard = STREAM_WRITERS
.write()
.map_err(|_| StreamWriterError::RwPoisioned)?;
.map_err(|_| StreamWriterError::RwPoisoned)?;

hashmap_guard.remove(stream);

Expand Down Expand Up @@ -143,14 +144,14 @@ impl STREAM_WRITERS {
pub fn unset_entry(stream: &str) -> Result<(), StreamWriterError> {
let guard = STREAM_WRITERS
.read()
.map_err(|_| StreamWriterError::RwPoisioned)?;
.map_err(|_| StreamWriterError::RwPoisoned)?;
let stream_writer = match guard.get(stream) {
Some(writer) => writer,
None => return Ok(()),
};
stream_writer
.lock()
.map_err(|_| StreamWriterError::MutexPoisioned)?
.map_err(|_| StreamWriterError::MutexPoisoned)?
.take();

Ok(())
Expand All @@ -163,10 +164,10 @@ pub enum StreamWriterError {
Writer(arrow::error::ArrowError),
#[error("Io Error when creating new file: {0}")]
Io(std::io::Error),
#[error("RwLock was poisioned")]
RwPoisioned,
#[error("Mutex was poisioned")]
MutexPoisioned,
#[error("RwLock was poisoned")]
RwPoisoned,
#[error("Mutex was poisoned")]
MutexPoisoned,
}

fn data_file_path(stream_name: &str) -> String {
Expand All @@ -189,24 +190,17 @@ pub struct Event {
// Events holds the schema related to a each event for a single log stream

impl Event {
pub async fn process(
&self,
storage: &impl ObjectStorage,
) -> Result<response::EventResponse, Error> {
let inferred_schema = self.infer_schema().map_err(|e| {
error!("Failed to infer schema for event. {:?}", e);
e
})?;
pub async fn process(&self, storage: &impl ObjectStorage) -> Result<(), EventError> {
let inferred_schema = self.infer_schema()?;

let event = self.get_reader(inferred_schema.clone());

let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
let is_first_event = stream_schema.is_none();

if let Some(existing_schema) = stream_schema {
// validate schema before processing the event
if existing_schema != inferred_schema {
return Err(Error::SchemaMismatch(self.stream_name.clone()));
return Err(EventError::SchemaMismatch(self.stream_name.clone()));
} else {
self.process_event(event)?
}
Expand All @@ -221,16 +215,7 @@ impl Event {
error!("Error checking for alerts. {:?}", e);
}

let msg = if is_first_event {
format!(
"Intial Event recieved for log stream {}, schema uploaded successfully",
&self.stream_name,
)
} else {
format!("Event recieved for log stream {}", &self.stream_name)
};

Ok(response::EventResponse { msg })
Ok(())
}

// This is called when the first event of a log stream is received. The first event is
Expand All @@ -241,56 +226,42 @@ impl Event {
mut event: json::Reader<R>,
schema: Schema,
storage: &impl ObjectStorage,
) -> Result<u64, Error> {
let rb = event.next()?.ok_or(Error::MissingRecord)?;
) -> Result<u64, EventError> {
let rb = event.next()?.ok_or(EventError::MissingRecord)?;
let stream_name = &self.stream_name;

// Store record batch on local cache
STREAM_WRITERS::create_entry(stream_name.clone(), &rb).unwrap();

// Put the inferred schema to object store
storage
.put_schema(stream_name.clone(), &schema)
.await
.map_err(|e| response::EventError {
msg: format!(
"Failed to upload schema for log stream {} due to err: {}",
stream_name, e
),
})?;
storage.put_schema(stream_name.clone(), &schema).await?;

// set the schema in memory for this stream
metadata::STREAM_INFO
.set_schema(stream_name, schema)
.map_err(|e| response::EventError {
msg: format!(
"Failed to set schema for log stream {} due to err: {}",
stream_name, e
),
})?;
metadata::STREAM_INFO.set_schema(stream_name, schema)?;

Ok(0)
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event<R: std::io::Read>(&self, mut event: json::Reader<R>) -> Result<u64, Error> {
let rb = event.next()?.ok_or(Error::MissingRecord)?;
fn process_event<R: std::io::Read>(
&self,
mut event: json::Reader<R>,
) -> Result<u64, EventError> {
let rb = event.next()?.ok_or(EventError::MissingRecord)?;
let stream_name = &self.stream_name;

STREAM_WRITERS::append_to_local(stream_name, &rb).unwrap();
STREAM_WRITERS::append_to_local(stream_name, &rb)?;

Ok(0)
}

// inferSchema is a constructor to Schema
// returns raw arrow schema type and arrow schema to string type.
fn infer_schema(&self) -> Result<Schema, Error> {
fn infer_schema(&self) -> Result<Schema, ArrowError> {
let reader = self.body.as_bytes();
let mut buf_reader = BufReader::new(reader);
let inferred_schema = infer_json_schema(&mut buf_reader, None)?;

Ok(inferred_schema)
infer_json_schema(&mut buf_reader, None)
}

fn get_reader(&self, arrow_schema: arrow::datatypes::Schema) -> json::Reader<&[u8]> {
Expand All @@ -301,3 +272,27 @@ impl Event {
)
}
}

pub mod error {
use crate::metadata::error::stream_info::MetadataError;
use crate::storage::ObjectStorageError;
use datafusion::arrow::error::ArrowError;

use super::StreamWriterError;

#[derive(Debug, thiserror::Error)]
pub enum EventError {
#[error("Missing Record from event body")]
MissingRecord,
#[error("Stream Writer Failed: {0}")]
StreamWriter(#[from] StreamWriterError),
#[error("Metadata Error: {0}")]
Metadata(#[from] MetadataError),
#[error("Stream Writer Failed: {0}")]
Arrow(#[from] ArrowError),
#[error("Schema Mismatch: {0}")]
SchemaMismatch(String),
#[error("Schema Mismatch: {0}")]
ObjectStorage(#[from] ObjectStorageError),
}
}
Loading