Skip to content

Commit

Permalink
More doc and run examples
Browse files Browse the repository at this point in the history
  • Loading branch information
aandres committed Jul 11, 2023
1 parent efb6c2a commit 51c0dbc
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 84 deletions.
34 changes: 9 additions & 25 deletions docs/concepts/1_dag.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@

# Dag
# DAG

At its core, `beavers` executes a Directed Acyclic Graph (DAG), where each node is a python function.
This section discuss the different type of nodes in the DAG.

## Stream Source

A stream source is a node whose value can be set externally.

When `Dag.execute` is called, the updated value is propagated in the dag
When `Dag.execute` is called, the updated value is propagated in the DAG

```python
--8<-- "examples/dag_concepts.py:source_stream"
```

If the dag is executed again, the value of the source stream will be reset to its empty value.
If the DAG is executed again, the value of the source stream will be reset to its empty value.

```python
--8<-- "examples/dag_concepts.py:source_stream_again"
Expand All @@ -37,7 +40,7 @@ A stream node uses the output of other nodes to calculate its updated value.
--8<-- "examples/dag_concepts.py:stream_node"
```

If the dag is executed again, the value of the stream node will be reset to its empty value.
If the DAG is executed again, the value of the stream node will be reset to its empty value.

```python
--8<-- "examples/dag_concepts.py:stream_node_again"
Expand All @@ -60,7 +63,7 @@ Or a class defining `__call__`:

## State Node

A state node retains its value from one dag execution to the next, even if it didn't update:
A state node retains its value from one DAG execution to the next, even if it didn't update:
```python
--8<-- "examples/dag_concepts.py:state_node"
```
Expand All @@ -78,7 +81,7 @@ Nodes are connected by calling the `map` function.
Stream nodes can be connected to state nodes, stream nodes or const nodes, and vice versa.

> :warning: The `map` function doesn't execute the underlying node.
> Instead it adds a node to the dag
> Instead it adds a node to the DAG
The map function can use positional arguments:

Expand All @@ -90,22 +93,3 @@ Or key word arguments:
```python
--8<-- "examples/dag_concepts.py:map_key_word"
```

## Update Propagation

- Nodes are notified if any of their input node was updated during the current execution cycle
```python
--8<-- "examples/dag_concepts.py:propagate_any"
```
- You can check if a node updated by looking at its `cycle_id`
```python
--8<-- "examples/dag_concepts.py:propagate_cycle_id"
```
- If several inputs of a node get updated during the same cycle, the node will be executed once (and not once per input)
```python
--8<-- "examples/dag_concepts.py:propagate_both"
```
- Stream nodes (and sources) are not considered updated if their output is empty
```python
--8<-- "examples/dag_concepts.py:propagate_empty"
```
43 changes: 41 additions & 2 deletions docs/concepts/2_advanced.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Advanced

This section discuss advanced features that control how updates propagate in the DAG.

## How updates propagate in the DAG

- Nodes are notified if any of their input node was updated during the current execution cycle
```python
--8<-- "examples/advanced_concepts.py:propagate_any"
```
- You can check if a node updated by looking at its `cycle_id`
```python
--8<-- "examples/advanced_concepts.py:propagate_cycle_id"
```
- If several inputs of a node get updated during the same cycle, the node will be executed once (and not once per input)
```python
--8<-- "examples/advanced_concepts.py:propagate_both"
```
- Stream nodes (and sources) are not considered updated if their output is empty
```python
--8<-- "examples/advanced_concepts.py:propagate_empty"
```


## Now node

Beavers can be used in both live and replay mode.
Expand All @@ -10,7 +32,7 @@ To access the current time of the replay, you should use the now node:
--8<-- "examples/advanced_concepts.py:now_node"
```

The now node is shared for the whole dag.
The now node is shared for the whole DAG.
Its value gets updated silently.

## TimerManager
Expand All @@ -21,11 +43,28 @@ To be notified when time passes, nodes can subscribe to a `TimerManager` node.
--8<-- "examples/advanced_concepts.py:timer_manager"
```

## Silent update
## Silent updates

Some node may update too often, or their updates may not be relevant to other nodes.
In this case it's possible to silence them:

```python
--8<-- "examples/advanced_concepts.py:silence"
```

## Value Cutoff

By default, state nodes will update everytime they are notified.
The framework doesn't check that their value has changed.

You can add a cutoff, to prevent updates when the value hasn't changed:

```python
--8<-- "examples/advanced_concepts.py:cutoff"
```

You can also provide a custom comparator to allow some tolerance when deciding if a value has changed:

```python
--8<-- "examples/advanced_concepts.py:cutoff_custom"
```
119 changes: 118 additions & 1 deletion examples/advanced_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,70 @@

from beavers import Dag

dag = Dag()

# --8<-- [start:propagate_any]
source_1 = dag.source_stream()
source_2 = dag.source_stream()
node = dag.stream(lambda x, y: x + y).map(source_1, source_2)

source_1.set_stream([1, 2, 3])
dag.execute()
assert node.get_value() == [1, 2, 3] # source_1 updated

source_2.set_stream([4, 5, 6])
dag.execute()
assert node.get_value() == [4, 5, 6] # source_2 updated

dag.execute()
assert node.get_value() == [] # no updates, reset to empty
# --8<-- [end:propagate_any]

# --8<-- [start:propagate_cycle_id]
source_1.set_stream([1, 2, 3])
dag.execute()
assert node.get_value() == [1, 2, 3]
assert node.get_cycle_id() == dag.get_cycle_id()

dag.execute()
assert node.get_value() == []
assert node.get_cycle_id() == dag.get_cycle_id() - 1
# --8<-- [end:propagate_cycle_id]


# --8<-- [start:propagate_both]
source_1.set_stream([1, 2, 3])
source_2.set_stream([4, 5, 6])
dag.execute()
assert node.get_value() == [1, 2, 3, 4, 5, 6]
assert node.get_cycle_id() == dag.get_cycle_id()
# --8<-- [end:propagate_both]


# --8<-- [start:propagate_empty]
def even_only(values: list[int]) -> list[int]:
return [v for v in values if (v % 2) == 0]


even = dag.stream(even_only).map(source_1)

source_1.set_stream([1, 2, 3])
dag.execute()
assert even.get_value() == [2]
assert even.get_cycle_id() == dag.get_cycle_id()

source_1.set_stream([1, 3])
dag.execute()
assert even.get_value() == []
assert even.get_cycle_id() == dag.get_cycle_id() - 1
# --8<-- [end:propagate_empty]


# --8<-- [start:now_node]
def get_delay(timestamps: list[pd.Timestamp], now: pd.Timestamp) -> list[pd.Timedelta]:
return [now - timestamp for timestamp in timestamps]


dag = Dag()
timestamp_stream = dag.source_stream()
delay = dag.stream(get_delay).map(timestamp_stream, dag.now())

Expand Down Expand Up @@ -80,3 +137,63 @@ def get_year(now: pd.Timestamp, timer_manager: TimerManager):
) # No update because source_1 is silent

# --8<-- [end:silence]


# --8<-- [start:cutoff]
class GetMax:
def __init__(self):
self._max = 0.0

def __call__(self, values: list[float]) -> float:
self._max = max(self._max, *values)
return self._max


source = dag.source_stream()
get_max = dag.state(GetMax()).map(source)
get_max_cutoff = dag.cutoff(get_max)

source.set_stream([1.0, 2.0])
dag.execute()
assert get_max.get_value() == 2.0
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff.get_cycle_id() == dag.get_cycle_id()

source.set_stream([1.0])
dag.execute()
assert get_max.get_value() == 2.0
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff.get_cycle_id() == dag.get_cycle_id() - 1

source.set_stream([3.0])
dag.execute()
assert get_max.get_value() == 3.0
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff.get_cycle_id() == dag.get_cycle_id()
# --8<-- [end:cutoff]

# --8<-- [start:cutoff_custom]
get_max_cutoff_custom = dag.cutoff(get_max, lambda x, y: abs(x - y) < 0.1)

source.set_stream([4.0])
dag.execute()
assert get_max.get_value() == 4.0
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff_custom.get_cycle_id() == dag.get_cycle_id()


source.set_stream([4.05])
dag.execute()
assert get_max.get_value() == 4.05
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff_custom.get_value() == 4.0
assert get_max_cutoff_custom.get_cycle_id() == dag.get_cycle_id() - 1


source.set_stream([4.11])
dag.execute()
assert get_max.get_value() == 4.11
assert get_max.get_cycle_id() == dag.get_cycle_id()
assert get_max_cutoff_custom.get_value() == 4.11
assert get_max_cutoff_custom.get_cycle_id() == dag.get_cycle_id()
# --8<-- [end:cutoff_custom]
56 changes: 0 additions & 56 deletions examples/dag_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,59 +125,3 @@ def __call__(self, values: list[int]) -> int:
# --8<-- [start:map_key_word]
key_word = dag.stream(lambda x, y: x + y).map(x=source_stream, y=to_append)
# --8<-- [end:map_key_word]

# --8<-- [start:propagate_any]
source_1 = dag.source_stream()
source_2 = dag.source_stream()
node = dag.stream(lambda x, y: x + y).map(source_1, source_2)

source_1.set_stream([1, 2, 3])
dag.execute()
assert node.get_value() == [1, 2, 3] # source_1 updated

source_2.set_stream([4, 5, 6])
dag.execute()
assert node.get_value() == [4, 5, 6] # source_2 updated

dag.execute()
assert node.get_value() == [] # no updates, reset to empty
# --8<-- [end:propagate_any]

# --8<-- [start:propagate_cycle_id]
source_1.set_stream([1, 2, 3])
dag.execute()
assert node.get_value() == [1, 2, 3]
assert node.get_cycle_id() == dag.get_cycle_id()

dag.execute()
assert node.get_value() == []
assert node.get_cycle_id() == dag.get_cycle_id() - 1
# --8<-- [end:propagate_cycle_id]


# --8<-- [start:propagate_both]
source_1.set_stream([1, 2, 3])
source_2.set_stream([4, 5, 6])
dag.execute()
assert node.get_value() == [1, 2, 3, 4, 5, 6]
assert node.get_cycle_id() == dag.get_cycle_id()
# --8<-- [end:propagate_both]


# --8<-- [start:propagate_empty]
def even_only(values: list[int]) -> list[int]:
return [v for v in values if (v % 2) == 0]


even = dag.stream(even_only).map(source_1)

source_1.set_stream([1, 2, 3])
dag.execute()
assert even.get_value() == [2]
assert even.get_cycle_id() == dag.get_cycle_id()

source_1.set_stream([1, 3])
dag.execute()
assert even.get_value() == []
assert even.get_cycle_id() == dag.get_cycle_id() - 1
# --8<-- [end:propagate_empty]
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ deps =
changedir = {envtmpdir}
commands =
coverage run --source=beavers --branch -m pytest {toxinidir}/tests
find {toxinidir}/examples -name "*.py" | xargs -n 1 python
coverage report -m --fail-under 95
coverage xml -o {toxinidir}/coverage.xml

Expand Down

0 comments on commit 51c0dbc

Please sign in to comment.