Skip to content

Commit

Permalink
Merge pull request #69 from samouwow/halt-running-nodes
Browse files Browse the repository at this point in the history
Add halt for flow, decorator and sync action nodes.
  • Loading branch information
besok committed Jun 28, 2024
2 parents 8b59d5c + f20c573 commit 6fac087
Show file tree
Hide file tree
Showing 19 changed files with 707 additions and 205 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description = "Workflow framework based on the behavior trees"
authors = ["BorisZhguchev <[email protected]>"]
homepage = "https://github.com/besok/forester"
repository = "https://github.com/besok/forester"
version = "0.3.2"
version = "0.4.0"
edition = "2021"
license-file = "LICENSE"

Expand Down Expand Up @@ -35,4 +35,3 @@ url = "2.4.1"
[dev-dependencies]
wiremock = "0.6.0"
forester-http = "0.1.0"

20 changes: 12 additions & 8 deletions docs/src/falls.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Fallback
# Fallback

A Fallback ticks children sequentially until someone returns a `Success`.
Otherwise, if all children return `Failure`, the node returns `Failure`.
Expand All @@ -11,8 +11,8 @@ impl take_from_others()
root main {
fallback {
any_tasks() // goes farther if the first actions is failure
do_it()
any_tasks() // goes farther if the first actions is failure
do_it()
}
}
```
Expand All @@ -21,13 +21,13 @@ root main {
- When it gets the first `tick` it switches to state `running`
- When a child returns `success` it stops the execution and returns `success`
- If a child returns `running`, the node returns `running` as well
- If a child returns `failure`, the node proceeds to the next child
- If a child returns `failure`, the node proceeds to the next child
- if this is a final child, it returns `failure`
- When a node is restarted, the process starts from the beginning
- When a node is restarted or halted the process starts from the beginning

## Intention
Often, it is used for making conditions.
The script below emulates a simple condition that needs to do before
The script below emulates a simple condition that needs to do before
```f-tree
cond can_take(sub:object)
impl move_to(sub:object)
Expand Down Expand Up @@ -68,10 +68,14 @@ root main {
r_fallback {
needs_to_charge() // returns failure
action() // returns running
fin_and_save()
fin_and_save()
}
}
```

The node `action` returns `running` and the whole sequence returns `running`
but on the next tick it starts from the node `needs_to_charge` again.
but on the next tick it starts from the node `needs_to_charge` again.

`r_fallback` will halt the `running` child to allow a graceful shutdown if a prior child changes from `failure` to `success`. In the above example, if `needs_to_change` returned `success` on the second tick then `action` would be halted before `r_fallback` returned `success` itself.

Halting must be performed as quickly as possible. Note that currently only build-in flow, built-in decorator and sync action nodes are halted, async and remote actions are not.
30 changes: 24 additions & 6 deletions docs/src/r_actions.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,41 @@ There are three types of actions available at that moment:

## Traits

The action trait implements two functions, `tick()` and `halt()`.

The `tick()` function is the main entry point of the action and will be called whenever the node is executed.

The `halt()` function is used to notify a `running` action that a reactive flow node (e.g. `r_sequnce`) has changed the control flow. This means the previously `running` action won't be called again, or won't be called for a while, and so should gracefully clean up. The `halt()` function has a default no-op implementation that can be used if no clean up is necessary.

Actions must halt as quickly as possible, and the call to `halt()` should not block the execution.

### `Impl` for sync actions

Sync actions are the only actions that currently implement the `halt()` function.

```rust
pub trait Impl {
fn tick(&self, args: RtArgs, ctx: TreeContextRef) -> Tick;

fn halt(&self, args: RtArgs, ctx: TreeContextRef) -> RtOk {
// Default halt is a no-op function.
let _ = args;
let _ = ctx;
Ok(())
}
}

```

#### `ImplAsync` for async actions
### `ImplAsync` for async actions
```rust
pub trait ImplAsync: Sync + Send {
fn tick(&self, args: RtArgs, ctx: TreeContextRef) -> Tick;
}
```

Where `args` are the given arguments from the tree definition and invocation and `ctx`
is a reference of the invocation context with `bb` and `tracer`
Where `args` are the given arguments from the tree definition and invocation and `ctx`
is a reference of the invocation context with `bb` and `tracer`.

## Mutability
The actions are intentionally stateless thus they can't mutate.
Expand All @@ -43,7 +61,7 @@ fn simple_delay() {
let mut forester_builder = fb("decorators/simple_delay");

forester_builder.register_sync_action("store", StoreData);

}
```

Expand All @@ -52,7 +70,7 @@ fn simple_delay() {
The async actions are executed in the multithreading environment and return the `running` tick result instantly.
It does not block the execution of the tree and can be used in parallel nodes, etc.

On the other hand, every time when the tree is reloaded, the tick number is increased that can exceed the limit on ticks
On the other hand, every time when the tree is reloaded, the tick number is increased that can exceed the limit on ticks
if the system has it. Therefore, it needs to take into account (when forester runs with the limit of ticks.)


Expand Down Expand Up @@ -107,7 +125,7 @@ How to implement the client side, please see [remote action lib](./rem_action.md

## Default actions

By default, there are several implementations for http and interactions with bb are available in
By default, there are several implementations for http and interactions with bb are available in

```rust
use forester_rs::runtime::action::builtin::*;
Expand Down
24 changes: 14 additions & 10 deletions docs/src/seq.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Sequence

A Sequence node ticks all underlying nodes as long as they return `Success`.
Otherwise, (when a child returns `failure` a sequence aborted)
Otherwise, when a child returns `failure` the sequence is aborted.

In the language, the tree definitions and lambda invocations of this element are marked with the key word `sequence`.

Expand All @@ -10,9 +10,9 @@ impl store(key:string, value:string); // store a string value to a key in blackb
root main {
sequence {
store("a","1") // first tick and proceed if succeded
store("b","2") // sec tick and proceed if succeded
store("c","3") // thrd tick and finish if succeded
store("a","1") // first tick and proceed if succeeded
store("b","2") // sec tick and proceed if succeeded
store("c","3") // thrd tick and finish if succeeded
}
}
```
Expand Down Expand Up @@ -40,7 +40,7 @@ main ",shape=rect,color=black]
- if this is a final child, it returns `success`
- If a child returns `running`, the node returns `running` as well
- If a child returns `failure`, the node returns `failure` as well
- When a node is restarted, the process starts from the beginning
- When a node is restarted or halted, the process starts from the beginning (see memory sequence for an exception)

## Intention
Often, it is used as a straight chain of instructions
Expand All @@ -57,12 +57,12 @@ root main sequence {

# Subtypes

There are 2 subtypes that bring a few subtleties to the common process
There are 2 subtypes that bring a few subtleties to the common process.

## Memory Sequence

This sequence defines in the language with the keyword `m_sequence` and has the following peculiarity:
The sequence memorizes the children that are succeeded and skips them next time:
The sequence memorizes the children that have succeeded and skips them next time.

```f-tree
root main {
Expand All @@ -77,17 +77,17 @@ root main {
The node `perform_action` returns `failure` and the decorator `retry` restarts `sequence`.
The main difference with a sequence is an execution starts from the node `perform_action` skipping the node `store`.

The memory will be reset once the final action has returned `success`.
This memory persists even if the `m_sequence` is halted by a reactive flow node. The memory will only be reset once the final action has returned `success`.
That is, if `finish_and_save` returns `success`, the next iteration will start with `store` again.

## Reactive Sequence

This sequence defines in the language with the keyword `r_sequence` and has the following peculiarity:
The sequence restarts all children if they return either failure or running:
The sequence restarts all children if they return either failure or running.

```f-tree
root main {
m_sequence {
r_sequence {
store("key",1) // returns success
perform_action() // returns running
finish_and_save()
Expand All @@ -97,3 +97,7 @@ root main {

The node `perform_action` returns `running` and the whole sequence returns `running`
but on the next tick it starts from the node `store` again.

`r_sequence` will halt the `running` child to allow a graceful shutdown if a prior child changes from `success` to `failure`. In the above example, if `store` returned `failure` on the second tick then `perform_action` would be halted before `r_sequence` returned `failure` itself.

Halting must be performed as quickly as possible. Note that currently only build-in flow, built-in decorator and sync action nodes are halted, async and remote actions are not.
11 changes: 10 additions & 1 deletion src/runtime/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use crate::runtime::context::{TreeContextRef, TreeRemoteContextRef};
use crate::runtime::{RtResult, RuntimeError, TickResult};
use std::sync::Arc;

pub use crate::runtime::RtOk;

pub type ActionName = String;
pub type Tick = RtResult<TickResult>;

/// Recovers the tick depending on the result.
pub fn recover(tick: Tick) -> Tick {
match tick {
Err(RuntimeError::RecoveryToFailure(r)) => Ok(TickResult::Failure(format!("{:?}",r))),
Err(RuntimeError::RecoveryToFailure(r)) => Ok(TickResult::Failure(format!("{:?}", r))),
Err(RuntimeError::BlackBoardError(r)) => Ok(TickResult::Failure(r)),
other => other,
}
Expand Down Expand Up @@ -121,6 +123,13 @@ impl Action {
/// ```
pub trait Impl: Sync + Send {
fn tick(&self, args: RtArgs, ctx: TreeContextRef) -> Tick;

fn halt(&self, args: RtArgs, ctx: TreeContextRef) -> RtOk {
// Default halt is a no-op function.
let _ = args;
let _ = ctx;
Ok(())
}
}

pub trait ImplAsync: Sync + Send {
Expand Down
35 changes: 28 additions & 7 deletions src/runtime/action/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use crate::runtime::{RtResult, RuntimeError, TickResult};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};

use super::RtOk;

/// Just an action map to register and execute the actions.
/// The actions are registered by the `ActionName` and the `Action` impl.
pub struct ActionKeeper {
Expand Down Expand Up @@ -54,8 +56,8 @@ impl ActionKeeper {
// the default action impl for the set = all_actions - impl_actions
default: T,
) -> RtResult<Self>
where
T: Fn() -> ActionImpl,
where
T: Fn() -> ActionImpl,
{
let mut impl_actions = impl_actions;
let mut actions = HashMap::new();
Expand Down Expand Up @@ -114,11 +116,9 @@ impl ActionKeeper {
// just to start it in the separate thread(supposedly)
TaskState::Absent => {
let action = action.to_owned();
let tick_handle = env.runtime.spawn_blocking(move || action.tick(args, ctx));
env.tasks.insert(
name.to_string(),
tick_handle,
);
let tick_handle =
env.runtime.spawn_blocking(move || action.tick(args, ctx));
env.tasks.insert(name.to_string(), tick_handle);
Ok(TickResult::running())
}
TaskState::Started(handle) => {
Expand All @@ -131,6 +131,27 @@ impl ActionKeeper {
}
}
}

pub fn halt(
&mut self,
_env: Arc<Mutex<RtEnv>>,
name: &ActionName,
args: RtArgs,
ctx: TreeContextRef,
_http_serv: &Option<ServInfo>,
) -> RtOk {
match self.get_mut(name)? {
Action::Sync(action) => action.halt(args, ctx),
Action::Remote(..) => {
// Halting is not implemented for remote actions.
Ok(())
}
Action::Async(..) => {
// Halting is not implemented for async actions.
Ok(())
}
}
}
}

fn get_port(http_serv: &Option<ServInfo>) -> Result<u16, RuntimeError> {
Expand Down
24 changes: 21 additions & 3 deletions src/runtime/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::runtime::action::Tick;
use crate::runtime::args::{RtArgs, RtValue};
use crate::runtime::blackboard::{BBRef, BlackBoard};
use crate::runtime::env::{RtEnvRef};
use crate::runtime::env::RtEnvRef;
use crate::runtime::forester::flow::REASON;
use crate::runtime::rtree::rnode::RNodeId;
use crate::runtime::trimmer::{TrimmingQueue, TrimmingQueueRef};
Expand All @@ -13,6 +13,8 @@ use std::fmt::{Display, Formatter};
use std::sync::Arc;
use std::sync::Mutex;

use super::rtree::rnode::RNode;

pub type Timestamp = usize;
pub type TracerRef = Arc<Mutex<Tracer>>;

Expand Down Expand Up @@ -212,6 +214,17 @@ impl TreeContext {
}
}

pub(crate) fn force_to_halting_state(&mut self, id: RNodeId) -> RtResult<Option<RNodeState>> {
self.ts_map.insert(id, self.curr_ts);
let new_state = RNodeState::Halting(self.state_last_set(&id).args());

// Trace the state change with an extra indent
self.tracer.lock()?.right();
self.trace(NewState(id, new_state.clone()))?;
self.tracer.lock()?.left();

Ok(self.state.insert(id, new_state))
}
pub(crate) fn new_state(
&mut self,
id: RNodeId,
Expand Down Expand Up @@ -245,6 +258,7 @@ pub enum RNodeState {
Running(RtArgs),
Success(RtArgs),
Failure(RtArgs),
Halting(RtArgs),
}

impl Display for RNodeState {
Expand All @@ -262,6 +276,9 @@ impl Display for RNodeState {
RNodeState::Failure(args) => {
f.write_str(format!("Failure({})", args).as_str())?;
}
RNodeState::Halting(args) => {
f.write_str(format!("Halting({})", args).as_str())?;
}
}
Ok(())
}
Expand All @@ -280,7 +297,7 @@ impl RNodeState {
RNodeState::Ready(_) => Err(RuntimeError::uex(
"the ready is the unexpected state for ".to_string(),
)),
RNodeState::Running(_) => Ok(TickResult::running()),
RNodeState::Running(_) | RNodeState::Halting(_) => Ok(TickResult::running()),
RNodeState::Success(_) => Ok(TickResult::success()),
RNodeState::Failure(args) => {
let reason = args
Expand Down Expand Up @@ -308,7 +325,8 @@ impl RNodeState {
RNodeState::Ready(tick_args)
| RNodeState::Running(tick_args)
| RNodeState::Failure(tick_args)
| RNodeState::Success(tick_args) => tick_args.clone(),
| RNodeState::Success(tick_args)
| RNodeState::Halting(tick_args) => tick_args.clone(),
}
}
}
Loading

0 comments on commit 6fac087

Please sign in to comment.