Skip to content

Commit

Permalink
Add dag metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
aandres committed Aug 22, 2023
1 parent d0df4e6 commit c46a4ee
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 3 deletions.
27 changes: 25 additions & 2 deletions beavers/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,15 @@ def get_sink_value(self) -> typing.Any:
def _stain(self):
self._runtime_data.notifications += 1

def _clean(self, cycle_id: int):
def _clean(self, cycle_id: int) -> bool:
if self._should_recalculate():
self._recalculate(cycle_id)
return True
else:
if self._is_stream():
self._runtime_data.value = self._empty
self._runtime_data.notifications = 0
return False

def _is_stream(self) -> bool:
return self._empty is not _STATE_EMPTY
Expand Down Expand Up @@ -326,6 +328,16 @@ def _unchanged_callback():
return _STATE_UNCHANGED


@dataclasses.dataclass
class DagMetrics:
"""Metrics for the execution of a dag."""

notification_count: int = 0
updated_node_count: int = 0
cycle_count: int = 0
node_count: int = 0


class Dag:
"""Main class used for building and executing a dag."""

Expand All @@ -340,6 +352,7 @@ def __init__(self):
self._silent_now_node: Node[pd.Timestamp] = self.silence(self._now_node)
self._timer_manager_nodes: list[Node[TimerManager]] = []
self._cycle_id: int = 0
self._metrics = DagMetrics(node_count=len(self._nodes))

def const(self, value: T) -> Node[T]:
"""
Expand Down Expand Up @@ -551,7 +564,17 @@ def execute(self, timestamp: typing.Optional[pd.Timestamp] = None):
self._flush_timers(timestamp)

for node in self._nodes:
node._clean(self._cycle_id)
self._metrics.notification_count += node._runtime_data.notifications
cleaned = node._clean(self._cycle_id)
if cleaned:
self._metrics.updated_node_count += 1
self._metrics.cycle_count += 1
self._metrics.node_count = len(self._nodes)

def flush_metrics(self) -> DagMetrics:
results = self._metrics
self._metrics = DagMetrics(node_count=len(self._nodes))
return results

def _add_stream(
self, function: typing.Callable[[...], T], empty: T, inputs: _NodeInputs
Expand Down
2 changes: 1 addition & 1 deletion beavers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def append(self, message: confluent_kafka.Message):
def flush(self) -> bool:
results: T = self._deserializer(self._messages)
self._messages = []
if results:
if len(results):
self._node.set_stream(results)
return True
else:
Expand Down
28 changes: 28 additions & 0 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
UTC_EPOCH,
UTC_MAX,
Dag,
DagMetrics,
TimerManager,
_NodeInputs,
_unchanged_callback,
Expand Down Expand Up @@ -593,3 +594,30 @@ def test_can_not_add_node_back():

def test_unchanged_callback():
assert _unchanged_callback() is _STATE_UNCHANGED


def test_metrics():
dag = Dag()
x_source = dag.source_stream([], "x")
x = dag.state(GetLatest(40)).map(x_source)
y_source = dag.source_stream([], "y")
y = dag.state(GetLatest(41)).map(y_source)
z = dag.state(add).map(x, y)

dag.state(lambda left, right: left - right).map(y, z)

dag.execute()
assert dag.flush_metrics() == DagMetrics(13, 8, 1, 8)

dag.execute()
assert dag.flush_metrics() == DagMetrics(0, 0, 1, 8)
assert dag.flush_metrics() == DagMetrics(0, 0, 0, 8)

x_source.set_stream([1, 2, 3])
y_source.set_stream([1, 2, 3])
dag.execute()
assert dag.flush_metrics() == DagMetrics(8, 6, 1, 8)

x_source.set_stream([1, 2, 3])
dag.execute()
assert dag.flush_metrics() == DagMetrics(4, 4, 1, 8)

0 comments on commit c46a4ee

Please sign in to comment.