diff --git a/mistralrs-bench/src/main.rs b/mistralrs-bench/src/main.rs index e5d7a0e2a8..b5058d87b7 100644 --- a/mistralrs-bench/src/main.rs +++ b/mistralrs-bench/src/main.rs @@ -535,7 +535,7 @@ fn main() -> anyhow::Result<()> { ), } }; - let mistralrs = MistralRsBuilder::new(pipeline, scheduler_config) + let mistralrs = MistralRsBuilder::new(pipeline, scheduler_config, false) .with_no_prefix_cache(true) .with_disable_eos_stop(true) .build(); diff --git a/mistralrs-core/src/engine/logger.rs b/mistralrs-core/src/engine/logger.rs new file mode 100644 index 0000000000..f0024e40bc --- /dev/null +++ b/mistralrs-core/src/engine/logger.rs @@ -0,0 +1,75 @@ +#![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)] + +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use tracing::info; + +pub struct IntervalLogger { + enable_logging: Arc, + prefix_cache_hits: Arc, + tokens_processed: Arc, + total_new_seqs: Arc, +} + +impl IntervalLogger { + /// Starts an interval logger. Call `begin_logging` to begin the logging process. + pub fn new(interval: Duration) -> Self { + let prefix_cache_hits = Arc::new(AtomicUsize::new(0)); + let tokens_processed = Arc::new(AtomicUsize::new(0)); + let total_new_seqs = Arc::new(AtomicUsize::new(0)); + let enable_logging = Arc::new(AtomicBool::new(false)); + + let t_prefix_cache_hits = prefix_cache_hits.clone(); + let t_tokens_processed = tokens_processed.clone(); + let t_total_new_seqs = total_new_seqs.clone(); + let t_enable_logging = enable_logging.clone(); + thread::spawn(move || { + // Wait + while !t_enable_logging.load(Ordering::Relaxed) {} + + // Start the actual logging + loop { + thread::sleep(interval); + + let total_new_seqs = t_total_new_seqs.load(Ordering::Relaxed); + let prefix_cache_hits = t_prefix_cache_hits.load(Ordering::Relaxed); + let tokens_processed = t_tokens_processed.swap(0, Ordering::Relaxed); + + if total_new_seqs != 0 && tokens_processed != 0 { + info!( + "Throughput (T/s) {:.2}, Prefix cache hitrate {:.2}%", + tokens_processed as f64 / interval.as_secs_f64(), + 100. * prefix_cache_hits as f64 / total_new_seqs as f64, + ); + } + } + }); + + Self { + prefix_cache_hits, + tokens_processed, + total_new_seqs, + enable_logging, + } + } + + pub fn enable_logging(&self) { + self.enable_logging.store(true, Ordering::Relaxed); + } + + pub fn add_tokens_processed(&self, num_tokens: usize) { + self.tokens_processed + .fetch_add(num_tokens, Ordering::Relaxed); + } + + pub fn add_new_sequence(&self) { + self.total_new_seqs.fetch_add(1, Ordering::Relaxed); + } + + pub fn add_prefix_cache_hit(&self) { + self.prefix_cache_hits.fetch_add(1, Ordering::Relaxed); + } +} diff --git a/mistralrs-core/src/engine/mod.rs b/mistralrs-core/src/engine/mod.rs index fff22136a9..939a26ca96 100644 --- a/mistralrs-core/src/engine/mod.rs +++ b/mistralrs-core/src/engine/mod.rs @@ -2,6 +2,7 @@ use candle_core::Tensor; use either::Either; use interprocess::local_socket::{traits::Listener, ListenerOptions}; use llguidance::toktrie::TokEnv; +use logger::IntervalLogger; use once_cell::sync::Lazy; use std::{ collections::HashMap, @@ -10,7 +11,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::{Instant, SystemTime, UNIX_EPOCH}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use tokio::sync::{mpsc::Receiver, Mutex}; @@ -43,6 +44,8 @@ use crate::{ Constraint, StopTokens, }; +mod logger; + pub enum EngineInstruction { Terminate, } @@ -66,6 +69,7 @@ pub struct Engine { is_debug: bool, disable_eos_stop: bool, throughput_logging_enabled: bool, + logger: IntervalLogger, } impl Engine { @@ -98,10 +102,15 @@ impl Engine { is_debug: DEBUG.load(Ordering::Relaxed), disable_eos_stop, throughput_logging_enabled, + logger: IntervalLogger::new(Duration::from_secs(5)), } } pub async fn run(&mut self) { + if self.throughput_logging_enabled { + self.logger.enable_logging(); + } + let rng = Arc::new(std::sync::Mutex::new(Isaac64Rng::seed_from_u64(SEED))); let mut last_completion_ids: Vec = vec![]; 'lp: loop { @@ -135,10 +144,7 @@ impl Engine { SchedulerOutput::DefaultScheduler { output: mut scheduled, } => { - let mut prompt_ts = None; - let mut completion_ts = None; if scheduled.completion.len() > 0 { - let throughput_start = Instant::now(); let current_completion_ids: Vec = scheduled.completion.iter().map(|seq| *seq.id()).collect(); let res = { @@ -201,16 +207,12 @@ impl Engine { self.prefix_cacher ); - let throughput_end = Instant::now(); - #[allow(clippy::cast_precision_loss)] - if self.throughput_logging_enabled { - completion_ts = Some( - scheduled.completion.len() as f64 - / throughput_end - .duration_since(throughput_start) - .as_secs_f64(), - ); - } + let total_processed_tokens: usize = scheduled + .completion + .iter() + .map(|seq| seq.get_toks().len()) + .sum(); + self.logger.add_tokens_processed(total_processed_tokens); last_completion_ids = current_completion_ids; } @@ -277,17 +279,12 @@ impl Engine { self.prefix_cacher ); - #[allow(clippy::cast_precision_loss)] - if self.throughput_logging_enabled { - prompt_ts = Some( - scheduled - .prompt - .iter() - .map(|seq| seq.get_toks().len()) - .sum::() as f64 - / prompt_exec_time.as_secs_f64(), - ); - } + let total_processed_tokens: usize = scheduled + .prompt + .iter() + .map(|seq| seq.get_toks().len()) + .sum(); + self.logger.add_tokens_processed(total_processed_tokens); for seq in scheduled.prompt.iter_mut() { match seq.sequence_stepping_type() { @@ -338,21 +335,6 @@ impl Engine { } } - if self.throughput_logging_enabled { - match (prompt_ts, completion_ts) { - (Some(prompt), Some(completion)) => { - info!("Throughput (scheduler V1): Prompt: {prompt} T/s Completion {completion} T/s"); - } - (None, Some(completion)) => { - info!("Throughput (scheduler V1): Completion {completion} T/s"); - } - (Some(prompt), None) => { - info!("Throughput (scheduler V1): Prompt: {prompt} T/s"); - } - (None, None) => (), - } - } - if scheduled.prompt.len() == 0 && scheduled.completion.len() == 0 && self.scheduler.waiting_len() == 0 @@ -369,8 +351,6 @@ impl Engine { } SchedulerOutput::PagedAttention { mut output } => { if !output.scheduled.is_empty() { - let throughput_start = Instant::now(); - let is_prompt = get_mut_arcmutex!(output.scheduled[0]).is_prompt(); let mut guards = output @@ -428,6 +408,10 @@ impl Engine { self.prefix_cacher ); + let total_processed_tokens: usize = + guards.iter().map(|seq| seq.get_toks().len()).sum(); + self.logger.add_tokens_processed(total_processed_tokens); + if self.is_debug { let ms_from_last_run = run_start.elapsed().as_secs_f64(); let total_len = guards.len(); @@ -453,21 +437,6 @@ impl Engine { } } - let throughput_end = Instant::now(); - #[allow(clippy::cast_precision_loss)] - if self.throughput_logging_enabled { - let n_toks = if is_prompt { - guards.iter().map(|seq| seq.get_toks().len()).sum::() - } else { - guards.len() - }; - let ts = n_toks as f64 - / throughput_end - .duration_since(throughput_start) - .as_secs_f64(); - info!("Throughput (scheduler V2): {ts} T/s"); - } - if is_prompt { for mut seq in guards { let now = SystemTime::now() @@ -965,7 +934,10 @@ impl Engine { request.return_raw_logits, eos_toks, ); + self.logger.add_new_sequence(); let seq = if let Some(prefill_cache) = prefill_cache.clone() { + self.logger.add_prefix_cache_hit(); + seq.prefill_v2( prefill_cache.normal, prefill_cache.toks, diff --git a/mistralrs-core/src/lib.rs b/mistralrs-core/src/lib.rs index 05010a08d9..8ec6a6f8c5 100644 --- a/mistralrs-core/src/lib.rs +++ b/mistralrs-core/src/lib.rs @@ -186,11 +186,15 @@ pub struct MistralRsBuilder { no_prefix_cache: Option, prefix_cache_n: Option, disable_eos_stop: Option, - throughput_logging_enabled: Option<()>, + throughput_logging_enabled: bool, } impl MistralRsBuilder { - pub fn new(pipeline: Arc>, method: SchedulerConfig) -> Self { + pub fn new( + pipeline: Arc>, + method: SchedulerConfig, + throughput_logging: bool, + ) -> Self { Self { pipeline, method, @@ -200,7 +204,7 @@ impl MistralRsBuilder { no_prefix_cache: None, prefix_cache_n: None, disable_eos_stop: None, - throughput_logging_enabled: None, + throughput_logging_enabled: throughput_logging, } } pub fn with_log(mut self, log: String) -> Self { @@ -231,10 +235,6 @@ impl MistralRsBuilder { self.disable_eos_stop = Some(disable_eos_stop); self } - pub fn with_throughput_logging(mut self) -> Self { - self.throughput_logging_enabled = Some(()); - self - } pub fn build(self) -> Arc { MistralRs::new(self) @@ -272,7 +272,6 @@ impl MistralRs { let no_prefix_cache = no_prefix_cache.unwrap_or(false); let prefix_cache_n = prefix_cache_n.unwrap_or(16); let disable_eos_stop = disable_eos_stop.unwrap_or(false); - let throughput_logging_enabled = throughput_logging_enabled.is_some(); let reboot_state = RebootState { pipeline: pipeline.clone(), @@ -443,7 +442,7 @@ impl MistralRs { .duration_since(UNIX_EPOCH) .expect("Time travel has occurred!") .as_secs(), - next_request_id: Mutex::new(RefCell::new(0)), + next_request_id: Mutex::new(RefCell::new(1)), reboot_state, engine_handler: RwLock::new(engine_handler), category, diff --git a/mistralrs-pyo3/src/lib.rs b/mistralrs-pyo3/src/lib.rs index 43c837a01f..61b752f545 100644 --- a/mistralrs-pyo3/src/lib.rs +++ b/mistralrs-pyo3/src/lib.rs @@ -764,7 +764,7 @@ impl Runner { ), } }; - let mistralrs = MistralRsBuilder::new(pipeline, scheduler_config) + let mistralrs = MistralRsBuilder::new(pipeline, scheduler_config, false) .with_no_kv_cache(no_kv_cache) .with_prefix_cache_n(prefix_cache_n) .build(); diff --git a/mistralrs-server/src/main.rs b/mistralrs-server/src/main.rs index 1a283cc469..3814ea622d 100644 --- a/mistralrs-server/src/main.rs +++ b/mistralrs-server/src/main.rs @@ -479,24 +479,18 @@ async fn main() -> Result<()> { } }; // Throughput logging in the server - let builder = MistralRsBuilder::new(pipeline, scheduler_config) + let mistralrs = MistralRsBuilder::new(pipeline, scheduler_config, !args.interactive_mode) .with_opt_log(args.log) .with_truncate_sequence(args.truncate_sequence) .with_no_kv_cache(args.no_kv_cache) - .with_prefix_cache_n(args.prefix_cache_n); + .with_prefix_cache_n(args.prefix_cache_n) + .build(); if args.interactive_mode { - interactive_mode(builder.build(), args.throughput_log).await; + interactive_mode(mistralrs, args.throughput_log).await; return Ok(()); } - let builder = if args.throughput_log { - builder.with_throughput_logging() - } else { - builder - }; - let mistralrs = builder.build(); - // Needs to be after the .build call as that is where the daemon waits. let setting_server = if !args.interactive_mode { let port = args.port.expect("Interactive mode was not specified, so expected port to be specified. Perhaps you forgot `-i` or `--port`?"); diff --git a/mistralrs/src/anymoe.rs b/mistralrs/src/anymoe.rs index 8b7cbe96bc..cdd4dc362b 100644 --- a/mistralrs/src/anymoe.rs +++ b/mistralrs/src/anymoe.rs @@ -111,9 +111,10 @@ impl AnyMoeModelBuilder { }, }; - let mut runner = MistralRsBuilder::new(pipeline, scheduler_method) - .with_no_kv_cache(self.base.no_kv_cache) - .with_no_prefix_cache(self.base.prefix_cache_n.is_none()); + let mut runner = + MistralRsBuilder::new(pipeline, scheduler_method, self.base.throughput_logging) + .with_no_kv_cache(self.base.no_kv_cache) + .with_no_prefix_cache(self.base.prefix_cache_n.is_none()); if let Some(n) = self.base.prefix_cache_n { runner = runner.with_prefix_cache_n(n) diff --git a/mistralrs/src/diffusion_model.rs b/mistralrs/src/diffusion_model.rs index de9379b46a..a645a532dd 100644 --- a/mistralrs/src/diffusion_model.rs +++ b/mistralrs/src/diffusion_model.rs @@ -102,7 +102,7 @@ impl DiffusionModelBuilder { method: DefaultSchedulerMethod::Fixed(self.max_num_seqs.try_into()?), }; - let runner = MistralRsBuilder::new(pipeline, scheduler_method); + let runner = MistralRsBuilder::new(pipeline, scheduler_method, false); Ok(Model::new(runner.build())) } diff --git a/mistralrs/src/gguf.rs b/mistralrs/src/gguf.rs index a927414370..a70bc8f5a0 100644 --- a/mistralrs/src/gguf.rs +++ b/mistralrs/src/gguf.rs @@ -20,6 +20,7 @@ pub struct GgufModelBuilder { pub(crate) prompt_chunksize: Option, pub(crate) force_cpu: bool, pub(crate) topology: Option, + pub(crate) throughput_logging: bool, // Other things pub(crate) paged_attn_cfg: Option, @@ -54,9 +55,16 @@ impl GgufModelBuilder { tok_model_id: None, device_mapping: None, jinja_explicit: None, + throughput_logging: false, } } + /// Enable runner throughput logging. + pub fn with_throughput_logging(mut self) -> Self { + self.throughput_logging = true; + self + } + /// Explicit JINJA chat template file (.jinja) to be used. If specified, this overrides all other chat templates. pub fn with_jinja_explicit(mut self, jinja_explicit: String) -> Self { self.jinja_explicit = Some(jinja_explicit); @@ -214,7 +222,7 @@ impl GgufModelBuilder { }, }; - let mut runner = MistralRsBuilder::new(pipeline, scheduler_method) + let mut runner = MistralRsBuilder::new(pipeline, scheduler_method, self.throughput_logging) .with_no_kv_cache(self.no_kv_cache) .with_no_prefix_cache(self.prefix_cache_n.is_none()); diff --git a/mistralrs/src/gguf_lora_model.rs b/mistralrs/src/gguf_lora_model.rs index 2dcbfd0098..7bee5b2706 100644 --- a/mistralrs/src/gguf_lora_model.rs +++ b/mistralrs/src/gguf_lora_model.rs @@ -79,9 +79,13 @@ impl GgufLoraModelBuilder { }, }; - let mut runner = MistralRsBuilder::new(pipeline, scheduler_method) - .with_no_kv_cache(self.gguf_model.no_kv_cache) - .with_no_prefix_cache(self.gguf_model.prefix_cache_n.is_none()); + let mut runner = MistralRsBuilder::new( + pipeline, + scheduler_method, + self.gguf_model.throughput_logging, + ) + .with_no_kv_cache(self.gguf_model.no_kv_cache) + .with_no_prefix_cache(self.gguf_model.prefix_cache_n.is_none()); if let Some(n) = self.gguf_model.prefix_cache_n { runner = runner.with_prefix_cache_n(n) diff --git a/mistralrs/src/gguf_xlora_model.rs b/mistralrs/src/gguf_xlora_model.rs index e294a74d99..d1227a16e3 100644 --- a/mistralrs/src/gguf_xlora_model.rs +++ b/mistralrs/src/gguf_xlora_model.rs @@ -91,9 +91,13 @@ impl GgufXLoraModelBuilder { }, }; - let mut runner = MistralRsBuilder::new(pipeline, scheduler_method) - .with_no_kv_cache(self.gguf_model.no_kv_cache) - .with_no_prefix_cache(self.gguf_model.prefix_cache_n.is_none()); + let mut runner = MistralRsBuilder::new( + pipeline, + scheduler_method, + self.gguf_model.throughput_logging, + ) + .with_no_kv_cache(self.gguf_model.no_kv_cache) + .with_no_prefix_cache(self.gguf_model.prefix_cache_n.is_none()); if let Some(n) = self.gguf_model.prefix_cache_n { runner = runner.with_prefix_cache_n(n) diff --git a/mistralrs/src/lora_model.rs b/mistralrs/src/lora_model.rs index 81220a4f18..336139d836 100644 --- a/mistralrs/src/lora_model.rs +++ b/mistralrs/src/lora_model.rs @@ -84,9 +84,13 @@ impl LoraModelBuilder { }, }; - let mut runner = MistralRsBuilder::new(pipeline, scheduler_method) - .with_no_kv_cache(self.text_model.no_kv_cache) - .with_no_prefix_cache(self.text_model.prefix_cache_n.is_none()); + let mut runner = MistralRsBuilder::new( + pipeline, + scheduler_method, + self.text_model.throughput_logging, + ) + .with_no_kv_cache(self.text_model.no_kv_cache) + .with_no_prefix_cache(self.text_model.prefix_cache_n.is_none()); if let Some(n) = self.text_model.prefix_cache_n { runner = runner.with_prefix_cache_n(n) diff --git a/mistralrs/src/speculative.rs b/mistralrs/src/speculative.rs index 82f60b9922..b743467afb 100644 --- a/mistralrs/src/speculative.rs +++ b/mistralrs/src/speculative.rs @@ -94,7 +94,8 @@ impl TextSpeculativeBuilder { self.speculative_config, )?)); - let runner = MistralRsBuilder::new(pipeline, scheduler_method); + let runner = + MistralRsBuilder::new(pipeline, scheduler_method, self.target.throughput_logging); Ok(Model::new(runner.build())) } diff --git a/mistralrs/src/text_model.rs b/mistralrs/src/text_model.rs index fed6a1f0e0..09704a7fc6 100644 --- a/mistralrs/src/text_model.rs +++ b/mistralrs/src/text_model.rs @@ -32,6 +32,7 @@ pub struct TextModelBuilder { pub(crate) dtype: ModelDType, pub(crate) force_cpu: bool, pub(crate) isq: Option, + pub(crate) throughput_logging: bool, // Other things pub(crate) paged_attn_cfg: Option, @@ -107,9 +108,16 @@ impl TextModelBuilder { imatrix: None, calibration_file: None, jinja_explicit: None, + throughput_logging: false, } } + /// Enable runner throughput logging. + pub fn with_throughput_logging(mut self) -> Self { + self.throughput_logging = true; + self + } + /// Explicit JINJA chat template file (.jinja) to be used. If specified, this overrides all other chat templates. pub fn with_jinja_explicit(mut self, jinja_explicit: String) -> Self { self.jinja_explicit = Some(jinja_explicit); @@ -320,7 +328,7 @@ impl TextModelBuilder { }, }; - let mut runner = MistralRsBuilder::new(pipeline, scheduler_method) + let mut runner = MistralRsBuilder::new(pipeline, scheduler_method, self.throughput_logging) .with_no_kv_cache(self.no_kv_cache) .with_no_prefix_cache(self.prefix_cache_n.is_none()); diff --git a/mistralrs/src/vision_model.rs b/mistralrs/src/vision_model.rs index 975d6abaa6..3d4990ac12 100644 --- a/mistralrs/src/vision_model.rs +++ b/mistralrs/src/vision_model.rs @@ -32,6 +32,7 @@ pub struct VisionModelBuilder { pub(crate) dtype: ModelDType, pub(crate) force_cpu: bool, pub(crate) isq: Option, + pub(crate) throughput_logging: bool, // Other things pub(crate) max_num_seqs: usize, @@ -66,9 +67,16 @@ impl VisionModelBuilder { calibration_file: None, imatrix: None, jinja_explicit: None, + throughput_logging: false, } } + /// Enable runner throughput logging. + pub fn with_throughput_logging(mut self) -> Self { + self.throughput_logging = true; + self + } + /// Explicit JINJA chat template file (.jinja) to be used. If specified, this overrides all other chat templates. pub fn with_jinja_explicit(mut self, jinja_explicit: String) -> Self { self.jinja_explicit = Some(jinja_explicit); @@ -221,7 +229,7 @@ impl VisionModelBuilder { method: DefaultSchedulerMethod::Fixed(self.max_num_seqs.try_into()?), }; - let runner = MistralRsBuilder::new(pipeline, scheduler_method) + let runner = MistralRsBuilder::new(pipeline, scheduler_method, self.throughput_logging) .with_no_kv_cache(false) .with_no_prefix_cache(false); diff --git a/mistralrs/src/xlora_model.rs b/mistralrs/src/xlora_model.rs index e4f2d647ce..77b016cf7e 100644 --- a/mistralrs/src/xlora_model.rs +++ b/mistralrs/src/xlora_model.rs @@ -96,9 +96,13 @@ impl XLoraModelBuilder { }, }; - let mut runner = MistralRsBuilder::new(pipeline, scheduler_method) - .with_no_kv_cache(self.text_model.no_kv_cache) - .with_no_prefix_cache(self.text_model.prefix_cache_n.is_none()); + let mut runner = MistralRsBuilder::new( + pipeline, + scheduler_method, + self.text_model.throughput_logging, + ) + .with_no_kv_cache(self.text_model.no_kv_cache) + .with_no_prefix_cache(self.text_model.prefix_cache_n.is_none()); if let Some(n) = self.text_model.prefix_cache_n { runner = runner.with_prefix_cache_n(n)