Skip to content

Commit

Permalink
Add advanced concept
Browse files Browse the repository at this point in the history
  • Loading branch information
aandres committed Jun 28, 2023
1 parent 9c0b9eb commit 3450d72
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 16 deletions.
27 changes: 19 additions & 8 deletions beavers/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,13 @@ def _process_updated_value(self, updated_value) -> bool:
else:
return False
else:
self._runtime_data.value = updated_value
return len(updated_value) > 0
if isinstance(updated_value, SilentUpdate):
# TODO: add test
self._runtime_data.value = updated_value.value
return False
else:
self._runtime_data.value = updated_value
return len(updated_value) > 0

def _notify_observers(self):
for observer in self._observers:
Expand Down Expand Up @@ -356,7 +361,9 @@ def const(self, value: T) -> Node[T]:
)
)

def source_stream(self, empty: T, name: typing.Optional[str] = None) -> Node[T]:
def source_stream(
self, empty: typing.Optional[T] = None, name: typing.Optional[str] = None
) -> Node[T]:
"""
Add a source stream `Node`.
Expand All @@ -369,7 +376,7 @@ def source_stream(self, empty: T, name: typing.Optional[str] = None) -> Node[T]:
The name of the source
"""
_check_empty(empty)
empty = _check_empty(empty)
existing = self._sources.get(name) if name else None
if existing is not None:
if existing._empty != empty:
Expand All @@ -386,7 +393,9 @@ def source_stream(self, empty: T, name: typing.Optional[str] = None) -> Node[T]:
self._sources[name] = node
return node

def stream(self, function: typing.Callable[P, T], empty: T) -> NodePrototype:
def stream(
self, function: typing.Callable[P, T], empty: typing.Optional[T] = None
) -> NodePrototype:
"""
Add a stream `NodePrototype`.
Expand All @@ -399,7 +408,7 @@ def stream(self, function: typing.Callable[P, T], empty: T) -> NodePrototype:
Must implement `__len__` and be empty
"""
_check_empty(empty)
empty = _check_empty(empty)
_check_function(function)

def add_to_dag(inputs: _NodeInputs) -> Node:
Expand Down Expand Up @@ -549,7 +558,7 @@ def _add_stream(
self, function: typing.Callable[[...], T], empty: T, inputs: _NodeInputs
) -> Node[T]:
_check_function(function)
_check_empty(empty)
empty = _check_empty(empty)
return self._add_node(
Node._create(value=empty, function=function, inputs=inputs, empty=empty)
)
Expand Down Expand Up @@ -584,7 +593,9 @@ def _add_node(self, node: Node) -> Node:


def _check_empty(empty: T) -> T:
if not isinstance(empty, collections.abc.Sized):
if empty is None:
return []
elif not isinstance(empty, collections.abc.Sized):
raise TypeError("`empty` should implement `__len__`")
elif len(empty) != 0:
raise TypeError("`len(empty)` should be 0")
Expand Down
33 changes: 33 additions & 0 deletions docs/concepts/advanced.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

# Now node

Beavers can be used in both live and replay mode.
In replay mode, the wall clock isn't relevant.
To access the current time of the replay, you should use the now node:

```python
--8<-- "examples/advanced_concepts.py:now_node"
```

The now node is shared for the whole dag.
Its value gets updated silently.

# TimerManager

To be notified when a time passes, nodes can subscribe to a `TimerManager` node.

```python
--8<-- "examples/advanced_concepts.py:timer_manager"
```

# Silent update

Some node myy update too often, or their updates may not be relevant to other nodes.
In this case it's possible to silence them:

```python
--8<-- "examples/advanced_concepts.py:silence"
```

- [ ] cutoff
- [ ] add link to section (eg: TimerManager?
106 changes: 103 additions & 3 deletions docs/concepts/dag.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,116 @@

# Source Streams
# Stream Source

A source stream is a node whose value can be set externally.
A stream source is a node whose value can be set externally.

When `Dag.execute` is called, the updated value is propagated in the dag

```python
--8<-- "examples/dag_concepts.py:source_stream"
```

If the dag is executed again, the value of the source stream will be reset to it's empty value.
If the dag is executed again, the value of the source stream will be reset to its empty value.

```python
--8<-- "examples/dag_concepts.py:source_stream_again"
```

The default empty value is set to `[]`, but it can be customized:

```python
--8<-- "examples/dag_concepts.py:source_stream_empty"
```

A source stream can be given a name, so they can be retrieved (and their value set):

```python
--8<-- "examples/dag_concepts.py:source_stream_name"
```

# Stream Node

A stream node uses the output of other nodes to calculate its updated value.

```python
--8<-- "examples/dag_concepts.py:stream_node"
```

If the dag is executed again, the value of the stream node will be reset to its empty value.

```python
--8<-- "examples/dag_concepts.py:stream_node_again"
```

Again, the default empty value is set to `[]`, but it can be customized:
```python
--8<-- "examples/dag_concepts.py:stream_node_empty"
```

The function provided to the node can be any callable, like a lambda:
```python
--8<-- "examples/dag_concepts.py:stream_node_lambda"
```

Or a callable:
```python
--8<-- "examples/dag_concepts.py:stream_node_callable"
```

# State Node

A state node retain its value from one dag execution to the next, even if it didn't update:
```python
--8<-- "examples/dag_concepts.py:state_node"
```

# Const Node

A const node is a node whose value doesn't change.
```python
--8<-- "examples/dag_concepts.py:const_node"
```

# Connecting Nodes (aka `map`)

Nodes are connected by calling the `map` function.
Stream node can be connected to state node, and vice versa.
Same thing applies to const nodes.

> :warning: The `map` function doesn't execute the underlying node.
> Instead it adds a node to the dag
The map function can use positional arguments:

```python
--8<-- "examples/dag_concepts.py:map_positional"
```
Or key word arguments:

```python
--8<-- "examples/dag_concepts.py:map_key_word"
```

# Update Propagation

- Nodes are notified if any of their input node was updated during the current execution cycle
```python
--8<-- "examples/dag_concepts.py:propagate_any"
```
- You can check if a node updated by looking at it "cycle_id"
```python
--8<-- "examples/dag_concepts.py:propagate_cycle_id"
```
- If several inputs of a node get updated during the same cycle, the node will be executed once (and not once per input)
```python
--8<-- "examples/dag_concepts.py:propagate_both"
```
- Stream nodes (and sources) are not considered updated if their output is empty
```python
--8<-- "examples/dag_concepts.py:propagate_empty"
```

- [ ] add emoji plugin
- [ ] get navigation to work
- [ ] add stream vs state
- [ ] add replay section
- [ ] add pyarrow section
79 changes: 79 additions & 0 deletions examples/advanced_concepts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# --8<-- [start:now_node]
import pandas as pd

from beavers import Dag


def get_delay(timestamps: list[pd.Timestamp], now: pd.Timestamp) -> list[pd.Timedelta]:
return [now - timestamp for timestamp in timestamps]


dag = Dag()
timestamp_stream = dag.source_stream()
delay = dag.stream(get_delay).map(timestamp_stream, dag.now())

timestamp_stream.set_stream(
[
pd.to_datetime("2022-01-01", utc=True),
pd.to_datetime("2022-01-02", utc=True),
pd.to_datetime("2022-01-03", utc=True),
]
)
dag.execute(timestamp=pd.to_datetime("2022-01-04", utc=True))
assert delay.get_value() == [
pd.to_timedelta("3d"),
pd.to_timedelta("2d"),
pd.to_timedelta("1d"),
]

# --8<-- [end:now_node]


# --8<-- [start:timer_manager]
from beavers import TimerManager


def get_year(now: pd.Timestamp, timer_manager: TimerManager):
if not timer_manager.has_next_timer():
timer_manager.set_next_timer(
pd.Timestamp(year=now.year + 1, day=1, month=1, tzinfo=now.tzinfo)
)

return now.year


year = dag.state(get_year).map(dag.now(), dag.timer_manager())

dag.execute(pd.to_datetime("2022-01-01", utc=True))
assert year.get_value() == 2022
assert year.get_cycle_id() == dag.get_cycle_id()

dag.execute(pd.to_datetime("2022-01-02", utc=True))
assert year.get_value() == 2022
assert year.get_cycle_id() == dag.get_cycle_id() - 1

dag.execute(pd.to_datetime("2023-01-02", utc=True))
assert year.get_value() == 2023
assert year.get_cycle_id() == dag.get_cycle_id()
# --8<-- [end:timer_manager]


# --8<-- [start:silence]
source_1 = dag.source_stream()
source_1_silence = dag.silence(source_1)
source_2 = dag.source_stream()

both = dag.stream(lambda x, y: x + y).map(source_1_silence, source_2)

source_1.set_stream([1, 2, 3])
source_2.set_stream([4, 5, 6])
dag.execute()
assert both.get_value() == [1, 2, 3, 4, 5, 6]
assert both.get_cycle_id() == dag.get_cycle_id()

source_1.set_stream([1, 2, 3])
dag.execute()
assert both.get_value() == [] # No update because source_1 is silent
assert both.get_cycle_id() == dag.get_cycle_id() - 1

# --8<-- [end:silence]
Loading

0 comments on commit 3450d72

Please sign in to comment.