Skip to content

Commit

Permalink
Merge pull request #36 from aminya/life-times2
Browse files Browse the repository at this point in the history
feat: relax the lifetime constraints on the actions
  • Loading branch information
genedna authored Nov 15, 2023
2 parents d267443 + f9016ad commit 2803988
Show file tree
Hide file tree
Showing 15 changed files with 98 additions and 53 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Cargo.lock

# IDE
.idea
.vscode
/.vscode/*
!/.vscode/settings.json

# Exclude execute log
/*.log
/*.log
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"rust-analyzer.cargo.features": "all"
}
61 changes: 33 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ Users need to program to implement the `Action` trait to define the specific log
- `predecessor_tasks`: the predecessor tasks of this task
- `action`: is a dynamic type that implements the Action trait in user programming, and it is the specific logic to be executed by the task


Here is the `examples/impl_action.rs` example:

```rust
//! Implement the Action trait to define the task logic.

use std::sync::Arc;
use dagrs::{
Action,
Dag, DefaultTask, EnvVar,log, Input, Output, RunningError,LogLevel
};
use dagrs::{log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError};

struct SimpleAction(usize);

/// Implement the `Action` trait for `SimpleAction`, defining the logic of the `run` function.
/// The logic here is simply to get the output value (`usize`) of all predecessor tasks and then accumulate.
impl Action for SimpleAction{
fn run(&self, input: Input,env:Arc<EnvVar>) -> Result<Output,RunningError> {
/// The logic here is simply to get the output value (usize) of all predecessor tasks and then accumulate.
impl Action for SimpleAction {
fn run(&self, input: Input, env: Arc<EnvVar>) -> Result<Output, RunningError> {
let base = env.get::<usize>("base").unwrap();
let mut sum = self.0;
input
Expand All @@ -76,27 +79,29 @@ impl Action for SimpleAction{
}
}

// Initialize the global logger
let _initialized = log::init_logger(LogLevel::Info,None);
// Generate four tasks.
let a= DefaultTask::new(SimpleAction(10),"Task a");
let mut b=DefaultTask::new(SimpleAction(20),"Task b");
let mut c=DefaultTask::new(SimpleAction(30),"Task c");
let mut d=DefaultTask::new(SimpleAction(40),"Task d");
// Set the precursor for each task.
b.set_predecessors(&[&a]);
c.set_predecessors(&[&a]);
d.set_predecessors(&[&b,&c]);
// Take these four tasks as a Dag.
let mut dag=Dag::with_tasks(vec![a,b,c,d]);
// Set a global environment variable for this dag.
let mut env = EnvVar::new();
env.set("base", 2usize);
dag.set_env(env);
// Begin execution.
assert!(dag.start().unwrap());
// Get execution result
assert_eq!(dag.get_result::<usize>().unwrap(),220);
fn main() {
// Initialize the global logger
let _initialized = log::init_logger(LogLevel::Info, None);
// Generate four tasks.
let a = DefaultTask::new(SimpleAction(10), "Task a");
let mut b = DefaultTask::new(SimpleAction(20), "Task b");
let mut c = DefaultTask::new(SimpleAction(30), "Task c");
let mut d = DefaultTask::new(SimpleAction(40), "Task d");
// Set the precursor for each task.
b.set_predecessors(&[&a]);
c.set_predecessors(&[&a]);
d.set_predecessors(&[&b, &c]);
// Take these four tasks as a Dag.
let mut dag = Dag::with_tasks(vec![a, b, c, d]);
// Set a global environment variable for this dag.
let mut env = EnvVar::new();
env.set("base", 2usize);
dag.set_env(env);
// Begin execution.
assert!(dag.start().unwrap());
// Get execution result
assert_eq!(dag.get_result::<usize>().unwrap(), 220);
}
```

**explain:**
Expand Down
3 changes: 2 additions & 1 deletion dagrs_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "dagrs_core"
version = "0.3.0"
edition = "2021"
license = "MIT OR Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -21,4 +22,4 @@ yaml = []

[[bin]]
name = "dagrs"
required-features = ["yaml"]
required-features = ["yaml"]
8 changes: 4 additions & 4 deletions dagrs_core/src/engine/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Dag {
}

/// Create a dag by adding a series of tasks.
pub fn with_tasks(tasks: Vec<impl Task + 'static>) -> Dag {
pub fn with_tasks(tasks: Vec<impl Task + 'static>) -> Self {
let mut dag = Dag::new();
tasks.into_iter().for_each(|task| {
let task = Box::new(task) as Box<dyn Task>;
Expand All @@ -103,7 +103,7 @@ impl Dag {
#[cfg(feature = "yaml")]
pub fn with_yaml(
file: &str,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Dag, DagError> {
use crate::YamlParser;
let parser = Box::new(YamlParser);
Expand All @@ -114,7 +114,7 @@ impl Dag {
pub fn with_config_file_and_parser(
file: &str,
parser: Box<dyn Parser>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Dag, DagError> {
Dag::read_tasks(file, parser, specific_actions)
}
Expand All @@ -124,7 +124,7 @@ impl Dag {
fn read_tasks(
file: &str,
parser: Box<dyn Parser>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Dag, DagError> {
let mut dag = Dag::new();
let tasks = parser.parse_tasks(file, specific_actions)?;
Expand Down
2 changes: 1 addition & 1 deletion dagrs_core/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ pub trait Parser {
fn parse_tasks(
&self,
file: &str,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Vec<Box<dyn Task>>, ParserError>;
}
6 changes: 3 additions & 3 deletions dagrs_core/src/parser/yaml_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl YamlParser {
&self,
id: &str,
item: &Yaml,
specific_action: Option<Arc<dyn Action + Send + Sync + 'static>>,
specific_action: Option<Arc<dyn Action + Send + Sync>>,
) -> Result<YamlTask, YamlTaskError> {
// Get name first
let name = item["name"]
Expand All @@ -63,7 +63,7 @@ impl YamlParser {
id,
precursors,
name,
Arc::new(CommandAction::new(cmd)) as Arc<dyn Action + Send + Sync + 'static>,
Arc::new(CommandAction::new(cmd)) as Arc<dyn Action + Send + Sync >,
))
}
}
Expand All @@ -73,7 +73,7 @@ impl Parser for YamlParser {
fn parse_tasks(
&self,
file: &str,
mut specific_actions: HashMap<String, Arc<dyn Action + Send + Sync + 'static>>,
mut specific_actions: HashMap<String, Arc<dyn Action + Send + Sync>>,
) -> Result<Vec<Box<dyn Task>>, ParserError> {
let content = self.load_file(file)?;
// Parse Yaml
Expand Down
33 changes: 33 additions & 0 deletions dagrs_core/src/task/default_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,39 @@ impl DefaultTask {
}
}

/// Allocate a new [`DefaultTask`] from any action and name.
/// The specific task behavior is a structure that implements [`Action`].
///
/// # Example
///
/// ```rust
/// use dagrs::{DefaultTask, Output,Input, Action,EnvVar,RunningError};
/// use std::sync::Arc;
/// struct SimpleAction(usize);
///
/// impl Action for SimpleAction {
/// fn run(&self, input: Input, env: Arc<EnvVar>) -> Result<Output,RunningError> {
/// Ok(Output::new(self.0 + 10))
/// }
/// }
///
/// let action = Arc::new(SimpleAction(10));
/// let task = DefaultTask::from(action, String::from("Increment action"));
/// ```
///
/// `SimpleAction` is a struct that impl [`Action`]. Since task will be
/// executed in separated threads, [`Send`] and [`Sync`] is needed.
///
/// **Note:** This method will take the ownership of struct that impl [`Action`].
pub fn from(action: Arc<dyn Action + Send + Sync>, name: String) -> Self {
DefaultTask {
id: ID_ALLOCATOR.alloc(),
action,
name,
precursors: Vec::new(),
}
}

/// Tasks that shall be executed before this one.
///
/// # Example
Expand Down
2 changes: 1 addition & 1 deletion dagrs_core/src/task/yaml_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl YamlTask {
yaml_id: &str,
precursors: Vec<String>,
name: String,
action: Arc<dyn Action + Send + Sync + 'static>,
action: Arc<dyn Action + Send + Sync>,
) -> Self {
Self {
yid: yaml_id.to_owned(),
Expand Down
1 change: 1 addition & 0 deletions dagrs_derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "dagrs_derive"
version = "0.3.0"
edition = "2021"
license = "MIT OR Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
12 changes: 6 additions & 6 deletions examples/custom_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ impl MyTask {
txt_id: &str,
precursors: Vec<String>,
name: String,
action: impl Action + Send + Sync + 'static,
action: Arc<dyn Action + Send + Sync>,
) -> Self {
Self {
tid: (txt_id.to_owned(), dagrs::alloc_id()),
name,
precursors,
precursors_id: Vec::new(),
action: Arc::new(action),
action,
}
}

Expand Down Expand Up @@ -92,19 +92,19 @@ impl ConfigParser {
}

fn parse_one(&self, item: String) -> MyTask {
let attr: Vec<&str> = item.split(",").collect();
let attr: Vec<&str> = item.split(',').collect();

let pres_item = *attr.get(2).unwrap();
let pres = if pres_item.eq("") {
Vec::new()
} else {
pres_item.split(" ").map(|pre| pre.to_string()).collect()
pres_item.split(' ').map(|pre| pre.to_string()).collect()
};

let id = *attr.get(0).unwrap();
let id = *attr.first().unwrap();
let name = attr.get(1).unwrap().to_string();
let cmd = *attr.get(3).unwrap();
MyTask::new(id, pres, name, CommandAction::new(cmd))
MyTask::new(id, pres, name, Arc::new(CommandAction::new(cmd)))
}
}

Expand Down
6 changes: 3 additions & 3 deletions examples/custom_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ struct MyTask {
}

impl MyTask {
pub fn new(action: impl Action + 'static + Send + Sync, name: &str) -> Self {
pub fn new(action: Arc<dyn Action + Send + Sync>, name: &str) -> Self {
MyTask {
id: alloc_id(),
action: Arc::new(action),
action,
name: name.to_owned(),
predecessor_tasks: Vec::new(),
}
Expand Down Expand Up @@ -59,7 +59,7 @@ macro_rules! generate_task {
Ok(Output::new(sum))
}
}
MyTask::new($action($val), $name)
MyTask::new(Arc::new($action($val)), $name)
}};
}

Expand Down
2 changes: 1 addition & 1 deletion examples/dependencies.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dagrs_core::{
log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError, Task,
log, DefaultTask, EnvVar, LogLevel,
};
use dagrs_derive::dependencies;

Expand Down
4 changes: 2 additions & 2 deletions examples/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
extern crate dagrs;

use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap};

use dagrs::{
gen_task, log, Action, Dag, DefaultTask, Engine, EnvVar, Input, LogLevel, Output, RunningError,
gen_task, log, Dag, DefaultTask, Engine, LogLevel,
};
fn main() {
// initialization log.
Expand Down
3 changes: 2 additions & 1 deletion tests/dag_job_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Some tests of the dag engine.
use std::{collections::HashMap, sync::Arc};

///! Some tests of the dag engine.
use dagrs::{
log, Action, Dag, DagError, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError,
};
Expand Down

0 comments on commit 2803988

Please sign in to comment.