Skip to content
99 changes: 66 additions & 33 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use relative_path::RelativePathBuf;

use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError},
};
Expand Down Expand Up @@ -69,25 +70,46 @@ impl ManifestFile for manifest::File {
}
}

fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.max)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
fn get_file_bounds(
file: &manifest::File,
partition_column: String,
) -> (DateTime<Utc>, DateTime<Utc>) {
if partition_column == DEFAULT_TIMESTAMP_KEY {
match file
.columns()
.iter()
.find(|col| col.name == partition_column)
.unwrap()
.stats
.as_ref()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.max)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
} else {
match file
.columns()
.iter()
.find(|col| col.name == partition_column)
.unwrap()
.stats
.as_ref()
.unwrap()
{
column::TypedStatistics::String(stats) => (
stats.min.parse::<DateTime<Utc>>().unwrap(),
stats.max.parse::<DateTime<Utc>>().unwrap(),
),
_ => unreachable!(),
}
}
}

Expand All @@ -97,10 +119,19 @@ pub async fn update_snapshot(
change: manifest::File,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;

let (lower_bound, _) = get_file_bounds(&change);
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
let time_partition = meta.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(&change, time_partition);
lower_bound
}
None => {
let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});
Expand All @@ -109,16 +140,18 @@ pub async fn update_snapshot(
// This updates an existing file so there is no need to create a snapshot entry.
if let Some(pos) = pos {
let info = &mut manifests[pos];
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
let Some(mut manifest) = storage.get_manifest(&path).await? else {
let manifest_path =
partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);

let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
storage.put_manifest(&manifest_path, manifest).await?;
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
Expand Down Expand Up @@ -148,7 +181,7 @@ pub async fn update_snapshot(
time_upper_bound: upper_bound,
};
manifests.push(new_snapshot_entriy);
storage.put_snapshot(stream_name, meta).await?;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

Ok(())
Expand All @@ -160,13 +193,13 @@ pub async fn remove_manifest_from_snapshot(
dates: Vec<String>,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;

// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));

storage.put_snapshot(stream_name, meta).await?;
storage.put_snapshot(stream_name, meta.snapshot).await?;
Ok(())
}

Expand All @@ -175,8 +208,8 @@ pub async fn get_first_event(
stream_name: &str,
) -> Result<Option<String>, ObjectStorageError> {
// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;

if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
Expand All @@ -199,7 +232,7 @@ pub async fn get_first_event(
};

if let Some(first_event) = manifest.files.first() {
let (lower_bound, _) = get_file_bounds(first_event);
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
}
Expand Down
3 changes: 1 addition & 2 deletions server/src/catalog/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ pub fn create_from_parquet_file(
let columns = column_statistics(row_groups);
manifest_file.columns = columns.into_values().collect();
let mut sort_orders = sort_order(row_groups);

if let Some(last_sort_order) = sort_orders.pop() {
if sort_orders
.into_iter()
Expand Down Expand Up @@ -155,7 +154,7 @@ fn sort_order(
})
.collect_vec();

sort_orders.push(sort_order)
sort_orders.push(sort_order);
}
sort_orders
}
Expand Down
15 changes: 11 additions & 4 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use itertools::Itertools;

use std::sync::Arc;

use crate::metadata;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::metadata;
use chrono::NaiveDateTime;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
Expand All @@ -42,6 +42,7 @@ pub struct Event {
pub origin_format: &'static str,
pub origin_size: u64,
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
}

// Events holds the schema related to a each event for a single log stream
Expand All @@ -54,7 +55,12 @@ impl Event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Self::process_event(&self.stream_name, &key, self.rb.clone())?;
Self::process_event(
&self.stream_name,
&key,
self.rb.clone(),
self.parsed_timestamp,
)?;

metadata::STREAM_INFO.update_stats(
&self.stream_name,
Expand All @@ -81,8 +87,9 @@ impl Event {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?;
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub trait EventFormat: Sized {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
};

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
if get_field(&schema, DEFAULT_METADATA_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_METADATA_KEY
));
};

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
Expand Down
29 changes: 18 additions & 11 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::utils;
use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
use arrow_array::{RecordBatch, TimestampMillisecondArray};
use arrow_schema::Schema;
use chrono::Utc;
use chrono::{NaiveDateTime, Utc};
use derive_more::{Deref, DerefMut};
use once_cell::sync::Lazy;

Expand All @@ -48,6 +48,7 @@ impl Writer {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand All @@ -56,7 +57,8 @@ impl Writer {
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

self.disk.push(stream_name, schema_key, &rb)?;
self.disk
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
self.mem.push(schema_key, rb);
Ok(())
}
Expand All @@ -72,29 +74,34 @@ impl WriterTable {
stream_name: &str,
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let hashmap_guard = self.read().unwrap();

match hashmap_guard.get(stream_name) {
Some(stream_writer) => {
stream_writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
}
None => {
drop(hashmap_guard);
let mut map = self.write().unwrap();
// check for race condition
// if map contains entry then just
if let Some(writer) = map.get(stream_name) {
writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
} else {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record)?;
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
Expand Down
43 changes: 16 additions & 27 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*
*/

use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use chrono::NaiveDateTime;
use derive_more::{Deref, DerefMut};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;

use crate::storage::staging::StorageDir;

Expand All @@ -44,27 +44,17 @@ impl FileWriter {
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
writer
.writer
.write(record)
.map_err(StreamWriterError::Writer)?;
}
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
file_path: path,
writer,
},
);
}
};
let (path, writer) =
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
file_path: path,
writer,
},
);

Ok(())
}
Expand All @@ -80,10 +70,10 @@ fn init_new_stream_writer_file(
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key);

let path = dir.path_by_current_time(schema_key, parsed_timestamp);
std::fs::create_dir_all(dir.data_path)?;

let file = OpenOptions::new().create(true).append(true).open(&path)?;
Expand All @@ -94,6 +84,5 @@ fn init_new_stream_writer_file(
stream_writer
.write(record)
.map_err(StreamWriterError::Writer)?;

Ok((path, stream_writer))
}
Loading