Skip to content

Commit

Permalink
feat: send 503 error when queue is full (#967)
Browse files Browse the repository at this point in the history
* feat: send 503 error when queue is full

* docs: update changelog

* fix: use new method to create an error
  • Loading branch information
SantiagoPittella authored Nov 14, 2024
1 parent ffc0fd9 commit 6e0e29f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## TBD

### Changes

- [BREAKING] Better error display when queues are full in the prover service (#967).

## 0.6.1 (2024-11-08)

### Features

- [BREAKING] Added CLI for the transaction prover services both the workers and the proxy (#955).
Expand Down
28 changes: 26 additions & 2 deletions bin/tx-prover/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use tracing::error;

use crate::commands::ProxyConfig;

const RESOURCE_EXHAUSTED_CODE: u16 = 8;

/// Load balancer that uses a round robin strategy
pub struct LoadBalancer {
lb: Arc<PingoraLoadBalancer<RoundRobin>>,
Expand Down Expand Up @@ -53,6 +55,25 @@ impl LoadBalancer {
Ok(true)
}

/// Create a 503 response for a full queue
pub async fn create_queue_full_response(session: &mut Session) -> Result<bool> {
// Set grpc-message header to "Too many requests in the queue"
// This is meant to be used by a Tonic interceptor to return a gRPC error
let mut header = ResponseHeader::build(503, None)?;
header.insert_header("grpc-message", "Too many requests in the queue".to_string())?;
header.insert_header("grpc-status", RESOURCE_EXHAUSTED_CODE)?;
session.set_keepalive(None);
session.write_response_header(Box::new(header.clone()), true).await?;

let mut error = Error::new(ErrorType::HTTPStatus(503))
.more_context("Too many requests in the queue")
.into_in();
error.set_cause("Too many requests in the queue");

session.write_response_header(Box::new(header), false).await?;
Err(error)
}

/// Remove the request ID from the corresponding worker queue
pub async fn remove_request_from_queue(request_id: &str) {
let mut ctx_guard = QUEUES.write().await;
Expand Down Expand Up @@ -167,7 +188,10 @@ impl ProxyHttp for LoadBalancer {
let worker = self.lb.select(b"", 256).ok_or(Error::new_str("Worker not found"))?;

// Read request ID from headers
let request_id = Self::get_request_id(session)?;
let request_id = {
let id = Self::get_request_id(session)?;
id.to_string()
};

// Enqueue the request ID in the worker queue
// We use a new scope to release the lock after the operation
Expand All @@ -177,7 +201,7 @@ impl ProxyHttp for LoadBalancer {

// Limit queue length
if worker_queue.len() >= self.max_queue_items {
return Err(Error::new_str("Too many requests in the queue"));
Self::create_queue_full_response(session).await?;
}

worker_queue.push(request_id.to_string());
Expand Down

0 comments on commit 6e0e29f

Please sign in to comment.