Skip to content
2 changes: 1 addition & 1 deletion rust/frontend/src/executor/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl ClientManager {
Ok(_) => {}
Err(e) => {
// There is no one to return the error to, so just log it
tracing::info!(
tracing::debug!(
"Failed to remove ip from client manager: {:?}",
e.to_string()
);
Expand Down
13 changes: 2 additions & 11 deletions rust/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ mod types;
use chroma_config::{registry::Registry, Configurable};
use chroma_error::ChromaError;
use chroma_system::System;
use chroma_tracing::{
init_global_filter_layer, init_otel_layer, init_panic_tracing_hook, init_stdout_layer,
init_tracing,
};
use chroma_tracing::init_otel_tracing;
use config::FrontendServerConfig;
use get_collection_with_segments_provider::*;
use mdac::{Pattern, Rule};
Expand Down Expand Up @@ -64,13 +61,7 @@ pub async fn frontend_service_entrypoint_with_config_system_registry(
config: &FrontendServerConfig,
) {
if let Some(config) = &config.open_telemetry {
let tracing_layers = vec![
init_global_filter_layer(),
init_otel_layer(&config.service_name, &config.endpoint),
init_stdout_layer(),
];
init_tracing(tracing_layers);
init_panic_tracing_hook();
init_otel_tracing(&config.service_name, &config.endpoint);
} else {
eprintln!("OpenTelemetry is not enabled because it is missing from the config.");
}
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/tower_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl<B> MakeSpan<B> for RequestTracing {
let name = format!("{} {}", request.method(), http_route);

tracing::span!(
tracing::Level::DEBUG,
tracing::Level::INFO,
"HTTP request",
http.method = %request.method(),
http.uri = %request.uri(),
Expand Down
59 changes: 14 additions & 45 deletions rust/tracing/src/init_tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
// load/src/opentelemetry_config.rs file
// Keep them in-sync manually.

use std::borrow::Cow;

use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, InstrumentationScope};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use std::borrow::Cow;
use tracing_subscriber::fmt;
use tracing_subscriber::Registry;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer};

// This is the filter that will be applied to all subsequent layers.
// It filters to modules we author / care about / errors.
pub fn init_global_filter_layer() -> Box<dyn Layer<Registry> + Send + Sync> {
EnvFilter::new(std::env::var("RUST_LOG").unwrap_or_else(|_| {
"error,opentelemetry_sdk=info,".to_string()
Expand All @@ -32,21 +33,22 @@ pub fn init_global_filter_layer() -> Box<dyn Layer<Registry> + Send + Sync> {
"compaction_service",
"distance_metrics",
"full_text",
"hosted-frontend",
"hosted_frontend",
"metadata_filtering",
"query_service",
"wal3",
"worker",
"garbage_collector",
]
.into_iter()
.map(|s| s.to_string() + "=trace")
.map(|s| s.to_string() + "=info")
.collect::<Vec<String>>()
.join(",")
}))
.boxed()
}

// This is the layer that will be used to send traces to the OpenTelemetry
pub fn init_otel_layer(
service_name: &String,
otel_endpoint: &String,
Expand Down Expand Up @@ -107,53 +109,20 @@ pub fn init_otel_layer(
.with_resource(resource.clone())
.build();
global::set_meter_provider(meter_provider);
// Layer for adding our configured tracer.
// Export everything at this layer. The backend i.e. honeycomb or jaeger will filter at its end.
tracing_opentelemetry::OpenTelemetryLayer::new(tracer)
.with_filter(tracing_subscriber::filter::LevelFilter::TRACE)
.boxed()
tracing_opentelemetry::OpenTelemetryLayer::new(tracer).boxed()
}

pub fn init_stdout_layer() -> Box<dyn Layer<Registry> + Send + Sync> {
fmt::layer()
.pretty()
.with_target(false)
.with_filter(tracing_subscriber::filter::FilterFn::new(|metadata| {
// NOTE(rescrv): This is a hack, too. Not an uppercase hack, just a hack. This
// one's localized to the cache module. There's not much to do to unify it with
// the otel filter because these are different output layers from the tracing.

// This filter ensures that we don't cache calls for get/insert on stdout, but will
// still see the clear call.
!(metadata
.module_path()
.unwrap_or("")
.starts_with("chroma_cache")
&& metadata.name() != "clear")
}))
.with_filter(tracing_subscriber::filter::FilterFn::new(|metadata| {
metadata.module_path().unwrap_or("").starts_with("chroma")
|| metadata.module_path().unwrap_or("").starts_with("wal3")
|| metadata.module_path().unwrap_or("").starts_with("worker")
|| metadata
.module_path()
.unwrap_or("")
.starts_with("garbage_collector")
|| metadata
.module_path()
.unwrap_or("")
.starts_with("opentelemetry_sdk")
|| metadata
.module_path()
.unwrap_or("")
.starts_with("hosted-frontend")
}))
.with_filter(tracing_subscriber::filter::LevelFilter::INFO)
.boxed()
fmt::layer().pretty().with_target(false).boxed()
}

pub fn init_tracing(layers: Vec<Box<dyn Layer<Registry> + Send + Sync>>) {
global::set_text_map_propagator(TraceContextPropagator::new());
// and_then is used to chain layers together
let layers = layers
.into_iter()
.reduce(|a, b| Box::new(a.and_then(b)))
.expect("Should be able to create tracing layers");
let subscriber = tracing_subscriber::registry().with(layers);
tracing::subscriber::set_global_default(subscriber)
.expect("Should be able to set global tracing subscriber");
Expand Down Expand Up @@ -184,7 +153,7 @@ pub fn init_panic_tracing_hook() {

pub fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {
let layers = vec![
init_global_filter_layer(),
// init_global_filter_layer(),
init_otel_layer(service_name, otel_endpoint),
init_stdout_layer(),
];
Expand Down
12 changes: 6 additions & 6 deletions rust/worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use chroma_types::{
use futures::{stream, StreamExt, TryStreamExt};
use tokio::signal::unix::{signal, SignalKind};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{trace_span, Instrument};
use tracing::{info_span, Instrument};

use crate::{
config::QueryServiceConfig,
Expand Down Expand Up @@ -376,7 +376,7 @@ impl QueryExecutor for WorkerServer {
async fn count(&self, count: Request<CountPlan>) -> Result<Response<CountResult>, Status> {
// Note: We cannot write a middleware that instruments every service rpc
// with a span because of https://github.com/hyperium/tonic/pull/1202.
let count_span = trace_span!("CountPlan",);
let count_span = info_span!("CountPlan",);
let instrumented_span = wrap_span_with_parent_context(count_span, count.metadata());
self.orchestrate_count(count)
.instrument(instrumented_span)
Expand All @@ -386,7 +386,7 @@ impl QueryExecutor for WorkerServer {
async fn get(&self, get: Request<GetPlan>) -> Result<Response<GetResult>, Status> {
// Note: We cannot write a middleware that instruments every service rpc
// with a span because of https://github.com/hyperium/tonic/pull/1202.
let get_span = trace_span!("GetPlan",);
let get_span = info_span!("GetPlan",);
let instrumented_span = wrap_span_with_parent_context(get_span, get.metadata());
self.orchestrate_get(get)
.instrument(instrumented_span)
Expand All @@ -396,7 +396,7 @@ impl QueryExecutor for WorkerServer {
async fn knn(&self, knn: Request<KnnPlan>) -> Result<Response<KnnBatchResult>, Status> {
// Note: We cannot write a middleware that instruments every service rpc
// with a span because of https://github.com/hyperium/tonic/pull/1202.
let knn_span = trace_span!("KnnPlan",);
let knn_span = info_span!("KnnPlan",);
let instrumented_span = wrap_span_with_parent_context(knn_span, knn.metadata());
self.orchestrate_knn(knn)
.instrument(instrumented_span)
Expand All @@ -413,7 +413,7 @@ impl chroma_types::chroma_proto::debug_server::Debug for WorkerServer {
) -> Result<Response<chroma_types::chroma_proto::GetInfoResponse>, Status> {
// Note: We cannot write a middleware that instruments every service rpc
// with a span because of https://github.com/hyperium/tonic/pull/1202.
let request_span = trace_span!("Get info");
let request_span = info_span!("Get info");

wrap_span_with_parent_context(request_span, request.metadata()).in_scope(|| {
let response = chroma_types::chroma_proto::GetInfoResponse {
Expand All @@ -428,7 +428,7 @@ impl chroma_types::chroma_proto::debug_server::Debug for WorkerServer {
async fn trigger_panic(&self, request: Request<()>) -> Result<Response<()>, Status> {
// Note: We cannot write a middleware that instruments every service rpc
// with a span because of https://github.com/hyperium/tonic/pull/1202.
let request_span = trace_span!("Trigger panic");
let request_span = info_span!("Trigger panic");

wrap_span_with_parent_context(request_span, request.metadata()).in_scope(|| {
panic!("Intentional panic triggered");
Expand Down
Loading