Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 71 additions & 60 deletions client/tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,11 @@ mod tests {
}
}

type TestSubscriber = tracing_subscriber::layer::Layered<
ProfilingLayer,
tracing_subscriber::fmt::Subscriber
>;

fn setup_subscriber() -> (TestSubscriber, Arc<Mutex<Vec<SpanDatum>>>, Arc<Mutex<Vec<TraceEvent>>>) {
fn setup_subscriber() -> (
impl tracing::Subscriber + Send + Sync,
Arc<Mutex<Vec<SpanDatum>>>,
Arc<Mutex<Vec<TraceEvent>>>
) {
let spans = Arc::new(Mutex::new(Vec::new()));
let events = Arc::new(Mutex::new(Vec::new()));
let handler = TestTraceHandler {
Expand All @@ -456,7 +455,7 @@ mod tests {
Box::new(handler),
"test_target",
);
let subscriber = tracing_subscriber::fmt().finish().with(layer);
let subscriber = tracing_subscriber::fmt().with_writer(std::io::sink).finish().with(layer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me se if I understand this correctly: instead of having our own TestSubscriber, you create one that writes to what is essentially /dev/null (will always succeed) and access the spans/events through the Arc'd (like before).

That seems like a much better solution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I first thought that this would be one of the problems. In the end this is no problem at all, but should make the output cleaner.

(subscriber, spans, events)
}

Expand Down Expand Up @@ -560,64 +559,76 @@ mod tests {

#[test]
fn test_parent_id_with_threads() {
use std::sync::mpsc;
use std::thread;

let (sub, spans, events) = setup_subscriber();
let _sub_guard = tracing::subscriber::set_global_default(sub);
let span1 = tracing::info_span!(target: "test_target", "test_span1");
let _guard1 = span1.enter();

let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || {
let span2 = tracing::info_span!(target: "test_target", "test_span2");
let _guard2 = span2.enter();
// emit event
tracing::event!(target: "test_target", tracing::Level::INFO, "test_event1");
for msg in rx.recv() {
if msg == false {
break;
use std::{sync::mpsc, thread};

if std::env::var("RUN_TEST_PARENT_ID_WITH_THREADS").is_err() {
let executable = std::env::current_exe().unwrap();
let mut command = std::process::Command::new(executable);

let res = command
.env("RUN_TEST_PARENT_ID_WITH_THREADS", "1")
.args(&["--nocapture", "test_parent_id_with_threads"])
.output()
.unwrap()
.status;
assert!(res.success());
} else {
let (sub, spans, events) = setup_subscriber();
let _sub_guard = tracing::subscriber::set_global_default(sub);
let span1 = tracing::info_span!(target: "test_target", "test_span1");
let _guard1 = span1.enter();

let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || {
let span2 = tracing::info_span!(target: "test_target", "test_span2");
let _guard2 = span2.enter();
// emit event
tracing::event!(target: "test_target", tracing::Level::INFO, "test_event1");
for msg in rx.recv() {
if msg == false {
break;
}
}
}
// gard2 and span2 dropped / exited
});
// gard2 and span2 dropped / exited
});

// wait for Event to be dispatched and stored
while events.lock().is_empty() {
thread::sleep(Duration::from_millis(1));
}
// wait for Event to be dispatched and stored
while events.lock().is_empty() {
thread::sleep(Duration::from_millis(1));
}

// emit new event (will be second item in Vec) while span2 still active in other thread
tracing::event!(target: "test_target", tracing::Level::INFO, "test_event2");
// emit new event (will be second item in Vec) while span2 still active in other thread
tracing::event!(target: "test_target", tracing::Level::INFO, "test_event2");

// stop thread and drop span
let _ = tx.send(false);
let _ = handle.join();
// stop thread and drop span
let _ = tx.send(false);
let _ = handle.join();

// wait for Span to be dispatched and stored
while spans.lock().is_empty() {
thread::sleep(Duration::from_millis(1));
// wait for Span to be dispatched and stored
while spans.lock().is_empty() {
thread::sleep(Duration::from_millis(1));
}
let span2 = spans.lock().remove(0);
let event1 = events.lock().remove(0);
drop(_guard1);
drop(span1);

// emit event with no parent
tracing::event!(target: "test_target", tracing::Level::INFO, "test_event3");

let span1 = spans.lock().remove(0);
let event2 = events.lock().remove(0);

assert_eq!(event1.values.string_values.get("message").unwrap(), "test_event1");
assert_eq!(event2.values.string_values.get("message").unwrap(), "test_event2");
assert!(span1.parent_id.is_none());
assert!(span2.parent_id.is_none());
assert_eq!(span2.id, event1.parent_id.unwrap());
assert_eq!(span1.id, event2.parent_id.unwrap());
assert_ne!(span2.id, span1.id);

let event3 = events.lock().remove(0);
assert!(event3.parent_id.is_none());
}
let span2 = spans.lock().remove(0);
let event1 = events.lock().remove(0);
drop(_guard1);
drop(span1);

// emit event with no parent
tracing::event!(target: "test_target", tracing::Level::INFO, "test_event3");

let span1 = spans.lock().remove(0);
let event2 = events.lock().remove(0);

assert_eq!(event1.values.string_values.get("message").unwrap(), "test_event1");
assert_eq!(event2.values.string_values.get("message").unwrap(), "test_event2");
assert!(span1.parent_id.is_none());
assert!(span2.parent_id.is_none());
assert_eq!(span2.id, event1.parent_id.unwrap());
assert_eq!(span1.id, event2.parent_id.unwrap());
assert_ne!(span2.id, span1.id);

let event3 = events.lock().remove(0);
assert!(event3.parent_id.is_none());
}
}
144 changes: 75 additions & 69 deletions client/tracing/src/logging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tracing_subscriber::{
format, FormatEvent, FormatFields, Formatter, Layer as FmtLayer, MakeWriter,
SubscriberBuilder,
},
layer::{self, SubscriberExt},
layer::{self, SubscriberExt}, filter::LevelFilter,
registry::LookupSpan,
EnvFilter, FmtSubscriber, Layer, Registry,
};
Expand Down Expand Up @@ -73,7 +73,7 @@ macro_rules! enable_log_reloading {
/// Common implementation to get the subscriber.
fn prepare_subscriber<N, E, F, W>(
directives: &str,
max_level: Option<log::LevelFilter>,
profiling_targets: Option<&str>,
force_colors: Option<bool>,
telemetry_buffer_size: Option<usize>,
telemetry_external_transport: Option<ExtTransport>,
Expand Down Expand Up @@ -125,21 +125,27 @@ where
}

if directives != "" {
// We're not sure if log or tracing is available at this moment, so silently ignore the
// parse error.
env_filter = parse_user_directives(env_filter, directives)?;
}

if let Some(profiling_targets) = profiling_targets {
Copy link
Contributor

@andresilva andresilva Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I am having a déjà vu. Didn't this code use to be pretty much like this? (including the forced sc_tracing=trace directive).

env_filter = parse_user_directives(env_filter, profiling_targets)?;
env_filter = env_filter
.add_directive(
parse_default_directive("sc_tracing=trace").expect("provided directive is valid")
);
}

let max_level_hint = Layer::<FmtSubscriber>::max_level_hint(&env_filter);

let max_level = max_level.unwrap_or_else(|| match max_level_hint {
Some(tracing_subscriber::filter::LevelFilter::INFO) | None => log::LevelFilter::Info,
Some(tracing_subscriber::filter::LevelFilter::TRACE) => log::LevelFilter::Trace,
Some(tracing_subscriber::filter::LevelFilter::WARN) => log::LevelFilter::Warn,
Some(tracing_subscriber::filter::LevelFilter::ERROR) => log::LevelFilter::Error,
Some(tracing_subscriber::filter::LevelFilter::DEBUG) => log::LevelFilter::Debug,
Some(tracing_subscriber::filter::LevelFilter::OFF) => log::LevelFilter::Off,
});
let max_level = match max_level_hint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use tracing_subscriber::filter::LevelFilter and clean up some line noise here?

Some(LevelFilter::INFO) | None => log::LevelFilter::Info,
Some(LevelFilter::TRACE) => log::LevelFilter::Trace,
Some(LevelFilter::WARN) => log::LevelFilter::Warn,
Some(LevelFilter::ERROR) => log::LevelFilter::Error,
Some(LevelFilter::DEBUG) => log::LevelFilter::Debug,
Some(LevelFilter::OFF) => log::LevelFilter::Off,
};

tracing_log::LogTracer::builder()
.with_max_level(max_level)
Expand Down Expand Up @@ -251,13 +257,10 @@ impl LoggerBuilder {
/// This sets various global logging and tracing instances and thus may only be called once.
pub fn init(self) -> Result<TelemetryWorker> {
if let Some((tracing_receiver, profiling_targets)) = self.profiling {
// If profiling is activated, we require `trace` logging.
let max_level = Some(log::LevelFilter::Trace);

if self.log_reloading {
let (subscriber, telemetry_worker) = prepare_subscriber(
&format!("{},{},sc_tracing=trace", self.directives, profiling_targets),
max_level,
&self.directives,
Some(&profiling_targets),
self.force_colors,
self.telemetry_buffer_size,
self.telemetry_external_transport,
Expand All @@ -270,8 +273,8 @@ impl LoggerBuilder {
Ok(telemetry_worker)
} else {
let (subscriber, telemetry_worker) = prepare_subscriber(
&format!("{},{},sc_tracing=trace", self.directives, profiling_targets),
max_level,
&self.directives,
Some(&profiling_targets),
self.force_colors,
self.telemetry_buffer_size,
self.telemetry_external_transport,
Expand Down Expand Up @@ -329,57 +332,53 @@ mod tests {
let _ = LoggerBuilder::new(directives).init().unwrap();
}

fn run_in_process(test_name: &str) {
if env::var("RUN_IN_PROCESS").is_err() {
let status = Command::new(env::current_exe().unwrap())
.arg(test_name)
.env("RUN_IN_PROCESS", "true")
.status()
.unwrap();
assert!(status.success(), "process did not ended successfully");
std::process::exit(0);
}
}

#[test]
fn test_logger_filters() {
run_in_process("test_logger_filters");

let test_directives = "afg=debug,sync=trace,client=warn,telemetry,something-with-dash=error";
init_logger(&test_directives);

tracing::dispatcher::get_default(|dispatcher| {
let test_filter = |target, level| {
struct DummyCallSite;
impl Callsite for DummyCallSite {
fn set_interest(&self, _: Interest) {}
fn metadata(&self) -> &Metadata<'_> {
unreachable!();
if env::var("RUN_TEST_LOGGER_FILTERS").is_ok() {
let test_directives = "afg=debug,sync=trace,client=warn,telemetry,something-with-dash=error";
init_logger(&test_directives);

tracing::dispatcher::get_default(|dispatcher| {
let test_filter = |target, level| {
struct DummyCallSite;
impl Callsite for DummyCallSite {
fn set_interest(&self, _: Interest) {}
fn metadata(&self) -> &Metadata<'_> {
unreachable!();
}
}
}

let metadata = tracing::metadata!(
name: "",
target: target,
level: level,
fields: &[],
callsite: &DummyCallSite,
kind: Kind::SPAN,
);

dispatcher.enabled(&metadata)
};

assert!(test_filter("afg", Level::INFO));
assert!(test_filter("afg", Level::DEBUG));
assert!(!test_filter("afg", Level::TRACE));

assert!(test_filter("sync", Level::TRACE));
assert!(test_filter("client", Level::WARN));

assert!(test_filter("telemetry", Level::TRACE));
assert!(test_filter("something-with-dash", Level::ERROR));
});

let metadata = tracing::metadata!(
name: "",
target: target,
level: level,
fields: &[],
callsite: &DummyCallSite,
kind: Kind::SPAN,
);

dispatcher.enabled(&metadata)
};

assert!(test_filter("afg", Level::INFO));
assert!(test_filter("afg", Level::DEBUG));
assert!(!test_filter("afg", Level::TRACE));

assert!(test_filter("sync", Level::TRACE));
assert!(test_filter("client", Level::WARN));

assert!(test_filter("telemetry", Level::TRACE));
assert!(test_filter("something-with-dash", Level::ERROR));
});
} else {
let status = Command::new(env::current_exe().unwrap())
.arg("test_logger_filters")
.env("RUN_TEST_LOGGER_FILTERS", "1")
.output()
.unwrap()
.status;
assert!(status.success());
}
}

/// This test ensures that using dash (`-`) in the target name in logs and directives actually
Expand Down Expand Up @@ -500,11 +499,18 @@ mod tests {

let output = command.output().unwrap();

String::from_utf8(output.stderr).unwrap()
dbg!(String::from_utf8(output.stderr)).unwrap()
}

if env::var("PRINT_MAX_LOG_LEVEL").is_ok() {
init_logger(&env::var("TRACING_TARGETS").unwrap_or_default());
let mut builder = LoggerBuilder::new("");

if let Ok(targets) = env::var("TRACING_TARGETS") {
builder.with_profiling(crate::TracingReceiver::Log, targets);
}

builder.init().unwrap();

eprint!("MAX_LOG_LEVEL={:?}", log::max_level());
} else {
assert_eq!("MAX_LOG_LEVEL=Info", run_test(None, None));
Expand Down