diff --git a/README.md b/README.md index ffab500..2ac969d 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,9 @@ [![Code style: black][codestyle-image]][codestyle-url] # Beavers -_____ **Beavers** is a python library for stream processing, optimized for analytics. - It is used at [Tradewell Technologies](https://www.tradewelltech.co/), to calculate analytics and serve model predictions, in both realtime and batch jobs. @@ -27,8 +25,8 @@ in both realtime and batch jobs. * Optimized for analytics, it uses micro-batching (instead of processing records one by one) * Similar to [incremental](https://github.com/janestreet/incremental), it updates nodes in a dag incrementally * Taking inspiration from [kafka streams](https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/), there are two types of nodes in the dag: - * Stream: ephemeral micro-batches of events (cleared after every cycle) - * State: durable state derived from streams + * **Stream:** ephemeral micro-batches of events (cleared after every cycle) + * **State:** durable state derived from streams * Clear separation between the business logic and the IO. So the same dag can be used in real time mode, replay mode or can be easily tested. * functional interface (no inheritance required) diff --git a/beavers/kafka.py b/beavers/kafka.py index 37e7dd1..4dc7f10 100644 --- a/beavers/kafka.py +++ b/beavers/kafka.py @@ -179,18 +179,19 @@ class ProducerMetrics: class _ProducerManager: - def __init__( - self, - producer_config: dict[str, Any], - ): - self._producer = confluent_kafka.Producer(producer_config) - self._errors = 0 + def __init__(self, producer: confluent_kafka.Producer): + self._producer: confluent_kafka.Producer = producer + self._errors: int = 0 self._metrics: ProducerMetrics = ProducerMetrics() + @staticmethod + def create(producer_config: dict[str, Any]) -> "_ProducerManager": + return _ProducerManager(confluent_kafka.Producer(producer_config)) + def poll(self): self._producer.poll(0.0) - def produce_one(self, topic: str, key: AnyStr, value: bytes) -> bool: + def produce_one(self, topic: str, key: AnyStr, value: AnyStr) -> bool: try: self._producer.produce( topic=topic, key=key, value=value, on_delivery=self.on_delivery @@ -201,8 +202,9 @@ def produce_one(self, topic: str, key: AnyStr, value: bytes) -> bool: except Exception as err: if self._errors == 0: logger.error("Error producing message on %s", topic, exc_info=err) - self._errors += 1 - return False + self._metrics.produced_error_count += 1 + self._errors += 1 + return False def on_delivery(self, err, msg: confluent_kafka.Message): if err: @@ -483,7 +485,7 @@ def create( _RuntimeSinkTopic(dag_sinks[key], value) for key, value in sink_topics.items() ] - producer_manager = _ProducerManager(producer_config) + producer_manager = _ProducerManager.create(producer_config) return KafkaDriver( dag=dag, runtime_source_topics=runtime_source_topics, @@ -492,10 +494,6 @@ def create( producer_manager=producer_manager, ) - def run(self): - while True: - self.run_cycle() - def flush_metrics(self) -> ExecutionMetrics: results = self._metrics self._metrics = ExecutionMetrics() diff --git a/mkdocs.yml b/mkdocs.yml index 2db20a7..465754f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -18,6 +18,7 @@ theme: - search.suggest palette: scheme: slate + accent: green logo: static/icons/beavers/logo.svg favicon: static/icons/beavers/icon.png diff --git a/tests/test_kafka.py b/tests/test_kafka.py index 8861319..f0f6942 100644 --- a/tests/test_kafka.py +++ b/tests/test_kafka.py @@ -5,7 +5,7 @@ import io import logging import queue -from typing import Optional, Tuple, Union +from typing import AnyStr, Callable, Optional, Tuple, Union import confluent_kafka import mock @@ -17,15 +17,19 @@ from beavers.engine import UTC_EPOCH, UTC_MAX, Dag from beavers.kafka import ( KAFKA_EOF_CODE, + ConsumerMetrics, KafkaDriver, KafkaProducerMessage, OffsetPolicy, + ProducerMetrics, SourceTopic, _ConsumerManager, _get_message_ns, _get_previous_start_of_day, _poll_all, + _ProducerManager, _resolve_topic_offsets, + _resolve_topics_offsets, _RuntimeSinkTopic, _RuntimeSourceTopic, ) @@ -110,6 +114,28 @@ def mock_offset_for_time( ] = TopicPartition(topic, partition, offset) +class MockProducer: + def __init__(self): + self.polls = [] + self.produced = [] + + def produce(self, topic: str, key: AnyStr, value: AnyStr, on_delivery: Callable): + self.produced.append((topic, key, value, on_delivery)) + + def poll(self, time: float): + self.polls.append(time) + + def ack_all(self): + for topic, key, value, call_back in self.produced: + call_back(None, mock_kafka_message(topic=topic, value=value, key=key)) + self.produced.clear() + + def fail_all(self): + for topic, key, value, call_back in self.produced: + call_back("BAD!", mock_kafka_message(topic=topic, value=value, key=key)) + self.produced.clear() + + def test_get_previous_start_of_day_utc(): assert pd.to_datetime( "2022-09-18 08:00:00", utc=True @@ -267,6 +293,65 @@ def test_consumer_manager_priming(): assert consumer_manager._get_priming_watermark() is None +def test_consumer_manager_update_partition_info(): + topic_partition = TopicPartition("topic-a", 0, offset=100) + partitions = [topic_partition] + cutoff = pd.to_datetime("2022-10-19 01:00:00", utc=True) + + mock_consumer = MockConsumer() + consumer_manager = _ConsumerManager( + cutoff=cutoff, + partitions=partitions, + consumer=mock_consumer, + batch_size=100, + max_held_messages=200, + ) + assert consumer_manager._low_water_mark_ns == 0 + + consumer_manager._update_partition_info( + [ + mock_kafka_message( + topic=topic_partition.topic, + partition=topic_partition.partition, + value=b"HELLO", + timestamp=cutoff, + ) + ] + ) + + assert consumer_manager._low_water_mark_ns == cutoff.value + + with mock.patch( + "pandas.Timestamp.utcnow", + return_value=cutoff + pd.to_timedelta("20s"), + ): + consumer_manager._update_partition_info( + [ + mock_kafka_message( + topic=topic_partition.topic, + partition=topic_partition.partition, + value=b"EOF", + error=KafkaError(KAFKA_EOF_CODE), + ) + ] + ) + assert ( + consumer_manager._low_water_mark_ns + == (cutoff + pd.to_timedelta("20s")).value + ) + + consumer_manager._update_partition_info( + [ + mock_kafka_message( + topic=topic_partition.topic, + partition=topic_partition.partition, + value=b"123", + error=KafkaError(123), + ) + ] + ) + + def test_consumer_manager_batching(): tp1 = TopicPartition("topic-a", 0, offset=0) tp2 = TopicPartition("topic-a", 1, offset=0) @@ -486,6 +571,12 @@ def test_kafka_driver_word_count(log_helper: LogHelper): assert mock_producer_manager.flush() == [] assert len(log_helper.flush()) == 1 + metrics = kafka_driver.flush_metrics() + assert metrics.serialization_ns > 0 + assert metrics.serialization_count == 6 + assert metrics.execution_ns > 0 + assert metrics.execution_count == 3 + def _timestamp_to_bytes(timestamp: pd.Timestamp) -> bytes: return str(timestamp).encode("utf-8") @@ -782,6 +873,11 @@ def test_all_partitions_eof(): assert [m.value() for m in consumer_manager.poll(0.0)] == [b"EOF", b"EOF"] assert mock_consumer._paused == [] # Not pausing EOF partitions assert consumer_manager._get_priming_watermark() is None + assert consumer_manager.flush_metrics() == ConsumerMetrics( + consumed_message_size=6, + consumed_message_count=2, + released_message_count=2, + ) def test_get_message_ns(): @@ -942,6 +1038,36 @@ def test_from_start_of_day(): ) +def test_resolve_topics_offsets(): + consumer = MockConsumer() + + consumer._topics["topic-1"] = topic_metadata("topic-1", [partition_metadata(0)]) + + start_of_day = pd.to_datetime("2021-12-31T00:15:00Z") + consumer.mock_offset_for_time("topic-1", 0, start_of_day, 123) + + results = _resolve_topics_offsets( + consumer, + [ + SourceTopic.from_start_of_day( + "topic-1", + PassThroughKafkaMessageDeserializer(), + pd.Timedelta("00:15:00"), + "UTC", + ) + ], + pd.to_datetime("2022-01-01", utc=True), + ) + assert len(results) == 1 + assert results == [ + TopicPartition( + topic="topic-1", + partition=0, + offset=123, + ) + ] + + def test_from_relative_time(): consumer = MockConsumer() now = pd.to_datetime("2022-01-01", utc=True) @@ -1097,3 +1223,104 @@ def test_no_policy(): with pytest.raises(ValueError, match="OffsetPolicy BAD! not supported for topic-1"): _resolve_topic_offsets(consumer, source_topic, now) + + +def test_producer_manager_ok(): + mock_producer = MockProducer() + producer_manager = _ProducerManager(mock_producer) + producer_manager.poll() + assert mock_producer.polls == [0.0] + + producer_manager.produce_one("topic-1", "key-1", "value-1") + assert mock_producer.produced == [ + ("topic-1", "key-1", "value-1", producer_manager.on_delivery) + ] + + assert producer_manager.flush_metrics() == ProducerMetrics( + produced_count=1, + produced_size=7, + ) + + mock_producer.ack_all() + assert producer_manager.flush_metrics() == ProducerMetrics(confirmed_count=1) + + +def test_producer_manager_produce_error(): + mock_producer = MockProducer() + producer_manager = _ProducerManager(mock_producer) + mock_producer.produced = None + + producer_manager.produce_one("topic-1", "key-1", "value-1") + assert producer_manager.flush_metrics() == ProducerMetrics(produced_error_count=1) + assert producer_manager._errors == 1 + + producer_manager.produce_one("topic-1", "key-1", "value-1") + assert producer_manager.flush_metrics() == ProducerMetrics(produced_error_count=1) + assert producer_manager._errors == 2 + + +def test_producer_manager_not_delivered(): + mock_producer = MockProducer() + producer_manager = _ProducerManager(mock_producer) + producer_manager.produce_one("topic-1", "key-1", "value-1") + producer_manager.flush_metrics() + mock_producer.fail_all() + assert producer_manager.flush_metrics() == ProducerMetrics(delivery_error_count=1) + assert producer_manager._errors == 1 + + producer_manager.produce_one("topic-1", "key-1", "value-1") + producer_manager.flush_metrics() + mock_producer.fail_all() + assert producer_manager.flush_metrics() == ProducerMetrics(delivery_error_count=1) + assert producer_manager._errors == 2 + + +def test_producer_manager_create(): + with mock.patch("confluent_kafka.Producer", autospec=True): + producer_manager = _ProducerManager.create({}) + assert producer_manager._producer is not None + + +def test_consumer_manager_create(): + with mock.patch("confluent_kafka.Consumer", autospec=True): + consumer_manager = _ConsumerManager.create( + {"enable.partition.eof": True}, [], 500, timeout=None + ) + assert consumer_manager._consumer is not None + + +def test_consumer_manager_create_bad(): + with mock.patch("confluent_kafka.Consumer", autospec=True): + with pytest.raises( + ValueError, match=r"'enable.partition.eof' should be set to true" + ): + _ConsumerManager.create({}, [], 500, timeout=None) + + +def test_kafka_dirver_create(): + with mock.patch("confluent_kafka.Consumer", autospec=True): + KafkaDriver.create(Dag(), {}, {"enable.partition.eof": True}, {}, {}) + + +def test_runtime_sink_topic(): + dag = Dag() + node = dag.source_stream(empty={}) + sink = dag.sink("sink", node) + runtime_sink_topic = _RuntimeSinkTopic([sink], WorldCountSerializer("topic-1")) + + producer_manager = MockProducerManager() + dag.execute() + runtime_sink_topic.flush(dag.get_cycle_id(), producer_manager) + assert producer_manager.messages == [] + + node.set_stream({"foo": "bar"}) + dag.execute() + runtime_sink_topic.flush(dag.get_cycle_id(), producer_manager) + assert producer_manager.messages == [ + KafkaProducerMessage(topic="topic-1", key=b"foo", value=b"bar") + ] + producer_manager.messages.clear() + + dag.execute() + runtime_sink_topic.flush(dag.get_cycle_id(), producer_manager) + assert producer_manager.messages == []