Skip to content

Commit

Permalink
feat(spooler): Implement backpressure in spooler via bounded queues (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Sep 23, 2024
1 parent d7ac14d commit 59861f1
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 91 deletions.
14 changes: 14 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,11 @@ fn spool_disk_usage_refresh_frequency_ms() -> u64 {
100
}

/// Default bounded buffer size for handling backpressure.
fn spool_max_backpressure_envelopes() -> usize {
500
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand Down Expand Up @@ -955,6 +960,9 @@ pub struct EnvelopeSpool {
/// internal page stats.
#[serde(default = "spool_disk_usage_refresh_frequency_ms")]
disk_usage_refresh_frequency_ms: u64,
/// The amount of envelopes that the envelope buffer can push to its output queue.
#[serde(default = "spool_max_backpressure_envelopes")]
max_backpressure_envelopes: usize,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
Expand Down Expand Up @@ -991,6 +999,7 @@ impl Default for EnvelopeSpool {
max_batches: spool_envelopes_stack_max_batches(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
version: EnvelopeSpoolVersion::default(),
}
}
Expand Down Expand Up @@ -2212,6 +2221,11 @@ impl Config {
Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
}

/// Returns the maximum number of envelopes that can be put in the bounded buffer.
pub fn spool_max_backpressure_envelopes(&self) -> usize {
self.values.spool.envelopes.max_backpressure_envelopes
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down
10 changes: 7 additions & 3 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
Expand All @@ -15,7 +12,10 @@ use relay_config::{Config, RedisConnection, RedisPoolConfigs};
use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
use crate::services::health_check::{HealthCheck, HealthCheckService};
Expand All @@ -25,6 +25,7 @@ use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService};
use crate::services::project_cache::{ProjectCache, ProjectCacheService, Services};
use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
#[cfg(feature = "processing")]
use crate::services::store::StoreService;
use crate::services::test_store::{TestStore, TestStoreService};
Expand Down Expand Up @@ -241,11 +242,13 @@ impl ServiceState {
)
.spawn_handler(processor_rx);

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());
let envelope_buffer = EnvelopeBufferService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
global_config_rx.clone(),
buffer::Services {
envelopes_tx,
project_cache: project_cache.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
Expand All @@ -269,6 +272,7 @@ impl ServiceState {
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_services,
global_config_rx,
envelopes_rx,
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
Expand Down
Loading

0 comments on commit 59861f1

Please sign in to comment.