Skip to content

Commit

Permalink
Merge pull request #15 from QIUZHILEI/main
Browse files Browse the repository at this point in the history
Fix the failure to find the Sh script execution failure
  • Loading branch information
genedna authored Jun 22, 2023
2 parents edbcd6f + 2360368 commit b38ebcb
Show file tree
Hide file tree
Showing 23 changed files with 76 additions and 72 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ dagrs provides users with two basic task execution methods:
name: "Task 8"
run:
type: sh
script: tests/config/sh_script.sh
script: echo h
```
These yaml-defined task items form a complex dependency graph. In the yaml configuration file:
Expand Down Expand Up @@ -250,7 +250,7 @@ Executing Task[name: Task 1]
- If the result of the predecessor task can be obtained, check the continuation status`can_continue`, if it is true, continue to execute the defined logic, if it is false, trigger`handle_error`, and cancel the execution of the subsequent task.
- After all tasks are executed, set the continuation status to false, which means that the tasks of the dag cannot be scheduled for execution again.

![image-20230621223120581](assets/execute logic.png)
![image-20230621223120581](assets/execute_logic.png)

## The examples

Expand Down
File renamed without changes
4 changes: 2 additions & 2 deletions examples/custom_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{fs, sync::Arc};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};

use dagrs::{Action, Dag, ID_ALLOCATOR, log,LogLevel, JavaScript, Parser, ParserError, ShScript, Task};
use dagrs::{Action, Dag, log,LogLevel, JavaScript, Parser, ParserError, ShScript, Task};

struct MyTask {
tid: (String, usize),
Expand All @@ -36,7 +36,7 @@ impl MyTask {
action: impl Action + Send + Sync + 'static,
) -> Self {
Self {
tid: (txt_id.to_owned(), ID_ALLOCATOR.alloc()),
tid: (txt_id.to_owned(), dagrs::alloc_id()),
name,
precursors,
precursors_id: Vec::new(),
Expand Down
4 changes: 2 additions & 2 deletions examples/custom_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::sync::Arc;

use dagrs::{log, Action, Dag, EnvVar, Input, LogLevel, Output, RunningError, Task, ID_ALLOCATOR};
use dagrs::{log, Action, Dag, EnvVar, Input, LogLevel, Output, RunningError, Task,alloc_id};

struct MyTask {
id: usize,
Expand All @@ -15,7 +15,7 @@ struct MyTask {
impl MyTask {
pub fn new(action: impl Action + 'static + Send + Sync, name: &str) -> Self {
MyTask {
id: ID_ALLOCATOR.alloc(),
id: alloc_id(),
action: Arc::new(action),
name: name.to_owned(),
predecessor_tasks: Vec::new(),
Expand Down
2 changes: 1 addition & 1 deletion src/engine/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{

use super::{error::DagError, graph::Graph};

/// dagrs's function is wrapped in Dag struct.
/// A Dag represents a set of tasks. Use it to build a multitasking Dag.
#[derive(Debug)]
pub struct Dag {
/// Store all tasks' infos.
Expand Down
20 changes: 10 additions & 10 deletions src/engine/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use bimap::BiMap;

#[derive(Debug)]
/// Graph Struct
pub struct Graph {
pub(crate) struct Graph {
size: usize,
/// Record node id and it's index <id,index>
nodes: BiMap<usize, usize>,
Expand All @@ -37,7 +37,7 @@ pub struct Graph {

impl Graph {
/// Allocate an empty graph
pub fn new() -> Graph {
pub(crate) fn new() -> Graph {
Graph {
size: 0,
nodes: BiMap::new(),
Expand All @@ -47,7 +47,7 @@ impl Graph {
}

/// Set graph size, size is the number of tasks
pub fn set_graph_size(&mut self, size: usize) {
pub(crate) fn set_graph_size(&mut self, size: usize) {
self.size = size;
self.adj.resize(size, Vec::new());
self.in_degree.resize(size, 0)
Expand All @@ -57,26 +57,26 @@ impl Graph {
/// This operation will create a mapping between ID and its index.
/// **Note:** `id` won't get repeated in dagrs,
/// since yaml parser will overwrite its info if a task's ID repeats.
pub fn add_node(&mut self, id: usize) {
pub(crate) fn add_node(&mut self, id: usize) {
let index = self.nodes.len();
self.nodes.insert(id, index);
}

/// Add an edge into the graph.
/// Above operation adds a arrow from node 0 to node 1,
/// which means task 0 shall be executed before task 1.
pub fn add_edge(&mut self, v: usize, w: usize) {
pub(crate) fn add_edge(&mut self, v: usize, w: usize) {
self.adj[v].push(w);
self.in_degree[w] += 1;
}

/// Find a task's index by its ID
pub fn find_index_by_id(&self, id: &usize) -> Option<usize> {
pub(crate) fn find_index_by_id(&self, id: &usize) -> Option<usize> {
self.nodes.get_by_left(id).map(|i| i.to_owned())
}

/// Find a task's ID by its index
pub fn find_id_by_index(&self, index: usize) -> Option<usize> {
pub(crate) fn find_id_by_index(&self, index: usize) -> Option<usize> {
self.nodes.get_by_right(&index).map(|n| n.to_owned())
}

Expand All @@ -98,7 +98,7 @@ impl Graph {
///
/// 4. Just repeat step 2, 3 until no more zero degree nodes can be generated.
/// If all tasks have been executed, then it's a DAG, or there must be a loop in the graph.
pub fn topo_sort(&self) -> Option<Vec<usize>> {
pub(crate) fn topo_sort(&self) -> Option<Vec<usize>> {
let mut queue = Vec::new();
let mut in_degree = self.in_degree.clone();
let mut count = 0;
Expand Down Expand Up @@ -139,7 +139,7 @@ impl Graph {
}

/// Get the out degree of a node.
pub fn get_node_out_degree(&self, id: &usize) -> usize {
pub(crate) fn get_node_out_degree(&self, id: &usize) -> usize {
match self.nodes.get_by_left(id) {
Some(index) => self.adj[*index].len(),
None => 0,
Expand All @@ -156,4 +156,4 @@ impl Default for Graph {
in_degree: Vec::new(),
}
}
}
}
1 change: 0 additions & 1 deletion src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
pub use dag::Dag;
pub use error::DagError;
pub use graph::Graph;

mod dag;
mod error;
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ extern crate clap;
extern crate deno_core;
extern crate yaml_rust;

pub use engine::{Dag, DagError, Engine, Graph};
pub use engine::{Dag, DagError, Engine};
pub use parser::*;
pub use task::{Action, Content, DefaultTask, ID_ALLOCATOR, IDAllocator, Input, JavaScript, Output, RunningError, ShScript, Task, YamlTask};
pub use task::{Action, DefaultTask, alloc_id, Input, JavaScript, Output, RunningError, ShScript, Task, YamlTask};
pub use utils::{EnvVar, gen_macro,LogLevel,Logger,log};

mod engine;
Expand Down
1 change: 1 addition & 0 deletions src/parser/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use thiserror::Error;

/// Errors that may occur while parsing task configuration files.
#[derive(Debug, Error)]
pub enum ParserError {
/// Configuration file not found.
Expand Down
2 changes: 1 addition & 1 deletion src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
//! name: "Task 8"
//! run:
//! type: sh
//! script: sh_script.sh
//! script: echo h
//! ```
//!
//! Currently yaml configuration files support two types of tasks, sh and javascript.
Expand Down
1 change: 1 addition & 0 deletions src/parser/yaml_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::{
Parser,
};

/// An implementation of [`Parser`]. It is the default yaml configuration file parser.
pub struct YamlParser;

impl YamlParser {
Expand Down
8 changes: 3 additions & 5 deletions src/task/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ pub struct RunningError {
#[derive(Error, Debug)]
pub struct ShExecuteError {
msg: String,
#[source]
err: std::io::Error,
}

/// Javascript script produces incorrect behavior when run.
Expand Down Expand Up @@ -49,14 +47,14 @@ impl Display for RunningError {
}

impl ShExecuteError {
pub fn new(msg: String, err: std::io::Error) -> Self {
Self { msg, err }
pub fn new(msg: String) -> Self {
Self { msg }
}
}

impl Display for ShExecuteError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "sh script execution error: {}\n,{}", self.msg, self.err)
write!(f, "sh script execution error: {}", self.msg)
}
}

Expand Down
38 changes: 22 additions & 16 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@
//! These two task types both implement the [`Task`] trait, that is to say, users can also
//! customize tasks and assign more functions and attributes to tasks. However, a task must
//! have four fixed properties (the four standard properties contained in DefaultTask):
//! - id: use [`ID_ALLOCATOR`] to get a global task unique identifier, the type must be usize
//! - name: the task name specified by the user, the type must be String
//! - predecessor_tasks: the predecessor task of this task, the type must be Vec<usize>
//! - action: the specific behavior to be performed by the task, the type must be Arc<dyn Action + Send + Sync>
//! - id: use [`ID_ALLOCATOR`] to get a global task unique identifier, the type must be `usize`
//! - name: the task name specified by the user, the type must be `String`
//! - predecessor_tasks: the predecessor task of this task, the type must be `Vec<usize>`
//! - action: the specific behavior to be performed by the task, the type must be `Arc<dyn Action + Send + Sync>`
//!
//! If users want to customize Task, they can refer to the implementation of these two specific [`Task`].
Expand All @@ -116,10 +116,11 @@ use std::sync::atomic::AtomicUsize;

use crate::utils::EnvVar;

pub use self::error::*;
pub use self::script::*;
pub use self::specific_task::*;
pub use self::state::*;
pub use self::error::{RunningError,JavaScriptExecuteError,ShExecuteError};
pub use self::script::{JavaScript,ShScript};
pub use self::specific_task::{YamlTask};
pub use self::state::{Output,Input};
pub(crate) use self::state::ExecState;

mod error;
mod script;
Expand All @@ -140,7 +141,6 @@ pub trait Action {
///
/// A task must provide methods to obtain precursors and required attributes, just as
/// the methods defined below, users who want to customize tasks must implement these methods.
pub trait Task {
/// Get a reference to an executable action.
fn action(&self) -> Arc<dyn Action + Send + Sync>;
Expand All @@ -158,7 +158,7 @@ impl Debug for dyn Task {
}
}

/// Default implementation of abstract tasks.
/// A default implementation of the Task trait. In general, use it to define the tasks of dagrs.
pub struct DefaultTask {
/// id is the unique identifier of each task, it will be assigned by the global [`IDAllocator`]
/// when creating a new task, you can find this task through this identifier.
Expand All @@ -172,7 +172,7 @@ pub struct DefaultTask {
}

impl DefaultTask {
/// Allocate a new [`DefaultTask`], the specific task behavior is a structure that implements [`SimpleRunner`].
/// Allocate a new [`DefaultTask`], the specific task behavior is a structure that implements [`Action`].
///
/// # Example
///
Expand All @@ -191,10 +191,10 @@ impl DefaultTask {
/// let task = DefaultTask::new(action, "Increment action");
/// ```
///
/// `Action` is a struct that impl [`SimpleAction`]. Since task will be
/// `SimpleAction` is a struct that impl [`Action`]. Since task will be
/// executed in separated threads, [`Send`] and [`Sync`] is needed.
///
/// **Note:** This method will take the ownership of struct that impl [`SimpleAction`].
/// **Note:** This method will take the ownership of struct that impl [`Action`].
pub fn new(action: impl Action + 'static + Send + Sync, name: &str) -> Self {
DefaultTask {
id: ID_ALLOCATOR.alloc(),
Expand Down Expand Up @@ -251,12 +251,12 @@ impl Task for DefaultTask {
}

/// IDAllocator for DefaultTask
pub struct IDAllocator {
struct IDAllocator {
id: AtomicUsize,
}

impl IDAllocator {
pub fn alloc(&self) -> usize {
fn alloc(&self) -> usize {
let origin = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if origin > self.id.load(std::sync::atomic::Ordering::Relaxed) {
panic!("Too many tasks.")
Expand All @@ -266,6 +266,12 @@ impl IDAllocator {
}
}

pub static ID_ALLOCATOR: IDAllocator = IDAllocator {
/// The global task uniquely identifies an instance of the allocator.
static ID_ALLOCATOR: IDAllocator = IDAllocator {
id: AtomicUsize::new(1),
};

/// public function to assign task's id.
pub fn alloc_id()->usize{
ID_ALLOCATOR.alloc()
}
43 changes: 21 additions & 22 deletions src/task/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
use std::{process::Command, sync::Arc};

use deno_core::{JsRuntime, RuntimeOptions, serde_json, serde_v8, v8};
use deno_core::{serde_json, serde_v8, v8, JsRuntime, RuntimeOptions};

use crate::utils::EnvVar;
use crate::{log, utils::EnvVar};

use super::{Action, Input, JavaScriptExecuteError, Output, RunningError, ShExecuteError};

/// Can be used to run a script cmd or file.
/// Can be used to run a sh script.
pub struct ShScript {
script: String,
}

/// Can be used to execute javascript scripts.
pub struct JavaScript {
script: String,
}
Expand All @@ -32,26 +33,24 @@ impl ShScript {

impl Action for ShScript {
fn run(&self, input: Input, _env: Arc<EnvVar>) -> Result<Output, RunningError> {
let mut cmd = format!("{} ", self.script);
input
let args: Vec<String> = input
.get_iter()
.for_each(|input| {
if let Some(arg) = input.get::<String>() {
cmd.push_str(arg)
}
});
match Command::new("sh")
.map(|input| input.get::<String>())
.filter(|input| input.is_some())
.map(|input| input.unwrap().clone())
.collect();
let out = Command::new("sh")
.arg("-c")
.arg(&cmd)
.arg(&self.script)
.args(args)
.output()
.map(|output| String::from_utf8(output.stdout).unwrap())
{
Ok(res) => Ok(Output::new(res)),
Err(err) => {
let e = ShExecuteError::new(err.to_string(), err);
// error!("sh task execution failed! {}", e);
Err(e.into())
}
.unwrap();
if !out.stderr.is_empty() {
let err_msg = String::from_utf8(out.stderr).unwrap();
log::error(err_msg.clone());
Err(ShExecuteError::new(err_msg).into())
} else {
Ok(Output::new(String::from_utf8(out.stdout).unwrap()))
}
}
}
Expand All @@ -78,14 +77,14 @@ impl Action for JavaScript {
Ok(value) => Ok(Output::new(value.to_string())),
Err(err) => {
let e = JavaScriptExecuteError::SerializeError(err);
//error!("JavaScript script task execution failed! {}", e);
log::error(format!("JavaScript script task execution failed! {}", e));
Err(e.into())
}
}
}
Err(err) => {
let e = JavaScriptExecuteError::AnyHowError(err);
// error!("JavaScript script task parsing failed! {}", e);
log::error(format!("JavaScript script task parsing failed! {}", e));
Err(e.into())
}
}
Expand Down
Loading

0 comments on commit b38ebcb

Please sign in to comment.