Skip to content

Commit

Permalink
Add offset policies, fix committed
Browse files Browse the repository at this point in the history
  • Loading branch information
aandres committed Sep 17, 2023
1 parent c8449ab commit 99c1ad7
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
2 changes: 1 addition & 1 deletion beavers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def _resolve_topic_offsets(
source_topic.absolute_time, consumer, watermarks, timeout
)
elif source_topic.offset_policy == OffsetPolicy.COMMITTED:
committed = consumer.committed(watermarks.keys())
committed = consumer.committed(list(watermarks.keys()), timeout=timeout)
return {
confluent_kafka.TopicPartition(topic=tp.topic, partition=tp.partition): (
tp.offset,
Expand Down
1 change: 1 addition & 0 deletions scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The "timestamp" of the output messages should be in order across topics when rep
Helpful commands:

```shell
docker run -p 9092:9092 -d bashj79/kafka-kraft
kafka-topics --create --topic left --bootstrap-server=localhost:9092
kafka-topics --create --topic right --bootstrap-server=localhost:9092
kafka-topics --create --topic both --bootstrap-server=localhost:9092
Expand Down
46 changes: 43 additions & 3 deletions scripts/kafka_test_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from operator import itemgetter
from typing import Any, Sequence
from typing import Any, Callable, Sequence

import click
import confluent_kafka
Expand Down Expand Up @@ -50,9 +50,47 @@ def kafka_message_serializer(
]


SOURCE_TOPIC_CREATORS: dict[str, Callable[[str], SourceTopic]] = {
"latest": functools.partial(
SourceTopic.from_latest, message_deserializer=kafka_messages_to_json
),
"earliest": functools.partial(
SourceTopic.from_earliest, message_deserializer=kafka_messages_to_json
),
"15min": functools.partial(
SourceTopic.from_relative_time,
message_deserializer=kafka_messages_to_json,
relative_time=pd.to_timedelta("15min"),
),
"start-of-day": functools.partial(
SourceTopic.from_start_of_day,
message_deserializer=kafka_messages_to_json,
start_of_day_time=pd.to_timedelta("00:00:00"),
start_of_day_timezone="UTC",
),
"absolute-time": functools.partial(
SourceTopic.from_absolute_time,
message_deserializer=kafka_messages_to_json,
absolute_time=pd.Timestamp.utcnow().normalize(),
),
"committed": functools.partial(
SourceTopic.from_committed,
message_deserializer=kafka_messages_to_json,
),
}


@click.command()
@click.option("--left-topic", type=click.STRING, default="left")
@click.option(
"--left-offset", type=click.Choice(SOURCE_TOPIC_CREATORS.keys()), default="earliest"
)
@click.option("--right-topic", type=click.STRING, default="right")
@click.option(
"--right-offset",
type=click.Choice(SOURCE_TOPIC_CREATORS.keys()),
default="earliest",
)
@click.option("--both-topic", type=click.STRING, default="both")
@click.option(
"--consumer-config",
Expand All @@ -67,7 +105,9 @@ def kafka_message_serializer(
@click.option("--batch-size", type=click.INT, default="2")
def kafka_test_bench(
left_topic: str,
left_offset: str,
right_topic: str,
right_offset: str,
both_topic: str,
consumer_config: dict,
producer_config: dict,
Expand All @@ -85,8 +125,8 @@ def kafka_test_bench(
producer_config=producer_config,
consumer_config=consumer_config,
source_topics={
"left": SourceTopic.from_earliest(left_topic, kafka_messages_to_json),
"right": SourceTopic.from_earliest(right_topic, kafka_messages_to_json),
"left": SOURCE_TOPIC_CREATORS[left_offset](left_topic),
"right": SOURCE_TOPIC_CREATORS[right_offset](right_topic),
},
sink_topics={
"both": functools.partial(kafka_message_serializer, topic=both_topic)
Expand Down

0 comments on commit 99c1ad7

Please sign in to comment.