Skip to content

Commit

Permalink
refactor: Revisit some docs, restructure some code
Browse files Browse the repository at this point in the history
  • Loading branch information
Nukesor committed Sep 23, 2024
1 parent e73d2a5 commit cee1cc9
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 169 deletions.
6 changes: 3 additions & 3 deletions pueue/src/daemon/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use pueue_lib::{
use super::state_helper::LockedState;

/// Users can specify a callback that's fired whenever a task finishes.
/// Execute the callback by spawning a new subprocess.
/// The callback is performed by spawning a new subprocess.
pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) {
// Return early, if there's no callback specified
let Some(template_string) = &settings.daemon.callback else {
Expand Down Expand Up @@ -110,8 +110,8 @@ pub fn build_callback_command(
handlebars.render_template(template_string, &parameters)
}

/// Look at all running callbacks and log any errors.
/// If everything went smoothly, simply remove them from the list.
/// Look at all running callbacks and check if they're still running.
/// Handle finished callbacks and log their outcome.
pub fn check_callbacks(state: &mut LockedState) {
let mut finished = Vec::new();
for (id, child) in state.callbacks.iter_mut().enumerate() {
Expand Down
143 changes: 0 additions & 143 deletions pueue/src/daemon/network/follow_log.rs

This file was deleted.

2 changes: 1 addition & 1 deletion pueue/src/daemon/network/message_handler/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) -
// If one is found, we expand the command, otherwise we just take the original command.
// Anyhow, we save this separately and keep the original command in a separate field.
//
// This allows us to have a debug experience and the user can opt to either show the
// This gives us better debugging capabilities and the user can opt to either show the
// original command or the expanded command in their `status` view.
task.command = insert_alias(settings, task.original_command.clone());

Expand Down
140 changes: 140 additions & 0 deletions pueue/src/daemon/network/message_handler/log.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use std::collections::BTreeMap;
use std::io::Read;
use std::path::Path;
use std::time::Duration;

use anyhow::Result;

use pueue_lib::failure_msg;
use pueue_lib::log::read_and_compress_log_file;
use pueue_lib::log::*;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::{send_message, GenericStream};
use pueue_lib::settings::Settings;
use pueue_lib::state::SharedState;

Expand Down Expand Up @@ -49,3 +56,136 @@ pub fn get_log(settings: &Settings, state: &SharedState, message: LogRequestMess
}
Message::LogResponse(tasks)
}

/// Handle the continuous stream of a some log output.
///
/// It's not actually a stream in the sense of a low-level network stream, but rather a series of
/// `Message::Stream` messages, that each send a portion of new log output.
///
/// It's basically our own chunked stream implementation on top of the protocol we established.
pub async fn follow_log(
pueue_directory: &Path,
stream: &mut GenericStream,
state: &SharedState,
message: StreamRequestMessage,
) -> Result<Message> {
// The user can specify the id of the task they want to follow
// If the id isn't specified and there's only a single running task, this task will be used.
// However, if there are multiple running tasks, the user will have to specify an id.
let task_id = if let Some(task_id) = message.task_id {
task_id
} else {
// Get all ids of running tasks
let state = state.lock().unwrap();
let running_ids: Vec<_> = state
.tasks
.iter()
.filter_map(|(&id, t)| if t.is_running() { Some(id) } else { None })
.collect();

// Return a message on "no" or multiple running tasks.
match running_ids.len() {
0 => {
return Ok(create_failure_message("There are no running tasks."));
}
1 => running_ids[0],
_ => {
let running_ids = running_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(", ");
return Ok(create_failure_message(format!(
"Multiple tasks are running, please select one of the following: {running_ids}"
)));
}
}
};

// It might be that the task is not yet running.
// Ensure that it exists and is started.
loop {
{
let state = state.lock().unwrap();
let Some(task) = state.tasks.get(&task_id) else {
return Ok(create_failure_message(
"Pueue: The task to be followed doesn't exist.",
));
};
// The task is running or finished, we can start to follow.
if task.is_running() || task.is_done() {
break;
}
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}

let mut handle = match get_log_file_handle(task_id, pueue_directory) {
Err(_) => {
return Ok(create_failure_message(
"Couldn't find output files for task. Maybe it finished? Try `log`",
))
}
Ok(handle) => handle,
};

// Get the output path.
// We need to check continuously, whether the file still exists,
// since the file can go away (e.g. due to finishing a task).
let path = get_log_path(task_id, pueue_directory);

// If `lines` is passed as an option, we only want to show the last `X` lines.
// To achieve this, we seek the file handle to the start of the `Xth` line
// from the end of the file.
// The loop following this section will then only copy those last lines to stdout.
if let Some(lines) = message.lines {
if let Err(err) = seek_to_last_lines(&mut handle, lines) {
println!("Error seeking to last lines from log: {err}");
}
}

loop {
// Check whether the file still exists. Exit if it doesn't.
if !path.exists() {
return Ok(create_success_message(
"Pueue: Log file has gone away. Has the task been removed?",
));
}
// Read the next chunk of text from the last position.
let mut buffer = Vec::new();

if let Err(err) = handle.read_to_end(&mut buffer) {
return Ok(create_failure_message(format!("Pueue Error: {err}")));
};
let text = String::from_utf8_lossy(&buffer).to_string();

// Only send a message, if there's actual new content.
if !text.is_empty() {
// Send the next chunk.
let response = Message::Stream(text);
send_message(response, stream).await?;
}

// Check if the task in question does:
// 1. Still exist
// 2. Is still running
//
// In case it's not, close the stream.
{
let state = state.lock().unwrap();
let Some(task) = state.tasks.get(&task_id) else {
return Ok(create_failure_message(
"Pueue: The followed task has been removed.",
));
};

// The task is done, just close the stream.
if !task.is_running() {
return Ok(Message::Close);
}
}

// Wait for 1 second before sending the next chunk.
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
2 changes: 2 additions & 0 deletions pueue/src/daemon/network/message_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ mod start;
mod stash;
mod switch;

pub use log::follow_log;

pub fn handle_message(message: Message, state: &SharedState, settings: &Settings) -> Message {
match message {
Message::Add(message) => add::add_task(settings, state, message),
Expand Down
1 change: 0 additions & 1 deletion pueue/src/daemon/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod follow_log;
pub mod message_handler;
pub mod response_helper;
pub mod socket;
18 changes: 14 additions & 4 deletions pueue/src/daemon/network/response_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ pub fn ensure_group_exists<'state>(
)))
}

/// Compile a response for actions that affect several given tasks.
/// These actions can sometimes only succeed for a part of the given tasks.
/// Compile a response for an action that affect several given tasks.
/// That action can sometimes only succeed for a portion of the given tasks.
/// E.g. only running tasks can be killed.
///
/// That's why this helper exists, which determines for which tasks the action succeeded
/// and which tasks failed, based on a given `filter` criterion.
/// That's why this helper exists, which determines for which tasks an action succeeds
/// and which tasks fail, based on a given `filter` criterion.
/// ```rs
/// task_ids = vec![1, 2, 4];
/// task_action_response_helper(
/// "Tasks are being killed",
/// task_ids.clone(),
/// |task| task.is_running(),
/// &state,
/// ),
/// ```
pub fn task_action_response_helper<F>(
message: &str,
task_ids: Vec<usize>,
Expand Down
Loading

0 comments on commit cee1cc9

Please sign in to comment.