Skip to content

Commit 6f6aed3

Browse files
authored
Merge pull request #2089 from input-output-hk/sfa/2086/refactor_event_message_creation
Refactor event message creation
2 parents e7565dd + 1ae9409 commit 6f6aed3

File tree

9 files changed

+107
-123
lines changed

9 files changed

+107
-123
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.5.103"
3+
version = "0.5.104"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/event_store/database/query.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ mod tests {
4848

4949
#[test]
5050
fn provider_sql() {
51-
let message = EventMessage::new("source", "action", serde_json::json!("content"));
51+
let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
5252

5353
let (final_expression, parameters) =
5454
InsertEventQuery::one(message).unwrap().filters().expand();
@@ -62,15 +62,16 @@ mod tests {
6262

6363
#[test]
6464
fn build_a_json_for_content_field_with_content_and_headers() {
65-
let content = serde_json::json!({
66-
"attr1": "content".to_string(),
67-
"attr2": 123,
68-
});
69-
70-
let mut message = EventMessage::new("source", "action", content);
71-
message
72-
.headers
73-
.insert("key".to_string(), "value".to_string());
65+
#[derive(serde::Serialize)]
66+
struct Content {
67+
attr1: String,
68+
attr2: i32,
69+
}
70+
let content = Content {
71+
attr1: "content".to_string(),
72+
attr2: 123,
73+
};
74+
let message = EventMessage::new("source", "action", &content, [("key", "value")].to_vec());
7475

7576
let (_, parameters) = InsertEventQuery::one(message).unwrap().filters().expand();
7677

mithril-aggregator/src/event_store/database/repository.rs

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ mod tests {
4242
let connection = Arc::new(event_store_db_connection().unwrap());
4343

4444
let persister = EventPersister::new(connection);
45-
let message = EventMessage::new("source", "action", serde_json::json!("content"));
45+
let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
4646

4747
let _event = persister.persist(message)?;
4848
Ok(())
@@ -53,7 +53,7 @@ mod tests {
5353
let connection = Arc::new(event_store_db_connection().unwrap());
5454

5555
let persister = EventPersister::new(connection);
56-
let message = EventMessage::new("source", "action", serde_json::json!("content"));
56+
let message = EventMessage::new("source", "action", &"content".to_string(), Vec::new());
5757

5858
let _event = persister.persist(message)?;
5959
Ok(())
@@ -63,9 +63,7 @@ mod tests {
6363
use std::time::Duration;
6464

6565
use crate::{
66-
event_store::{database::test_helper::event_store_db_connection, TransmitterService},
67-
services::UsageReporter,
68-
test_tools::TestLogger,
66+
event_store::database::test_helper::event_store_db_connection, services::UsageReporter,
6967
};
7068
use chrono::DateTime;
7169

@@ -103,18 +101,13 @@ mod tests {
103101
let metric_date =
104102
DateTime::parse_from_str(&format!("{date} +0000"), "%Y-%m-%d %H:%M:%S %z").unwrap();
105103

106-
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
107-
let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
108-
let _result = UsageReporter::send_metric_event(
109-
&transmitter_service,
104+
let message = UsageReporter::create_metrics_event_message(
110105
metric_name.to_string(),
111106
value,
112107
Duration::from_secs(5),
113108
metric_date.into(),
114109
);
115110

116-
let message: EventMessage = rx.try_recv().unwrap();
117-
118111
let _event = persister.persist(message).unwrap();
119112
}
120113

@@ -150,12 +143,9 @@ mod tests {
150143
mod signer_registration_summary {
151144
use std::sync::Arc;
152145

153-
use crate::event_store::{
154-
database::test_helper::event_store_db_connection, TransmitterService,
155-
};
156-
use crate::test_tools::TestLogger;
157-
use mithril_common::entities::Stake;
158-
use mithril_common::{entities::SignerWithStake, test_utils::fake_data, StdResult};
146+
use crate::event_store::database::test_helper::event_store_db_connection;
147+
use mithril_common::entities::{SignerWithStake, Stake};
148+
use mithril_common::{test_utils::fake_data, StdResult};
159149
use sqlite::ConnectionThreadSafe;
160150

161151
use super::{EventMessage, EventPersister};
@@ -168,25 +158,20 @@ mod tests {
168158
stake: Stake,
169159
signer_node_version: &str,
170160
) {
171-
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<EventMessage>();
172-
let transmitter_service = Arc::new(TransmitterService::new(tx, TestLogger::stdout()));
173-
174161
let signers = fake_data::signers_with_stakes(1);
175162
let signer = SignerWithStake {
176163
party_id: party_id.to_string(),
177164
stake,
178165
..signers[0].clone()
179166
};
180167

181-
let _ = transmitter_service.send_signer_registration_event(
168+
let message = EventMessage::signer_registration(
182169
"Test",
183170
&signer,
184171
Some(signer_node_version.to_string()),
185172
epoch,
186173
);
187174

188-
let message: EventMessage = rx.try_recv().unwrap();
189-
190175
let _event = persister.persist(message).unwrap();
191176
}
192177

mithril-aggregator/src/event_store/event.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use chrono::{DateTime, Utc};
2+
use mithril_common::entities::SignerWithStake;
3+
use serde::Serialize;
24

35
use std::collections::HashMap;
46

@@ -24,13 +26,40 @@ pub struct EventMessage {
2426

2527
impl EventMessage {
2628
/// Instantiate a new EventMessage.
27-
pub fn new(source: &str, action: &str, content: serde_json::Value) -> Self {
28-
Self {
29+
pub fn new<T>(source: &str, action: &str, content: &T, headers: Vec<(&str, &str)>) -> Self
30+
where
31+
T: Serialize,
32+
{
33+
let content = serde_json::json!(content);
34+
35+
EventMessage {
2936
source: source.to_string(),
3037
action: action.to_string(),
3138
content,
32-
headers: HashMap::new(),
39+
headers: headers
40+
.into_iter()
41+
.map(|(h, v)| (h.to_string(), v.to_string()))
42+
.collect(),
43+
}
44+
}
45+
46+
/// Create a signer registration event message.
47+
pub fn signer_registration(
48+
source: &str,
49+
signer_with_stake: &SignerWithStake,
50+
signer_node_version: Option<String>,
51+
epoch_str: &str,
52+
) -> Self {
53+
let mut headers: Vec<(&str, &str)> = match signer_node_version.as_ref() {
54+
Some(version) => vec![("signer-node-version", version)],
55+
None => Vec::new(),
56+
};
57+
58+
if !epoch_str.is_empty() {
59+
headers.push(("epoch", epoch_str));
3360
}
61+
62+
Self::new::<SignerWithStake>(source, "register_signer", signer_with_stake, headers)
3463
}
3564
}
3665

Lines changed: 16 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use serde::Serialize;
1+
use anyhow::Context;
22
use slog::{warn, Logger};
33
use std::fmt::Debug;
44
use tokio::sync::mpsc::UnboundedSender;
55

6-
use mithril_common::{entities::SignerWithStake, logging::LoggerExtensions};
6+
use mithril_common::{logging::LoggerExtensions, StdResult};
77

88
use super::EventMessage;
99

@@ -36,62 +36,22 @@ where
3636
}
3737

3838
impl TransmitterService<EventMessage> {
39-
/// Craft and send an [EventMessage] given the serializable data.
40-
/// This method is done in a way to make as simple as possible to send a
41-
/// message and make any error not to cause a business failure. A warning is
42-
/// issued so the resulting error may be discarded.
43-
pub fn send_event_message<T>(
44-
&self,
45-
source: &str,
46-
action: &str,
47-
content: &T,
48-
headers: Vec<(&str, &str)>,
49-
) -> Result<(), String>
50-
where
51-
T: Serialize,
52-
{
53-
let content = serde_json::json!(content);
54-
55-
let message = EventMessage {
56-
source: source.to_string(),
57-
action: action.to_string(),
58-
content,
59-
headers: headers
60-
.into_iter()
61-
.map(|(h, v)| (h.to_string(), v.to_string()))
62-
.collect(),
63-
};
64-
self.get_transmitter().send(message.clone()).map_err(|e| {
65-
let error_msg =
66-
format!("An error occurred when sending message {message:?} to monitoring: '{e}'.");
67-
warn!(self.logger, "Event message error"; "error" => &error_msg);
68-
69-
error_msg
70-
})
39+
/// Send an [EventMessage].
40+
pub fn try_send(&self, message: EventMessage) -> StdResult<()> {
41+
self.get_transmitter()
42+
.send(message.clone())
43+
.with_context(|| {
44+
format!("An error occurred when sending message {message:?} to monitoring.")
45+
})
7146
}
7247

73-
/// Send signer registration event.
74-
pub fn send_signer_registration_event(
75-
&self,
76-
source: &str,
77-
signer_with_stake: &SignerWithStake,
78-
signer_node_version: Option<String>,
79-
epoch_str: &str,
80-
) -> Result<(), String> {
81-
let mut headers: Vec<(&str, &str)> = match signer_node_version.as_ref() {
82-
Some(version) => vec![("signer-node-version", version)],
83-
None => Vec::new(),
48+
/// Send an [EventMessage].
49+
////
50+
/// An error when sending a message has no impact on the business.
51+
/// If there is one, a warning is issued so the resulting error may be safely ignored by the caller.
52+
pub fn send(&self, message: EventMessage) {
53+
if let Err(e) = self.try_send(message.clone()) {
54+
warn!(self.logger, "Event message error"; "error" => ?e);
8455
};
85-
86-
if !epoch_str.is_empty() {
87-
headers.push(("epoch", epoch_str));
88-
}
89-
90-
self.send_event_message::<SignerWithStake>(
91-
source,
92-
"register_signer",
93-
signer_with_stake,
94-
headers,
95-
)
9656
}
9757
}

mithril-aggregator/src/http_server/routes/signer_routes.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,24 +144,25 @@ mod handlers {
144144
.await
145145
{
146146
Ok(signer_with_stake) => {
147-
let _ = event_transmitter.send_signer_registration_event(
147+
event_transmitter.send(EventMessage::signer_registration(
148148
"HTTP::signer_register",
149149
&signer_with_stake,
150150
signer_node_version,
151151
epoch_str.as_str(),
152-
);
152+
));
153153

154154
Ok(reply::empty(StatusCode::CREATED))
155155
}
156156
Err(SignerRegistrationError::ExistingSigner(signer_with_stake)) => {
157157
debug!(logger, "register_signer::already_registered");
158158

159-
let _ = event_transmitter.send_signer_registration_event(
159+
event_transmitter.send(EventMessage::signer_registration(
160160
"HTTP::signer_register",
161161
&signer_with_stake,
162162
signer_node_version,
163163
epoch_str.as_str(),
164-
);
164+
));
165+
165166
Ok(reply::empty(StatusCode::CREATED))
166167
}
167168
Err(SignerRegistrationError::FailedSignerRegistration(err)) => {

mithril-aggregator/src/http_server/routes/statistics_routes.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ fn post_statistics(
1616
warp::path!("statistics" / "snapshot")
1717
.and(warp::post())
1818
.and(warp::body::json())
19+
.and(middlewares::with_logger(router_state))
1920
.and(middlewares::with_event_transmitter(router_state))
2021
.and(middlewares::with_metrics_service(router_state))
2122
.and_then(handlers::post_snapshot_statistics)
2223
}
2324

2425
mod handlers {
26+
use slog::warn;
2527
use std::{convert::Infallible, sync::Arc};
2628
use warp::http::StatusCode;
2729

@@ -33,6 +35,7 @@ mod handlers {
3335

3436
pub async fn post_snapshot_statistics(
3537
snapshot_download_message: SnapshotDownloadMessage,
38+
logger: slog::Logger,
3639
event_transmitter: Arc<TransmitterService<EventMessage>>,
3740
metrics_service: Arc<MetricsService>,
3841
) -> Result<impl warp::Reply, Infallible> {
@@ -42,13 +45,18 @@ mod handlers {
4245

4346
let headers: Vec<(&str, &str)> = Vec::new();
4447

45-
match event_transmitter.send_event_message(
48+
let message = EventMessage::new(
4649
"HTTP::statistics",
4750
"snapshot_downloaded",
4851
&snapshot_download_message,
4952
headers,
50-
) {
51-
Err(e) => Ok(reply::internal_server_error(e)),
53+
);
54+
55+
match event_transmitter.try_send(message.clone()) {
56+
Err(e) => {
57+
warn!(logger, "Event message error"; "error" => ?e);
58+
Ok(reply::internal_server_error(e))
59+
}
5260
Ok(_) => Ok(reply::empty(StatusCode::CREATED)),
5361
}
5462
}

0 commit comments

Comments
 (0)