Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ mod writer;
use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use itertools::Itertools;

use std::sync::Arc;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::metadata;
use chrono::NaiveDateTime;
use std::collections::HashMap;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
Expand All @@ -44,6 +44,7 @@ pub struct Event {
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
pub time_partition: Option<String>,
pub custom_partition_values: HashMap<String, String>,
}

// Events holds the schema related to a each event for a single log stream
Expand All @@ -55,6 +56,14 @@ impl Event {
key = format!("{key}{parsed_timestamp_to_min}");
}

if !self.custom_partition_values.is_empty() {
let mut custom_partition_key = String::default();
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
custom_partition_key = format!("{custom_partition_key}&{k}={v}");
}
key = format!("{key}{custom_partition_key}");
}

let num_rows = self.rb.num_rows() as u64;
if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
Expand All @@ -65,6 +74,7 @@ impl Event {
&key,
self.rb.clone(),
self.parsed_timestamp,
self.custom_partition_values,
)?;

metadata::STREAM_INFO.update_stats(
Expand Down Expand Up @@ -93,8 +103,15 @@ impl Event {
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
STREAM_WRITERS.append_to_local(
stream_name,
schema_key,
rb,
parsed_timestamp,
custom_partition_values,
)?;
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl EventFormat for Event {
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, None, false)?;
let data = flatten_json_body(self.data, None, None, None, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand Down
21 changes: 18 additions & 3 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl Writer {
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand All @@ -57,8 +58,13 @@ impl Writer {
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

self.disk
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
self.disk.push(
stream_name,
schema_key,
&rb,
parsed_timestamp,
custom_partition_values,
)?;
self.mem.push(schema_key, rb);
Ok(())
}
Expand All @@ -75,6 +81,7 @@ impl WriterTable {
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(), StreamWriterError> {
let hashmap_guard = self.read().unwrap();

Expand All @@ -85,6 +92,7 @@ impl WriterTable {
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
}
None => {
Expand All @@ -98,10 +106,17 @@ impl WriterTable {
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
} else {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
writer.push(
stream_name,
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
Expand Down
13 changes: 10 additions & 3 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl FileWriter {
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
Expand All @@ -55,8 +56,13 @@ impl FileWriter {
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) =
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
let (path, writer) = init_new_stream_writer_file(
stream_name,
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
Expand All @@ -82,9 +88,10 @@ fn init_new_stream_writer_file(
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key, parsed_timestamp);
let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values);
std::fs::create_dir_all(dir.data_path)?;

let file = OpenOptions::new().create(true).append(true).open(&path)?;
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';
Expand Down
105 changes: 87 additions & 18 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,36 +106,103 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
let time_partition = object_store_format.time_partition;
let time_partition_limit = object_store_format.time_partition_limit;
let static_schema_flag = object_store_format.static_schema_flag;
let custom_partition = object_store_format.custom_partition;
let body_val: Value = serde_json::from_slice(&body)?;
let size: usize = body.len();
let mut parsed_timestamp = Utc::now().naive_utc();
let mut custom_partition_values: HashMap<String, String> = HashMap::new();

if time_partition.is_none() {
let stream = stream_name.clone();
let (rb, is_first_event) = get_stream_schema(
stream.clone(),
req,
body_val,
static_schema_flag,
time_partition.clone(),
)?;
event::Event {
rb,
stream_name: stream,
origin_format: "json",
origin_size: size as u64,
is_first_event,
parsed_timestamp,
time_partition,
if custom_partition.is_none() {
let stream = stream_name.clone();
let (rb, is_first_event) = get_stream_schema(
stream.clone(),
req,
body_val,
static_schema_flag,
time_partition.clone(),
)?;
event::Event {
rb: rb.clone(),
stream_name: stream,
origin_format: "json",
origin_size: size as u64,
is_first_event,
parsed_timestamp,
time_partition,
custom_partition_values,
}
.process()
.await?;
} else {
let data = convert_array_to_object(
body_val.clone(),
time_partition.clone(),
time_partition_limit,
custom_partition.clone(),
)?;
let custom_partition = custom_partition.unwrap();
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();

for value in data {
for custom_partition_field in custom_partition_list.clone() {
let custom_partition_value =
value.get(custom_partition_field.trim()).unwrap().to_owned();
let custom_partition_value = match custom_partition_value.clone() {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
let (rb, is_first_event) = get_stream_schema(
stream_name.clone(),
req.clone(),
value.clone(),
static_schema_flag.clone(),
time_partition.clone(),
)?;
event::Event {
rb,
stream_name: stream_name.clone(),
origin_format: "json",
origin_size: value.to_string().into_bytes().len() as u64,
is_first_event,
parsed_timestamp,
time_partition: time_partition.clone(),
custom_partition_values: custom_partition_values.clone(),
}
.process()
.await?;
}
}
.process()
.await?;
} else {
let data = convert_array_to_object(
body_val.clone(),
time_partition.clone(),
time_partition_limit,
custom_partition.clone(),
)?;
let custom_partition = custom_partition.unwrap();
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();

for value in data {
for custom_partition_field in custom_partition_list.clone() {
let custom_partition_value =
value.get(custom_partition_field.trim()).unwrap().to_owned();
let custom_partition_value = match custom_partition_value.clone() {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
};
custom_partition_values.insert(
custom_partition_field.trim().to_string(),
custom_partition_value,
);
}
let body_timestamp = value.get(&time_partition.clone().unwrap().to_string());
parsed_timestamp = body_timestamp
.unwrap()
Expand All @@ -161,6 +228,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
is_first_event,
parsed_timestamp,
time_partition: time_partition.clone(),
custom_partition_values: custom_partition_values.clone(),
}
.process()
.await?;
Expand Down Expand Up @@ -216,6 +284,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
"",
"",
"",
"",
Arc::new(Schema::empty()),
)
.await?;
Expand Down
31 changes: 28 additions & 3 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use crate::alerts::Alerts;
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY};
use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
Expand Down Expand Up @@ -224,6 +226,21 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
} else {
""
};
let mut custom_partition: &str = "";
if let Some((_, custom_partition_key)) = req
.headers()
.iter()
.find(|&(key, _)| key == CUSTOM_PARTITION_KEY)
{
custom_partition = custom_partition_key.to_str().unwrap();
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
if custom_partition_list.len() > 3 {
return Err(StreamError::Custom {
msg: "maximum 3 custom partition keys are supported".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
}

let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let mut schema = Arc::new(Schema::empty());
Expand All @@ -240,8 +257,11 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
if !body.is_empty() && static_schema_flag == "true" {
let static_schema: StaticSchema = serde_json::from_slice(&body)?;

let parsed_schema =
convert_static_schema_to_arrow_schema(static_schema.clone(), time_partition);
let parsed_schema = convert_static_schema_to_arrow_schema(
static_schema.clone(),
time_partition,
custom_partition,
);
if let Ok(parsed_schema) = parsed_schema {
schema = parsed_schema;
} else {
Expand All @@ -263,6 +283,7 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
stream_name,
time_partition,
time_partition_in_days,
custom_partition,
static_schema_flag,
schema,
)
Expand Down Expand Up @@ -548,6 +569,7 @@ pub async fn create_stream(
stream_name: String,
time_partition: &str,
time_partition_limit: &str,
custom_partition: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
) -> Result<(), CreateStreamError> {
Expand All @@ -561,6 +583,7 @@ pub async fn create_stream(
&stream_name,
time_partition,
time_partition_limit,
custom_partition,
static_schema_flag,
schema.clone(),
)
Expand Down Expand Up @@ -591,6 +614,7 @@ pub async fn create_stream(
created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema,
);
Expand Down Expand Up @@ -630,6 +654,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
first_event_at: stream_meta.first_event_at.clone(),
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta.time_partition_limit.clone(),
custom_partition: stream_meta.custom_partition.clone(),
cache_enabled: stream_meta.cache_enabled,
static_schema_flag: stream_meta.static_schema_flag.clone(),
};
Expand Down
Loading