diff --git a/src/internal_events/amqp.rs b/src/internal_events/amqp.rs index dd5dde7b8e7b8..a042b86278ed3 100644 --- a/src/internal_events/amqp.rs +++ b/src/internal_events/amqp.rs @@ -105,9 +105,9 @@ pub mod sink { impl InternalEvent for AmqpDeliveryError<'_> { fn emit(self) { - let deliver_reason = "Unable to deliver."; + const DELIVER_REASON: &str = "Unable to deliver."; - error!(message = deliver_reason, + error!(message = DELIVER_REASON, error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, @@ -120,7 +120,7 @@ pub mod sink { ); emit!(ComponentEventsDropped:: { count: 1, - reason: deliver_reason + reason: DELIVER_REASON }); } } @@ -132,9 +132,9 @@ pub mod sink { impl InternalEvent for AmqpAcknowledgementError<'_> { fn emit(self) { - let ack_reason = "Acknowledgement failed."; + const ACK_REASON: &str = "Acknowledgement failed."; - error!(message = ack_reason, + error!(message = ACK_REASON, error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, @@ -147,7 +147,31 @@ pub mod sink { ); emit!(ComponentEventsDropped:: { count: 1, - reason: ack_reason + reason: ACK_REASON + }); + } + } + + #[derive(Debug)] + pub struct AmqpNackError; + + impl InternalEvent for AmqpNackError { + fn emit(self) { + const DELIVER_REASON: &str = "Received Negative Acknowledgement from AMQP broker."; + error!( + message = DELIVER_REASON, + error_type = error_type::ACKNOWLEDGMENT_FAILED, + stage = error_stage::SENDING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::ACKNOWLEDGMENT_FAILED, + "stage" => error_stage::SENDING, + ); + emit!(ComponentEventsDropped:: { + count: 1, + reason: DELIVER_REASON }); } } diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 42ccf467e5692..36e2845afa821 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -1,7 +1,7 @@ //! The main tower service that takes the request created by the request builder //! and sends it to `AMQP`. use crate::{ - internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError}, + internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError, AmqpNackError}, sinks::prelude::*, }; use bytes::Bytes; @@ -88,10 +88,13 @@ pub(super) struct AmqpService { #[derive(Debug, Snafu)] pub(super) enum AmqpError { #[snafu(display("Failed retrieving Acknowledgement: {}", error))] - AmqpAcknowledgementFailed { error: lapin::Error }, + AcknowledgementFailed { error: lapin::Error }, #[snafu(display("Failed AMQP request: {}", error))] - AmqpDeliveryFailed { error: lapin::Error }, + DeliveryFailed { error: lapin::Error }, + + #[snafu(display("Received Negative Acknowledgement from AMQP broker."))] + Nack, } impl Service for AmqpService { @@ -109,11 +112,6 @@ impl Service for AmqpService { let channel = Arc::clone(&self.channel); Box::pin(async move { - channel - .confirm_select(lapin::options::ConfirmSelectOptions::default()) - .await - .unwrap(); - let byte_size = req.body.len(); let fut = channel .basic_publish( @@ -128,16 +126,13 @@ impl Service for AmqpService { match fut { Ok(result) => match result.await { Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => { - warn!("Received Negative Acknowledgement from AMQP server."); - Ok(AmqpResponse { - json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), - byte_size, - }) + emit!(AmqpNackError); + Err(AmqpError::Nack) } Err(error) => { // TODO: In due course the caller could emit these on error. emit!(AmqpAcknowledgementError { error: &error }); - Err(AmqpError::AmqpAcknowledgementFailed { error }) + Err(AmqpError::AcknowledgementFailed { error }) } Ok(_) => Ok(AmqpResponse { json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), @@ -147,7 +142,7 @@ impl Service for AmqpService { Err(error) => { // TODO: In due course the caller could emit these on error. emit!(AmqpDeliveryError { error: &error }); - Err(AmqpError::AmqpDeliveryFailed { error }) + Err(AmqpError::DeliveryFailed { error }) } } }) diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index 7039c6217993d..922065a58a7a3 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -43,6 +43,7 @@ impl AmqpSink { .await .map_err(|e| BuildError::AmqpCreateFailed { source: e })?; + // Enable confirmations on the channel. channel .confirm_select(ConfirmSelectOptions::default()) .await