Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/engines/mistralrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ impl
id: None,
data: Some(delta),
event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None,
};
yield ann;
Expand Down Expand Up @@ -566,6 +569,9 @@ impl AsyncEngine<SingleIn<CompletionRequest>, ManyOut<Annotated<CompletionRespon
id: None,
data: Some(inner),
event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None,
};
yield ann;
Expand Down
8 changes: 4 additions & 4 deletions lib/llm/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,15 @@ impl
let response = NvCreateChatCompletionStreamResponse {
inner,
};
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, chunk_tokens: None, input_tokens: None, output_tokens: None, comment: None };
id += 1;
}

let inner = deltas.create_choice(0, None, Some(async_openai::types::FinishReason::Stop), None);
let response = NvCreateChatCompletionStreamResponse {
inner,
};
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, chunk_tokens: None, input_tokens: None, output_tokens: None, comment: None };
};

Ok(ResponseStream::new(Box::pin(output), ctx))
Expand All @@ -234,11 +234,11 @@ impl AsyncEngine<SingleIn<CompletionRequest>, ManyOut<Annotated<CompletionRespon
for c in chars_string.chars() {
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let response = deltas.create_choice(0, Some(c.to_string()), None);
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, chunk_tokens: None, input_tokens: None, output_tokens: None, comment: None };
id += 1;
}
let response = deltas.create_choice(0, None, Some("stop".to_string()));
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, chunk_tokens: None, input_tokens: None, output_tokens: None, comment: None };

};

Expand Down
167 changes: 165 additions & 2 deletions lib/llm/src/http/service/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Router};
use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts};
use std::{sync::Arc, time::Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};

pub use prometheus::Registry;

Expand All @@ -25,6 +28,10 @@ pub struct Metrics {
request_counter: IntCounterVec,
inflight_gauge: IntGaugeVec,
request_duration: HistogramVec,
input_sequence_length: HistogramVec,
output_sequence_length: HistogramVec,
time_to_first_token: HistogramVec,
inter_token_latency: HistogramVec,
}

/// RAII object for inflight gauge and request counters
Expand Down Expand Up @@ -68,6 +75,20 @@ pub enum Status {
Error,
}

/// Track response-specific metrics
pub struct ResponseMetricCollector {
metrics: Arc<Metrics>,
model: String,
start_time: Instant,
// we use is_first_token to distinguish TTFT from ITL. It is true by default and
// flipped to false when the first token is returned and TTFT is published.
is_first_token: bool,
// we track the last response time so that ITL for the newly returned tokens can
// be computed.
last_response_time: Option<Duration>,
osl: usize,
}

impl Default for Metrics {
fn default() -> Self {
Self::new("nv_llm")
Expand All @@ -80,6 +101,10 @@ impl Metrics {
/// - `{prefix}_http_service_requests_total` - IntCounterVec for the total number of requests processed
/// - `{prefix}_http_service_inflight_requests` - IntGaugeVec for the number of inflight requests
/// - `{prefix}_http_service_request_duration_seconds` - HistogramVec for the duration of requests
/// - `{prefix}_http_service_input_sequence_tokens` - HistogramVec for input sequence length in tokens
/// - `{prefix}_http_service_output_sequence_tokens` - HistogramVec for output sequence length in tokens
/// - `{prefix}_http_service_time_to_first_token_seconds` - HistogramVec for time to first token in seconds
/// - `{prefix}_http_service_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds
pub fn new(prefix: &str) -> Self {
let request_counter = IntCounterVec::new(
Opts::new(
Expand Down Expand Up @@ -111,10 +136,64 @@ impl Metrics {
)
.unwrap();

let input_sequence_length = HistogramVec::new(
HistogramOpts::new(
format!("{}_http_service_input_sequence_tokens", prefix),
"Input sequence length in tokens",
)
.buckets(vec![
0.0, 50.0, 100.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0, 64000.0,
128000.0,
]),
&["model"],
)
.unwrap();

let output_sequence_length = HistogramVec::new(
HistogramOpts::new(
format!("{}_http_service_output_sequence_tokens", prefix),
"Output sequence length in tokens",
)
.buckets(vec![
0.0, 50.0, 100.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0,
]),
&["model"],
)
.unwrap();

let time_to_first_token = HistogramVec::new(
HistogramOpts::new(
format!("{}_http_service_time_to_first_token_seconds", prefix),
"Time to first token in seconds",
)
.buckets(vec![
0.0, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0,
60.0, 120.0, 240.0, 480.0,
]),
&["model"],
)
.unwrap();

let inter_token_latency = HistogramVec::new(
HistogramOpts::new(
format!("{}_http_service_inter_token_latency_seconds", prefix),
"Inter-token latency in seconds",
)
.buckets(vec![
0.0, 0.001, 0.005, 0.01, 0.015, 0.02, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0,
]),
&["model"],
)
.unwrap();

Metrics {
request_counter,
inflight_gauge,
request_duration,
input_sequence_length,
output_sequence_length,
time_to_first_token,
inter_token_latency,
}
}

Expand Down Expand Up @@ -179,6 +258,10 @@ impl Metrics {
registry.register(Box::new(self.request_counter.clone()))?;
registry.register(Box::new(self.inflight_gauge.clone()))?;
registry.register(Box::new(self.request_duration.clone()))?;
registry.register(Box::new(self.input_sequence_length.clone()))?;
registry.register(Box::new(self.output_sequence_length.clone()))?;
registry.register(Box::new(self.time_to_first_token.clone()))?;
registry.register(Box::new(self.inter_token_latency.clone()))?;
Ok(())
}

Expand All @@ -199,7 +282,17 @@ impl Metrics {
RequestType::Unary
};

InflightGuard::new(self.clone(), model.to_string(), endpoint, request_type)
InflightGuard::new(
self.clone(),
model.to_string().to_lowercase(),
endpoint,
request_type,
)
}

/// Create a new [`ResponseMetricCollector`] for collecting per-response metrics (i.e., TTFT, ITL)
pub fn create_response_collector(self: Arc<Self>, model: &str) -> ResponseMetricCollector {
ResponseMetricCollector::new(self, model.to_string().to_lowercase())
}
}

Expand Down Expand Up @@ -293,6 +386,76 @@ impl Status {
}
}

impl ResponseMetricCollector {
fn new(metrics: Arc<Metrics>, model: String) -> Self {
ResponseMetricCollector {
metrics,
model,
is_first_token: true,
last_response_time: None,
start_time: Instant::now(),
osl: 0,
}
}

/// Observe the current output sequence length
pub fn observe_current_osl(&mut self, osl: usize) {
self.osl = osl;
}

/// Observe a response with input sequence length and number of new tokens
pub fn observe_response(&mut self, isl: usize, num_tokens: usize) {
if num_tokens == 0 {
return;
}

if self.is_first_token {
// NOTE: when there are multiple tokens in the first response,
// we use the full response time as TTFT and ignore the ITL
self.is_first_token = false;

// Publish TTFT
let ttft = self.start_time.elapsed().as_secs_f64();
self.metrics
.time_to_first_token
.with_label_values(&[&self.model])
.observe(ttft);

// Publish ISL
// TODO: publish ISL as soon as the tokenization process completes
self.metrics
.input_sequence_length
.with_label_values(&[&self.model])
.observe(isl as f64);
}

let current_duration = self.start_time.elapsed();

if let Some(last_response_time) = self.last_response_time {
let response_duration = current_duration - last_response_time;
let itl = response_duration.as_secs_f64() / num_tokens as f64;
for _ in 0..num_tokens {
self.metrics
.inter_token_latency
.with_label_values(&[&self.model])
.observe(itl);
}
}

self.last_response_time = Some(current_duration);
}
}

impl Drop for ResponseMetricCollector {
fn drop(&mut self) {
// Publish final OSL when the collector is dropped
self.metrics
.output_sequence_length
.with_label_values(&[&self.model])
.observe(self.osl as f64);
}
}

/// Create a new router with the given path
pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Router) {
let registry = Arc::new(registry);
Expand Down
Loading
Loading