diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 73b486199d..179c8e080c 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -912,6 +912,11 @@ fn spool_disk_usage_refresh_frequency_ms() -> u64 { 100 } +/// Default bounded buffer size for handling backpressure. +fn spool_max_backpressure_envelopes() -> usize { + 500 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -955,6 +960,9 @@ pub struct EnvelopeSpool { /// internal page stats. #[serde(default = "spool_disk_usage_refresh_frequency_ms")] disk_usage_refresh_frequency_ms: u64, + /// The amount of envelopes that the envelope buffer can push to its output queue. + #[serde(default = "spool_max_backpressure_envelopes")] + max_backpressure_envelopes: usize, /// Version of the spooler. #[serde(default)] version: EnvelopeSpoolVersion, @@ -991,6 +999,7 @@ impl Default for EnvelopeSpool { max_batches: spool_envelopes_stack_max_batches(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(), + max_backpressure_envelopes: spool_max_backpressure_envelopes(), version: EnvelopeSpoolVersion::default(), } } @@ -2212,6 +2221,11 @@ impl Config { Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms) } + /// Returns the maximum number of envelopes that can be put in the bounded buffer. + pub fn spool_max_backpressure_envelopes(&self) -> usize { + self.values.spool.envelopes.max_backpressure_envelopes + } + /// Returns the maximum size of an event payload in bytes. pub fn max_event_size(&self) -> usize { self.values.limits.max_event_size.as_bytes() diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 289014f858..28364dde65 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -3,9 +3,6 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use crate::metrics::{MetricOutcomes, MetricStats}; -use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer}; -use crate::services::stats::RelayStats; use anyhow::{Context, Result}; use axum::extract::FromRequestParts; use axum::http::request::Parts; @@ -15,7 +12,10 @@ use relay_config::{Config, RedisConnection, RedisPoolConfigs}; use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; use relay_system::{channel, Addr, Service}; use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use crate::metrics::{MetricOutcomes, MetricStats}; +use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer}; use crate::services::cogs::{CogsService, CogsServiceRecorder}; use crate::services::global_config::{GlobalConfigManager, GlobalConfigService}; use crate::services::health_check::{HealthCheck, HealthCheckService}; @@ -25,6 +25,7 @@ use crate::services::outcome_aggregator::OutcomeAggregator; use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService}; use crate::services::project_cache::{ProjectCache, ProjectCacheService, Services}; use crate::services::relays::{RelayCache, RelayCacheService}; +use crate::services::stats::RelayStats; #[cfg(feature = "processing")] use crate::services::store::StoreService; use crate::services::test_store::{TestStore, TestStoreService}; @@ -241,11 +242,13 @@ impl ServiceState { ) .spawn_handler(processor_rx); + let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes()); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), global_config_rx.clone(), buffer::Services { + envelopes_tx, project_cache: project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), test_store: test_store.clone(), @@ -269,6 +272,7 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_services, global_config_rx, + envelopes_rx, redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index cfdccda21d..1fb92940c7 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -8,10 +8,10 @@ use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; use relay_system::{Controller, Shutdown}; -use tokio::sync::watch; +use tokio::sync::mpsc::Permit; +use tokio::sync::{mpsc, watch}; use tokio::time::{timeout, Instant}; use crate::envelope::Envelope; @@ -21,11 +21,10 @@ use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; use crate::services::processor::ProcessingGroup; -use crate::services::project_cache::DequeuedEnvelope; -use crate::services::project_cache::ProjectCache; -use crate::services::project_cache::UpdateProject; +use crate::services::project_cache::{DequeuedEnvelope, ProjectCache, UpdateProject}; + use crate::services::test_store::TestStore; -use crate::statsd::RelayCounters; +use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::ManagedEnvelope; use crate::utils::MemoryChecker; @@ -95,7 +94,11 @@ impl ObservableEnvelopeBuffer { } /// Services that the buffer service communicates with. +#[derive(Clone)] pub struct Services { + /// Bounded channel used exclusively to handle backpressure when sending envelopes to the + /// project cache. + pub envelopes_tx: mpsc::Sender, pub project_cache: Addr, pub outcome_aggregator: Addr, pub test_store: Addr, @@ -151,8 +154,8 @@ impl EnvelopeBufferService { /// Wait for the configured amount of time and make sure the project cache is ready to receive. async fn ready_to_pop( &mut self, - buffer: &mut PolymorphicEnvelopeBuffer, - ) -> Result<(), SendError> { + buffer: &PolymorphicEnvelopeBuffer, + ) -> Option> { relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checking" @@ -174,12 +177,14 @@ impl EnvelopeBufferService { status = "slept" ); + let permit = self.services.envelopes_tx.reserve().await.ok(); + relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checked" ); - Ok(()) + permit } /// Waits until preconditions for unspooling are met. @@ -203,26 +208,35 @@ impl EnvelopeBufferService { } /// Tries to pop an envelope for a ready project. - async fn try_pop( - &mut self, + async fn try_pop<'a>( + config: &Config, buffer: &mut PolymorphicEnvelopeBuffer, - ) -> Result<(), EnvelopeBufferError> { + services: &Services, + envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, + ) -> Result { relay_log::trace!("EnvelopeBufferService: peeking the buffer"); - match buffer.peek().await? { + + let sleep = match buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService: peek returned empty"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, peek_result = "empty" ); - self.sleep = Duration::MAX; // wait for reset by `handle_message`. + + Duration::MAX // wait for reset by `handle_message`. } - Peek::Ready(envelope) | Peek::NotReady(.., envelope) if self.expired(envelope) => { + Peek::Ready(envelope) | Peek::NotReady(.., envelope) + if Self::expired(config, envelope) => + { let envelope = buffer .pop() .await? .expect("Element disappeared despite exclusive excess"); - self.drop_expired(envelope); + + Self::drop_expired(envelope, services); + + Duration::ZERO // try next pop immediately } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService: popping envelope"); @@ -234,8 +248,9 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); - self.services.project_cache.send(DequeuedEnvelope(envelope)); - self.sleep = Duration::ZERO; // try next pop immediately + envelopes_tx_permit.send(DequeuedEnvelope(envelope)); + + Duration::ZERO // try next pop immediately } Peek::NotReady(stack_key, next_project_fetch, envelope) => { relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready"); @@ -249,15 +264,14 @@ impl EnvelopeBufferService { // avoid flooding the project cache with `UpdateProject` messages. if Instant::now() >= next_project_fetch { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); - let project_key = envelope.meta().public_key(); - self.services.project_cache.send(UpdateProject(project_key)); + let own_key = envelope.meta().public_key(); + + services.project_cache.send(UpdateProject(own_key)); match envelope.sampling_key() { None => {} - Some(sampling_key) if sampling_key == project_key => {} // already sent. + Some(sampling_key) if sampling_key == own_key => {} // already sent. Some(sampling_key) => { - self.services - .project_cache - .send(UpdateProject(sampling_key)); + services.project_cache.send(UpdateProject(sampling_key)); } } @@ -266,32 +280,28 @@ impl EnvelopeBufferService { buffer.mark_seen(&stack_key, DEFAULT_SLEEP); } - self.sleep = DEFAULT_SLEEP; + DEFAULT_SLEEP // wait and prioritize handling new messages. } - } + }; - Ok(()) + Ok(sleep) } - fn expired(&self, envelope: &Envelope) -> bool { - envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() + fn expired(config: &Config, envelope: &Envelope) -> bool { + envelope.meta().start_time().elapsed() > config.spool_envelopes_max_age() } - fn drop_expired(&self, envelope: Box) { + fn drop_expired(envelope: Box, services: &Services) { let mut managed_envelope = ManagedEnvelope::new( envelope, - self.services.outcome_aggregator.clone(), - self.services.test_store.clone(), + services.outcome_aggregator.clone(), + services.test_store.clone(), ProcessingGroup::Ungrouped, ); managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); } - async fn handle_message( - &mut self, - buffer: &mut PolymorphicEnvelopeBuffer, - message: EnvelopeBuffer, - ) { + async fn handle_message(buffer: &mut PolymorphicEnvelopeBuffer, message: EnvelopeBuffer) { match message { EnvelopeBuffer::Push(envelope) => { // NOTE: This function assumes that a project state update for the relevant @@ -299,7 +309,7 @@ impl EnvelopeBufferService { // For better separation of concerns, this prefetch should be triggered from here // once buffer V1 has been removed. relay_log::trace!("EnvelopeBufferService: received push message"); - self.push(buffer, envelope).await; + Self::push(buffer, envelope).await; } EnvelopeBuffer::NotReady(project_key, envelope) => { relay_log::trace!( @@ -307,7 +317,7 @@ impl EnvelopeBufferService { &project_key ); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1); - self.push(buffer, envelope).await; + Self::push(buffer, envelope).await; buffer.mark_ready(&project_key, false); } EnvelopeBuffer::Ready(project_key) => { @@ -318,14 +328,9 @@ impl EnvelopeBufferService { buffer.mark_ready(&project_key, true); } }; - self.sleep = Duration::ZERO; } - async fn handle_shutdown( - &mut self, - buffer: &mut PolymorphicEnvelopeBuffer, - message: Shutdown, - ) -> bool { + async fn handle_shutdown(buffer: &mut PolymorphicEnvelopeBuffer, message: Shutdown) -> bool { // We gracefully shut down only if the shutdown has a timeout. if let Some(shutdown_timeout) = message.timeout { relay_log::trace!("EnvelopeBufferService: shutting down gracefully"); @@ -347,7 +352,7 @@ impl EnvelopeBufferService { false } - async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { + async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, @@ -369,6 +374,8 @@ impl Service for EnvelopeBufferService { let config = self.config.clone(); let memory_checker = self.memory_checker.clone(); let mut global_config_rx = self.global_config_rx.clone(); + let services = self.services.clone(); + tokio::spawn(async move { let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; @@ -392,36 +399,51 @@ impl Service for EnvelopeBufferService { iteration += 1; relay_log::trace!("EnvelopeBufferService: loop iteration {iteration}"); + let used_capacity = self.services.envelopes_tx.max_capacity() + - self.services.envelopes_tx.capacity(); + relay_statsd::metric!( + histogram(RelayHistograms::BufferBackpressureEnvelopesCount) = + used_capacity as u64 + ); + + let mut sleep = Duration::MAX; tokio::select! { // NOTE: we do not select a bias here. // On the one hand, we might want to prioritize dequeuing over enqueuing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - Ok(()) = self.ready_to_pop(&mut buffer) => { - if let Err(e) = self.try_pop(&mut buffer).await { - relay_log::error!( - error = &e as &dyn std::error::Error, + Some(permit) = self.ready_to_pop(&buffer) => { + match Self::try_pop(&config, &mut buffer, &services, permit).await { + Ok(new_sleep) => { + sleep = new_sleep; + } + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, "failed to pop envelope" ); + } } } Some(message) = rx.recv() => { - self.handle_message(&mut buffer, message).await; + Self::handle_message(&mut buffer, message).await; + sleep = Duration::ZERO; } shutdown = shutdown.notified() => { // In case the shutdown was handled, we break out of the loop signaling that // there is no need to process anymore envelopes. - if self.handle_shutdown(&mut buffer, shutdown).await { + if Self::handle_shutdown(&mut buffer, shutdown).await { break; } } _ = global_config_rx.changed() => { relay_log::trace!("EnvelopeBufferService: received global config"); - self.sleep = Duration::ZERO; // Try to pop + sleep = Duration::ZERO; } else => break, } + self.sleep = sleep; self.update_observable_state(&mut buffer); } @@ -447,6 +469,7 @@ mod tests { fn buffer_service() -> ( EnvelopeBufferService, watch::Sender, + mpsc::Receiver, mpsc::UnboundedReceiver, mpsc::UnboundedReceiver, ) { @@ -462,6 +485,7 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); ( @@ -470,6 +494,7 @@ mod tests { memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator, test_store: Addr::dummy(), @@ -477,6 +502,7 @@ mod tests { ) .unwrap(), global_tx, + envelopes_rx, project_cache_rx, outcome_aggregator_rx, ) @@ -485,7 +511,7 @@ mod tests { #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _global_rx, _project_cache_tx, _) = buffer_service(); + let (service, _global_tx, _envelopes_rx, _project_cache_tx, _) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -506,8 +532,9 @@ mod tests { #[tokio::test] async fn pop_requires_global_config() { + relay_log::init_test!(); tokio::time::pause(); - let (service, global_tx, project_cache_rx, _) = buffer_service(); + let (service, global_tx, envelopes_rx, project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -520,6 +547,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Nothing was dequeued, global config not ready: + assert_eq!(envelopes_rx.len(), 0); assert_eq!(project_cache_rx.len(), 0); global_tx.send_replace(global_config::Status::Ready(Arc::new( @@ -529,7 +557,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Dequeued, global config ready: - assert_eq!(project_cache_rx.len(), 1); + assert_eq!(envelopes_rx.len(), 1); + assert_eq!(project_cache_rx.len(), 0); } #[tokio::test] @@ -555,12 +584,14 @@ mod tests { GlobalConfig::default(), ))); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); let (project_cache, project_cache_rx) = Addr::custom(); let service = EnvelopeBufferService::new( config, memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator: Addr::dummy(), test_store: Addr::dummy(), @@ -578,6 +609,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Nothing was dequeued, memory not ready: + assert_eq!(envelopes_rx.len(), 0); assert_eq!(project_cache_rx.len(), 0); } @@ -598,6 +630,7 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); let service = EnvelopeBufferService::new( @@ -605,6 +638,7 @@ mod tests { memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator, test_store: Addr::dummy(), @@ -628,7 +662,9 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; - assert!(project_cache_rx.is_empty()); + assert_eq!(envelopes_rx.len(), 0); + assert_eq!(project_cache_rx.len(), 0); + let outcome = outcome_aggregator_rx.try_recv().unwrap(); assert_eq!(outcome.category, DataCategory::TransactionIndexed); assert_eq!(outcome.quantity, 1); @@ -637,7 +673,7 @@ mod tests { #[tokio::test] async fn test_update_project() { tokio::time::pause(); - let (service, global_tx, mut project_cache_rx, _) = buffer_service(); + let (service, global_tx, mut envelopes_rx, mut project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -652,9 +688,8 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; - // We expect the project update request to be sent. - let Some(ProjectCache::HandleDequeuedEnvelope(envelope, _)) = project_cache_rx.recv().await - else { + // We expect the envelope to be forwarded because by default we mark the project as ready. + let Some(DequeuedEnvelope(envelope)) = envelopes_rx.recv().await else { panic!(); }; @@ -662,6 +697,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; + // We expect the project update request to be sent. assert_eq!(project_cache_rx.len(), 1); let message = project_cache_rx.recv().await; assert!(matches!( @@ -671,10 +707,56 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; + // We expect the project update request to be sent. assert_eq!(project_cache_rx.len(), 1); assert!(matches!( message, Some(ProjectCache::UpdateProject(key)) if key == project_key )) } + + #[tokio::test] + async fn output_is_throttled() { + tokio::time::pause(); + let (service, global_tx, mut envelopes_rx, _project_cache_rx, _) = buffer_service(); + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let addr = service.start(); + + // Send 10 messages, with a bounded queue size of 5. + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + for _ in 0..10 { + addr.send(EnvelopeBuffer::Push(envelope.clone())); + } + addr.send(EnvelopeBuffer::Ready(project_key)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut messages = vec![]; + envelopes_rx.recv_many(&mut messages, 100).await; + + assert_eq!( + messages + .iter() + .filter(|message| matches!(message, DequeuedEnvelope(..))) + .count(), + 5 + ); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut messages = vec![]; + envelopes_rx.recv_many(&mut messages, 100).await; + + assert_eq!( + messages + .iter() + .filter(|message| matches!(message, DequeuedEnvelope(..))) + .count(), + 5 + ); + } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index be595f340a..690671a434 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -255,6 +255,7 @@ pub struct SpoolHealth; pub struct RefreshIndexCache(pub HashSet); /// Handle an envelope that was popped from the envelope buffer. +#[derive(Debug)] pub struct DequeuedEnvelope(pub Box); /// A request to update a project, typically sent by the envelope buffer. @@ -295,7 +296,6 @@ pub enum ProjectCache { UpdateSpoolIndex(UpdateSpoolIndex), SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), - HandleDequeuedEnvelope(Box, Sender<()>), UpdateProject(ProjectKey), } @@ -314,7 +314,6 @@ impl ProjectCache { Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", } } @@ -421,15 +420,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse<()>; - - fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self { - let DequeuedEnvelope(envelope) = message; - Self::HandleDequeuedEnvelope(envelope, sender) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -1308,27 +1298,26 @@ impl ProjectCacheBroker { ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::HandleDequeuedEnvelope(message, sender) => { - let envelope_buffer = self - .services - .envelope_buffer - .clone() - .expect("Called HandleDequeuedEnvelope without an envelope buffer"); - - if let Err(e) = self.handle_dequeued_envelope(message, envelope_buffer) { - relay_log::error!( - error = &e as &dyn std::error::Error, - "Failed to handle popped envelope" - ); - } - // Return response to signal readiness for next envelope: - sender.send(()) - } ProjectCache::UpdateProject(project) => self.handle_update_project(project), } } ) } + + fn handle_envelope(&mut self, dequeued_envelope: DequeuedEnvelope) { + let envelope_buffer = self + .services + .envelope_buffer + .clone() + .expect("Called HandleDequeuedEnvelope without an envelope buffer"); + + if let Err(e) = self.handle_dequeued_envelope(dequeued_envelope.0, envelope_buffer) { + relay_log::error!( + error = &e as &dyn std::error::Error, + "Failed to handle popped envelope" + ); + } + } } /// Service implementing the [`ProjectCache`] interface. @@ -1338,6 +1327,8 @@ pub struct ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, + /// Bounded channel used exclusively to receive envelopes from the envelope buffer. + envelopes_rx: mpsc::Receiver, redis: Option, } @@ -1348,6 +1339,7 @@ impl ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, + envelopes_rx: mpsc::Receiver, redis: Option, ) -> Self { Self { @@ -1355,6 +1347,7 @@ impl ProjectCacheService { memory_checker, services, global_config_rx, + envelopes_rx, redis, } } @@ -1369,6 +1362,7 @@ impl Service for ProjectCacheService { memory_checker, services, mut global_config_rx, + mut envelopes_rx, redis, } = self; let project_cache = services.project_cache.clone(); @@ -1489,6 +1483,11 @@ impl Service for ProjectCacheService { broker.handle_periodic_unspool() }) } + Some(message) = envelopes_rx.recv() => { + metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_envelope", { + broker.handle_envelope(message) + }) + } Some(message) = rx.recv() => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message", { broker.handle_message(message) diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 103fc03e79..fbae91d779 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -178,6 +178,9 @@ pub enum RelayHistograms { /// This metric is tagged with: /// - `storage_type`: The type of storage used in the envelope buffer. BufferEnvelopesCount, + /// Number of envelopes in the backpressure buffer between the envelope buffer + /// and the project cache. + BufferBackpressureEnvelopesCount, /// The number of batches emitted per partition. BatchesPerPartition, /// The number of buckets in a batch emitted. @@ -303,6 +306,9 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::BufferDiskSize => "buffer.disk_size", RelayHistograms::BufferDequeueAttempts => "buffer.dequeue_attempts", RelayHistograms::BufferEnvelopesCount => "buffer.envelopes_count", + RelayHistograms::BufferBackpressureEnvelopesCount => { + "buffer.backpressure_envelopes_count" + } RelayHistograms::ProjectStatePending => "project_state.pending", RelayHistograms::ProjectStateAttempts => "project_state.attempts", RelayHistograms::ProjectStateRequestBatchSize => "project_state.request.batch_size",