From e09374f8b308a4e90af921aaf4a17b7453fc9765 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Fri, 13 Dec 2024 13:55:18 -0300 Subject: [PATCH] feat: add tracing for proxy and worker --- CHANGELOG.md | 1 + Cargo.lock | 223 +++++++++++++++++++++++++++ bin/tx-prover/Cargo.toml | 7 + bin/tx-prover/README.md | 10 ++ bin/tx-prover/src/api/mod.rs | 12 +- bin/tx-prover/src/commands/mod.rs | 9 +- bin/tx-prover/src/commands/proxy.rs | 6 +- bin/tx-prover/src/commands/worker.rs | 5 +- bin/tx-prover/src/main.rs | 5 +- bin/tx-prover/src/proxy/mod.rs | 153 +++++++++++++++++- bin/tx-prover/src/utils.rs | 67 +++++++- 11 files changed, 479 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b5d8cd4e3..842ef37f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Changes +- Added tracing to the `miden-tx-prover` CLI (#1014). - Added health check endpoints to the prover service (#1006). - Implemented serialization for `AccountHeader` (#996). - Updated Pingora crates to 0.4 and added polling time to the configuration file (#997). diff --git a/Cargo.lock b/Cargo.lock index 5b3ec503f..ae5a66c47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1122,6 +1122,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "hermit-abi" version = "0.4.0" @@ -1474,6 +1480,12 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "ipnet" version = "2.10.1" @@ -2002,6 +2014,7 @@ version = "0.7.0" dependencies = [ "async-trait", "axum", + "bytes", "clap 4.5.21", "figment", "getrandom", @@ -2010,6 +2023,11 @@ dependencies = [ "miden-tx", "miette", "once_cell", + "opentelemetry 0.27.1", + "opentelemetry-jaeger", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions 0.27.0", + "opentelemetry_sdk 0.27.1", "pingora", "pingora-core", "pingora-limits", @@ -2029,6 +2047,7 @@ dependencies = [ "tonic-web", "tonic-web-wasm-client", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "uuid", "winter-maybe-async", @@ -2239,6 +2258,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.9", + "libc", +] + [[package]] name = "object" version = "0.36.5" @@ -2304,6 +2333,150 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror 1.0.69", +] + +[[package]] +name = "opentelemetry" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 1.0.69", + "tracing", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501b471b67b746d9a07d4c29f8be00f952d1a2eca356922ede0098cbaddff19f" +dependencies = [ + "async-trait", + "futures-core", + "futures-util", + "opentelemetry 0.23.0", + "opentelemetry-semantic-conventions 0.15.0", + "opentelemetry_sdk 0.23.0", + "thrift", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry 0.27.1", + "opentelemetry-proto", + "opentelemetry_sdk 0.27.1", + "prost", + "thiserror 1.0.69", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" +dependencies = [ + "opentelemetry 0.27.1", + "opentelemetry_sdk 0.27.1", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1869fb4bb9b35c5ba8a1e40c9b128a7b4c010d07091e864a29da19e4fe2ca4d7" + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52" + +[[package]] +name = "opentelemetry_sdk" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "lazy_static", + "once_cell", + "opentelemetry 0.23.0", + "ordered-float 4.5.0", + "percent-encoding", + "rand", + "thiserror 1.0.69", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry 0.27.1", + "percent-encoding", + "rand", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + +[[package]] +name = "ordered-float" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c65ee1f9701bf938026630b455d5315f490640234259037edb259798b3bcf85e" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -3716,6 +3889,28 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float 2.10.1", + "threadpool", +] + [[package]] name = "thrift_codec" version = "0.1.1" @@ -4075,6 +4270,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.27.1", + "opentelemetry_sdk 0.27.1", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" @@ -4400,6 +4613,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/bin/tx-prover/Cargo.toml b/bin/tx-prover/Cargo.toml index 36b2b6182..07ad95c12 100644 --- a/bin/tx-prover/Cargo.toml +++ b/bin/tx-prover/Cargo.toml @@ -32,12 +32,19 @@ tonic = { version = "0.12", default-features = false, features = ["prost", "code getrandom = { version = "0.2", features = ["js"], optional = true } [target.'cfg(not(all(target_arch = "wasm32", target_os = "unknown")))'.dependencies] +bytes = "1.0" tonic = { version = "0.12", default-features = false, features = ["prost", "codegen", "transport"] } once_cell = "1.19.0" pingora = { version = "0.4", features = [ "lb" ] } pingora-core = "0.4" pingora-proxy = "0.4" pingora-limits = "0.4" +opentelemetry = { version = "0.27", features = ["metrics", "trace"] } +opentelemetry-otlp = { version = "0.27", features = ["grpc-tonic"] } +opentelemetry_sdk = { version = "0.27", features = ["metrics", "rt-tokio"] } +opentelemetry-semantic-conventions = "0.27.0" +opentelemetry-jaeger = "0.22.0" +tracing-opentelemetry = "0.28" [dependencies] async-trait = "0.1" diff --git a/bin/tx-prover/README.md b/bin/tx-prover/README.md index ac3282484..a7e3ed575 100644 --- a/bin/tx-prover/README.md +++ b/bin/tx-prover/README.md @@ -114,6 +114,16 @@ The proxy service uses this health check to determine if a worker is available t Both the worker and the proxy will use the `info` log level by default, but it can be changed by setting the `RUST_LOG` environment variable. +## Traces + +The service uses the `tracing` crate for structured logging and tracing. Traces are enabled by default, and uses opentelemetry to export traces to a Jaeger instance. The traces can be visualized using the Jaeger UI, which can be used by running: + +```bash +docker run -d -p4317:4317 -p16686:16686 jaegertracing/all-in-one:latest +``` + +Then, you can access the Jaeger UI by opening `http://localhost:16686/` in your browser. + ## Features Description of this crate's feature: diff --git a/bin/tx-prover/src/api/mod.rs b/bin/tx-prover/src/api/mod.rs index c71fb344d..33c8cf2e5 100644 --- a/bin/tx-prover/src/api/mod.rs +++ b/bin/tx-prover/src/api/mod.rs @@ -9,7 +9,9 @@ use miden_tx_prover::generated::{ }; use tokio::{net::TcpListener, sync::Mutex}; use tonic::{Request, Response, Status}; -use tracing::info; +use tracing::{debug, info, instrument}; + +use crate::utils::TRACING_TARGET_NAME; pub struct RpcListener { pub api_service: ApiServer, @@ -30,10 +32,18 @@ pub struct ProverRpcApi { #[async_trait::async_trait] impl ProverApi for ProverRpcApi { + #[instrument( + target = TRACING_TARGET_NAME, + name = "prover:prove_transaction", + skip_all, + ret(level = "info"), + err + )] async fn prove_transaction( &self, request: Request, ) -> Result, tonic::Status> { + debug!(request = ?request, "Processing reply"); info!("Received request to prove transaction"); // Try to acquire a permit without waiting diff --git a/bin/tx-prover/src/commands/mod.rs b/bin/tx-prover/src/commands/mod.rs index 746dd6c96..057340ca8 100644 --- a/bin/tx-prover/src/commands/mod.rs +++ b/bin/tx-prover/src/commands/mod.rs @@ -9,10 +9,12 @@ use init::Init; use miden_tx_prover::PROVER_SERVICE_CONFIG_FILE_NAME; use proxy::StartProxy; use serde::{Deserialize, Serialize}; -use tracing::debug; +use tracing::{debug, instrument}; use update_workers::{AddWorkers, RemoveWorkers, UpdateWorkers}; use worker::StartWorker; +use crate::utils::TRACING_TARGET_NAME; + pub mod init; pub mod proxy; pub mod update_workers; @@ -22,7 +24,7 @@ pub mod worker; /// /// It is stored in a TOML file, which will be created by the `init` command. /// It allows manual modification of the configuration file. -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ProxyConfig { /// List of workers used by the proxy. pub workers: Vec, @@ -116,7 +118,7 @@ impl ProxyConfig { } /// Configuration for a worker -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct WorkerConfig { pub host: String, pub port: u16, @@ -168,6 +170,7 @@ pub enum Command { /// CLI entry point impl Cli { + #[instrument(target = TRACING_TARGET_NAME, name = "cli:execute", skip_all, ret(level = "info"), err)] pub async fn execute(&self) -> Result<(), String> { match &self.action { // For the `StartWorker` command, we need to create a new runtime and run the worker diff --git a/bin/tx-prover/src/commands/proxy.rs b/bin/tx-prover/src/commands/proxy.rs index f18c5d998..ed3b22dad 100644 --- a/bin/tx-prover/src/commands/proxy.rs +++ b/bin/tx-prover/src/commands/proxy.rs @@ -7,7 +7,10 @@ use pingora::{ }; use pingora_proxy::http_proxy_service; -use crate::proxy::{LoadBalancer, LoadBalancerState}; +use crate::{ + proxy::{LoadBalancer, LoadBalancerState}, + utils::TRACING_TARGET_NAME, +}; /// Starts the proxy defined in the config file. #[derive(Debug, Parser)] @@ -18,6 +21,7 @@ impl StartProxy { /// /// This method will first read the config file to get the list of workers to start. It will /// then start a proxy with each worker as a backend. + #[tracing::instrument(target = TRACING_TARGET_NAME, name = "proxy:execute")] pub async fn execute(&self) -> Result<(), String> { let mut server = Server::new(Some(Opt::default())).map_err(|err| err.to_string())?; server.bootstrap(); diff --git a/bin/tx-prover/src/commands/worker.rs b/bin/tx-prover/src/commands/worker.rs index eeb965d8b..e5dfae8c5 100644 --- a/bin/tx-prover/src/commands/worker.rs +++ b/bin/tx-prover/src/commands/worker.rs @@ -3,9 +3,9 @@ use miden_tx_prover::generated::api_server::ApiServer; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; use tonic_health::server::health_reporter; -use tracing::info; +use tracing::{info, instrument}; -use crate::api::RpcListener; +use crate::{api::RpcListener, utils::TRACING_TARGET_NAME}; /// Starts a worker. #[derive(Debug, Parser)] @@ -28,6 +28,7 @@ impl StartWorker { /// The worker includes a health reporter that will mark the service as serving, following the /// [gRPC health checking protocol]( /// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto). + #[instrument(target = TRACING_TARGET_NAME, name = "worker:execute")] pub async fn execute(&self) -> Result<(), String> { let worker_addr = format!("{}:{}", self.host, self.port); let rpc = diff --git a/bin/tx-prover/src/main.rs b/bin/tx-prover/src/main.rs index 278ae2570..fecaadc46 100644 --- a/bin/tx-prover/src/main.rs +++ b/bin/tx-prover/src/main.rs @@ -3,13 +3,14 @@ pub mod commands; mod proxy; mod utils; use commands::Cli; -use utils::setup_tracing; +use utils::{init_tracer_provider, setup_tracing}; #[tokio::main] async fn main() -> Result<(), String> { use clap::Parser; + let provider = init_tracer_provider(); - setup_tracing(); + setup_tracing(provider)?; // read command-line args let cli = Cli::parse(); diff --git a/bin/tx-prover/src/proxy/mod.rs b/bin/tx-prover/src/proxy/mod.rs index fd18188d5..f055a2292 100644 --- a/bin/tx-prover/src/proxy/mod.rs +++ b/bin/tx-prover/src/proxy/mod.rs @@ -1,10 +1,13 @@ use std::{collections::VecDeque, future::Future, pin::Pin, sync::Arc, time::Duration}; use async_trait::async_trait; +use bytes::Bytes; use once_cell::sync::Lazy; use pingora::{ + http::ResponseHeader, lb::Backend, prelude::*, + protocols::Digest, server::ShutdownWatch, services::background::BackgroundService, upstreams::peer::{Peer, ALPN}, @@ -13,7 +16,7 @@ use pingora_core::{upstreams::peer::HttpPeer, Result}; use pingora_limits::rate::Rate; use pingora_proxy::{ProxyHttp, Session}; use tokio::{sync::RwLock, time::sleep}; -use tracing::{error, info, warn}; +use tracing::{debug_span, error, info, info_span, warn, Span}; use uuid::Uuid; use worker::Worker; @@ -37,6 +40,7 @@ const LOCALHOST_ADDR: &str = "127.0.0.1"; // ================================================================================================ /// Load balancer that uses a round robin strategy +#[derive(Debug)] pub struct LoadBalancerState { workers: Arc>>, timeout_secs: Duration, @@ -50,6 +54,7 @@ pub struct LoadBalancerState { impl LoadBalancerState { /// Create a new load balancer + #[tracing::instrument(name = "proxy:new_load_balancer", skip(initial_workers))] pub async fn new( initial_workers: Vec, config: &ProxyConfig, @@ -300,6 +305,7 @@ static QUEUE: Lazy = Lazy::new(RequestQueue::new); /// Custom context for the request/response lifecycle /// We use this context to keep track of the number of tries for a request, the unique ID for the /// request, and the worker that will process the request. +#[derive(Debug)] pub struct RequestContext { /// Number of tries for the request tries: usize, @@ -307,15 +313,19 @@ pub struct RequestContext { request_id: Uuid, /// Worker that will process the request worker: Option, + /// Parent span for the request + parent_span: Span, } impl RequestContext { /// Create a new request context fn new() -> Self { + let request_id = Uuid::new_v4(); Self { tries: 0, - request_id: Uuid::new_v4(), + request_id, worker: None, + parent_span: info_span!("proxy:new_request", request_id = request_id.to_string()), } } @@ -333,6 +343,7 @@ impl RequestContext { /// This wrapper is used to implement the ProxyHttp trait for Arc. /// This is necessary because we want to share the load balancer between the proxy server and the /// health check background service. +#[derive(Debug)] pub struct LoadBalancer(pub Arc); /// Implements load-balancing of incoming requests across a pool of workers. @@ -367,6 +378,7 @@ impl ProxyHttp for LoadBalancer { /// Here we apply IP-based rate-limiting to the request. We also check if the queue is full. /// /// If the request is rate-limited, we return a 429 response. Otherwise, we return false. + #[tracing::instrument(name = "proxy:request_filter", parent = &ctx.parent_span, skip(session))] async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result where Self::CTX: Send + Sync, @@ -423,6 +435,7 @@ impl ProxyHttp for LoadBalancer { /// /// Note that the request will be assigned a worker here, and the worker will be removed from /// the list of available workers once it reaches the [Self::logging] method. + #[tracing::instrument(name = "proxy:upstream_peer", parent = &ctx.parent_span, skip(_session))] async fn upstream_peer( &self, _session: &mut Session, @@ -482,6 +495,7 @@ impl ProxyHttp for LoadBalancer { /// /// This method is called right after [Self::upstream_peer()] returns a [HttpPeer] and a /// connection is established with the worker. + #[tracing::instrument(name = "proxy:upstream_request_filter", parent = &_ctx.parent_span, skip(_session))] async fn upstream_request_filter( &self, _session: &mut Session, @@ -503,6 +517,7 @@ impl ProxyHttp for LoadBalancer { } /// Retry the request if the connection fails. + #[tracing::instrument(name = "proxy:fail_to_connect", parent = &ctx.parent_span, skip(_session))] fn fail_to_connect( &self, _session: &mut Session, @@ -522,6 +537,7 @@ impl ProxyHttp for LoadBalancer { /// /// This method is the last one in the request lifecycle, no matter if the request was /// processed or not. + #[tracing::instrument(name = "proxy:logging", parent = &ctx.parent_span, skip(_session))] async fn logging(&self, _session: &mut Session, e: Option<&Error>, ctx: &mut Self::CTX) where Self::CTX: Send + Sync, @@ -535,6 +551,135 @@ impl ProxyHttp for LoadBalancer { self.0.add_available_worker(worker).await; } } + + // The following methods are a copy of the default implementation defined in the trait, but + // with tracing instrumentation. + // ============================================================================================ + #[tracing::instrument(name = "proxy:early_request_filter", parent = &ctx.parent_span, skip(_session))] + async fn early_request_filter( + &self, + _session: &mut Session, + ctx: &mut Self::CTX, + ) -> Result<()> { + Ok(()) + } + + #[tracing::instrument(name = "proxy:connected_to_upstream", parent = &ctx.parent_span, skip(_session, _sock, _fd))] + async fn connected_to_upstream( + &self, + _session: &mut Session, + _reused: bool, + _peer: &HttpPeer, + #[cfg(unix)] _fd: std::os::unix::io::RawFd, + #[cfg(windows)] _sock: std::os::windows::io::RawSocket, + _digest: Option<&Digest>, + ctx: &mut Self::CTX, + ) -> Result<()> { + Ok(()) + } + + #[tracing::instrument(name = "proxy:request_body_filter", parent = &ctx.parent_span, skip(_session, _body))] + async fn request_body_filter( + &self, + _session: &mut Session, + _body: &mut Option, + _end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> Result<()> { + Ok(()) + } + + #[tracing::instrument(name = "proxy:upstream_response_filter", parent = &ctx.parent_span, skip(_session, _upstream_response))] + fn upstream_response_filter( + &self, + _session: &mut Session, + _upstream_response: &mut ResponseHeader, + ctx: &mut Self::CTX, + ) { + } + + #[tracing::instrument(name = "proxy:response_filter", parent = &ctx.parent_span, skip(_session, _upstream_response))] + async fn response_filter( + &self, + _session: &mut Session, + _upstream_response: &mut ResponseHeader, + ctx: &mut Self::CTX, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + Ok(()) + } + + #[tracing::instrument(name = "proxy:upstream_response_body_filter", parent = &ctx.parent_span, skip(_session, _body))] + fn upstream_response_body_filter( + &self, + _session: &mut Session, + _body: &mut Option, + _end_of_stream: bool, + ctx: &mut Self::CTX, + ) { + } + + #[tracing::instrument(name = "proxy:response_body_filter", parent = &ctx.parent_span, skip(_session, _body))] + fn response_body_filter( + &self, + _session: &mut Session, + _body: &mut Option, + _end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> Result> + where + Self::CTX: Send + Sync, + { + Ok(None) + } + + #[tracing::instrument(name = "proxy:fail_to_proxy", parent = &ctx.parent_span, skip(session))] + async fn fail_to_proxy(&self, session: &mut Session, e: &Error, ctx: &mut Self::CTX) -> u16 + where + Self::CTX: Send + Sync, + { + let server_session = session.as_mut(); + let code = match e.etype() { + HTTPStatus(code) => *code, + _ => { + match e.esource() { + ErrorSource::Upstream => 502, + ErrorSource::Downstream => { + match e.etype() { + WriteError | ReadError | ConnectionClosed => { + /* conn already dead */ + 0 + }, + _ => 400, + } + }, + ErrorSource::Internal | ErrorSource::Unset => 500, + } + }, + }; + if code > 0 { + server_session.respond_error(code).await + } + code + } + + #[tracing::instrument(name = "proxy:error_while_proxy", parent = &ctx.parent_span, skip(session))] + fn error_while_proxy( + &self, + peer: &HttpPeer, + session: &mut Session, + e: Box, + ctx: &mut Self::CTX, + client_reused: bool, + ) -> Box { + let mut e = e.more_context(format!("Peer: {}", peer)); + // only reused client connections where retry buffer is not truncated + e.retry + .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated()); + e + } } /// Implement the BackgroundService trait for the LoadBalancer @@ -567,6 +712,10 @@ impl BackgroundService for LoadBalancerState { { Box::pin(async move { loop { + // Create a new spawn to perform the health check + let span = debug_span!("proxy:health_check"); + let _guard = span.enter(); + let mut workers = self.workers.write().await; // Perform health checks on workers and retain healthy ones diff --git a/bin/tx-prover/src/utils.rs b/bin/tx-prover/src/utils.rs index 6e310d6d6..9ea4093dd 100644 --- a/bin/tx-prover/src/utils.rs +++ b/bin/tx-prover/src/utils.rs @@ -1,20 +1,71 @@ use std::time::Duration; +use opentelemetry::{trace::TracerProvider as _, KeyValue}; +use opentelemetry_sdk::{ + runtime, + trace::{RandomIdGenerator, Sampler, TracerProvider}, + Resource, +}; +use opentelemetry_semantic_conventions::{ + resource::{SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; use pingora::{http::ResponseHeader, Error, ErrorType}; use pingora_proxy::Session; use tonic::transport::Channel; use tonic_health::pb::health_client::HealthClient; +use tracing::Level; +use tracing_subscriber::{layer::SubscriberExt, Registry}; + +pub const TRACING_TARGET_NAME: &str = "miden-tx-prover"; const RESOURCE_EXHAUSTED_CODE: u16 = 8; -pub(crate) fn setup_tracing() { - // Set a default log level if `RUST_LOG` is not set - if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "info"); // Default to 'info' level - } - tracing_subscriber::fmt::init(); +// Construct TracerProvider for OpenTelemetryLayer +pub(crate) fn init_tracer_provider() -> TracerProvider { + let exporter = create_span_exporter(); + + TracerProvider::builder() + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(1.0)))) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(create_resource()) + .with_batch_exporter(exporter, runtime::Tokio) + .build() +} + +// Create a SpanExporter +fn create_span_exporter() -> opentelemetry_otlp::SpanExporter { + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build() + .expect("Failed to build SpanExporter") +} + +// Create a Resource that captures information about the entity for which telemetry is recorded. +fn create_resource() -> Resource { + Resource::from_schema_url( + [ + KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")), + KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + ], + SCHEMA_URL, + ) } +// Setup tracing subscriber +pub(crate) fn setup_tracing(provider: TracerProvider) -> Result<(), String> { + let tracer = provider.tracer(TRACING_TARGET_NAME); + + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + let subscriber = Registry::default() + .with(telemetry) + .with(tracing_subscriber::filter::LevelFilter::from_level(Level::INFO)) + .with(tracing_subscriber::fmt::layer()); + + tracing::subscriber::set_global_default(subscriber) + .map_err(|e| format!("Failed to set subscriber: {:?}", e)) +} /// Create a 503 response for a full queue pub(crate) async fn create_queue_full_response( session: &mut Session, @@ -88,11 +139,11 @@ pub async fn create_health_check_client( total_timeout: Duration, ) -> Result, String> { Channel::from_shared(format!("http://{}", address)) - .map_err(|err| err.to_string())? + .map_err(|err| format!("Invalid format for worker URI: {}", err))? .connect_timeout(connection_timeout) .timeout(total_timeout) .connect() .await .map(HealthClient::new) - .map_err(|err| err.to_string()) + .map_err(|err| format!("Failed to create health check client for worker: {}", err)) }