diff --git a/beavers/kafka.py b/beavers/kafka.py index f9dcf1e..101f6c4 100644 --- a/beavers/kafka.py +++ b/beavers/kafka.py @@ -275,6 +275,15 @@ def create( consumer = confluent_kafka.Consumer(consumer_config) cutoff = pd.Timestamp.utcnow() offsets = _resolve_topics_offsets(consumer, source_topics, cutoff, timeout) + for tp, (start, end) in offsets.items(): + logger.debug( + "Replay offsets: %s:%s %d -> %d = %d", + tp.topic, + tp.partition, + start, + end, + max(0, end - start), + ) consumer.assign( [ confluent_kafka.TopicPartition( @@ -610,7 +619,6 @@ def _resolve_topic_offsets( ) for p in topic_meta_data.partitions.values() } - if source_topic.offset_policy == OffsetPolicy.LATEST: return {tp: (end, end - 1) for tp, (start, end) in watermarks.items()} elif source_topic.offset_policy == OffsetPolicy.EARLIEST: diff --git a/scripts/README.md b/scripts/README.md index 4df62d8..6dff812 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -18,6 +18,7 @@ kafka-console-producer --topic right --bootstrap-server=localhost:9092 kafka-console-consumer \ --topic=both \ --bootstrap-server=localhost:9092 \ - --property print.key=true + --property print.key=true \ + --from-beginning python -m scripts.kafka_test_bench --batch-size=2 ``` diff --git a/tests/test_kafka.py b/tests/test_kafka.py index c6e0a5d..3d6b886 100644 --- a/tests/test_kafka.py +++ b/tests/test_kafka.py @@ -1,11 +1,12 @@ """ Unit tests for tradewell.beavers.kafka """ + import dataclasses import io import logging import queue -from typing import AnyStr, Callable, Optional, Tuple, Union +from typing import AnyStr, Callable, Optional, Sequence, Tuple, Union import confluent_kafka import mock @@ -25,6 +26,7 @@ _ConsumerManager, _get_message_ns, _get_previous_start_of_day, + _PartitionInfo, _poll_all, _ProducerManager, _resolve_offset_for_time, @@ -940,14 +942,10 @@ def test_poll_all(): class PassThroughKafkaMessageDeserializer: messages: list[confluent_kafka.Message] = dataclasses.field(default_factory=list) - def append_message(self, message: confluent_kafka.Message): - self.messages.append(message) - - def flush(self) -> list[confluent_kafka.Message]: - """Convert queued messages to data""" - results = self.messages.copy() - self.messages.clear() - return results + def __call__( + self, messages: Sequence[confluent_kafka.Message] + ) -> Sequence[confluent_kafka.Message]: + return messages def test_from_xxx(): @@ -1336,10 +1334,31 @@ def test_producer_manager_create(): assert producer_manager._producer is not None -def test_consumer_manager_create(): - with mock.patch("confluent_kafka.Consumer", autospec=True): - consumer_manager = _ConsumerManager.create({}, [], 500, timeout=None) +def test_consumer_manager_create_with_topics(): + with mock.patch( + "confluent_kafka.Consumer", + new=lambda *_: MockConsumer( + {"topic-1": topic_metadata("topic-1", [partition_metadata(0)])} + ), + ): + consumer_manager = _ConsumerManager.create( + {}, + [ + SourceTopic( + "topic-1", + PassThroughKafkaMessageDeserializer(), + OffsetPolicy.LATEST, + ) + ], + 500, + timeout=None, + ) assert consumer_manager._consumer is not None + assert consumer_manager._partition_info == { + TopicPartition("topic-1", 0): _PartitionInfo( + current_offset=0, live_offset=-1, timestamp_ns=0, paused=False + ) + } def test_consumer_manager_create_partition_eof():