From 3450d728872962dff7101189d20a4e81a48d8e2e Mon Sep 17 00:00:00 2001 From: aandres Date: Wed, 28 Jun 2023 22:01:32 +0100 Subject: [PATCH] Add advanced concept --- beavers/engine.py | 27 ++++-- docs/concepts/advanced.md | 33 +++++++ docs/concepts/dag.md | 106 ++++++++++++++++++++- examples/advanced_concepts.py | 79 ++++++++++++++++ examples/dag_concepts.py | 168 +++++++++++++++++++++++++++++++++- tests/test_engine.py | 16 +++- 6 files changed, 413 insertions(+), 16 deletions(-) create mode 100644 docs/concepts/advanced.md create mode 100644 examples/advanced_concepts.py diff --git a/beavers/engine.py b/beavers/engine.py index e81cf4b..2788438 100644 --- a/beavers/engine.py +++ b/beavers/engine.py @@ -299,8 +299,13 @@ def _process_updated_value(self, updated_value) -> bool: else: return False else: - self._runtime_data.value = updated_value - return len(updated_value) > 0 + if isinstance(updated_value, SilentUpdate): + # TODO: add test + self._runtime_data.value = updated_value.value + return False + else: + self._runtime_data.value = updated_value + return len(updated_value) > 0 def _notify_observers(self): for observer in self._observers: @@ -356,7 +361,9 @@ def const(self, value: T) -> Node[T]: ) ) - def source_stream(self, empty: T, name: typing.Optional[str] = None) -> Node[T]: + def source_stream( + self, empty: typing.Optional[T] = None, name: typing.Optional[str] = None + ) -> Node[T]: """ Add a source stream `Node`. @@ -369,7 +376,7 @@ def source_stream(self, empty: T, name: typing.Optional[str] = None) -> Node[T]: The name of the source """ - _check_empty(empty) + empty = _check_empty(empty) existing = self._sources.get(name) if name else None if existing is not None: if existing._empty != empty: @@ -386,7 +393,9 @@ def source_stream(self, empty: T, name: typing.Optional[str] = None) -> Node[T]: self._sources[name] = node return node - def stream(self, function: typing.Callable[P, T], empty: T) -> NodePrototype: + def stream( + self, function: typing.Callable[P, T], empty: typing.Optional[T] = None + ) -> NodePrototype: """ Add a stream `NodePrototype`. @@ -399,7 +408,7 @@ def stream(self, function: typing.Callable[P, T], empty: T) -> NodePrototype: Must implement `__len__` and be empty """ - _check_empty(empty) + empty = _check_empty(empty) _check_function(function) def add_to_dag(inputs: _NodeInputs) -> Node: @@ -549,7 +558,7 @@ def _add_stream( self, function: typing.Callable[[...], T], empty: T, inputs: _NodeInputs ) -> Node[T]: _check_function(function) - _check_empty(empty) + empty = _check_empty(empty) return self._add_node( Node._create(value=empty, function=function, inputs=inputs, empty=empty) ) @@ -584,7 +593,9 @@ def _add_node(self, node: Node) -> Node: def _check_empty(empty: T) -> T: - if not isinstance(empty, collections.abc.Sized): + if empty is None: + return [] + elif not isinstance(empty, collections.abc.Sized): raise TypeError("`empty` should implement `__len__`") elif len(empty) != 0: raise TypeError("`len(empty)` should be 0") diff --git a/docs/concepts/advanced.md b/docs/concepts/advanced.md new file mode 100644 index 0000000..30d76fe --- /dev/null +++ b/docs/concepts/advanced.md @@ -0,0 +1,33 @@ + +# Now node + +Beavers can be used in both live and replay mode. +In replay mode, the wall clock isn't relevant. +To access the current time of the replay, you should use the now node: + +```python +--8<-- "examples/advanced_concepts.py:now_node" +``` + +The now node is shared for the whole dag. +Its value gets updated silently. + +# TimerManager + +To be notified when a time passes, nodes can subscribe to a `TimerManager` node. + +```python +--8<-- "examples/advanced_concepts.py:timer_manager" +``` + +# Silent update + +Some node myy update too often, or their updates may not be relevant to other nodes. +In this case it's possible to silence them: + +```python +--8<-- "examples/advanced_concepts.py:silence" +``` + +- [ ] cutoff +- [ ] add link to section (eg: TimerManager? diff --git a/docs/concepts/dag.md b/docs/concepts/dag.md index d27cf29..08ce4c5 100644 --- a/docs/concepts/dag.md +++ b/docs/concepts/dag.md @@ -1,7 +1,7 @@ -# Source Streams +# Stream Source -A source stream is a node whose value can be set externally. +A stream source is a node whose value can be set externally. When `Dag.execute` is called, the updated value is propagated in the dag @@ -9,8 +9,108 @@ When `Dag.execute` is called, the updated value is propagated in the dag --8<-- "examples/dag_concepts.py:source_stream" ``` -If the dag is executed again, the value of the source stream will be reset to it's empty value. +If the dag is executed again, the value of the source stream will be reset to its empty value. ```python --8<-- "examples/dag_concepts.py:source_stream_again" ``` + +The default empty value is set to `[]`, but it can be customized: + +```python +--8<-- "examples/dag_concepts.py:source_stream_empty" +``` + +A source stream can be given a name, so they can be retrieved (and their value set): + +```python +--8<-- "examples/dag_concepts.py:source_stream_name" +``` + +# Stream Node + +A stream node uses the output of other nodes to calculate its updated value. + +```python +--8<-- "examples/dag_concepts.py:stream_node" +``` + +If the dag is executed again, the value of the stream node will be reset to its empty value. + +```python +--8<-- "examples/dag_concepts.py:stream_node_again" +``` + +Again, the default empty value is set to `[]`, but it can be customized: +```python +--8<-- "examples/dag_concepts.py:stream_node_empty" +``` + +The function provided to the node can be any callable, like a lambda: +```python +--8<-- "examples/dag_concepts.py:stream_node_lambda" +``` + +Or a callable: +```python +--8<-- "examples/dag_concepts.py:stream_node_callable" +``` + +# State Node + +A state node retain its value from one dag execution to the next, even if it didn't update: +```python +--8<-- "examples/dag_concepts.py:state_node" +``` + +# Const Node + +A const node is a node whose value doesn't change. +```python +--8<-- "examples/dag_concepts.py:const_node" +``` + +# Connecting Nodes (aka `map`) + +Nodes are connected by calling the `map` function. +Stream node can be connected to state node, and vice versa. +Same thing applies to const nodes. + +> :warning: The `map` function doesn't execute the underlying node. +> Instead it adds a node to the dag + +The map function can use positional arguments: + +```python +--8<-- "examples/dag_concepts.py:map_positional" +``` +Or key word arguments: + +```python +--8<-- "examples/dag_concepts.py:map_key_word" +``` + +# Update Propagation + +- Nodes are notified if any of their input node was updated during the current execution cycle +```python +--8<-- "examples/dag_concepts.py:propagate_any" +``` +- You can check if a node updated by looking at it "cycle_id" +```python +--8<-- "examples/dag_concepts.py:propagate_cycle_id" +``` +- If several inputs of a node get updated during the same cycle, the node will be executed once (and not once per input) +```python +--8<-- "examples/dag_concepts.py:propagate_both" +``` +- Stream nodes (and sources) are not considered updated if their output is empty +```python +--8<-- "examples/dag_concepts.py:propagate_empty" +``` + +- [ ] add emoji plugin +- [ ] get navigation to work +- [ ] add stream vs state +- [ ] add replay section +- [ ] add pyarrow section diff --git a/examples/advanced_concepts.py b/examples/advanced_concepts.py new file mode 100644 index 0000000..21f04f6 --- /dev/null +++ b/examples/advanced_concepts.py @@ -0,0 +1,79 @@ +# --8<-- [start:now_node] +import pandas as pd + +from beavers import Dag + + +def get_delay(timestamps: list[pd.Timestamp], now: pd.Timestamp) -> list[pd.Timedelta]: + return [now - timestamp for timestamp in timestamps] + + +dag = Dag() +timestamp_stream = dag.source_stream() +delay = dag.stream(get_delay).map(timestamp_stream, dag.now()) + +timestamp_stream.set_stream( + [ + pd.to_datetime("2022-01-01", utc=True), + pd.to_datetime("2022-01-02", utc=True), + pd.to_datetime("2022-01-03", utc=True), + ] +) +dag.execute(timestamp=pd.to_datetime("2022-01-04", utc=True)) +assert delay.get_value() == [ + pd.to_timedelta("3d"), + pd.to_timedelta("2d"), + pd.to_timedelta("1d"), +] + +# --8<-- [end:now_node] + + +# --8<-- [start:timer_manager] +from beavers import TimerManager + + +def get_year(now: pd.Timestamp, timer_manager: TimerManager): + if not timer_manager.has_next_timer(): + timer_manager.set_next_timer( + pd.Timestamp(year=now.year + 1, day=1, month=1, tzinfo=now.tzinfo) + ) + + return now.year + + +year = dag.state(get_year).map(dag.now(), dag.timer_manager()) + +dag.execute(pd.to_datetime("2022-01-01", utc=True)) +assert year.get_value() == 2022 +assert year.get_cycle_id() == dag.get_cycle_id() + +dag.execute(pd.to_datetime("2022-01-02", utc=True)) +assert year.get_value() == 2022 +assert year.get_cycle_id() == dag.get_cycle_id() - 1 + +dag.execute(pd.to_datetime("2023-01-02", utc=True)) +assert year.get_value() == 2023 +assert year.get_cycle_id() == dag.get_cycle_id() +# --8<-- [end:timer_manager] + + +# --8<-- [start:silence] +source_1 = dag.source_stream() +source_1_silence = dag.silence(source_1) +source_2 = dag.source_stream() + +both = dag.stream(lambda x, y: x + y).map(source_1_silence, source_2) + +source_1.set_stream([1, 2, 3]) +source_2.set_stream([4, 5, 6]) +dag.execute() +assert both.get_value() == [1, 2, 3, 4, 5, 6] +assert both.get_cycle_id() == dag.get_cycle_id() + +source_1.set_stream([1, 2, 3]) +dag.execute() +assert both.get_value() == [] # No update because source_1 is silent +assert both.get_cycle_id() == dag.get_cycle_id() - 1 + +# --8<-- [end:silence] diff --git a/examples/dag_concepts.py b/examples/dag_concepts.py index ddd120f..8e7f99a 100644 --- a/examples/dag_concepts.py +++ b/examples/dag_concepts.py @@ -3,9 +3,9 @@ dag = Dag() -source_stream = dag.source_stream([], "my_source") -source_stream.set_stream([1, 2, 3]) +source_stream = dag.source_stream() +source_stream.set_stream([1, 2, 3]) dag.execute() assert source_stream.get_value() == [1, 2, 3] # --8<-- [end:source_stream] @@ -16,6 +16,166 @@ assert source_stream.get_value() == [] # --8<-- [end:source_stream_again] +# --8<-- [start:source_stream_name] +my_source_stream = dag.source_stream(name="my_source") +dag.get_sources()["my_source"].set_stream([4, 5, 6]) +dag.execute() +assert my_source_stream.get_value() == [4, 5, 6] +# --8<-- [end:source_stream_name] + +# --8<-- [start:source_stream_empty] +dict_source_stream = dag.source_stream(empty={}) +dict_source_stream.set_stream({"hello": "world"}) +dag.execute() +assert dict_source_stream.get_value() == {"hello": "world"} +dag.execute() +assert dict_source_stream.get_value() == {} +# --8<-- [end:source_stream_empty] + + +# --8<-- [start:stream_node] +def multiply_by_2(values: list[int]) -> list[int]: + return [v * 2 for v in values] + + +stream_node = dag.stream(multiply_by_2).map(source_stream) + +source_stream.set_stream([1, 2, 3]) +dag.execute() +assert stream_node.get_value() == [2, 4, 6] +# --8<-- [end:stream_node] + + +# --8<-- [start:stream_node_again] +dag.execute() +assert stream_node.get_value() == [] +# --8<-- [end:stream_node_again] + + +# --8<-- [start:stream_node_empty] +set_stream_node = dag.stream(set, empty=set()).map(source_stream) +source_stream.set_stream([1, 2, 3]) +dag.execute() +assert set_stream_node.get_value() == {1, 2, 3} +dag.execute() +assert set_stream_node.get_value() == set() +# --8<-- [end:stream_node_empty] + + +# --8<-- [start:stream_node_lambda] +lambda_stream_node = dag.stream(lambda x: x[:-1]).map(source_stream) +source_stream.set_stream([1, 2, 3]) +dag.execute() +assert lambda_stream_node.get_value() == [1, 2] +# --8<-- [end:stream_node_lambda] + + +# --8<-- [start:stream_node_callable] +class MultiplyBy: + def __init__(self, by: int): + self.by = by + + def __call__(self, values: list[int]) -> list[int]: + return [v * self.by for v in values] + + +callable_stream_node = dag.stream(MultiplyBy(3)).map(source_stream) +source_stream.set_stream([1, 2, 3]) +dag.execute() +assert callable_stream_node.get_value() == [3, 6, 9] +# --8<-- [end:stream_node_callable] + + +# --8<-- [start:state_node] +class Accumulator: + def __init__(self): + self._count = 0 -stream_node = dag.stream(lambda x: x, []).map(source_stream) -state_node = dag.state(lambda x: x).map(source_stream) + def __call__(self, values: list[int]) -> int: + self._count += sum(values) + return self._count + + +state_node = dag.state(Accumulator()).map(source_stream) +source_stream.set_stream([1, 2, 3]) +dag.execute() +assert state_node.get_value() == 6 +dag.execute() +assert state_node.get_value() == 6 +# --8<-- [end:state_node] + + +# --8<-- [start:const_node] +const_node = dag.const(2) +assert const_node.get_value() == 2 +# --8<-- [end:const_node] + + +# --8<-- [start:map_positional] +to_append = dag.const([3]) +positional_stream = dag.stream(lambda x, y: x + y).map(source_stream, to_append) +source_stream.set_stream([1, 2]) +dag.execute() +assert positional_stream.get_value() == [1, 2, 3] +# --8<-- [end:map_positional] + + +# --8<-- [start:map_key_word] +key_word = dag.stream(lambda x, y: x + y).map(x=source_stream, y=to_append) +# --8<-- [end:map_positional] + +# --8<-- [start:propagate_any] +source_1 = dag.source_stream() +source_2 = dag.source_stream() +node = dag.stream(lambda x, y: x + y).map(source_1, source_2) + +source_1.set_stream([1, 2, 3]) +dag.execute() +assert node.get_value() == [1, 2, 3] + +source_2.set_stream([4, 5, 6]) +dag.execute() +assert node.get_value() == [4, 5, 6] + +dag.execute() +assert node.get_value() == [] +# --8<-- [end:propagate_any] + +# --8<-- [start:propagate_cycle_id] +source_1.set_stream([1, 2, 3]) +dag.execute() +assert node.get_value() == [1, 2, 3] +assert node.get_cycle_id() == dag.get_cycle_id() + +dag.execute() +assert node.get_value() == [] +assert node.get_cycle_id() == dag.get_cycle_id() - 1 +# --8<-- [end:propagate_cycle_id] + + +# --8<-- [start:propagate_both] +source_1.set_stream([1, 2, 3]) +source_2.set_stream([4, 5, 6]) +dag.execute() +assert node.get_value() == [1, 2, 3, 4, 5, 6] +assert node.get_cycle_id() == dag.get_cycle_id() +# --8<-- [end:propagate_cycle_id] + + +# --8<-- [start:propagate_empty] +def even_only(values: list[int]) -> list[int]: + return [v for v in values if (v % 2) == 0] + + +even = dag.stream(even_only).map(source_1) + +source_1.set_stream([1, 2, 3]) +dag.execute() +assert even.get_value() == [2] +assert even.get_cycle_id() == dag.get_cycle_id() + +source_1.set_stream([1, 3]) +dag.execute() +assert even.get_value() == [] +assert even.get_cycle_id() == dag.get_cycle_id() - 1 +# --8<-- [end:propagate_empty] diff --git a/tests/test_engine.py b/tests/test_engine.py index f0669f9..b08874e 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -122,6 +122,17 @@ def test_scalar(): z.set_stream(34) +def test_stream_no_empty(): + dag = Dag() + source1 = dag.source_stream() + assert source1.get_value() == [] + assert source1._empty == [] + + stream = dag.stream(lambda x: x).map(source1) + assert stream.get_value() == [] + assert stream._empty == [] + + def test_stream_to_state(): dag = Dag() @@ -482,7 +493,10 @@ def test_node_with_same_input_mixed(): def test_wrong_usage(): dag = Dag() with pytest.raises(TypeError, match="`empty` should implement `__len__`"): - dag.stream(lambda x: x, None) + dag.stream(lambda x: x, 123) + + with pytest.raises(TypeError, match="`empty` should implement `__len__`"): + dag.stream(lambda x: x, 0) def test_add_existing_node():