Skip to content

Commit

Permalink
Merge pull request #30 from tradewelltech/make-cycle-time-a-member
Browse files Browse the repository at this point in the history
Make cycle time a member
  • Loading branch information
0x26res authored Aug 24, 2023
2 parents 655738e + 5cc25cb commit 43bcbee
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions beavers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ def __init__(
self._consumer_manager = consumer_manager
self._sink_topics = runtime_sink_topics
self._producer_manager = producer_manager
self._cycle_time = UTC_EPOCH
self._metrics = ExecutionMetrics()

@staticmethod
Expand Down Expand Up @@ -542,18 +543,18 @@ def _run_cycle(self, messages: list[confluent_kafka.Message]) -> bool:
with self._metrics.measure_deserialization_time():
for handler in self._source_topics.values():
has_messages = handler.flush() or has_messages
cycle_time = (
self._cycle_time = (
self._consumer_manager._get_priming_watermark() or pd.Timestamp.utcnow()
)

if has_messages or self._dag.get_next_timer() <= cycle_time:
if has_messages or self._dag.get_next_timer() <= self._cycle_time:
with self._metrics.measure_execution_time():
self._dag.execute(cycle_time)
self._dag.execute(self._cycle_time)
logger.debug(
"Ran cycle cycle_id=%d, messages=%d, time=%s, next_timer=%s",
self._dag.get_cycle_id(),
len(messages),
cycle_time,
self._cycle_time,
self._dag.get_next_timer(),
)
return True
Expand Down

0 comments on commit 43bcbee

Please sign in to comment.