Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 19 additions & 30 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*
*/
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::json;
use datafusion::arrow::json::reader::infer_json_schema;
use datafusion::arrow::record_batch::RecordBatch;
Expand All @@ -41,10 +42,6 @@ pub struct Event {
}

// Events holds the schema related to a each event for a single log stream
pub struct Schema {
pub arrow_schema: arrow::datatypes::Schema,
pub string_schema: String,
}

impl Event {
fn data_file_path(&self) -> String {
Expand All @@ -62,31 +59,29 @@ impl Event {
&self,
storage: &impl ObjectStorage,
) -> Result<response::EventResponse, Error> {
let Schema {
arrow_schema,
string_schema,
} = self.infer_schema().map_err(|e| {
let inferred_schema = self.infer_schema().map_err(|e| {
error!("Failed to infer schema for event. {:?}", e);
e
})?;

let event = self.get_reader(arrow_schema);
let event = self.get_reader(inferred_schema.clone());
let size = self.body_size();

let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
let is_first_event = stream_schema.is_empty();
// if stream schema is empty then it is first event.
let compressed_size = if is_first_event {
// process first event and store schema in obect store
self.process_first_event(event, string_schema.clone(), storage)
.await?
} else {
let is_first_event = stream_schema.is_none();

let compressed_size = if let Some(existing_schema) = stream_schema {
// validate schema before processing the event
if stream_schema != string_schema {
if existing_schema != inferred_schema {
return Err(Error::SchemaMismatch(self.stream_name.clone()));
} else {
self.process_event(event)?
}
} else {
// if stream schema is none then it is first event,
// process first event and store schema in obect store
self.process_first_event(event, inferred_schema, storage)
.await?
};

if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size)
Expand Down Expand Up @@ -116,7 +111,7 @@ impl Event {
async fn process_first_event<R: std::io::Read>(
&self,
mut event: json::Reader<R>,
string_schema: String,
schema: Schema,
storage: &impl ObjectStorage,
) -> Result<u64, Error> {
let rb = event.next()?.ok_or(Error::MissingRecord)?;
Expand All @@ -126,8 +121,9 @@ impl Event {

// Put the inferred schema to object store
let stream_name = &self.stream_name;

storage
.put_schema(stream_name.clone(), string_schema.clone())
.put_schema(stream_name.clone(), &schema)
.await
.map_err(|e| response::EventError {
msg: format!(
Expand All @@ -138,7 +134,7 @@ impl Event {

// set the schema in memory for this stream
metadata::STREAM_INFO
.set_schema(self.stream_name.clone(), string_schema)
.set_schema(&self.stream_name, schema)
.map_err(|e| response::EventError {
msg: format!(
"Failed to set schema for log stream {} due to err: {}",
Expand Down Expand Up @@ -180,12 +176,8 @@ impl Event {
let reader = self.body.as_bytes();
let mut buf_reader = BufReader::new(reader);
let inferred_schema = infer_json_schema(&mut buf_reader, None)?;
let str_inferred_schema = serde_json::to_string(&inferred_schema)?;

Ok(Schema {
arrow_schema: inferred_schema,
string_schema: str_inferred_schema,
})
Ok(inferred_schema)
}

fn get_reader(&self, arrow_schema: arrow::datatypes::Schema) -> json::Reader<&[u8]> {
Expand All @@ -206,11 +198,8 @@ impl Event {
let parquet_path = self.data_file_path();
let parquet_file = fs::File::create(&parquet_path)?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(
parquet_file,
Arc::new(self.infer_schema()?.arrow_schema),
Some(props),
)?;
let mut writer =
ArrowWriter::try_new(parquet_file, Arc::new(self.infer_schema()?), Some(props))?;
writer.write(&rb)?;
writer.close()?;

Expand Down
29 changes: 13 additions & 16 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,24 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {

match metadata::STREAM_INFO.schema(&stream_name) {
Ok(schema) => response::ServerResponse {
msg: schema,
msg: schema
.map(|schema| schema.to_json().to_string())
.unwrap_or_default(),
code: StatusCode::OK,
}
.to_http(),
Err(_) => match S3::new().get_schema(&stream_name).await {
Ok(schema) if schema.is_empty() => response::ServerResponse {
Ok(None) => response::ServerResponse {
msg: "log stream is not initialized, please post an event before fetching schema"
.to_string(),
code: StatusCode::BAD_REQUEST,
}
.to_http(),
Ok(schema) => {
let buf = schema.as_ref();
response::ServerResponse {
msg: String::from_utf8(buf.to_vec()).unwrap(),
code: StatusCode::OK,
}
.to_http()
Ok(Some(schema)) => response::ServerResponse {
msg: serde_json::from_value(schema.to_json()).unwrap(),
code: StatusCode::OK,
}
.to_http(),
Err(_) => response::ServerResponse {
msg: "failed to get log stream schema, because log stream doesn't exist"
.to_string(),
Expand All @@ -124,7 +123,7 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
pub async fn get_alert(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

match metadata::STREAM_INFO.alert(stream_name.clone()) {
match metadata::STREAM_INFO.alert(&stream_name) {
Ok(alerts) => response::ServerResponse {
msg: serde_json::to_string(&alerts).unwrap(),
code: StatusCode::OK,
Expand Down Expand Up @@ -166,11 +165,9 @@ pub async fn put(req: HttpRequest) -> HttpResponse {

// Proceed to create log stream if it doesn't exist
if s3.get_schema(&stream_name).await.is_err() {
if let Err(e) = metadata::STREAM_INFO.add_stream(
stream_name.to_string(),
"".to_string(),
Default::default(),
) {
if let Err(e) =
metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Default::default())
{
return response::ServerResponse {
msg: format!(
"failed to create log stream {} due to error: {}",
Expand Down Expand Up @@ -249,7 +246,7 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
.to_http();
}

if let Err(e) = metadata::STREAM_INFO.set_alert(stream_name.to_string(), alerts) {
if let Err(e) = metadata::STREAM_INFO.set_alert(&stream_name, alerts) {
return response::ServerResponse {
msg: format!(
"failed to set alert configuration for log stream {} due to err: {}",
Expand Down
87 changes: 37 additions & 50 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

use bytes::Bytes;
use datafusion::arrow::datatypes::Schema;
use lazy_static::lazy_static;
use log::error;
use serde::{Deserialize, Serialize};
Expand All @@ -30,7 +30,7 @@ use crate::storage::ObjectStorage;

#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct LogStreamMetadata {
pub schema: String,
pub schema: Option<Schema>,
pub alerts: Alerts,
pub stats: Stats,
}
Expand Down Expand Up @@ -85,38 +85,42 @@ impl STREAM_INFO {
Ok(())
}

pub fn set_schema(&self, stream_name: String, schema: String) -> Result<(), Error> {
let alerts = self.alert(stream_name.clone())?;
self.add_stream(stream_name, schema, alerts)
pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), Error> {
let mut map = self.write().unwrap();
map.get_mut(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.schema.replace(schema);
})
}

pub fn schema(&self, stream_name: &str) -> Result<String, Error> {
pub fn schema(&self, stream_name: &str) -> Result<Option<Schema>, Error> {
let map = self.read().unwrap();
let meta = map
.get(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))?;

Ok(meta.schema.clone())
map.get(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.schema.to_owned())
}

pub fn set_alert(&self, stream_name: String, alerts: Alerts) -> Result<(), Error> {
let schema = self.schema(&stream_name)?;
self.add_stream(stream_name, schema, alerts)
pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), Error> {
let mut map = self.write().unwrap();
map.get_mut(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.alerts = alerts;
})
}

pub fn alert(&self, stream_name: String) -> Result<Alerts, Error> {
pub fn alert(&self, stream_name: &str) -> Result<Alerts, Error> {
let map = self.read().unwrap();
let meta = map
.get(&stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))?;

Ok(meta.alerts.clone())
map.get(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))
.map(|metadata| metadata.alerts.to_owned())
}

pub fn add_stream(
&self,
stream_name: String,
schema: String,
schema: Option<Schema>,
alerts: Alerts,
) -> Result<(), Error> {
let mut map = self.write().unwrap();
Expand Down Expand Up @@ -153,8 +157,6 @@ impl STREAM_INFO {
let schema = storage
.get_schema(&stream.name)
.await
.map_err(|e| e.into())
.and_then(parse_string)
.map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?;

let metadata = LogStreamMetadata {
Expand Down Expand Up @@ -191,17 +193,21 @@ impl STREAM_INFO {
}
}

fn parse_string(bytes: Bytes) -> Result<String, Error> {
String::from_utf8(bytes.to_vec()).map_err(|e| e.into())
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::datatypes::{DataType, Field};
use maplit::hashmap;
use rstest::*;
use serial_test::serial;

#[fixture]
fn schema() -> Schema {
let field_a = Field::new("a", DataType::Int64, false);
let field_b = Field::new("b", DataType::Boolean, false);
Schema::new(vec![field_a, field_b])
}

#[rstest]
#[case::zero(0, 0, 0)]
#[case::some(1024, 512, 2048)]
Expand Down Expand Up @@ -229,25 +235,10 @@ mod tests {
}

#[rstest]
#[case::nonempty_string("Hello world")]
#[case::empty_string("")]
fn test_parse_string(#[case] string: String) {
let bytes = Bytes::from(string);
assert!(parse_string(bytes).is_ok())
}

#[test]
fn test_bad_parse_string() {
let bad: Vec<u8> = vec![195, 40];
let bytes = Bytes::from(bad);
assert!(parse_string(bytes).is_err());
}

#[rstest]
#[case::stream_schema_alert("teststream", "schema")]
#[case::stream_only("teststream", "")]
#[case::stream_schema_alert("teststream", Some(schema()))]
#[case::stream_only("teststream", None)]
#[serial]
fn test_add_stream(#[case] stream_name: String, #[case] schema: String) {
fn test_add_stream(#[case] stream_name: String, #[case] schema: Option<Schema>) {
let alerts = Alerts { alerts: vec![] };
clear_map();
STREAM_INFO
Expand All @@ -271,11 +262,7 @@ mod tests {
fn test_delete_stream(#[case] stream_name: String) {
clear_map();
STREAM_INFO
.add_stream(
stream_name.clone(),
"".to_string(),
Alerts { alerts: vec![] },
)
.add_stream(stream_name.clone(), None, Alerts { alerts: vec![] })
.unwrap();

STREAM_INFO.delete_stream(&stream_name).unwrap();
Expand Down
12 changes: 5 additions & 7 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/

use chrono::{DateTime, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
Expand Down Expand Up @@ -94,13 +93,12 @@ impl Query {
target_partitions: 1,
};

let schema = &STREAM_INFO.schema(&self.stream_name)?;
let schema = STREAM_INFO.schema(&self.stream_name)?;

if schema.is_empty() {
return Ok(());
}

let schema: Arc<Schema> = Arc::new(serde_json::from_str(schema)?);
let schema = match schema {
Some(schema) => Arc::new(schema),
None => return Ok(()),
};

ctx.register_listing_table(
&self.stream_name,
Expand Down
Loading