Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,25 @@ type EventSchema = Vec<Arc<Field>>;
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub enum LogSource {
// AWS Kinesis sends logs in the format of a json array
#[serde(rename = "kinesis")]
Kinesis,
// OpenTelemetry sends logs according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1
#[serde(rename = "otel-logs")]
OtelLogs,
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto
#[serde(rename = "otel-traces")]
OtelMetrics,
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
#[serde(rename = "otel-metrics")]
OtelTraces,
// Internal Stream format
#[serde(rename = "pmeta")]
Pmeta,
#[default]
#[serde(rename = "json")]
// Json object or array
Json,
Custom(String),
Expand Down
22 changes: 21 additions & 1 deletion src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ async fn migrate_stream_metadata(
stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value =
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);

storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
Expand All @@ -259,6 +262,9 @@ async fn migrate_stream_metadata(
stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value =
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);

storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
Expand All @@ -272,24 +278,38 @@ async fn migrate_stream_metadata(
stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value =
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);

storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
}
Some("v4") => {
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value =
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);

storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
}
Some("v5") => {
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value =
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);
storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
}
_ => {
stream_metadata_value =
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);
storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
}
_ => (),
}

Ok(stream_metadata_value)
Expand Down
37 changes: 36 additions & 1 deletion src/migration/stream_metadata_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use serde_json::{json, Value};

Expand Down Expand Up @@ -201,6 +201,33 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value {
stream_metadata
}

pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value {
let mut format_mapping = HashMap::new();
format_mapping.insert("Kinesis", "kinesis");
format_mapping.insert("OtelLogs", "otel-logs");
format_mapping.insert("OtelTraces", "otel-traces");
format_mapping.insert("OtelMetrics", "otel-metrics");
format_mapping.insert("Pmeta", "pmeta");
format_mapping.insert("Json", "json");

// Transform log_source_format in each log_source entry if it exists
if let Some(log_sources) = stream_metadata
.get_mut("log_source")
.and_then(|v| v.as_array_mut())
{
for source in log_sources.iter_mut() {
if let Some(format_value) = source.get_mut("log_source_format") {
if let Some(format_str) = format_value.as_str() {
if let Some(new_format) = format_mapping.get(format_str) {
*format_value = json!(new_format);
}
}
}
}
}
stream_metadata
}

fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value {
let manifest_list = snapshot.get("manifest_list").unwrap();
let mut new_manifest_list = Vec::new();
Expand Down Expand Up @@ -268,4 +295,12 @@ mod tests {
let updated_stream_metadata = super::v5_v6(stream_metadata.clone());
assert_eq!(updated_stream_metadata, expected);
}

#[test]
fn test_rename_log_source_v6() {
let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v1","objectstore-format":"v6","created-at":"2025-03-25T02:37:00.664625075+00:00","first-event-at":"2025-03-24T22:37:00.665-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":94,"ingestion":146530,"storage":29248},"current_stats":{"events":94,"ingestion":146530,"storage":29248},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test11/date=2025-03-25/manifest.json","time_lower_bound":"2025-03-25T00:00:00Z","time_upper_bound":"2025-03-25T23:59:59.999999999Z","events_ingested":94,"ingestion_size":146530,"storage_size":29248}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"OtelLogs","fields":["span_id","trace_id","time_unix_nano","severity_text","severity_number","body"]},{"log_source_format":"OtelTraces","fields":["span_status_code","flags","span_parent_span_id","span_trace_id","span_status_message","event_name","span_span_id","span_name","span_kind_description","event_time_unix_nano","span_end_time_unix_nano","span_status_description","span_start_time_unix_nano","span_kind","name"]},{"log_source_format":"OtelMetrics","fields":["metric_unit","start_time_unix_nano","time_unix_nano","metric_name","metric_description"]}]});
let expected = serde_json::json!({"version":"v6","schema_version":"v1","objectstore-format":"v6","created-at":"2025-03-25T02:37:00.664625075+00:00","first-event-at":"2025-03-24T22:37:00.665-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":94,"ingestion":146530,"storage":29248},"current_stats":{"events":94,"ingestion":146530,"storage":29248},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test11/date=2025-03-25/manifest.json","time_lower_bound":"2025-03-25T00:00:00Z","time_upper_bound":"2025-03-25T23:59:59.999999999Z","events_ingested":94,"ingestion_size":146530,"storage_size":29248}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":["span_id","trace_id","time_unix_nano","severity_text","severity_number","body"]},{"log_source_format":"otel-traces","fields":["span_status_code","flags","span_parent_span_id","span_trace_id","span_status_message","event_name","span_span_id","span_name","span_kind_description","event_time_unix_nano","span_end_time_unix_nano","span_status_description","span_start_time_unix_nano","span_kind","name"]},{"log_source_format":"otel-metrics","fields":["metric_unit","start_time_unix_nano","time_unix_nano","metric_name","metric_description"]}]});
let updated_stream_metadata = super::rename_log_source_v6(stream_metadata.clone());
assert_eq!(updated_stream_metadata, expected);
}
}
Loading