Skip to content
This repository was archived by the owner on Sep 12, 2023. It is now read-only.

Commit bbc5ec3

Browse files
committed
Export traces via OLTP
This exports traces and logs to Jaeger or OTEL via the opentelemetry OLTP exporter.
1 parent 1635a1c commit bbc5ec3

File tree

15 files changed

+326
-34
lines changed

15 files changed

+326
-34
lines changed

Cargo.lock

+234-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ resolver = "2"
2525
xtra = { git = "https://github.com/Restioson/xtra", rev = "afff02dd0fc8b92ae264db8a5457773f8af95487" } # Unreleased
2626
maia = { git = "https://github.com/comit-network/maia", rev = "fc6b78b98407b10b55f8cfd152062ad77f98cd9f" } # Unreleased
2727
maia-core = { git = "https://github.com/comit-network/maia", tag = "0.1.1", package = "maia-core" } # Pinned to support maia 0.1 and 0.2
28-
xtra_productivity = { git = "https://github.com/comit-network/xtra-productivity", rev = "bebe45425ae44980186df7b96b41f70cad58a4bb" } # Unreleased
28+
xtra_productivity = { git = "https://github.com/Restioson/xtra-productivity", rev = "dcf76b6622c89dc6b5630605d8395d8265968406" } # Unreleased
2929
electrum-client = { git = "https://github.com/comit-network/rust-electrum-client/", branch = "do-not-ignore-empty-lines" }
3030

3131
[profile.dev.package.sqlx-macros]

daemon-tests/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"]
2222
tokio-tasks = { path = "../tokio-tasks", features = ["xtra"] }
2323
tracing = { version = "0.1" }
2424
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi", "env-filter", "local-time", "tracing-log", "json"] }
25-
xtra = { version = "0.6" }
25+
xtra = { version = "0.6", features = ["instrumentation"] }
2626
xtra-bitmex-price-feed = { path = "../xtra-bitmex-price-feed" }
2727
xtra-libp2p = { path = "../xtra-libp2p" }
28-
xtra_productivity = { version = "0.1" }
28+
xtra_productivity = { version = "0.1", features = ["tracing"] }

daemon/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ xtra-bitmex-price-feed = { path = "../xtra-bitmex-price-feed" }
5555
xtra-libp2p = { path = "../xtra-libp2p" }
5656
xtra-libp2p-offer = { path = "../xtra-libp2p-offer" }
5757
xtra-libp2p-ping = { path = "../xtra-libp2p-ping" }
58-
xtra_productivity = { version = "0.1.0" }
58+
xtra_productivity = { version = "0.1.0", features = ["tracing"] }
5959
xtras = { path = "../xtras" }
6060

6161
[dev-dependencies]

jaeger.sh

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Run jaeger for viewing traces when debugging locally
2+
docker run \
3+
-p 5778:5778 \
4+
-p 16686:16686 \
5+
-p 13133:13133 \
6+
-p 4317:55680 \
7+
jaegertracing/opentelemetry-all-in-one:latest

maker/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,5 @@ xtra-bitmex-price-feed = { path = "../xtra-bitmex-price-feed" }
4040
xtra-libp2p = { path = "../xtra-libp2p" }
4141
xtra-libp2p-offer = { path = "../xtra-libp2p-offer" }
4242
xtra-libp2p-ping = { path = "../xtra-libp2p-ping" }
43-
xtra_productivity = { version = "0.1.0" }
43+
xtra_productivity = { version = "0.1.0", features = ["tracing"] }
4444
xtras = { path = "../xtras" }

maker/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use xtras::supervisor::always_restart;
3131
async fn main() -> Result<()> {
3232
let opts = Opts::parse();
3333

34-
logger::init(opts.log_level, opts.json).context("initialize logger")?;
34+
logger::init(opts.log_level, opts.json, "maker").context("initialize logger")?;
3535
tracing::info!("Running version: {}", daemon::version::version());
3636
let settlement_interval_hours = SETTLEMENT_INTERVAL.whole_hours();
3737

shared-bin/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ atty = "0.2"
1313
daemon = { path = "../daemon" }
1414
http-api-problem = { version = "0.53.0", features = ["rocket"] }
1515
model = { path = "../model" }
16+
opentelemetry = { version = "0.17.0", features = ["rt-tokio"] }
17+
opentelemetry-otlp = { version = "0.10.0" }
1618
rocket = { version = "0.5.0-rc.1", features = ["json"] }
1719
rocket-basicauth = { path = "../rocket-basicauth" }
1820
serde = { version = "1", features = ["derive"] }
1921
time = "0.3.11"
2022
tracing = { version = "0.1" }
23+
tracing-opentelemetry = "0.17.3"
2124
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi", "env-filter", "local-time", "tracing-log", "json"] }

shared-bin/src/logger.rs

+44-9
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
1-
use anyhow::anyhow;
1+
use anyhow::Context;
22
use anyhow::Result;
3+
use opentelemetry::sdk::propagation::TraceContextPropagator;
4+
use opentelemetry::sdk::trace;
5+
use opentelemetry::sdk::Resource;
6+
use opentelemetry::KeyValue;
37
use time::macros::format_description;
48
use tracing_subscriber::fmt::time::UtcTime;
59
use tracing_subscriber::EnvFilter;
10+
use tracing_subscriber::Layer;
11+
use tracing_subscriber::Registry;
612

713
pub use tracing_subscriber::filter::LevelFilter;
14+
use tracing_subscriber::layer::SubscriberExt;
15+
use tracing_subscriber::util::SubscriberInitExt;
816

917
const RUST_LOG_ENV: &str = "RUST_LOG";
1018

1119
#[allow(clippy::print_stdout)] // because the logger is only initialized at the end of this function but we want to print a warning
12-
pub fn init(level: LevelFilter, json_format: bool) -> Result<()> {
20+
pub fn init(
21+
level: LevelFilter,
22+
json_format: bool,
23+
instrumentation: bool,
24+
service_name: &'static str,
25+
) -> Result<()> {
1326
if level == LevelFilter::OFF {
1427
return Ok(());
1528
}
@@ -31,23 +44,45 @@ pub fn init(level: LevelFilter, json_format: bool) -> Result<()> {
3144
};
3245
let filter = filter.add_directive(format!("{level}").parse()?);
3346

34-
let builder = tracing_subscriber::fmt()
35-
.with_env_filter(filter)
47+
let fmt_layer = tracing_subscriber::fmt::layer()
3648
.with_writer(std::io::stderr)
3749
.with_ansi(is_terminal);
3850

39-
let result = if json_format {
40-
builder.json().with_timer(UtcTime::rfc_3339()).try_init()
51+
let fmt_layer = if json_format {
52+
fmt_layer.json().with_timer(UtcTime::rfc_3339()).boxed()
4153
} else {
42-
builder
54+
fmt_layer
4355
.compact()
4456
.with_timer(UtcTime::new(format_description!(
4557
"[year]-[month]-[day] [hour]:[minute]:[second]"
4658
)))
47-
.try_init()
59+
.boxed()
4860
};
4961

50-
result.map_err(|e| anyhow!("Failed to init logger: {e}"))?;
62+
let fmt_layer = fmt_layer.with_filter(filter);
63+
64+
let telemetry = if instrumentation {
65+
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
66+
let cfg = trace::Config::default()
67+
.with_resource(Resource::new([KeyValue::new("service.name", service_name)]));
68+
69+
let tracer = opentelemetry_otlp::new_pipeline()
70+
.tracing()
71+
.with_trace_config(cfg)
72+
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
73+
.install_batch(opentelemetry::runtime::Tokio)
74+
.context("Failed to initialise OTLP exporter")?;
75+
76+
Some(tracing_opentelemetry::layer().with_tracer(tracer))
77+
} else {
78+
None
79+
};
80+
81+
Registry::default()
82+
.with(telemetry)
83+
.with(fmt_layer)
84+
.try_init()
85+
.context("Failed to init logger")?;
5186

5287
tracing::info!("Initialized logger");
5388

taker/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ tracing = { version = "0.1" }
3030
uuid = "0.8"
3131
webbrowser = "0.7.1"
3232
x25519-dalek = "1.1"
33-
xtra = { version = "0.6" }
33+
xtra = { version = "0.6", features = ["instrumentation"] }
3434
xtra-bitmex-price-feed = { path = "../xtra-bitmex-price-feed" }
3535
xtra-libp2p = { path = "../xtra-libp2p" }
3636

taker/src/main.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ struct Opts {
8585
#[clap(short, long)]
8686
json: bool,
8787

88+
/// If enabled, traces will be exported to the OTEL collector
89+
#[clap(short, long)]
90+
instrumentation: bool,
91+
8892
/// Configure the log level, e.g.: one of Error, Warn, Info, Debug, Trace
8993
#[clap(short, long, default_value = "Debug")]
9094
log_level: LevelFilter,
@@ -284,7 +288,8 @@ async fn main() -> Result<()> {
284288
let network = opts.network();
285289
let (maker_url, maker_id, maker_peer_id) = opts.maker()?;
286290

287-
logger::init(opts.log_level, opts.json).context("initialize logger")?;
291+
logger::init(opts.log_level, opts.json, opts.instrumentation, "taker")
292+
.context("initialize logger")?;
288293
tracing::info!("Running version: {}", daemon::version::version());
289294
let settlement_interval_hours = SETTLEMENT_INTERVAL.whole_hours();
290295

taker/src/routes.rs

+11
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use std::borrow::Cow;
3636
use std::path::PathBuf;
3737
use tokio::select;
3838
use tokio::sync::watch;
39+
use tracing::instrument;
3940

4041
type Taker = TakerActorSystem<
4142
oracle::Actor,
@@ -54,6 +55,7 @@ pub struct IdentityInfo {
5455
}
5556

5657
#[rocket::get("/feed")]
58+
#[instrument(name = "GET /feed", skip(rx))]
5759
pub async fn feed(
5860
rx: &State<Feeds>,
5961
rx_wallet: &State<watch::Receiver<Option<WalletInfo>>>,
@@ -148,6 +150,7 @@ pub struct CfdOrderRequest {
148150
}
149151

150152
#[rocket::post("/cfd/order", data = "<cfd_order_request>")]
153+
#[instrument(name = "POST /cfd/order", skip(taker))]
151154
pub async fn post_order_request(
152155
cfd_order_request: Json<CfdOrderRequest>,
153156
taker: &State<Taker>,
@@ -170,6 +173,7 @@ pub async fn post_order_request(
170173
}
171174

172175
#[rocket::post("/cfd/<id>/<action>")]
176+
#[instrument(name = "POST /cfd/<id>/<action>", skip(taker))]
173177
pub async fn post_cfd_action(
174178
id: Uuid,
175179
action: String,
@@ -205,6 +209,7 @@ pub async fn post_cfd_action(
205209
}
206210

207211
#[rocket::get("/alive")]
212+
#[instrument(name = "GET /alive")]
208213
pub fn get_health_check() {}
209214

210215
#[derive(Debug, Clone, Copy, Deserialize)]
@@ -233,12 +238,14 @@ pub struct MarginResponse {
233238
struct Asset;
234239

235240
#[rocket::get("/assets/<file..>")]
241+
#[instrument(name = "GET /assets/<file>")]
236242
pub fn dist<'r>(file: PathBuf, _auth: Authenticated) -> impl Responder<'r, 'static> {
237243
let filename = format!("assets/{}", file.display());
238244
Asset::get(&filename).into_response(file)
239245
}
240246

241247
#[rocket::get("/<_paths..>", format = "text/html")]
248+
#[instrument(name = "GET /<_paths>")]
242249
pub fn index<'r>(_paths: PathBuf, _auth: Authenticated) -> impl Responder<'r, 'static> {
243250
let asset = Asset::get("index.html").ok_or(Status::NotFound)?;
244251
Ok::<(ContentType, Cow<[u8]>), Status>((ContentType::HTML, asset.data))
@@ -253,6 +260,7 @@ pub struct WithdrawRequest {
253260
}
254261

255262
#[rocket::post("/withdraw", data = "<withdraw_request>")]
263+
#[instrument(name = "POST /withdraw", skip(taker))]
256264
pub async fn post_withdraw_request(
257265
withdraw_request: Json<WithdrawRequest>,
258266
taker: &State<Taker>,
@@ -279,6 +287,7 @@ pub async fn post_withdraw_request(
279287
}
280288

281289
#[rocket::get("/metrics")]
290+
#[instrument(name = "GET /metrics")]
282291
pub async fn get_metrics<'r>(_auth: Authenticated) -> Result<String, HttpApiProblem> {
283292
let metrics = prometheus::TextEncoder::new()
284293
.encode_to_string(&prometheus::gather())
@@ -292,6 +301,7 @@ pub async fn get_metrics<'r>(_auth: Authenticated) -> Result<String, HttpApiProb
292301
}
293302

294303
#[rocket::put("/sync")]
304+
#[instrument(name = "PUT /sync", skip(taker))]
295305
pub async fn put_sync_wallet(
296306
taker: &State<Taker>,
297307
_auth: Authenticated,
@@ -311,6 +321,7 @@ pub struct HealthCheck {
311321
}
312322

313323
#[rocket::get("/version")]
324+
#[instrument(name = "GET /version")]
314325
pub async fn get_version() -> Json<HealthCheck> {
315326
Json(HealthCheck {
316327
daemon_version: daemon::version::version().to_string(),

xtra-bitmex-price-feed/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ tokio = "1"
1919
tokio-tasks = { path = "../tokio-tasks" }
2020
tracing = "0.1"
2121
xtra = "0.6"
22-
xtra_productivity = { version = "0.1.0" }
22+
xtra_productivity = { version = "0.1.0", features = ["tracing"] }
2323

2424
[dev-dependencies]
2525
rust_decimal_macros = "1"

xtra-libp2p/src/dialer.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use async_trait::async_trait;
99
use libp2p_core::Multiaddr;
1010
use libp2p_core::PeerId;
1111
use std::time::Duration;
12+
use tracing::instrument;
13+
use tracing::Instrument;
1214
use xtra::Address;
1315
use xtra_productivity::xtra_productivity;
1416
use xtras::SendAsyncSafe;
@@ -21,6 +23,7 @@ pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
2123
/// Polls Endpoint at startup to check whether connection got established correctly, and
2224
/// then listens for ConnectionDropped message to stop itself.
2325
/// Should be used in conjunction with supervisor maintaining resilient connection.
26+
#[derive(Debug)]
2427
pub struct Actor {
2528
endpoint: Address<Endpoint>,
2629
connect_address: Multiaddr,
@@ -38,6 +41,7 @@ impl Actor {
3841
}
3942
}
4043

44+
#[instrument(err)]
4145
async fn connect(&self) -> Result<(), Error> {
4246
self.endpoint
4347
.send(Connect(self.connect_address.clone()))
@@ -86,6 +90,7 @@ impl Actor {
8690
.expect("to always have peer id if successfully started")
8791
}
8892

93+
#[instrument(ret, err)]
8994
async fn is_connection_established(&self) -> Result<bool> {
9095
Ok(self
9196
.endpoint
@@ -95,6 +100,7 @@ impl Actor {
95100
.contains(&self.peer_id()))
96101
}
97102

103+
#[instrument(err)]
98104
async fn dial(&self) -> Result<()> {
99105
if self.is_connection_established().await? {
100106
tracing::info!("Connection is already established, no need to connect");
@@ -106,7 +112,12 @@ impl Actor {
106112
}
107113

108114
// Only check the connection again after it had enough time to be established
109-
tokio::time::sleep(CONNECTION_TIMEOUT).await;
115+
tokio::time::sleep(CONNECTION_TIMEOUT)
116+
.instrument(tracing::debug_span!(
117+
"Wait connection timeout",
118+
timeout_secs = CONNECTION_TIMEOUT.as_secs()
119+
))
120+
.await;
110121

111122
anyhow::ensure!(
112123
self.is_connection_established().await?,

xtras/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ tokio-tasks = { path = "../tokio-tasks" }
1414
tracing = { version = "0.1" }
1515
uuid = { version = "1.1", features = ["v4"] }
1616
xtra = { version = "0.6", features = ["instrumentation"] }
17-
xtra_productivity = { version = "0.1.0" }
17+
xtra_productivity = { version = "0.1.0", features = ["tracing"] }
1818

1919
[dev-dependencies]
2020
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }

0 commit comments

Comments
 (0)