Skip to content
This repository has been archived by the owner on Aug 31, 2023. It is now read-only.

Commit

Permalink
add additional documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
leops committed Sep 7, 2022
1 parent 9a9f7dc commit a1f491d
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 82 deletions.
200 changes: 118 additions & 82 deletions crates/rome_cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tokio::{
},
runtime::Runtime,
sync::{
mpsc::{channel, Sender},
mpsc::{channel, Receiver, Sender},
oneshot,
},
time::sleep,
Expand Down Expand Up @@ -68,109 +68,69 @@ type JsonRpcResult = Result<Box<RawValue>, TransportError>;
/// Implementation of [WorkspaceTransport] for types implementing [AsyncRead]
/// and [AsyncWrite]
///
/// This implementation makes use of two "background tasks":
/// - the "write task" pulls outgoing messages from the "write channel" and
/// This structs holds an instance of the `tokio` runtime, as well as the
/// following fields:
/// - `write_send` is a sender handle to the "write channel", an MPSC channel
/// that's used to queue up requests to be sent to the server (for simplicity
/// the requests are pushed to the channel as serialized byte buffers)
/// - `pending_requests` is handle to a shared hashmap where the keys are `u64`
/// corresponding to request IDs, and the values are sender handles to oneshot
/// channel instances that can be consumed to fullfill the associated request
///
/// Creating a new `SocketTransport` instance requires providing a `tokio`
/// runtime instance as well as the "read half" and "write half" of the socket
/// object to be used by this transport instance. These two objects implement
/// [AsyncRead] and [AsyncWrite] respectively, and should generally map to the
/// same underlying I/O object but are represented as separate so they can be
/// used concurrently
///
/// This concurrent handling of I/O is implemented useing two "background tasks":
/// - the [write_task] pulls outgoing messages from the "write channel" and
/// writes them to the "write half" of the socket
/// - the "read task" reads incoming messages from the "read half" of the
/// - the [read_task] reads incoming messages from the "read half" of the
/// socket, then looks up a request with an ID corresponding to the received
/// message in the "pending requests" map. If a pending request is found, it's
/// fullfilled with the content of the message that was just received
///
/// In addition to these, a new "foreground task" is created for each request.
/// These tasks create a "oneshot channel" and store it in the pending requests
/// map using the request ID as a key, then serialize the content of the
/// request and send it over the write channel. Finally, the task blocks the
/// current thread until a response is received over the oneshot channel from
/// the read task, or the request times out
/// Each foreground task creates a oneshot channel and stores it in the pending
/// requests map using the request ID as a key, then serialize the content of
/// the request and send it over the write channel. Finally, the task blocks
/// the current thread until a response is received over the oneshot channel
/// from the read task, or the request times out
pub struct SocketTransport {
runtime: Runtime,
write_send: Sender<Vec<u8>>,
pending_requests: Arc<DashMap<u64, oneshot::Sender<JsonRpcResult>>>,
pending_requests: PendingRequests,
}

type PendingRequests = Arc<DashMap<u64, oneshot::Sender<JsonRpcResult>>>;

impl SocketTransport {
pub fn open<R, W>(runtime: Runtime, socket_read: R, socket_write: W) -> Self
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let (write_send, mut write_recv) = channel(512);
/// Capacity of the "write channel", once this many requests have been
/// queued up calls to `write_send.send` will block the sending task
/// until enough capacity is available again
///
/// Note that this does not limit how many requests can be in flight at
/// a given time, it only serves as a loose rate-limit on how many new
/// requests can be sent to the server within a given time frame
const WRITE_CHANNEL_CAPACITY: usize = 16;

let (write_send, write_recv) = channel(WRITE_CHANNEL_CAPACITY);

let pending_requests: Arc<DashMap<u64, oneshot::Sender<JsonRpcResult>>> = Arc::default();
let pending_requests_2 = Arc::clone(&pending_requests);

let mut socket_read = BufReader::new(socket_read);
let mut socket_write = BufWriter::new(socket_write);

runtime.spawn(async move {
while let Some(message) = write_recv.recv().await {
if let Err(err) = write_message(&mut socket_write, message).await {
eprintln!(
"{:?}",
err.context("remote connection read task exited with an error")
);
break;
}
}
});

runtime.spawn(async move {
loop {
let message = read_message(&mut socket_read).await;
let message = match message {
Ok(message) => {
let response = from_slice(&message).with_context(|| {
if let Ok(message) = from_utf8(&message) {
format!("failed to deserialize JSON-RPC response from {message:?}")
} else {
format!("failed to deserialize JSON-RPC response from {message:?}")
}
});

response.map(|response| (message, response))
}
Err(err) => Err(err),
};

let (message, response): (_, JsonRpcResponse) = match message {
Ok(message) => message,
Err(err) => {
eprintln!(
"{:?}",
err.context("remote connection write task exited with an error")
);
break;
}
};

if let Some((_, channel)) = pending_requests.remove(&response.id) {
let response = match (response.result, response.error) {
(Some(result), None) => Ok(result),
(None, Some(err)) => Err(TransportError::RPCError(err.message)),

// Both result and error will be None if the request
// returns a null-ish result, in this case create a
// "null" RawValue as the result
//
// SAFETY: Calling `to_raw_value` with a static "null"
// JSON Value will always succeed
(None, None) => Ok(to_raw_value(&Value::Null).unwrap()),

_ => {
let message = if let Ok(message) = from_utf8(&message) {
format!("invalid response {message:?}")
} else {
format!("invalid response {message:?}")
};

Err(TransportError::SerdeError(message))
}
};
let socket_read = BufReader::new(socket_read);
let socket_write = BufWriter::new(socket_write);

channel.send(response).ok();
}
}
});
runtime.spawn(write_task(write_recv, socket_write));
runtime.spawn(read_task(socket_read, pending_requests));

Self {
runtime,
Expand Down Expand Up @@ -239,6 +199,67 @@ impl WorkspaceTransport for SocketTransport {
}
}

async fn read_task<R>(mut socket_read: BufReader<R>, pending_requests: PendingRequests)
where
R: AsyncRead + Unpin,
{
loop {
let message = read_message(&mut socket_read).await;
let message = match message {
Ok(message) => {
let response = from_slice(&message).with_context(|| {
if let Ok(message) = from_utf8(&message) {
format!("failed to deserialize JSON-RPC response from {message:?}")
} else {
format!("failed to deserialize JSON-RPC response from {message:?}")
}
});

response.map(|response| (message, response))
}
Err(err) => Err(err),
};

let (message, response): (_, JsonRpcResponse) = match message {
Ok(message) => message,
Err(err) => {
eprintln!(
"{:?}",
err.context("remote connection write task exited with an error")
);
break;
}
};

if let Some((_, channel)) = pending_requests.remove(&response.id) {
let response = match (response.result, response.error) {
(Some(result), None) => Ok(result),
(None, Some(err)) => Err(TransportError::RPCError(err.message)),

// Both result and error will be None if the request
// returns a null-ish result, in this case create a
// "null" RawValue as the result
//
// SAFETY: Calling `to_raw_value` with a static "null"
// JSON Value will always succeed
(None, None) => Ok(to_raw_value(&Value::Null).unwrap()),

_ => {
let message = if let Ok(message) = from_utf8(&message) {
format!("invalid response {message:?}")
} else {
format!("invalid response {message:?}")
};

Err(TransportError::SerdeError(message))
}
};

channel.send(response).ok();
}
}
}

async fn read_message<R>(mut socket_read: R) -> Result<Vec<u8>, Error>
where
R: AsyncBufRead + Unpin,
Expand Down Expand Up @@ -298,6 +319,21 @@ where
Ok(result)
}

async fn write_task<W>(mut write_recv: Receiver<Vec<u8>>, mut socket_write: BufWriter<W>)
where
W: AsyncWrite + Unpin,
{
while let Some(message) = write_recv.recv().await {
if let Err(err) = write_message(&mut socket_write, message).await {
eprintln!(
"{:?}",
err.context("remote connection read task exited with an error")
);
break;
}
}
}

async fn write_message<W>(mut socket_write: W, message: Vec<u8>) -> Result<(), Error>
where
W: AsyncWrite + Unpin,
Expand Down
4 changes: 4 additions & 0 deletions crates/rome_lsp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ macro_rules! workspace_method {
});

result.map(move |result| {
// The type of `result` is `Result<Result<R, RomeError>, JoinError>`,
// where the inner result is the return value of `$method` while the
// outer one is added by `spawn_blocking` to catch panics or
// cancellations of the task
match result {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(into_lsp_error(err)),
Expand Down

0 comments on commit a1f491d

Please sign in to comment.