Skip to content

Commit

Permalink
Merge pull request #33 from QIUZHILEI/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
genedna authored Oct 10, 2023
2 parents 96c08ec + b99e7be commit d267443
Show file tree
Hide file tree
Showing 41 changed files with 957 additions and 318 deletions.
55 changes: 44 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,27 +1,60 @@
[package]
name = "dagrs"
authors = ["Quanyi Ma <[email protected]>", "Zhilei Qiu <[email protected]>"]
version = "0.2.0"
version = "0.3.0"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "The DAG engine, named dagrs, is designed to execute multiple tasks with graph-like dependencies. It offers high performance and asynchronous execution, providing a convenient programming interface for Rust developers."
readme = "README.md"
repository = "https://github.com/open-rust-initiative/dagrs"
keywords = ["DAG", "task", "async", "parallel", "concurrent"]

[dependencies]
yaml-rust = "0.4.5"
bimap = "0.6.1"
clap = { version = "4.2.2", features = ["derive"] }
anymap2 = "0.13.0"
thiserror = "1.0.30"
tokio = { version = "1.28", features = ["rt", "sync","rt-multi-thread"] }

[dev-dependencies]
log = "0.4"
simplelog = "0.12"

[workspace]
members = ["dagrs_derive","dagrs_core"]


[dependencies]
dagrs_core = {path = "dagrs_core" , version = "0.3.0"}
dagrs_derive ={ path = "dagrs_derive", version = "0.3.0"}

[features]
default = ["logger"]
logger = []
yaml = []
default = ["dagrs_core/logger"]
yaml = ["dagrs_core/yaml"]
derive = ["dagrs_derive/derive"]

[[example]]
name = "custom_log"
required-features = ["yaml"]

[[example]]
name = "custom_parser"
required-features = ["yaml"]

[[example]]
name = "engine"
required-features = ["yaml"]

[[example]]
name = "derive_task"
required-features = ["derive"]

[[example]]
name = "dependencies"
required-features = ["derive"]

[[example]]
name = "yaml_dag"
required-features = ["yaml"]

[[test]]
name = "dag_job_test"
required-features = ["yaml"]

[[test]]
name = "yaml_parser_test"
required-features = ["yaml"]
3 changes: 0 additions & 3 deletions build.rs

This file was deleted.

24 changes: 24 additions & 0 deletions dagrs_core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "dagrs_core"
version = "0.3.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
yaml-rust = "0.4.5"
bimap = "0.6.1"
clap = { version = "4.2.2", features = ["derive"] }
anymap2 = "0.13.0"
thiserror = "1.0.30"
tokio = { version = "1.28", features = ["rt", "sync","rt-multi-thread"] }

[features]
default = ["logger"]
logger = []
yaml = []


[[bin]]
name = "dagrs"
required-features = ["yaml"]
42 changes: 42 additions & 0 deletions dagrs_core/src/bin/dagrs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::collections::HashMap;

use clap::Parser;
use dagrs_core::{log, Dag, LogLevel};

#[derive(Parser, Debug)]
#[command(name = "dagrs", version = "0.2.0")]
struct Args {
/// Log output file, the default is to print to the terminal.
#[arg(long)]
log_path: Option<String>,
/// yaml configuration file path.
#[arg(long)]
yaml: String,
/// Log level, the default is Info.
#[arg(long)]
log_level: Option<String>,
}

fn main() {
let args = Args::parse();
let log_level = args
.log_level
.map_or(LogLevel::Info, |level| match level.as_str() {
"debug" => LogLevel::Debug,
"info" => LogLevel::Info,
"warn" => LogLevel::Warn,
"error" => LogLevel::Error,
"off" => LogLevel::Off,
_ => {
println!("The logging level can only be [debug,info,warn,error,off]");
std::process::abort();
}
});
let _initialized = match args.log_path {
None => log::init_logger(log_level, None),
Some(path) => log::init_logger(log_level, Some(std::fs::File::create(path).unwrap())),
};
let yaml_path = args.yaml;
let mut dag = Dag::with_yaml(yaml_path.as_str(), HashMap::new()).unwrap();
assert!(dag.start().unwrap());
}
54 changes: 31 additions & 23 deletions src/engine/dag.rs → dagrs_core/src/engine/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ use std::{
},
};

use anymap2::any::CloneAnySendSync;
use tokio::task::JoinHandle;
use crate::{
parser::Parser,
task::{ExecState, Input, Task},
utils::{log, EnvVar}, Action,
utils::{log, EnvVar},
Action,
};
use anymap2::any::CloneAnySendSync;
use tokio::task::JoinHandle;

use super::{error::DagError, graph::Graph};

Expand Down Expand Up @@ -100,26 +101,33 @@ impl Dag {

/// Given a yaml configuration file parsing task to generate a dag.
#[cfg(feature = "yaml")]
pub fn with_yaml(file: &str,specific_actions: HashMap<String,Arc<dyn Action+Send+Sync+'static>>) -> Result<Dag, DagError> {
pub fn with_yaml(
file: &str,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
) -> Result<Dag, DagError> {
use crate::YamlParser;
let parser=Box::new(YamlParser);
Dag::read_tasks(file, parser,specific_actions)
let parser = Box::new(YamlParser);
Dag::read_tasks(file, parser, specific_actions)
}

/// Generates a dag with the user given path to a custom parser and task config file.
pub fn with_config_file_and_parser(
file: &str,
parser: Box<dyn Parser>,
specific_actions: HashMap<String,Arc<dyn Action+Send+Sync+'static>>
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
) -> Result<Dag, DagError> {
Dag::read_tasks(file, parser,specific_actions)
Dag::read_tasks(file, parser, specific_actions)
}

/// Parse the content of the configuration file into a series of tasks and generate a dag.
fn read_tasks(file: &str, parser: Box<dyn Parser>,specific_actions: HashMap<String,Arc<dyn Action+Send+Sync+'static>>) -> Result<Dag, DagError> {
fn read_tasks(
file: &str,
parser: Box<dyn Parser>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
) -> Result<Dag, DagError> {
let mut dag = Dag::new();
let tasks = parser.parse_tasks(file,specific_actions)?;
let tasks = parser.parse_tasks(file, specific_actions)?;
tasks.into_iter().for_each(|task| {
dag.tasks.insert(task.id(), Arc::new(task));
});
Expand All @@ -143,7 +151,7 @@ impl Dag {
for (&id, task) in self.tasks.iter() {
let index = self.rely_graph.find_index_by_id(&id).unwrap();

for rely_task_id in task.predecessors() {
for rely_task_id in task.precursors() {
// Rely task existence check
let rely_index = self
.rely_graph
Expand Down Expand Up @@ -247,7 +255,7 @@ impl Dag {
let execute_state = self.execute_states[&task_id].clone();
let task_out_degree = self.rely_graph.get_node_out_degree(&task_id);
let wait_for_input: Vec<Arc<ExecState>> = task
.predecessors()
.precursors()
.iter()
.map(|id| self.execute_states[id].clone())
.collect();
Expand Down Expand Up @@ -277,7 +285,7 @@ impl Dag {
// Store execution results
execute_state.set_output(out);
execute_state.semaphore().add_permits(task_out_degree);
log::info(format!("Task executed successfully. [name: {}]",task_name));
log::info(format!("Task executed successfully. [name: {}]", task_name));
true
}
Err(err) => {
Expand Down Expand Up @@ -328,16 +336,16 @@ impl Dag {
}
}

pub fn get_results<T: CloneAnySendSync + Send + Sync>(&self) -> HashMap<usize,Option<T>> {
let mut hm = HashMap::new();
for (id,state) in &self.execute_states {
let output = match state.get_output() {
Some(ref content) => content.clone().remove(),
None => None,
};
hm.insert(*id,output);
}
hm
pub fn get_results<T: CloneAnySendSync + Send + Sync>(&self) -> HashMap<usize, Option<T>> {
let mut hm = HashMap::new();
for (id, state) in &self.execute_states {
let output = match state.get_output() {
Some(ref content) => content.clone().remove(),
None => None,
};
hm.insert(*id, output);
}
hm
}

/// Before the dag starts executing, set the dag's global environment variable.
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/engine/graph.rs → dagrs_core/src/engine/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Graph {
})
.count();

while let Some(v)=queue.pop() {
while let Some(v) = queue.pop() {
sequence.push(v);
count += 1;

Expand Down
6 changes: 3 additions & 3 deletions src/engine/mod.rs → dagrs_core/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl Engine {

/// Execute all the Dags in the Engine in sequence according to the order numbers of the Dags in
/// the sequence from small to large. The return value is the execution status of all tasks.
pub fn run_sequential(&mut self) ->Vec<bool>{
let mut res=Vec::new();
pub fn run_sequential(&mut self) -> Vec<bool> {
let mut res = Vec::new();
for seq in 1..self.sequence.len() + 1 {
let name = self.sequence.get(&seq).unwrap().clone();
res.push(self.run_dag(name.as_str()));
Expand All @@ -78,7 +78,7 @@ impl Engine {
}

/// Given the name of the Dag, get the execution result of the specified Dag.
pub fn get_dag_result<T: CloneAnySendSync + Send + Sync>(&self,name:&str)->Option<T>{
pub fn get_dag_result<T: CloneAnySendSync + Send + Sync>(&self, name: &str) -> Option<T> {
if self.dags.contains_key(name) {
self.dags.get(name).unwrap().get_result()
} else {
Expand Down
18 changes: 18 additions & 0 deletions dagrs_core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
extern crate anymap2;
extern crate bimap;
extern crate clap;
extern crate proc_macro;
extern crate tokio;
#[cfg(feature = "yaml")]
extern crate yaml_rust;

pub use engine::{Dag, DagError, Engine};
pub use parser::*;
pub use task::{alloc_id, Action, DefaultTask, Input, NopAction, Output, RunningError, Task};
#[cfg(feature = "yaml")]
pub use task::{CommandAction, YamlTask};
pub use utils::{gen_macro, log, EnvVar, LogLevel, Logger};
mod engine;
mod parser;
mod task;
mod utils;
1 change: 0 additions & 1 deletion src/parser/error.rs → dagrs_core/src/parser/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,3 @@ impl From<YamlTaskError> for ParserError {
ParserError::YamlTaskError(value)
}
}

7 changes: 5 additions & 2 deletions src/parser/mod.rs → dagrs_core/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,15 @@ use crate::{task::Task, Action};

mod error;


/// Generic parser traits. If users want to customize the configuration file parser, they must implement this trait.
/// [`YamlParser`] is an example of [`Parser`]
pub trait Parser {
/// Parses the contents of a configuration file into a series of tasks with dependencies.
/// If the user customizes the script execution logic, it is necessary to provide specific
/// types that implement the [`Action`] trait for certain tasks in the form of key-value pairs.
fn parse_tasks(&self, file: &str,specific_actions:HashMap<String,Arc<dyn Action+Send+Sync+'static>>) -> Result<Vec<Box<dyn Task>>, ParserError>;
fn parse_tasks(
&self,
file: &str,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
) -> Result<Vec<Box<dyn Task>>, ParserError>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ impl YamlParser {
/// An item refers to:
///
/// ```yaml
/// name: "Task 1"
/// name: "Task 1"
/// after: [b, c]
/// run:
/// type: sh
/// script: echo a
/// cmd: echo a
/// ```
fn parse_one(
&self,
Expand Down
11 changes: 7 additions & 4 deletions src/task/cmd.rs → dagrs_core/src/task/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct CommandAction {
}

impl CommandAction {
#[allow(unused)]
pub fn new(cmd: &str) -> Self {
Self {
command: cmd.to_owned(),
Expand All @@ -43,13 +44,15 @@ impl Action for CommandAction {
});
let out = match cmd.args(args).output() {
Ok(o) => o,
Err(e) => {
return Err(CmdExecuteError::new(e.to_string()).into())
}
Err(e) => return Err(CmdExecuteError::new(e.to_string()).into()),
};
let code = out.status.code().unwrap_or(-1);
if code == 0 {
Ok(Output::new(String::from_utf8(out.stdout).unwrap()))
let mut out = String::from_utf8(out.stdout).unwrap();
if cfg!(target_os = "windows") {
out = out.replace("\r\n", " ");
}
Ok(Output::new(out))
} else {
let err_msg = String::from_utf8(out.stderr).unwrap();
log::error(err_msg.clone());
Expand Down
Loading

0 comments on commit d267443

Please sign in to comment.