From f574bb522b21495fd1a77aa2a8928c94516aedb3 Mon Sep 17 00:00:00 2001 From: clabby Date: Tue, 17 Sep 2024 23:44:25 -0400 Subject: [PATCH 1/4] chore(preimage): Improve error differentiation in preimage servers --- bin/host/src/server.rs | 48 ++++++++++++++++++++++------------- crates/preimage/src/hint.rs | 22 +++++++++------- crates/preimage/src/lib.rs | 2 +- crates/preimage/src/oracle.rs | 19 ++++++++------ crates/preimage/src/traits.rs | 14 ++++++++-- 5 files changed, 67 insertions(+), 38 deletions(-) diff --git a/bin/host/src/server.rs b/bin/host/src/server.rs index 724fd31c68..2093e836e0 100644 --- a/bin/host/src/server.rs +++ b/bin/host/src/server.rs @@ -8,9 +8,12 @@ use crate::{ }, }; use anyhow::{anyhow, Result}; -use kona_preimage::{HintReaderServer, HintRouter, PreimageFetcher, PreimageOracleServer}; +use kona_preimage::{ + HintReaderServer, HintRouter, PreimageFetcher, PreimageOracleServer, PreimageServerError, +}; use std::sync::Arc; use tokio::sync::RwLock; +use tracing::error; /// The [PreimageServer] is responsible for waiting for incoming preimage requests and /// serving them to the client. @@ -67,8 +70,6 @@ where s = server => s.map_err(|e| anyhow!(e))?, h = hint_router => h.map_err(|e| anyhow!(e))?, } - - Ok(()) } /// Starts the oracle server, which waits for incoming preimage requests and serves them to the @@ -77,49 +78,60 @@ where kv_store: Arc>, fetcher: Option>>>, oracle_server: P, - ) { + ) -> Result<()> { #[inline(always)] - async fn do_loop(fetcher: &F, server: &P) + async fn do_loop(fetcher: &F, server: &P) -> Result<()> where F: PreimageFetcher + Send + Sync, P: PreimageOracleServer, { loop { - // Break the loop on any error. An error in this path indicates a closed pipe. - if server.next_preimage_request(fetcher).await.is_err() { - break; + match server.next_preimage_request(fetcher).await { + Ok(_) => (), + Err(PreimageServerError::BrokenPipe(_)) => return Ok(()), + Err(PreimageServerError::Other(e)) => { + error!("Failed to serve preimage request: {e:?}"); + return Err(e); + } } } } if let Some(fetcher) = fetcher.as_ref() { - do_loop(&OnlinePreimageFetcher::new(Arc::clone(fetcher)), &oracle_server).await; + do_loop(&OnlinePreimageFetcher::new(Arc::clone(fetcher)), &oracle_server).await } else { - do_loop(&OfflinePreimageFetcher::new(Arc::clone(&kv_store)), &oracle_server).await; - }; + do_loop(&OfflinePreimageFetcher::new(Arc::clone(&kv_store)), &oracle_server).await + } } /// Starts the hint router, which waits for incoming hints and routes them to the appropriate /// handler. - async fn start_hint_router(hint_reader: H, fetcher: Option>>>) { + async fn start_hint_router( + hint_reader: H, + fetcher: Option>>>, + ) -> Result<()> { #[inline(always)] - async fn do_loop(router: &R, server: &H) + async fn do_loop(router: &R, server: &H) -> Result<()> where R: HintRouter + Send + Sync, H: HintReaderServer, { loop { - // Break the loop on any error. An error in this path indicates a closed pipe. - if server.next_hint(router).await.is_err() { - break; + match server.next_hint(router).await { + Ok(_) => (), + Err(PreimageServerError::BrokenPipe(_)) => return Ok(()), + Err(PreimageServerError::Other(e)) => { + error!("Failed to serve preimage request: {e:?}"); + return Err(e); + } } } } if let Some(fetcher) = fetcher { - do_loop(&OnlineHintRouter::new(Arc::clone(&fetcher)), &hint_reader).await; + do_loop(&OnlineHintRouter::new(Arc::clone(&fetcher)), &hint_reader).await } else { - do_loop(&OfflineHintRouter, &hint_reader).await; + do_loop(&OfflineHintRouter, &hint_reader).await } } } diff --git a/crates/preimage/src/hint.rs b/crates/preimage/src/hint.rs index e996393edf..82641132c5 100644 --- a/crates/preimage/src/hint.rs +++ b/crates/preimage/src/hint.rs @@ -1,5 +1,5 @@ use crate::{ - traits::{HintRouter, HintWriterClient}, + traits::{HintRouter, HintWriterClient, PreimageServerError}, HintReaderServer, PipeHandle, }; use alloc::{boxed::Box, string::String, vec}; @@ -65,34 +65,38 @@ impl HintReader { #[async_trait] impl HintReaderServer for HintReader { - async fn next_hint(&self, hint_router: &R) -> Result<()> + async fn next_hint(&self, hint_router: &R) -> Result<(), PreimageServerError> where R: HintRouter + Send + Sync, { // Read the length of the raw hint payload. let mut len_buf = [0u8; 4]; - self.pipe_handle.read_exact(&mut len_buf).await?; + self.pipe_handle.read_exact(&mut len_buf).await.map_err(PreimageServerError::BrokenPipe)?; let len = u32::from_be_bytes(len_buf); // Read the raw hint payload. let mut raw_payload = vec![0u8; len as usize]; - self.pipe_handle.read_exact(raw_payload.as_mut_slice()).await?; - let payload = String::from_utf8(raw_payload) - .map_err(|e| anyhow::anyhow!("Failed to decode hint payload: {e}"))?; + self.pipe_handle + .read_exact(raw_payload.as_mut_slice()) + .await + .map_err(PreimageServerError::BrokenPipe)?; + let payload = String::from_utf8(raw_payload).map_err(|e| { + PreimageServerError::Other(anyhow::anyhow!("Failed to decode hint payload: {e}")) + })?; trace!(target: "hint_reader", "Successfully read hint: \"{payload}\""); // Route the hint if let Err(e) = hint_router.route_hint(payload).await { // Write back on error to prevent blocking the client. - self.pipe_handle.write(&[0x00]).await?; + self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?; error!("Failed to route hint: {e}"); - anyhow::bail!("Failed to rout hint: {e}"); + return Err(PreimageServerError::Other(e)); } // Write back an acknowledgement to the client to unblock their process. - self.pipe_handle.write(&[0x00]).await?; + self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?; trace!(target: "hint_reader", "Successfully routed and acknowledged hint"); diff --git a/crates/preimage/src/lib.rs b/crates/preimage/src/lib.rs index 482f0b80fd..9fd36390ff 100644 --- a/crates/preimage/src/lib.rs +++ b/crates/preimage/src/lib.rs @@ -22,7 +22,7 @@ pub use pipe::PipeHandle; mod traits; pub use traits::{ CommsClient, HintReaderServer, HintRouter, HintWriterClient, PreimageFetcher, - PreimageOracleClient, PreimageOracleServer, + PreimageOracleClient, PreimageOracleServer, PreimageServerError, }; #[cfg(test)] diff --git a/crates/preimage/src/oracle.rs b/crates/preimage/src/oracle.rs index f8f9d21b76..b52f2e942c 100644 --- a/crates/preimage/src/oracle.rs +++ b/crates/preimage/src/oracle.rs @@ -1,5 +1,6 @@ use crate::{ - traits::PreimageFetcher, PipeHandle, PreimageKey, PreimageOracleClient, PreimageOracleServer, + traits::{PreimageFetcher, PreimageServerError}, + PipeHandle, PreimageKey, PreimageOracleClient, PreimageOracleServer, }; use alloc::{boxed::Box, vec::Vec}; use anyhow::{bail, Result}; @@ -99,19 +100,19 @@ impl OracleServer { #[async_trait::async_trait] impl PreimageOracleServer for OracleServer { - async fn next_preimage_request(&self, fetcher: &F) -> Result<()> + async fn next_preimage_request(&self, fetcher: &F) -> Result<(), PreimageServerError> where F: PreimageFetcher + Send + Sync, { // Read the preimage request from the client, and throw early if there isn't is any. let mut buf = [0u8; 32]; - self.pipe_handle.read_exact(&mut buf).await?; - let preimage_key = PreimageKey::try_from(buf)?; + self.pipe_handle.read_exact(&mut buf).await.map_err(PreimageServerError::BrokenPipe)?; + let preimage_key = PreimageKey::try_from(buf).map_err(PreimageServerError::Other)?; trace!(target: "oracle_server", "Fetching preimage for key {preimage_key}"); // Fetch the preimage value from the preimage getter. - let value = fetcher.get_preimage(preimage_key).await?; + let value = fetcher.get_preimage(preimage_key).await.map_err(PreimageServerError::Other)?; // Write the length as a big-endian u64 followed by the data. let data = [(value.len() as u64).to_be_bytes().as_ref(), value.as_ref()] @@ -119,7 +120,7 @@ impl PreimageOracleServer for OracleServer { .flatten() .copied() .collect::>(); - self.pipe_handle.write(data.as_slice()).await?; + self.pipe_handle.write(data.as_slice()).await.map_err(PreimageServerError::BrokenPipe)?; trace!(target: "oracle_server", "Successfully wrote preimage data for key {preimage_key}"); @@ -188,8 +189,10 @@ mod test { let test_fetcher = TestFetcher { preimages: Arc::clone(&preimages) }; loop { - if oracle_server.next_preimage_request(&test_fetcher).await.is_err() { - break; + match oracle_server.next_preimage_request(&test_fetcher).await { + Err(PreimageServerError::BrokenPipe(_)) => break, + Err(PreimageServerError::Other(e)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {} } } }); diff --git a/crates/preimage/src/traits.rs b/crates/preimage/src/traits.rs index 913615e18e..b110a66648 100644 --- a/crates/preimage/src/traits.rs +++ b/crates/preimage/src/traits.rs @@ -43,6 +43,16 @@ pub trait CommsClient: PreimageOracleClient + Clone + HintWriterClient {} // Implement the super trait for any type that satisfies the bounds impl CommsClient for T {} +/// A [PreimageServerError] is an enum that differentiates pipe-related errors from other errors +/// in the [PreimageOracleServer] and [HintReaderServer] implementations. +#[derive(Debug)] +pub enum PreimageServerError { + /// The pipe has been broken. + BrokenPipe(anyhow::Error), + /// Other errors. + Other(anyhow::Error), +} + /// A [PreimageOracleServer] is a high-level interface to accept read requests from the client and /// write the preimage data to the client pipe. #[async_trait] @@ -52,7 +62,7 @@ pub trait PreimageOracleServer { /// # Returns /// - `Ok(())` if the data was successfully written into the client pipe. /// - `Err(_)` if the data could not be written to the client. - async fn next_preimage_request(&self, get_preimage: &F) -> Result<()> + async fn next_preimage_request(&self, get_preimage: &F) -> Result<(), PreimageServerError> where F: PreimageFetcher + Send + Sync; } @@ -67,7 +77,7 @@ pub trait HintReaderServer { /// - `Ok(())` if the hint was received and the client was notified of the host's /// acknowledgement. /// - `Err(_)` if the hint was not received correctly. - async fn next_hint(&self, route_hint: &R) -> Result<()> + async fn next_hint(&self, route_hint: &R) -> Result<(), PreimageServerError> where R: HintRouter + Send + Sync; } From 37049945f8cb6ec2b50dbb1c167575b918ccb12f Mon Sep 17 00:00:00 2001 From: clabby Date: Tue, 17 Sep 2024 23:53:55 -0400 Subject: [PATCH 2/4] test unblock on failure for hintrouter --- crates/preimage/src/hint.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/crates/preimage/src/hint.rs b/crates/preimage/src/hint.rs index 82641132c5..326b43f547 100644 --- a/crates/preimage/src/hint.rs +++ b/crates/preimage/src/hint.rs @@ -127,6 +127,40 @@ mod test { } } + struct TestFailRouter; + + #[async_trait] + impl HintRouter for TestFailRouter { + async fn route_hint(&self, _hint: String) -> Result<()> { + anyhow::bail!("Failed to route hint") + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_unblock_on_failure() { + const MOCK_DATA: &str = "test-hint 0xfacade"; + + let hint_pipe = bidirectional_pipe().unwrap(); + + let client = tokio::task::spawn(async move { + let hint_writer = HintWriter::new(PipeHandle::new( + FileDescriptor::Wildcard(hint_pipe.client.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(hint_pipe.client.write.as_raw_fd() as usize), + )); + + hint_writer.write(MOCK_DATA).await + }); + let host = tokio::task::spawn(async move { + let hint_reader = HintReader::new(PipeHandle::new( + FileDescriptor::Wildcard(hint_pipe.host.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(hint_pipe.host.write.as_raw_fd() as usize), + )); + hint_reader.next_hint(&TestFailRouter).await.unwrap(); + }); + + let _ = tokio::join!(client, host); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_hint_client_and_host() { const MOCK_DATA: &str = "test-hint 0xfacade"; From 624295da847cf9996ce5a523cb6000346381ae43 Mon Sep 17 00:00:00 2001 From: clabby Date: Wed, 18 Sep 2024 00:00:07 -0400 Subject: [PATCH 3/4] assert err --- crates/preimage/src/hint.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/crates/preimage/src/hint.rs b/crates/preimage/src/hint.rs index 326b43f547..a1ba7e5f3d 100644 --- a/crates/preimage/src/hint.rs +++ b/crates/preimage/src/hint.rs @@ -110,7 +110,7 @@ mod test { use super::*; use crate::test_utils::bidirectional_pipe; - use alloc::{sync::Arc, vec::Vec}; + use alloc::{string::ToString, sync::Arc, vec::Vec}; use kona_common::FileDescriptor; use std::os::fd::AsRawFd; use tokio::sync::Mutex; @@ -155,10 +155,17 @@ mod test { FileDescriptor::Wildcard(hint_pipe.host.read.as_raw_fd() as usize), FileDescriptor::Wildcard(hint_pipe.host.write.as_raw_fd() as usize), )); - hint_reader.next_hint(&TestFailRouter).await.unwrap(); + hint_reader.next_hint(&TestFailRouter).await }); - let _ = tokio::join!(client, host); + let (c, h) = tokio::join!(client, host); + c.unwrap().unwrap(); + assert!(h.unwrap().is_err_and(|e| { + let PreimageServerError::Other(e) = e else { + return false; + }; + e.to_string() == "Failed to route hint" + })); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] From 22a3c62fd227a6b95ef7eda60f2e6faaf8d10369 Mon Sep 17 00:00:00 2001 From: clabby Date: Wed, 18 Sep 2024 00:21:05 -0400 Subject: [PATCH 4/4] fix bad utf8 unblocking --- crates/preimage/src/hint.rs | 51 ++++++++++++++++++++++++++++++++++--- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/crates/preimage/src/hint.rs b/crates/preimage/src/hint.rs index a1ba7e5f3d..c393922de4 100644 --- a/crates/preimage/src/hint.rs +++ b/crates/preimage/src/hint.rs @@ -80,9 +80,17 @@ impl HintReaderServer for HintReader { .read_exact(raw_payload.as_mut_slice()) .await .map_err(PreimageServerError::BrokenPipe)?; - let payload = String::from_utf8(raw_payload).map_err(|e| { - PreimageServerError::Other(anyhow::anyhow!("Failed to decode hint payload: {e}")) - })?; + let payload = match String::from_utf8(raw_payload) { + Ok(p) => p, + Err(e) => { + // Write back on error to prevent blocking the client. + self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?; + + return Err(PreimageServerError::Other(anyhow::anyhow!( + "Failed to decode hint payload: {e}" + ))); + } + }; trace!(target: "hint_reader", "Successfully read hint: \"{payload}\""); @@ -137,7 +145,42 @@ mod test { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_unblock_on_failure() { + async fn test_unblock_on_bad_utf8() { + let mock_data = [0xf0, 0x90, 0x28, 0xbc]; + + let hint_pipe = bidirectional_pipe().unwrap(); + + let client = tokio::task::spawn(async move { + let hint_writer = HintWriter::new(PipeHandle::new( + FileDescriptor::Wildcard(hint_pipe.client.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(hint_pipe.client.write.as_raw_fd() as usize), + )); + + #[allow(invalid_from_utf8_unchecked)] + hint_writer.write(unsafe { alloc::str::from_utf8_unchecked(&mock_data) }).await + }); + let host = tokio::task::spawn(async move { + let router = TestRouter { incoming_hints: Default::default() }; + + let hint_reader = HintReader::new(PipeHandle::new( + FileDescriptor::Wildcard(hint_pipe.host.read.as_raw_fd() as usize), + FileDescriptor::Wildcard(hint_pipe.host.write.as_raw_fd() as usize), + )); + hint_reader.next_hint(&router).await + }); + + let (c, h) = tokio::join!(client, host); + c.unwrap().unwrap(); + assert!(h.unwrap().is_err_and(|e| { + let PreimageServerError::Other(e) = e else { + return false; + }; + e.to_string().contains("Failed to decode hint payload") + })); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_unblock_on_fetch_failure() { const MOCK_DATA: &str = "test-hint 0xfacade"; let hint_pipe = bidirectional_pipe().unwrap();