Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation, code and comments #37

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .vscode/settings.json

This file was deleted.

31 changes: 17 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,33 @@ readme = "README.md"
repository = "https://github.com/open-rust-initiative/dagrs"
keywords = ["DAG", "task", "async", "parallel", "concurrent"]

[workspace]
members = ["derive","."]

[dependencies]
yaml-rust = "0.4.5"
bimap = "0.6.1"
clap = { version = "4.2.2", features = ["derive"] }
anymap2 = "0.13.0"
tokio = { version = "1.28", features = ["rt", "sync","rt-multi-thread"] }
derive ={ path = "derive", version = "0.3.0"}

[dev-dependencies]
log = "0.4"
simplelog = "0.12"

[workspace]
members = ["dagrs_derive","dagrs_core"]


[dependencies]
dagrs_core = {path = "dagrs_core" , version = "0.3.0"}
dagrs_derive ={ path = "dagrs_derive", version = "0.3.0"}

[features]
default = ["dagrs_core/logger"]
yaml = ["dagrs_core/yaml"]
derive = ["dagrs_derive/derive"]
default = ["logger"]
logger = []
yaml = []
derive = ["derive/derive"]

[[example]]
name = "custom_log"
[[bin]]
name = "dagrs"
required-features = ["yaml"]

[[example]]
name = "custom_parser"
name = "custom_log"
required-features = ["yaml"]

[[example]]
Expand Down
198 changes: 138 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,97 +48,166 @@ Among them, each task may produce output, and may also require the output of som

### Programmatically implement task definition

Users need to program to implement the `Action` trait to define the specific logic of the task, and then build a series of `DefaultTask`. The example: `examples/compute_dag.rs`. `DefaultTask` is the default implementation of the Task trait, and it has several mandatory attributes:
Users need to program to implement the `Action` trait to define the specific logic of the task, and then build a series of `DefaultTask`.

- `id`: uniquely identifies the task assigned by the global ID assigner
- `name`: the name of the task
- `predecessor_tasks`: the predecessor tasks of this task
- `action`: is a dynamic type that implements the Action trait in user programming, and it is the specific logic to be executed by the task
First, users need to define some specific task logic. There are two ways to define task logic:

- Create a closure whose type is `Simple`, which is suitable for simple scenarios.
- Create a type and implement the `Complex` trait, which is suitable for more complex situations. For example, if the logic of the task is to execute a system command, the command string needs to be recorded in some way. You can create a `Commad` structure with a string attribute inside to store the command string.

You can refer to examples:`examples/actions.rs`.

In the second step, you need to use the defined task logic to create specific tasks. Here you may need to use the `DefaultTask` type, which provides users with several ways to create `Task`. `DefaultTask` allows you to specify specific task logic for the task and give the task a name. Please refer to the documentation for specific function functions.

In the third step, you need to specify dependencies for the defined series of tasks. Here you need to use the `set_predecessors` function of `DefaultTask`. This function requires you to specify a series of predecessor tasks for the current task.

The fourth step is to create a `Dag` and put all the defined tasks into the `Dag` scheduler.

Optional step: You can specify an environment variable for `Dag`. This environment variable is available in all tasks. In some specific tasks, this behavior can be useful.

Finally, don’t forget to initialize the logger, and then you can call the `start` function of `Dag` to start executing all tasks.

You can refer to an example for the above complete steps: `examples/compute_dag.rs`


Here is the `examples/impl_action.rs` example:

```rust
//! Implement the Action trait to define the task logic.
//! Only use Dag, execute a job. The graph is as follows:
//!
//! ↱----------↴
//! B -→ E --→ G
//! ↗ ↗ ↗
//! A --→ C /
//! ↘ ↘ /
//! D -→ F
//!
//! The final execution result is 272.

extern crate dagrs;

use std::sync::Arc;
use dagrs::{log, Action, Dag, DefaultTask, EnvVar, Input, LogLevel, Output, RunningError};
use dagrs::{log, Complex, Dag, DefaultTask, EnvVar, Input, LogLevel, Output};

struct SimpleAction(usize);
struct Compute(usize);

/// Implement the `Action` trait for `SimpleAction`, defining the logic of the `run` function.
/// The logic here is simply to get the output value (usize) of all predecessor tasks and then accumulate.
impl Action for SimpleAction {
fn run(&self, input: Input, env: Arc<EnvVar>) -> Result<Output, RunningError> {
impl Complex for Compute {
fn run(&self, input: Input, env: Arc<EnvVar>) -> Output {
let base = env.get::<usize>("base").unwrap();
let mut sum = self.0;
input
.get_iter()
.for_each(|i| sum += i.get::<usize>().unwrap() * base);
Ok(Output::new(sum))
Output::new(sum)
}
}

fn main() {
// Initialize the global logger
// initialization log.
let _initialized = log::init_logger(LogLevel::Info, None);
// Generate four tasks.
let a = DefaultTask::new(SimpleAction(10), "Task a");
let mut b = DefaultTask::new(SimpleAction(20), "Task b");
let mut c = DefaultTask::new(SimpleAction(30), "Task c");
let mut d = DefaultTask::new(SimpleAction(40), "Task d");
// Set the precursor for each task.
// generate some tasks.
let a = DefaultTask::with_action("Compute A", Compute(1));

let mut b = DefaultTask::with_action("Compute B", Compute(2));

let mut c = DefaultTask::new("Compute C");
c.set_action(Compute(4));

let mut d = DefaultTask::new("Compute D");
d.set_action(Compute(8));

let mut e = DefaultTask::with_closure("Compute E", |input, env| {
let base = env.get::<usize>("base").unwrap();
let mut sum = 16;
input
.get_iter()
.for_each(|i| sum += i.get::<usize>().unwrap() * base);
Output::new(sum)
});
let mut f = DefaultTask::with_closure("Compute F", |input, env| {
let base = env.get::<usize>("base").unwrap();
let mut sum = 32;
input
.get_iter()
.for_each(|i| sum += i.get::<usize>().unwrap() * base);
Output::new(sum)
});

let mut g = DefaultTask::new("Compute G");
g.set_closure(|input, env| {
let base = env.get::<usize>("base").unwrap();
let mut sum = 64;
input
.get_iter()
.for_each(|i| sum += i.get::<usize>().unwrap() * base);
Output::new(sum)
});

// Set up task dependencies.
b.set_predecessors(&[&a]);
c.set_predecessors(&[&a]);
d.set_predecessors(&[&b, &c]);
// Take these four tasks as a Dag.
let mut dag = Dag::with_tasks(vec![a, b, c, d]);
d.set_predecessors(&[&a]);
e.set_predecessors(&[&b, &c]);
f.set_predecessors(&[&c, &d]);
g.set_predecessors(&[&b, &e, &f]);
// Create a new Dag.
let mut dag = Dag::with_tasks(vec![a, b, c, d, e, f, g]);
// Set a global environment variable for this dag.
let mut env = EnvVar::new();
env.set("base", 2usize);
dag.set_env(env);
// Begin execution.
// Start executing this dag
assert!(dag.start().unwrap());
// Get execution result
assert_eq!(dag.get_result::<usize>().unwrap(), 220);
// Get execution result.
let res = dag.get_result::<usize>().unwrap();
println!("The result is {}.", res);
}
```

**explain:**

First, we define `SimpleAction` and implement the `Action` trait for this structure. In the rewritten run function, we simply get the output value of the predecessor task and multiply it by the environment variable `base`. Then accumulate the multiplied result to itself self.0.

After defining the specific task logic, start creating the prerequisites for executing `Dag`:
Initialize the global logger first. Here we set the log level to Info, and do not give the log output file, let the log output to the console by default.
First, we initialize the logger, declare the `Compute` type, and implement the `Complex` trait for it. In the rewritten run function, we simply get the output value of the predecessor task and multiply it by the environment variable `base`. Then accumulate the multiplied result to itself self.0.

Create a `DefaultTask` with `SimpleAction` and give the task a name. Then set the dependencies between tasks.
Next, we define 6 tasks and show the usage of some functions in the `DefaultTask` type. Set predecessor tasks for each task.

Then create a Dag and assign it a global environment variable.
Then, create a `Dag`, set a base environment variable for it, and use the start method to start executing all tasks.

Finally we call the `start` function of `Dag` to execute all tasks. After the task is executed, call the `get_result` function to obtain the final execution result of the task.

The graph formed by the task is shown below:

```mermaid
flowchart LR;
A((Task a))-->B; A-->C; B((Task b))-->D; C((Task c))-->D((Task d));
flowchart LR
A-->B
A-->C
B-->D
B-->F
C-->D
C-->E
D-->F
E-->F
```

The execution order is a->c->b->d.

```bash
$cargo run
[Start] -> Task a -> Task c -> Task b -> Task d -> [End]
Executing Task[name: Task a]
Task executed successfully. [name: Task a]
Executing Task[name: Task b]
Executing Task[name: Task c]
Task executed successfully. [name: Task b]
Task executed successfully. [name: Task c]
Executing Task[name: Task d]
Task executed successfully. [name: Task d]

Process finished with exit code 0
$ cargo run --example compute_dag
[Start] -> Compute A -> Compute B -> Compute D -> Compute C -> Compute F -> Compute E -> Compute G -> [End]
Executing task [name: Compute A, id: 1]
Execution succeed [name: Compute A, id: 1]
Executing task [name: Compute C, id: 3]
Executing task [name: Compute B, id: 2]
Executing task [name: Compute D, id: 4]
Execution succeed [name: Compute C, id: 3]
Execution succeed [name: Compute B, id: 2]
Execution succeed [name: Compute D, id: 4]
Executing task [name: Compute F, id: 6]
Executing task [name: Compute E, id: 5]
Execution succeed [name: Compute F, id: 6]
Execution succeed [name: Compute E, id: 5]
Executing task [name: Compute G, id: 7]
Execution succeed [name: Compute G, id: 7]
The result is 272.
```

### `Yaml` configuration file
Expand Down Expand Up @@ -192,8 +261,8 @@ These yaml-defined task items form a complex dependency graph. In the yaml confi
To parse the yaml configured file, you need to compile this project, requiring rust version >= 1.70:

```bash
$cargo build --release
$ .\target\release\dagrs.exe --help
$ cargo build --release --features=yaml
$ ./target/release/dagrs.exe --help
Usage: dagrs.exe [OPTIONS] --yaml <YAML>

Options:
Expand All @@ -213,7 +282,7 @@ Options:
We can try an already defined file at `tests/config/correct.yaml`

```bash
$./target/release/dagrs --yaml=./tests/config/correct.yaml --log-path=./dagrs.log --log-level=info
$ ./target/release/dagrs --yaml=./tests/config/correct.yaml --log-path=./dagrs.log --log-level=info
[Start] -> Task 8 -> Task 5 -> Task 7 -> Task 6 -> Task 3 -> Task 2 -> Task 1 -> Task 4 -> [End]
Executing Task[name: Task 8]
Executing Task[name: Task 5]
Expand All @@ -227,10 +296,19 @@ Executing Task[name: Task 1]

You can see an example: `examples/yaml_dag.rs`. In fact, you can also programmatically read the yaml configuration file generation task, which is very simple, just use the `with_yaml` function provided by `Dag` to parse the configuration file.

--------------------------------------

**In addition to these two methods, `dagrs` also supports advanced task custom configuration.**

- `DefaultTask` is a default implementation of the `Task` trait. Users can also customize tasks and add more functions and attributes to tasks, but they still need to have the four necessary attributes in `DefaultTask`. `YamlTask` is another example of `Task` concrete implementation, its source code is available for reference, or refer to `example/custom_task.rs`.
- In addition to yaml-type configuration files, users can also provide other types of configuration files, but in order to allow other types of configuration files to be parsed as tasks, users need to implement the `Parser` trait. `YamlParser` source code is available for reference, or refer to `examples/custom_parser.rs`
- `DefaultTask` is a default implementation of the `Task` trait. Users can also customize tasks and add more functions and attributes to tasks, but they still need to have the four necessary attributes in `DefaultTask`. `YamlTask` is another example of `Task` concrete implementation, its source code is available for reference. No matter how you customize the task type, the customized task type must have the following attributes:
- `id`: uniquely identifies the task assigned by the global ID assigner
- `name`: the name of the task
- `predecessor_tasks`: the predecessor tasks of this task
- `action`: is a dynamic type that implements the Action trait in user programming, and it is the specific logic to be executed by the task

- In addition to yaml-type configuration files, users can also provide other types of configuration files, but in order to allow other types of configuration files to be parsed as tasks, users need to implement the `Parser` trait. `YamlParser` source code is available for reference.

`examples/custom_parser_and_task.rs` is an example of a custom task type and a custom configuration file parser

## Analyze the logic of task execution

Expand Down Expand Up @@ -323,27 +401,27 @@ gantt

### Basic function usage

`examples/compute_dag.rs`: Use a custom macro to generate multiple simple tasks.
`examples/compute_dag.rs`: A complete usage example of dagrs.

`examples/impl_action.rs`: Define a simple Action to build multiple tasks with the same logic.
`examples/action.rs`: Two ways to define the specific logic of a task.

`examples/yaml_dag.rs`: Spawn multiple tasks with a given yaml configuration file。

`examples/use_macro.rs`: Use the `gen_task` macro provided by `dagrs` to generate multiple simple tasks。
`examples/yaml_dag.rs`: Example of reading yaml configuration file (needs to enable `yaml` features).

`examples/engine.rs`: Using `Engine` to manage multiple dags with different task types.

### Advanced Features

`examples/custom_task.rs`: Implement the `Task` trait and define your own task type.

`examples/custom_parser.rs`: Implement the `Parser` trait to define your own task configuration file parser。
`examples/custom_parser_and_task.rs`: Custom task types and configuration file parsers.

`examples/custom_log.rs`: Implement the `Logger` trait to define your own global logger.

`examples/derive_task.rs`:Use `CustomTask` derived macros to help customize task types.

`examples/dependencies.rs`:Use the `dependencies!` macro to specify dependencies in an intuitive way and define a series of tasks.

## Contribution

The dagrs project relies on community contributions and aims to simplify getting started. To develop `dagrs`, clone the repository, then install all dependencies, run the test suite and try it out locally. Pick an issue, make changes, and submit a pull request for community review.
The `dagrs` project relies on community contributions and aims to simplify getting started. To develop `dagrs`, clone the repository, then install all dependencies, run the test suite and try it out locally. Pick an issue, make changes, and submit a pull request for community review.

### What's the contribution

Expand Down
25 changes: 0 additions & 25 deletions dagrs_core/Cargo.toml

This file was deleted.

Loading