Skip to content

Commit

Permalink
refactor: otlp logs insertion (GreptimeTeam#5479)
Browse files Browse the repository at this point in the history
* chore: add test for selector overlapping

* refactor: simplify otlp logs insertion

* fix: use layered extracted value array

* fix: wrong len

* chore: minor renaming and update

* chore: rename

* fix: clippy

* fix: typos

* chore: update test

* chore: address CR comment & update meter-deps version
  • Loading branch information
shuiyisong authored Feb 7, 2025
1 parent 79acc99 commit 88c3d33
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 213 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a3
lazy_static = "1.4"
local-ip-address = "0.6"
loki-api = { git = "https://github.com/shuiyisong/tracing-loki", branch = "chore/prost_version" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.11.4"
moka = "0.12"
nalgebra = "0.33"
Expand Down Expand Up @@ -283,7 +283,7 @@ pprof = { git = "https://github.com/GreptimeTeam/pprof-rs", rev = "1bd1e21" }

[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "a10facb353b41460eeb98578868ebf19c2084fac"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"

[profile.release]
debug = 1
Expand Down
1 change: 1 addition & 0 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
}

/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
/// The key is used to uplift value from the attributes and serve as column name in the table
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,
Expand Down
11 changes: 10 additions & 1 deletion src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub mod coerce;
use std::collections::HashSet;
use std::sync::Arc;

use ahash::HashMap;
use ahash::{HashMap, HashMapExt};
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
Expand Down Expand Up @@ -245,6 +245,15 @@ pub struct SchemaInfo {
pub index: HashMap<String, usize>,
}

impl SchemaInfo {
pub fn with_capacity(capacity: usize) -> Self {
Self {
schema: Vec::with_capacity(capacity),
index: HashMap::with_capacity(capacity),
}
}
}

fn resolve_schema(
index: Option<usize>,
value_data: ValueData,
Expand Down
6 changes: 1 addition & 5 deletions src/servers/src/http/loki.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,7 @@ pub async fn loki_ingest(

// fill Null for missing values
for row in rows.iter_mut() {
if row.len() < schemas.len() {
for _ in row.len()..schemas.len() {
row.push(GreptimeValue { value_data: None });
}
}
row.resize(schemas.len(), GreptimeValue::default());
}

let rows = Rows {
Expand Down
3 changes: 2 additions & 1 deletion src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use snafu::prelude::*;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, PipelineSnafu, Result};
use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName};
use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

Expand Down Expand Up @@ -112,7 +113,7 @@ pub async fn logs(
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED
let _timer = METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
Expand Down
Loading

0 comments on commit 88c3d33

Please sign in to comment.