Skip to content

Commit 2ed0e33

Browse files
authored
balancer: make json parsing errors include the path to the key causing the error (#1345)
1 parent 0a4d569 commit 2ed0e33

File tree

9 files changed

+79
-32
lines changed

9 files changed

+79
-32
lines changed

Cargo.lock

+33-21
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ rand = "0.8.5"
2626
reqwest = { version = "0.11.17", features = ["json", "stream", "rustls-tls"] }
2727
serde = { version = "1", features = ["derive", "rc"] }
2828
serde_json = { version = "1", features = ["raw_value"] }
29+
serde_path_to_error = "0.1.15"
2930
test-context = "0.1.4"
3031
tokio = { version = "1", features = ["full", "tracing"] }
3132
tokio-tungstenite = "0.21.0"

crates/harness/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ reqwest.workspace = true
1919
rand.workspace = true
2020
serde.workspace = true
2121
serde_json.workspace = true
22+
serde_path_to_error.workspace = true
2223
test-context.workspace = true
2324
tracing.workspace = true
2425
tracing-subscriber.workspace = true

crates/harness/src/monolith.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ impl Monolith {
270270
.iter()
271271
.filter_map(|msg| match msg {
272272
Message::Text(msg) => {
273-
let msg: MsgB2M = serde_json::from_str(msg).unwrap();
273+
let jd = &mut serde_json::Deserializer::from_str(msg);
274+
let msg: MsgB2M = serde_path_to_error::deserialize(jd).unwrap();
274275
Some(msg)
275276
}
276277
_ => None,

crates/ott-balancer/Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ name = "ott-balancer"
33
version = "0.10.0"
44
edition = "2021"
55

6-
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7-
86
[dependencies]
97
anyhow.workspace = true
108
async-trait.workspace = true
@@ -20,6 +18,7 @@ rand.workspace = true
2018
reqwest.workspace = true
2119
serde.workspace = true
2220
serde_json.workspace = true
21+
serde_path_to_error.workspace = true
2322
tracing.workspace = true
2423
tracing-subscriber.workspace = true
2524
tokio.workspace = true

crates/ott-balancer/src/client.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use tokio::sync::broadcast::error::RecvError;
55
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
66
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
77
use tokio_tungstenite::tungstenite::Message;
8-
use tracing::{debug, error, info, trace};
8+
use tracing::{debug, error, info, trace, warn};
99
use uuid::Uuid;
1010

1111
use crate::balancer::BalancerLink;
@@ -147,7 +147,21 @@ pub async fn client_entry<'r>(
147147
let mut client_link;
148148
match message {
149149
Message::Text(text) => {
150-
let message: ClientMessage = serde_json::from_str(&text).unwrap();
150+
let jd = &mut serde_json::Deserializer::from_str(&text);
151+
let message: ClientMessage = match serde_path_to_error::deserialize(jd) {
152+
Ok(msg) => msg,
153+
Err(err) => {
154+
warn!("failed to deserialize client message: {:?}", err);
155+
stream
156+
.close(Some(CloseFrame {
157+
code: CloseCode::Protocol,
158+
reason: "failed to deserialize message".into(),
159+
}))
160+
.await?;
161+
return Ok(());
162+
}
163+
};
164+
151165
match message {
152166
ClientMessage::Auth(message) => {
153167
debug!("client authenticated, handing off to balancer");

crates/ott-balancer/src/connection.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,21 @@ async fn connect_and_maintain(
126126
let monolith_id;
127127
match message {
128128
Message::Text(text) => {
129-
let message: MsgM2B = serde_json::from_str(&text).unwrap();
129+
let jd = &mut serde_json::Deserializer::from_str(&text);
130+
let message: MsgM2B = match serde_path_to_error::deserialize(jd) {
131+
Ok(msg) => msg,
132+
Err(err) => {
133+
warn!("failed to deserialize monolith message: {:?}", err);
134+
let _ = stream
135+
.close(Some(CloseFrame {
136+
code: CloseCode::Protocol,
137+
reason: "failed to deserialize message".into(),
138+
}))
139+
.await;
140+
return;
141+
}
142+
};
143+
130144
match message {
131145
MsgM2B::Init(init) => {
132146
monolith_id = init.id;
@@ -154,7 +168,7 @@ async fn connect_and_maintain(
154168
_ => {
155169
let _ = stream
156170
.close(Some(CloseFrame {
157-
code: CloseCode::Library(4000),
171+
code: CloseCode::Protocol,
158172
reason: "did not send init".into(),
159173
}))
160174
.await;

crates/ott-balancer/src/discovery/harness.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,13 @@ async fn do_harness_discovery(
8080
error!("expected text message from harness, got something else");
8181
continue;
8282
};
83-
let Ok(msg) = serde_json::from_str(text.as_str()) else {
84-
error!("failed to deserialize message from harness");
85-
continue;
83+
let jd = &mut serde_json::Deserializer::from_str(&text);
84+
let msg = match serde_path_to_error::deserialize(jd) {
85+
Ok(msg) => msg,
86+
Err(err) => {
87+
error!("failed to deserialize message from harness: {:?}", err);
88+
continue;
89+
}
8690
};
8791
let mut monoliths = monoliths.lock().await;
8892
*monoliths = msg;

crates/ott-balancer/src/messages.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ impl<'de> SocketMessage {
1313
match self {
1414
SocketMessage::Message(msg) => {
1515
let text = msg.to_text()?;
16-
let obj = serde_json::from_str(text)?;
16+
let jd = &mut serde_json::Deserializer::from_str(text);
17+
let obj = serde_path_to_error::deserialize(jd)?;
1718
Ok(obj)
1819
}
1920
#[allow(deprecated)]

0 commit comments

Comments
 (0)