From 00edba895527c495c2e68027e39975fec7d299c8 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Tue, 24 Oct 2023 12:13:07 -0400 Subject: [PATCH 1/4] fix(amqp sink): remove unnecessary unwrap & emit event dropped errors --- src/internal_events/amqp.rs | 24 ++++++++++++++++++++++++ src/sinks/amqp/service.rs | 12 +++++------- src/sinks/amqp/sink.rs | 1 + 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/internal_events/amqp.rs b/src/internal_events/amqp.rs index dd5dde7b8e7b8..6087624002771 100644 --- a/src/internal_events/amqp.rs +++ b/src/internal_events/amqp.rs @@ -151,4 +151,28 @@ pub mod sink { }); } } + + #[derive(Debug)] + pub struct AmqpNackError; + + impl InternalEvent for AmqpNackError { + fn emit(self) { + let deliver_reason = "Received Negative Acknowledgement from AMQP broker."; + error!( + message = deliver_reason, + error_type = error_type::ACKNOWLEDGMENT_FAILED, + stage = error_stage::SENDING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::ACKNOWLEDGMENT_FAILED, + "stage" => error_stage::SENDING, + ); + emit!(ComponentEventsDropped:: { + count: 1, + reason: deliver_reason + }); + } + } } diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 42ccf467e5692..606e34306104a 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -1,7 +1,7 @@ //! The main tower service that takes the request created by the request builder //! and sends it to `AMQP`. use crate::{ - internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError}, + internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError, AmqpNackError}, sinks::prelude::*, }; use bytes::Bytes; @@ -92,6 +92,9 @@ pub(super) enum AmqpError { #[snafu(display("Failed AMQP request: {}", error))] AmqpDeliveryFailed { error: lapin::Error }, + + #[snafu(display("Recieved Negative Acknowledgement from AMQP broker."))] + AmqpNack, } impl Service for AmqpService { @@ -109,11 +112,6 @@ impl Service for AmqpService { let channel = Arc::clone(&self.channel); Box::pin(async move { - channel - .confirm_select(lapin::options::ConfirmSelectOptions::default()) - .await - .unwrap(); - let byte_size = req.body.len(); let fut = channel .basic_publish( @@ -128,7 +126,7 @@ impl Service for AmqpService { match fut { Ok(result) => match result.await { Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => { - warn!("Received Negative Acknowledgement from AMQP server."); + emit!(AmqpNackError); Ok(AmqpResponse { json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), byte_size, diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index 7039c6217993d..922065a58a7a3 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -43,6 +43,7 @@ impl AmqpSink { .await .map_err(|e| BuildError::AmqpCreateFailed { source: e })?; + // Enable confirmations on the channel. channel .confirm_select(ConfirmSelectOptions::default()) .await From a6ee467b5e951a934e4dade513bffd1c3d5a339c Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Tue, 24 Oct 2023 12:16:30 -0400 Subject: [PATCH 2/4] return error too --- src/sinks/amqp/service.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 606e34306104a..3b47c9809aa72 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -127,10 +127,7 @@ impl Service for AmqpService { Ok(result) => match result.await { Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => { emit!(AmqpNackError); - Ok(AmqpResponse { - json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), - byte_size, - }) + Err(AmqpError::AmqpNack) } Err(error) => { // TODO: In due course the caller could emit these on error. From 417db1ff9945c89cd4709c94778de362e9341768 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Tue, 24 Oct 2023 12:53:04 -0400 Subject: [PATCH 3/4] fix checks --- src/sinks/amqp/service.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 3b47c9809aa72..36e2845afa821 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -88,13 +88,13 @@ pub(super) struct AmqpService { #[derive(Debug, Snafu)] pub(super) enum AmqpError { #[snafu(display("Failed retrieving Acknowledgement: {}", error))] - AmqpAcknowledgementFailed { error: lapin::Error }, + AcknowledgementFailed { error: lapin::Error }, #[snafu(display("Failed AMQP request: {}", error))] - AmqpDeliveryFailed { error: lapin::Error }, + DeliveryFailed { error: lapin::Error }, - #[snafu(display("Recieved Negative Acknowledgement from AMQP broker."))] - AmqpNack, + #[snafu(display("Received Negative Acknowledgement from AMQP broker."))] + Nack, } impl Service for AmqpService { @@ -127,12 +127,12 @@ impl Service for AmqpService { Ok(result) => match result.await { Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => { emit!(AmqpNackError); - Err(AmqpError::AmqpNack) + Err(AmqpError::Nack) } Err(error) => { // TODO: In due course the caller could emit these on error. emit!(AmqpAcknowledgementError { error: &error }); - Err(AmqpError::AmqpAcknowledgementFailed { error }) + Err(AmqpError::AcknowledgementFailed { error }) } Ok(_) => Ok(AmqpResponse { json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), @@ -142,7 +142,7 @@ impl Service for AmqpService { Err(error) => { // TODO: In due course the caller could emit these on error. emit!(AmqpDeliveryError { error: &error }); - Err(AmqpError::AmqpDeliveryFailed { error }) + Err(AmqpError::DeliveryFailed { error }) } } }) From af8a31a83a5633bcf1fab8f69610ccdbf1a33c83 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Tue, 24 Oct 2023 13:46:00 -0400 Subject: [PATCH 4/4] feedback --- src/internal_events/amqp.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/internal_events/amqp.rs b/src/internal_events/amqp.rs index 6087624002771..a042b86278ed3 100644 --- a/src/internal_events/amqp.rs +++ b/src/internal_events/amqp.rs @@ -105,9 +105,9 @@ pub mod sink { impl InternalEvent for AmqpDeliveryError<'_> { fn emit(self) { - let deliver_reason = "Unable to deliver."; + const DELIVER_REASON: &str = "Unable to deliver."; - error!(message = deliver_reason, + error!(message = DELIVER_REASON, error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, @@ -120,7 +120,7 @@ pub mod sink { ); emit!(ComponentEventsDropped:: { count: 1, - reason: deliver_reason + reason: DELIVER_REASON }); } } @@ -132,9 +132,9 @@ pub mod sink { impl InternalEvent for AmqpAcknowledgementError<'_> { fn emit(self) { - let ack_reason = "Acknowledgement failed."; + const ACK_REASON: &str = "Acknowledgement failed."; - error!(message = ack_reason, + error!(message = ACK_REASON, error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, @@ -147,7 +147,7 @@ pub mod sink { ); emit!(ComponentEventsDropped:: { count: 1, - reason: ack_reason + reason: ACK_REASON }); } } @@ -157,9 +157,9 @@ pub mod sink { impl InternalEvent for AmqpNackError { fn emit(self) { - let deliver_reason = "Received Negative Acknowledgement from AMQP broker."; + const DELIVER_REASON: &str = "Received Negative Acknowledgement from AMQP broker."; error!( - message = deliver_reason, + message = DELIVER_REASON, error_type = error_type::ACKNOWLEDGMENT_FAILED, stage = error_stage::SENDING, internal_log_rate_limit = true, @@ -171,7 +171,7 @@ pub mod sink { ); emit!(ComponentEventsDropped:: { count: 1, - reason: deliver_reason + reason: DELIVER_REASON }); } }