Skip to content

Commit

Permalink
Add test script
Browse files Browse the repository at this point in the history
  • Loading branch information
aandres committed Sep 12, 2023
1 parent b4b9a28 commit 077bfc2
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,5 @@ cython_debug/
/.ruff_cache
/poetry.lock
/venv
*.csv
coverrage.xml
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pip-tools = "^6.12.1"
pre-commit = ">=2.20.0"
pylint = ">=2.15.0"
pytest = ">=7.2.0"
click = ">=8.1.7"
mkdocs-material = ">=9.0.3"
mkdocstrings = { version = ">=0.21.2", extras = ['python'] }
mock = "*"
Expand Down
22 changes: 22 additions & 0 deletions scripts/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Scripts

## `kafka_test_bench`

Tests a simple application with kafka, making sure it replays in order.
The "timestamp" of the output messages should be in order across topics when replaying.


Helpful commands:

```shell
kafka-topics --create --topic left --bootstrap-server=localhost:9092
kafka-topics --create --topic right --bootstrap-server=localhost:9092
kafka-topics --create --topic both --bootstrap-server=localhost:9092
kafka-console-producer --topic left --bootstrap-server=localhost:9092
kafka-console-producer --topic right --bootstrap-server=localhost:9092
kafka-console-consumer \
--topic=both \
--bootstrap-server=localhost:9092 \
--property print.key=true
python -m scripts.kafka_test_bench --batch-size=2
```
95 changes: 95 additions & 0 deletions scripts/kafka_test_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import functools
import json
from operator import itemgetter
from typing import Any, Sequence

import click
import confluent_kafka
import pandas as pd

from beavers import Dag
from beavers.kafka import KafkaDriver, KafkaProducerMessage, SourceTopic


def create_test_dag() -> "Dag":
dag = Dag()
left_stream = dag.source_stream(name="left")
right_stream = dag.source_stream(name="right")
both_stream = dag.stream(
lambda left, right: sorted(left + right, key=itemgetter("timestamp"))
).map(left_stream, right_stream)
dag.sink("both", both_stream)
return dag


def kafka_messages_to_json(
messages: Sequence[confluent_kafka.Message],
) -> list[dict[str, Any]]:
return [
{
"topic": message.topic(),
"partition": message.partition(),
"offset": message.offset(),
"timestamp": str(
pd.to_datetime(message.timestamp()[1], unit="ms", utc=True)
),
"key": message.key().encode("utf-8") if message.key() else None,
"value": message.value().decode("utf-8"),
}
for message in messages
]


def kafka_message_serializer(
payloads: list[dict[str, Any]], topic: str
) -> list[KafkaProducerMessage]:
return [
KafkaProducerMessage(topic, key=None, value=json.dumps(payload))
for payload in payloads
]


@click.command()
@click.option("--left-topic", type=click.STRING, default="left")
@click.option("--right-topic", type=click.STRING, default="right")
@click.option("--both-topic", type=click.STRING, default="both")
@click.option(
"--consumer-config",
type=json.loads,
default='{"bootstrap.servers": "localhost:9092", "group.id": "beavers"}',
)
@click.option(
"--producer-config",
type=json.loads,
default='{"bootstrap.servers": "localhost:9092"}',
)
@click.option("--batch-size", type=click.INT, default="2")
def kafka_test_bench(
left_topic: str,
right_topic: str,
both_topic: str,
consumer_config: dict,
producer_config: dict,
batch_size: int,
):
dag = create_test_dag()

driver = KafkaDriver.create(
dag=dag,
producer_config=producer_config,
consumer_config=consumer_config,
source_topics={
"left": SourceTopic.from_earliest(left_topic, kafka_messages_to_json),
"right": SourceTopic.from_earliest(right_topic, kafka_messages_to_json),
},
sink_topics={
"both": functools.partial(kafka_message_serializer, topic=both_topic)
},
batch_size=batch_size,
)
while True:
driver.run_cycle()


if __name__ == "__main__":
kafka_test_bench()

0 comments on commit 077bfc2

Please sign in to comment.