Skip to content

Commit

Permalink
Merge pull request #17 from tradewelltech/fix-kafka-test-coverage
Browse files Browse the repository at this point in the history
Fix kafka test coverage
  • Loading branch information
0x26res authored Jul 5, 2023
2 parents cb624c7 + c3e0b8b commit efb6c2a
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 19 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
[![Code style: black][codestyle-image]][codestyle-url]

# Beavers
_____

**Beavers** is a python library for stream processing, optimized for analytics.


It is used at [Tradewell Technologies](https://www.tradewelltech.co/),
to calculate analytics and serve model predictions,
in both realtime and batch jobs.
Expand All @@ -27,8 +25,8 @@ in both realtime and batch jobs.
* Optimized for analytics, it uses micro-batching (instead of processing records one by one)
* Similar to [incremental](https://github.com/janestreet/incremental), it updates nodes in a dag incrementally
* Taking inspiration from [kafka streams](https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/), there are two types of nodes in the dag:
* Stream: ephemeral micro-batches of events (cleared after every cycle)
* State: durable state derived from streams
* **Stream:** ephemeral micro-batches of events (cleared after every cycle)
* **State:** durable state derived from streams
* Clear separation between the business logic and the IO.
So the same dag can be used in real time mode, replay mode or can be easily tested.
* functional interface (no inheritance required)
Expand Down
26 changes: 12 additions & 14 deletions beavers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,19 @@ class ProducerMetrics:


class _ProducerManager:
def __init__(
self,
producer_config: dict[str, Any],
):
self._producer = confluent_kafka.Producer(producer_config)
self._errors = 0
def __init__(self, producer: confluent_kafka.Producer):
self._producer: confluent_kafka.Producer = producer
self._errors: int = 0
self._metrics: ProducerMetrics = ProducerMetrics()

@staticmethod
def create(producer_config: dict[str, Any]) -> "_ProducerManager":
return _ProducerManager(confluent_kafka.Producer(producer_config))

def poll(self):
self._producer.poll(0.0)

def produce_one(self, topic: str, key: AnyStr, value: bytes) -> bool:
def produce_one(self, topic: str, key: AnyStr, value: AnyStr) -> bool:
try:
self._producer.produce(
topic=topic, key=key, value=value, on_delivery=self.on_delivery
Expand All @@ -201,8 +202,9 @@ def produce_one(self, topic: str, key: AnyStr, value: bytes) -> bool:
except Exception as err:
if self._errors == 0:
logger.error("Error producing message on %s", topic, exc_info=err)
self._errors += 1
return False
self._metrics.produced_error_count += 1
self._errors += 1
return False

def on_delivery(self, err, msg: confluent_kafka.Message):
if err:
Expand Down Expand Up @@ -483,7 +485,7 @@ def create(
_RuntimeSinkTopic(dag_sinks[key], value)
for key, value in sink_topics.items()
]
producer_manager = _ProducerManager(producer_config)
producer_manager = _ProducerManager.create(producer_config)
return KafkaDriver(
dag=dag,
runtime_source_topics=runtime_source_topics,
Expand All @@ -492,10 +494,6 @@ def create(
producer_manager=producer_manager,
)

def run(self):
while True:
self.run_cycle()

def flush_metrics(self) -> ExecutionMetrics:
results = self._metrics
self._metrics = ExecutionMetrics()
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ theme:
- search.suggest
palette:
scheme: slate
accent: green
logo: static/icons/beavers/logo.svg
favicon: static/icons/beavers/icon.png

Expand Down
229 changes: 228 additions & 1 deletion tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io
import logging
import queue
from typing import Optional, Tuple, Union
from typing import AnyStr, Callable, Optional, Tuple, Union

import confluent_kafka
import mock
Expand All @@ -17,15 +17,19 @@
from beavers.engine import UTC_EPOCH, UTC_MAX, Dag
from beavers.kafka import (
KAFKA_EOF_CODE,
ConsumerMetrics,
KafkaDriver,
KafkaProducerMessage,
OffsetPolicy,
ProducerMetrics,
SourceTopic,
_ConsumerManager,
_get_message_ns,
_get_previous_start_of_day,
_poll_all,
_ProducerManager,
_resolve_topic_offsets,
_resolve_topics_offsets,
_RuntimeSinkTopic,
_RuntimeSourceTopic,
)
Expand Down Expand Up @@ -110,6 +114,28 @@ def mock_offset_for_time(
] = TopicPartition(topic, partition, offset)


class MockProducer:
def __init__(self):
self.polls = []
self.produced = []

def produce(self, topic: str, key: AnyStr, value: AnyStr, on_delivery: Callable):
self.produced.append((topic, key, value, on_delivery))

def poll(self, time: float):
self.polls.append(time)

def ack_all(self):
for topic, key, value, call_back in self.produced:
call_back(None, mock_kafka_message(topic=topic, value=value, key=key))
self.produced.clear()

def fail_all(self):
for topic, key, value, call_back in self.produced:
call_back("BAD!", mock_kafka_message(topic=topic, value=value, key=key))
self.produced.clear()


def test_get_previous_start_of_day_utc():
assert pd.to_datetime(
"2022-09-18 08:00:00", utc=True
Expand Down Expand Up @@ -267,6 +293,65 @@ def test_consumer_manager_priming():
assert consumer_manager._get_priming_watermark() is None


def test_consumer_manager_update_partition_info():
topic_partition = TopicPartition("topic-a", 0, offset=100)
partitions = [topic_partition]
cutoff = pd.to_datetime("2022-10-19 01:00:00", utc=True)

mock_consumer = MockConsumer()
consumer_manager = _ConsumerManager(
cutoff=cutoff,
partitions=partitions,
consumer=mock_consumer,
batch_size=100,
max_held_messages=200,
)
assert consumer_manager._low_water_mark_ns == 0

consumer_manager._update_partition_info(
[
mock_kafka_message(
topic=topic_partition.topic,
partition=topic_partition.partition,
value=b"HELLO",
timestamp=cutoff,
)
]
)

assert consumer_manager._low_water_mark_ns == cutoff.value

with mock.patch(
"pandas.Timestamp.utcnow",
return_value=cutoff + pd.to_timedelta("20s"),
):
consumer_manager._update_partition_info(
[
mock_kafka_message(
topic=topic_partition.topic,
partition=topic_partition.partition,
value=b"EOF",
error=KafkaError(KAFKA_EOF_CODE),
)
]
)
assert (
consumer_manager._low_water_mark_ns
== (cutoff + pd.to_timedelta("20s")).value
)

consumer_manager._update_partition_info(
[
mock_kafka_message(
topic=topic_partition.topic,
partition=topic_partition.partition,
value=b"123",
error=KafkaError(123),
)
]
)


def test_consumer_manager_batching():
tp1 = TopicPartition("topic-a", 0, offset=0)
tp2 = TopicPartition("topic-a", 1, offset=0)
Expand Down Expand Up @@ -486,6 +571,12 @@ def test_kafka_driver_word_count(log_helper: LogHelper):
assert mock_producer_manager.flush() == []
assert len(log_helper.flush()) == 1

metrics = kafka_driver.flush_metrics()
assert metrics.serialization_ns > 0
assert metrics.serialization_count == 6
assert metrics.execution_ns > 0
assert metrics.execution_count == 3


def _timestamp_to_bytes(timestamp: pd.Timestamp) -> bytes:
return str(timestamp).encode("utf-8")
Expand Down Expand Up @@ -782,6 +873,11 @@ def test_all_partitions_eof():
assert [m.value() for m in consumer_manager.poll(0.0)] == [b"EOF", b"EOF"]
assert mock_consumer._paused == [] # Not pausing EOF partitions
assert consumer_manager._get_priming_watermark() is None
assert consumer_manager.flush_metrics() == ConsumerMetrics(
consumed_message_size=6,
consumed_message_count=2,
released_message_count=2,
)


def test_get_message_ns():
Expand Down Expand Up @@ -942,6 +1038,36 @@ def test_from_start_of_day():
)


def test_resolve_topics_offsets():
consumer = MockConsumer()

consumer._topics["topic-1"] = topic_metadata("topic-1", [partition_metadata(0)])

start_of_day = pd.to_datetime("2021-12-31T00:15:00Z")
consumer.mock_offset_for_time("topic-1", 0, start_of_day, 123)

results = _resolve_topics_offsets(
consumer,
[
SourceTopic.from_start_of_day(
"topic-1",
PassThroughKafkaMessageDeserializer(),
pd.Timedelta("00:15:00"),
"UTC",
)
],
pd.to_datetime("2022-01-01", utc=True),
)
assert len(results) == 1
assert results == [
TopicPartition(
topic="topic-1",
partition=0,
offset=123,
)
]


def test_from_relative_time():
consumer = MockConsumer()
now = pd.to_datetime("2022-01-01", utc=True)
Expand Down Expand Up @@ -1097,3 +1223,104 @@ def test_no_policy():

with pytest.raises(ValueError, match="OffsetPolicy BAD! not supported for topic-1"):
_resolve_topic_offsets(consumer, source_topic, now)


def test_producer_manager_ok():
mock_producer = MockProducer()
producer_manager = _ProducerManager(mock_producer)
producer_manager.poll()
assert mock_producer.polls == [0.0]

producer_manager.produce_one("topic-1", "key-1", "value-1")
assert mock_producer.produced == [
("topic-1", "key-1", "value-1", producer_manager.on_delivery)
]

assert producer_manager.flush_metrics() == ProducerMetrics(
produced_count=1,
produced_size=7,
)

mock_producer.ack_all()
assert producer_manager.flush_metrics() == ProducerMetrics(confirmed_count=1)


def test_producer_manager_produce_error():
mock_producer = MockProducer()
producer_manager = _ProducerManager(mock_producer)
mock_producer.produced = None

producer_manager.produce_one("topic-1", "key-1", "value-1")
assert producer_manager.flush_metrics() == ProducerMetrics(produced_error_count=1)
assert producer_manager._errors == 1

producer_manager.produce_one("topic-1", "key-1", "value-1")
assert producer_manager.flush_metrics() == ProducerMetrics(produced_error_count=1)
assert producer_manager._errors == 2


def test_producer_manager_not_delivered():
mock_producer = MockProducer()
producer_manager = _ProducerManager(mock_producer)
producer_manager.produce_one("topic-1", "key-1", "value-1")
producer_manager.flush_metrics()
mock_producer.fail_all()
assert producer_manager.flush_metrics() == ProducerMetrics(delivery_error_count=1)
assert producer_manager._errors == 1

producer_manager.produce_one("topic-1", "key-1", "value-1")
producer_manager.flush_metrics()
mock_producer.fail_all()
assert producer_manager.flush_metrics() == ProducerMetrics(delivery_error_count=1)
assert producer_manager._errors == 2


def test_producer_manager_create():
with mock.patch("confluent_kafka.Producer", autospec=True):
producer_manager = _ProducerManager.create({})
assert producer_manager._producer is not None


def test_consumer_manager_create():
with mock.patch("confluent_kafka.Consumer", autospec=True):
consumer_manager = _ConsumerManager.create(
{"enable.partition.eof": True}, [], 500, timeout=None
)
assert consumer_manager._consumer is not None


def test_consumer_manager_create_bad():
with mock.patch("confluent_kafka.Consumer", autospec=True):
with pytest.raises(
ValueError, match=r"'enable.partition.eof' should be set to true"
):
_ConsumerManager.create({}, [], 500, timeout=None)


def test_kafka_dirver_create():
with mock.patch("confluent_kafka.Consumer", autospec=True):
KafkaDriver.create(Dag(), {}, {"enable.partition.eof": True}, {}, {})


def test_runtime_sink_topic():
dag = Dag()
node = dag.source_stream(empty={})
sink = dag.sink("sink", node)
runtime_sink_topic = _RuntimeSinkTopic([sink], WorldCountSerializer("topic-1"))

producer_manager = MockProducerManager()
dag.execute()
runtime_sink_topic.flush(dag.get_cycle_id(), producer_manager)
assert producer_manager.messages == []

node.set_stream({"foo": "bar"})
dag.execute()
runtime_sink_topic.flush(dag.get_cycle_id(), producer_manager)
assert producer_manager.messages == [
KafkaProducerMessage(topic="topic-1", key=b"foo", value=b"bar")
]
producer_manager.messages.clear()

dag.execute()
runtime_sink_topic.flush(dag.get_cycle_id(), producer_manager)
assert producer_manager.messages == []

0 comments on commit efb6c2a

Please sign in to comment.