Skip to content

Commit

Permalink
fix: multiple otlp loggers conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc committed Dec 18, 2023
1 parent 45ca985 commit 58ab1c1
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 46 deletions.
8 changes: 4 additions & 4 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use minitrace::prelude::*;
use serde_json::Map;

use crate::loggers::new_file_log_writer;
use crate::loggers::new_otlp_log_writer;
use crate::loggers::MinitraceLogger;
use crate::loggers::OpenTelemetryLogger;
use crate::Config;

const HEADER_TRACE_PARENT: &str = "traceparent";
Expand Down Expand Up @@ -176,7 +176,7 @@ pub fn init_logging(
let mut labels = labels.clone();
labels.insert("category".to_string(), "system".to_string());
labels.extend(cfg.otlp.labels.clone());
let logger = new_otlp_log_writer(&cfg.otlp.endpoint, labels);
let logger = OpenTelemetryLogger::new(log_name, &cfg.otlp.endpoint, labels);
let dispatch = fern::Dispatch::new()
.level(cfg.otlp.level.parse().unwrap_or(LevelFilter::Info))
.format(formatter("json"))
Expand Down Expand Up @@ -211,7 +211,7 @@ pub fn init_logging(
let mut labels = labels.clone();
labels.insert("category".to_string(), "query".to_string());
labels.extend(cfg.query.labels.clone());
let logger = new_otlp_log_writer(&cfg.query.otlp_endpoint, labels);
let logger = OpenTelemetryLogger::new(log_name, &cfg.query.otlp_endpoint, labels);
query_logger = query_logger.chain(Box::new(logger) as Box<dyn Log>);
}
}
Expand All @@ -229,7 +229,7 @@ pub fn init_logging(
let mut labels = labels.clone();
labels.insert("category".to_string(), "profile".to_string());
labels.extend(cfg.profile.labels.clone());
let logger = new_otlp_log_writer(&cfg.profile.otlp_endpoint, labels);
let logger = OpenTelemetryLogger::new(log_name, &cfg.profile.otlp_endpoint, labels);
profile_logger = profile_logger.chain(Box::new(logger) as Box<dyn Log>);
}
}
Expand Down
86 changes: 44 additions & 42 deletions src/common/tracing/src/loggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::time::Duration;
use std::time::SystemTime;

use opentelemetry::logs::AnyValue;
use opentelemetry::logs::Logger as _;
use opentelemetry::logs::Logger;
use opentelemetry::logs::LoggerProvider;
use opentelemetry::logs::Severity;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::Logger;
use tracing_appender::non_blocking::NonBlocking;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::RollingFileAppender;
Expand Down Expand Up @@ -96,39 +96,48 @@ impl log::Log for MinitraceLogger {
fn flush(&self) {}
}

pub(crate) struct OpenTelemetryOTLPLogWriter {
logger: Logger,
pub(crate) struct OpenTelemetryLogger {
logger: opentelemetry_sdk::logs::Logger,
// keep provider alive
provider: opentelemetry_sdk::logs::LoggerProvider,
}

pub(crate) fn new_otlp_log_writer(
endpoint: &str,
labels: BTreeMap<String, String>,
) -> OpenTelemetryOTLPLogWriter {
let kvs = labels
.into_iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k, v))
.collect::<Vec<_>>();
let log_config = opentelemetry_sdk::logs::Config {
resource: Cow::Owned(opentelemetry::sdk::Resource::new(kvs)),
};
let export_config = opentelemetry_otlp::ExportConfig {
endpoint: endpoint.to_string(),
protocol: opentelemetry_otlp::Protocol::Grpc,
timeout: Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT),
};
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(export_config);
let logger = opentelemetry_otlp::new_pipeline()
.logging()
.with_exporter(exporter)
.with_log_config(log_config)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("install query log otlp pipeline");
OpenTelemetryOTLPLogWriter { logger }
impl OpenTelemetryLogger {
pub(crate) fn new(
name: impl ToString,
endpoint: &str,
labels: BTreeMap<String, String>,
) -> Self {
let kvs = labels
.into_iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k, v))
.collect::<Vec<_>>();
let export_config = opentelemetry_otlp::ExportConfig {
endpoint: endpoint.to_string(),
protocol: opentelemetry_otlp::Protocol::Grpc,
timeout: Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT),
};
let exporter_builder: opentelemetry_otlp::LogExporterBuilder =
opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(export_config)
.into();
let exporter = exporter_builder
.build_log_exporter()
.expect("build log exporter");
let provider = opentelemetry_sdk::logs::LoggerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.with_config(
opentelemetry_sdk::logs::Config::default()
.with_resource(opentelemetry_sdk::Resource::new(kvs)),
)
.build();
let logger = provider.versioned_logger(name.to_string(), None, None, None);
Self { logger, provider }
}
}

impl log::Log for OpenTelemetryOTLPLogWriter {
impl log::Log for OpenTelemetryLogger {
fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
// we handle level and target filter with fern
true
Expand All @@ -144,17 +153,10 @@ impl log::Log for OpenTelemetryOTLPLogWriter {
}

fn flush(&self) {
match self.logger.provider() {
Some(provider) => {
let result = provider.force_flush();
for r in result {
if let Err(e) = r {
eprintln!("flush log failed: {}", e);
}
}
}
None => {
eprintln!("flush log failed: logger provider is None");
let result = self.provider.force_flush();
for r in result {
if let Err(e) = r {
eprintln!("flush log failed: {}", e);
}
}
}
Expand Down

0 comments on commit 58ab1c1

Please sign in to comment.