Skip to content

fix: multiple otlp loggers conflict #14050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use minitrace::prelude::*;
use serde_json::Map;

use crate::loggers::new_file_log_writer;
use crate::loggers::new_otlp_log_writer;
use crate::loggers::MinitraceLogger;
use crate::loggers::OpenTelemetryLogger;
use crate::Config;

const HEADER_TRACE_PARENT: &str = "traceparent";
Expand Down Expand Up @@ -176,7 +176,7 @@ pub fn init_logging(
let mut labels = labels.clone();
labels.insert("category".to_string(), "system".to_string());
labels.extend(cfg.otlp.labels.clone());
let logger = new_otlp_log_writer(&cfg.otlp.endpoint, labels);
let logger = OpenTelemetryLogger::new(log_name, &cfg.otlp.endpoint, labels);
let dispatch = fern::Dispatch::new()
.level(cfg.otlp.level.parse().unwrap_or(LevelFilter::Info))
.format(formatter("json"))
Expand Down Expand Up @@ -211,7 +211,7 @@ pub fn init_logging(
let mut labels = labels.clone();
labels.insert("category".to_string(), "query".to_string());
labels.extend(cfg.query.labels.clone());
let logger = new_otlp_log_writer(&cfg.query.otlp_endpoint, labels);
let logger = OpenTelemetryLogger::new(log_name, &cfg.query.otlp_endpoint, labels);
query_logger = query_logger.chain(Box::new(logger) as Box<dyn Log>);
}
}
Expand All @@ -229,7 +229,7 @@ pub fn init_logging(
let mut labels = labels.clone();
labels.insert("category".to_string(), "profile".to_string());
labels.extend(cfg.profile.labels.clone());
let logger = new_otlp_log_writer(&cfg.profile.otlp_endpoint, labels);
let logger = OpenTelemetryLogger::new(log_name, &cfg.profile.otlp_endpoint, labels);
profile_logger = profile_logger.chain(Box::new(logger) as Box<dyn Log>);
}
}
Expand Down
86 changes: 44 additions & 42 deletions src/common/tracing/src/loggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::time::Duration;
use std::time::SystemTime;

use opentelemetry::logs::AnyValue;
use opentelemetry::logs::Logger as _;
use opentelemetry::logs::Logger;
use opentelemetry::logs::LoggerProvider;
use opentelemetry::logs::Severity;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::Logger;
use tracing_appender::non_blocking::NonBlocking;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::RollingFileAppender;
Expand Down Expand Up @@ -96,39 +96,48 @@ impl log::Log for MinitraceLogger {
fn flush(&self) {}
}

pub(crate) struct OpenTelemetryOTLPLogWriter {
logger: Logger,
pub(crate) struct OpenTelemetryLogger {
logger: opentelemetry_sdk::logs::Logger,
// keep provider alive
provider: opentelemetry_sdk::logs::LoggerProvider,
}

pub(crate) fn new_otlp_log_writer(
endpoint: &str,
labels: BTreeMap<String, String>,
) -> OpenTelemetryOTLPLogWriter {
let kvs = labels
.into_iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k, v))
.collect::<Vec<_>>();
let log_config = opentelemetry_sdk::logs::Config {
resource: Cow::Owned(opentelemetry::sdk::Resource::new(kvs)),
};
let export_config = opentelemetry_otlp::ExportConfig {
endpoint: endpoint.to_string(),
protocol: opentelemetry_otlp::Protocol::Grpc,
timeout: Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT),
};
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(export_config);
let logger = opentelemetry_otlp::new_pipeline()
.logging()
.with_exporter(exporter)
.with_log_config(log_config)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("install query log otlp pipeline");
OpenTelemetryOTLPLogWriter { logger }
impl OpenTelemetryLogger {
pub(crate) fn new(
name: impl ToString,
endpoint: &str,
labels: BTreeMap<String, String>,
) -> Self {
let kvs = labels
.into_iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k, v))
.collect::<Vec<_>>();
let export_config = opentelemetry_otlp::ExportConfig {
endpoint: endpoint.to_string(),
protocol: opentelemetry_otlp::Protocol::Grpc,
timeout: Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT),
};
let exporter_builder: opentelemetry_otlp::LogExporterBuilder =
opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(export_config)
.into();
let exporter = exporter_builder
.build_log_exporter()
.expect("build log exporter");
let provider = opentelemetry_sdk::logs::LoggerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.with_config(
opentelemetry_sdk::logs::Config::default()
.with_resource(opentelemetry_sdk::Resource::new(kvs)),
)
.build();
let logger = provider.versioned_logger(name.to_string(), None, None, None);
Self { logger, provider }
}
}

impl log::Log for OpenTelemetryOTLPLogWriter {
impl log::Log for OpenTelemetryLogger {
fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
// we handle level and target filter with fern
true
Expand All @@ -144,17 +153,10 @@ impl log::Log for OpenTelemetryOTLPLogWriter {
}

fn flush(&self) {
match self.logger.provider() {
Some(provider) => {
let result = provider.force_flush();
for r in result {
if let Err(e) = r {
eprintln!("flush log failed: {}", e);
}
}
}
None => {
eprintln!("flush log failed: logger provider is None");
let result = self.provider.force_flush();
for r in result {
if let Err(e) = r {
eprintln!("flush log failed: {}", e);
}
}
}
Expand Down