Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions bin/host/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use crate::{
},
};
use anyhow::{anyhow, Result};
use kona_preimage::{HintReaderServer, HintRouter, PreimageFetcher, PreimageOracleServer};
use kona_preimage::{
HintReaderServer, HintRouter, PreimageFetcher, PreimageOracleServer, PreimageServerError,
};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::error;

/// The [PreimageServer] is responsible for waiting for incoming preimage requests and
/// serving them to the client.
Expand Down Expand Up @@ -67,8 +70,6 @@ where
s = server => s.map_err(|e| anyhow!(e))?,
h = hint_router => h.map_err(|e| anyhow!(e))?,
}

Ok(())
}

/// Starts the oracle server, which waits for incoming preimage requests and serves them to the
Expand All @@ -77,49 +78,60 @@ where
kv_store: Arc<RwLock<KV>>,
fetcher: Option<Arc<RwLock<Fetcher<KV>>>>,
oracle_server: P,
) {
) -> Result<()> {
#[inline(always)]
async fn do_loop<F, P>(fetcher: &F, server: &P)
async fn do_loop<F, P>(fetcher: &F, server: &P) -> Result<()>
where
F: PreimageFetcher + Send + Sync,
P: PreimageOracleServer,
{
loop {
// Break the loop on any error. An error in this path indicates a closed pipe.
if server.next_preimage_request(fetcher).await.is_err() {
break;
match server.next_preimage_request(fetcher).await {
Ok(_) => (),
Err(PreimageServerError::BrokenPipe(_)) => return Ok(()),
Err(PreimageServerError::Other(e)) => {
error!("Failed to serve preimage request: {e:?}");
return Err(e);
}
}
}
}

if let Some(fetcher) = fetcher.as_ref() {
do_loop(&OnlinePreimageFetcher::new(Arc::clone(fetcher)), &oracle_server).await;
do_loop(&OnlinePreimageFetcher::new(Arc::clone(fetcher)), &oracle_server).await
} else {
do_loop(&OfflinePreimageFetcher::new(Arc::clone(&kv_store)), &oracle_server).await;
};
do_loop(&OfflinePreimageFetcher::new(Arc::clone(&kv_store)), &oracle_server).await
}
}

/// Starts the hint router, which waits for incoming hints and routes them to the appropriate
/// handler.
async fn start_hint_router(hint_reader: H, fetcher: Option<Arc<RwLock<Fetcher<KV>>>>) {
async fn start_hint_router(
hint_reader: H,
fetcher: Option<Arc<RwLock<Fetcher<KV>>>>,
) -> Result<()> {
#[inline(always)]
async fn do_loop<R, H>(router: &R, server: &H)
async fn do_loop<R, H>(router: &R, server: &H) -> Result<()>
where
R: HintRouter + Send + Sync,
H: HintReaderServer,
{
loop {
// Break the loop on any error. An error in this path indicates a closed pipe.
if server.next_hint(router).await.is_err() {
break;
match server.next_hint(router).await {
Ok(_) => (),
Err(PreimageServerError::BrokenPipe(_)) => return Ok(()),
Err(PreimageServerError::Other(e)) => {
error!("Failed to serve preimage request: {e:?}");
return Err(e);
}
}
}
}

if let Some(fetcher) = fetcher {
do_loop(&OnlineHintRouter::new(Arc::clone(&fetcher)), &hint_reader).await;
do_loop(&OnlineHintRouter::new(Arc::clone(&fetcher)), &hint_reader).await
} else {
do_loop(&OfflineHintRouter, &hint_reader).await;
do_loop(&OfflineHintRouter, &hint_reader).await
}
}
}
108 changes: 98 additions & 10 deletions crates/preimage/src/hint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
traits::{HintRouter, HintWriterClient},
traits::{HintRouter, HintWriterClient, PreimageServerError},
HintReaderServer, PipeHandle,
};
use alloc::{boxed::Box, string::String, vec};
Expand Down Expand Up @@ -65,34 +65,46 @@ impl HintReader {

#[async_trait]
impl HintReaderServer for HintReader {
async fn next_hint<R>(&self, hint_router: &R) -> Result<()>
async fn next_hint<R>(&self, hint_router: &R) -> Result<(), PreimageServerError>
where
R: HintRouter + Send + Sync,
{
// Read the length of the raw hint payload.
let mut len_buf = [0u8; 4];
self.pipe_handle.read_exact(&mut len_buf).await?;
self.pipe_handle.read_exact(&mut len_buf).await.map_err(PreimageServerError::BrokenPipe)?;
let len = u32::from_be_bytes(len_buf);

// Read the raw hint payload.
let mut raw_payload = vec![0u8; len as usize];
self.pipe_handle.read_exact(raw_payload.as_mut_slice()).await?;
let payload = String::from_utf8(raw_payload)
.map_err(|e| anyhow::anyhow!("Failed to decode hint payload: {e}"))?;
self.pipe_handle
.read_exact(raw_payload.as_mut_slice())
.await
.map_err(PreimageServerError::BrokenPipe)?;
let payload = match String::from_utf8(raw_payload) {
Ok(p) => p,
Err(e) => {
// Write back on error to prevent blocking the client.
self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?;

return Err(PreimageServerError::Other(anyhow::anyhow!(
"Failed to decode hint payload: {e}"
)));
}
};

trace!(target: "hint_reader", "Successfully read hint: \"{payload}\"");

// Route the hint
if let Err(e) = hint_router.route_hint(payload).await {
// Write back on error to prevent blocking the client.
self.pipe_handle.write(&[0x00]).await?;
self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?;

error!("Failed to route hint: {e}");
anyhow::bail!("Failed to rout hint: {e}");
return Err(PreimageServerError::Other(e));
}

// Write back an acknowledgement to the client to unblock their process.
self.pipe_handle.write(&[0x00]).await?;
self.pipe_handle.write(&[0x00]).await.map_err(PreimageServerError::BrokenPipe)?;

trace!(target: "hint_reader", "Successfully routed and acknowledged hint");

Expand All @@ -106,7 +118,7 @@ mod test {

use super::*;
use crate::test_utils::bidirectional_pipe;
use alloc::{sync::Arc, vec::Vec};
use alloc::{string::ToString, sync::Arc, vec::Vec};
use kona_common::FileDescriptor;
use std::os::fd::AsRawFd;
use tokio::sync::Mutex;
Expand All @@ -123,6 +135,82 @@ mod test {
}
}

struct TestFailRouter;

#[async_trait]
impl HintRouter for TestFailRouter {
async fn route_hint(&self, _hint: String) -> Result<()> {
anyhow::bail!("Failed to route hint")
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_unblock_on_bad_utf8() {
let mock_data = [0xf0, 0x90, 0x28, 0xbc];

let hint_pipe = bidirectional_pipe().unwrap();

let client = tokio::task::spawn(async move {
let hint_writer = HintWriter::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.client.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.client.write.as_raw_fd() as usize),
));

#[allow(invalid_from_utf8_unchecked)]
hint_writer.write(unsafe { alloc::str::from_utf8_unchecked(&mock_data) }).await
});
let host = tokio::task::spawn(async move {
let router = TestRouter { incoming_hints: Default::default() };

let hint_reader = HintReader::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.host.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.host.write.as_raw_fd() as usize),
));
hint_reader.next_hint(&router).await
});

let (c, h) = tokio::join!(client, host);
c.unwrap().unwrap();
assert!(h.unwrap().is_err_and(|e| {
let PreimageServerError::Other(e) = e else {
return false;
};
e.to_string().contains("Failed to decode hint payload")
}));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_unblock_on_fetch_failure() {
const MOCK_DATA: &str = "test-hint 0xfacade";

let hint_pipe = bidirectional_pipe().unwrap();

let client = tokio::task::spawn(async move {
let hint_writer = HintWriter::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.client.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.client.write.as_raw_fd() as usize),
));

hint_writer.write(MOCK_DATA).await
});
let host = tokio::task::spawn(async move {
let hint_reader = HintReader::new(PipeHandle::new(
FileDescriptor::Wildcard(hint_pipe.host.read.as_raw_fd() as usize),
FileDescriptor::Wildcard(hint_pipe.host.write.as_raw_fd() as usize),
));
hint_reader.next_hint(&TestFailRouter).await
});

let (c, h) = tokio::join!(client, host);
c.unwrap().unwrap();
assert!(h.unwrap().is_err_and(|e| {
let PreimageServerError::Other(e) = e else {
return false;
};
e.to_string() == "Failed to route hint"
}));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_hint_client_and_host() {
const MOCK_DATA: &str = "test-hint 0xfacade";
Expand Down
2 changes: 1 addition & 1 deletion crates/preimage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use pipe::PipeHandle;
mod traits;
pub use traits::{
CommsClient, HintReaderServer, HintRouter, HintWriterClient, PreimageFetcher,
PreimageOracleClient, PreimageOracleServer,
PreimageOracleClient, PreimageOracleServer, PreimageServerError,
};

#[cfg(test)]
Expand Down
19 changes: 11 additions & 8 deletions crates/preimage/src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
traits::PreimageFetcher, PipeHandle, PreimageKey, PreimageOracleClient, PreimageOracleServer,
traits::{PreimageFetcher, PreimageServerError},
PipeHandle, PreimageKey, PreimageOracleClient, PreimageOracleServer,
};
use alloc::{boxed::Box, vec::Vec};
use anyhow::{bail, Result};
Expand Down Expand Up @@ -99,27 +100,27 @@ impl OracleServer {

#[async_trait::async_trait]
impl PreimageOracleServer for OracleServer {
async fn next_preimage_request<F>(&self, fetcher: &F) -> Result<()>
async fn next_preimage_request<F>(&self, fetcher: &F) -> Result<(), PreimageServerError>
where
F: PreimageFetcher + Send + Sync,
{
// Read the preimage request from the client, and throw early if there isn't is any.
let mut buf = [0u8; 32];
self.pipe_handle.read_exact(&mut buf).await?;
let preimage_key = PreimageKey::try_from(buf)?;
self.pipe_handle.read_exact(&mut buf).await.map_err(PreimageServerError::BrokenPipe)?;
let preimage_key = PreimageKey::try_from(buf).map_err(PreimageServerError::Other)?;

trace!(target: "oracle_server", "Fetching preimage for key {preimage_key}");

// Fetch the preimage value from the preimage getter.
let value = fetcher.get_preimage(preimage_key).await?;
let value = fetcher.get_preimage(preimage_key).await.map_err(PreimageServerError::Other)?;

// Write the length as a big-endian u64 followed by the data.
let data = [(value.len() as u64).to_be_bytes().as_ref(), value.as_ref()]
.into_iter()
.flatten()
.copied()
.collect::<Vec<_>>();
self.pipe_handle.write(data.as_slice()).await?;
self.pipe_handle.write(data.as_slice()).await.map_err(PreimageServerError::BrokenPipe)?;

trace!(target: "oracle_server", "Successfully wrote preimage data for key {preimage_key}");

Expand Down Expand Up @@ -188,8 +189,10 @@ mod test {
let test_fetcher = TestFetcher { preimages: Arc::clone(&preimages) };

loop {
if oracle_server.next_preimage_request(&test_fetcher).await.is_err() {
break;
match oracle_server.next_preimage_request(&test_fetcher).await {
Err(PreimageServerError::BrokenPipe(_)) => break,
Err(PreimageServerError::Other(e)) => panic!("Unexpected error: {:?}", e),
Ok(_) => {}
}
}
});
Expand Down
14 changes: 12 additions & 2 deletions crates/preimage/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ pub trait CommsClient: PreimageOracleClient + Clone + HintWriterClient {}
// Implement the super trait for any type that satisfies the bounds
impl<T: PreimageOracleClient + Clone + HintWriterClient> CommsClient for T {}

/// A [PreimageServerError] is an enum that differentiates pipe-related errors from other errors
/// in the [PreimageOracleServer] and [HintReaderServer] implementations.
#[derive(Debug)]
pub enum PreimageServerError {
/// The pipe has been broken.
BrokenPipe(anyhow::Error),
/// Other errors.
Other(anyhow::Error),
}

/// A [PreimageOracleServer] is a high-level interface to accept read requests from the client and
/// write the preimage data to the client pipe.
#[async_trait]
Expand All @@ -52,7 +62,7 @@ pub trait PreimageOracleServer {
/// # Returns
/// - `Ok(())` if the data was successfully written into the client pipe.
/// - `Err(_)` if the data could not be written to the client.
async fn next_preimage_request<F>(&self, get_preimage: &F) -> Result<()>
async fn next_preimage_request<F>(&self, get_preimage: &F) -> Result<(), PreimageServerError>
where
F: PreimageFetcher + Send + Sync;
}
Expand All @@ -67,7 +77,7 @@ pub trait HintReaderServer {
/// - `Ok(())` if the hint was received and the client was notified of the host's
/// acknowledgement.
/// - `Err(_)` if the hint was not received correctly.
async fn next_hint<R>(&self, route_hint: &R) -> Result<()>
async fn next_hint<R>(&self, route_hint: &R) -> Result<(), PreimageServerError>
where
R: HintRouter + Send + Sync;
}
Expand Down