Skip to content

Commit

Permalink
Merge pull request #33 from tradewelltech/32-rely-on-end-offset-inste…
Browse files Browse the repository at this point in the history
…ad-of-eof

Change how we identify replay / live mode (stop using EOF market)
  • Loading branch information
0x26res authored Sep 19, 2023
2 parents 95b0e74 + 6f0e371 commit 1a2527a
Show file tree
Hide file tree
Showing 7 changed files with 501 additions and 298 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,5 @@ cython_debug/
/.ruff_cache
/poetry.lock
/venv
*.csv
coverrage.xml
147 changes: 71 additions & 76 deletions beavers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

T = TypeVar("T")

KAFKA_EOF_CODE = -191


class KafkaMessageDeserializer(Protocol[T]):
"""Interface for converting incoming kafka messages to custom data."""
Expand Down Expand Up @@ -224,10 +222,12 @@ def flush_metrics(self) -> ProducerMetrics:
@dataclasses.dataclass
class _PartitionInfo:
current_offset: int
live_offset: int
timestamp_ns: int = UTC_EPOCH.value
paused: bool = False
primed: bool = False
eof: bool = False

def is_live(self) -> bool:
return self.current_offset >= self.live_offset


@dataclasses.dataclass
Expand All @@ -239,21 +239,23 @@ class ConsumerMetrics:
paused_partitions: int = 0
released_message_count: int = 0
held_message_count: int = 0
error_message_count: int = 0


class _ConsumerManager:
def __init__(
self,
cutoff: pd.Timestamp,
partitions: list[confluent_kafka.TopicPartition],
partitions: dict[confluent_kafka.TopicPartition : tuple[int, int]],
consumer: confluent_kafka.Consumer,
batch_size: int,
max_held_messages: int,
):
self._cutoff_ns: int = cutoff.value
self._consumer: confluent_kafka.Consumer = consumer
self._partition_info: dict[confluent_kafka.TopicPartition, _PartitionInfo] = {
partition: _PartitionInfo(partition.offset) for partition in partitions
tp: _PartitionInfo(current_offset=start, live_offset=end)
for tp, (start, end) in partitions.items()
}
self._held_messages: list[confluent_kafka.Message] = []
self._batch_size: int = batch_size
Expand All @@ -269,12 +271,17 @@ def create(
batch_size: int,
timeout: Optional[float],
) -> "_ConsumerManager":
if not consumer_config.get("enable.partition.eof"):
raise ValueError("'enable.partition.eof' should be set to true")
consumer = confluent_kafka.Consumer(consumer_config)
cutoff = pd.Timestamp.utcnow()
offsets = _resolve_topics_offsets(consumer, source_topics, cutoff, timeout)
consumer.assign(offsets)
consumer.assign(
[
confluent_kafka.TopicPartition(
topic=tp.topic, partition=tp.partition, offset=start
)
for tp, (start, _) in offsets.items()
]
)
return _ConsumerManager(cutoff, offsets, consumer, batch_size, batch_size * 5)

def poll(self, timeout: float) -> list[confluent_kafka.Message]:
Expand All @@ -285,6 +292,9 @@ def poll(self, timeout: float) -> list[confluent_kafka.Message]:
)
self._metrics.consumed_message_count += len(new_messages)
self._metrics.consumed_message_size += sum(len(m.value()) for m in new_messages)
for message in new_messages:
if message.error():
self._metrics.error_message_count += 1

self._held_messages.extend(new_messages)
self._held_messages.sort(key=_get_message_ns)
Expand Down Expand Up @@ -366,20 +376,11 @@ def _update_partition_info(self, new_messages: list[confluent_kafka.Message]):
)
partition_info: _PartitionInfo = self._partition_info[topic_partition]
timestamp_type, timestamp = message.timestamp()
if timestamp_type == confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
if (
message.error() is not None
and message.error().code() == KAFKA_EOF_CODE
):
partition_info.eof = True
else:
if timestamp_type != confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
partition_info.timestamp_ns = timestamp * 1_000_000
partition_info.eof = False
partition_info.current_offset = message.offset()
if partition_info.timestamp_ns >= self._cutoff_ns:
partition_info.primed = True
self._low_water_mark_ns = min(
(v.timestamp_ns for v in self._partition_info.values() if not v.eof),
(v.timestamp_ns for v in self._partition_info.values() if not v.is_live()),
default=pd.Timestamp.utcnow().value,
)

Expand Down Expand Up @@ -567,10 +568,10 @@ def _resolve_topics_offsets(
source_topics: list[SourceTopic],
now: pd.Timestamp,
timeout: Optional[float] = None,
) -> list[confluent_kafka.TopicPartition]:
assignments: list[confluent_kafka.TopicPartition] = []
) -> dict[confluent_kafka.TopicPartition, tuple[int, int]]:
assignments = {}
for source_topic in source_topics:
assignments.extend(_resolve_topic_offsets(consumer, source_topic, now, timeout))
assignments.update(_resolve_topic_offsets(consumer, source_topic, now, timeout))
return assignments


Expand All @@ -579,7 +580,7 @@ def _resolve_topic_offsets(
source_topic: SourceTopic,
now: pd.Timestamp,
timeout: Optional[float] = None,
) -> list[confluent_kafka.TopicPartition]:
) -> dict[confluent_kafka.TopicPartition, tuple[int, int]]:
cluster_metadata: confluent_kafka.admin.ClusterMetadata = consumer.list_topics(
source_topic.name, timeout
)
Expand All @@ -588,71 +589,40 @@ def _resolve_topic_offsets(
]
if len(topic_meta_data.partitions) == 0:
raise ValueError(f"Topic {source_topic.name} does not exist")
watermarks = {
confluent_kafka.TopicPartition(
source_topic.name, p.id
): consumer.get_watermark_offsets(
confluent_kafka.TopicPartition(source_topic.name, p.id)
)
for p in topic_meta_data.partitions.values()
}

if source_topic.offset_policy == OffsetPolicy.LATEST:
return [
confluent_kafka.TopicPartition(
topic=source_topic.name,
partition=p.id,
offset=confluent_kafka.OFFSET_END,
)
for p in topic_meta_data.partitions.values()
]
return {tp: (end, end - 1) for tp, (start, end) in watermarks.items()}
elif source_topic.offset_policy == OffsetPolicy.EARLIEST:
return [
confluent_kafka.TopicPartition(
topic=source_topic.name,
partition=p.id,
offset=confluent_kafka.OFFSET_BEGINNING,
)
for p in topic_meta_data.partitions.values()
]
return {tp: (start, end - 1) for tp, (start, end) in watermarks.items()}
elif source_topic.offset_policy == OffsetPolicy.RELATIVE_TIME:
offset_timestamp = now - source_topic.relative_time
offset_ms = offset_timestamp.value // 1_000_000
return consumer.offsets_for_times(
[
confluent_kafka.TopicPartition(
topic=source_topic.name, partition=p.id, offset=offset_ms
)
for p in topic_meta_data.partitions.values()
],
timeout,
)
return _resolve_offset_for_time(offset_timestamp, consumer, watermarks, timeout)
elif source_topic.offset_policy == OffsetPolicy.START_OF_DAY:
offset_timestamp = _get_previous_start_of_day(
now, source_topic.start_of_day_time, source_topic.start_of_day_timezone
)
offset_ms = offset_timestamp.value // 1_000_000
return consumer.offsets_for_times(
[
confluent_kafka.TopicPartition(
topic=source_topic.name, partition=p.id, offset=offset_ms
)
for p in topic_meta_data.partitions.values()
],
timeout,
)
return _resolve_offset_for_time(offset_timestamp, consumer, watermarks, timeout)
elif source_topic.offset_policy == OffsetPolicy.ABSOLUTE_TIME:
offset_ms = source_topic.absolute_time.value // 1_000_000
return consumer.offsets_for_times(
[
confluent_kafka.TopicPartition(
topic=source_topic.name, partition=p.id, offset=offset_ms
)
for p in topic_meta_data.partitions.values()
],
timeout,
return _resolve_offset_for_time(
source_topic.absolute_time, consumer, watermarks, timeout
)
elif source_topic.offset_policy == OffsetPolicy.COMMITTED:
return [
confluent_kafka.TopicPartition(
topic=source_topic.name,
partition=p.id,
offset=confluent_kafka.OFFSET_STORED,
committed = consumer.committed(list(watermarks.keys()), timeout=timeout)
return {
confluent_kafka.TopicPartition(topic=tp.topic, partition=tp.partition): (
tp.offset,
watermarks[tp][1] - 1,
)
for p in topic_meta_data.partitions.values()
]
for tp in committed
}
else:
raise ValueError(
f"{OffsetPolicy.__name__} {source_topic.offset_policy}"
Expand Down Expand Up @@ -694,3 +664,28 @@ def _get_message_ns(message: confluent_kafka.Message) -> int:
return UTC_MAX.value
else:
return timestamp * 1_000_000


def _resolve_offset_for_time(
offset_timestamp: pd.Timestamp,
consumer: confluent_kafka.Consumer,
watermarks: dict[confluent_kafka.TopicPartition, tuple[int, int]],
timeout: float,
) -> dict[confluent_kafka.TopicPartition, tuple[int, int]]:
offset_ms = offset_timestamp.value // 1_000_000
offset_for_time = consumer.offsets_for_times(
[
confluent_kafka.TopicPartition(
topic=tp.topic, partition=tp.partition, offset=offset_ms
)
for tp in watermarks.keys()
],
timeout,
)
return {
confluent_kafka.TopicPartition(topic=tp.topic, partition=tp.partition): (
tp.offset,
watermarks[tp][1] - 1,
)
for tp in offset_for_time
}
14 changes: 7 additions & 7 deletions examples/kafka_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


import confluent_kafka
import pandas as pd

# --8<-- [start:dag]
from beavers import Dag
Expand Down Expand Up @@ -41,7 +42,9 @@ def deserialize_messages(messages: list[confluent_kafka.Message]) -> list[str]:
# --8<-- [start:kafka_source]
from beavers.kafka import SourceTopic, KafkaDriver

source_topic = SourceTopic.from_latest("words", deserialize_messages)
source_topic = SourceTopic.from_start_of_day(
"words", deserialize_messages, pd.to_timedelta("15min"), "UTC"
)
# --8<-- [end:kafka_source]


Expand All @@ -67,7 +70,6 @@ def serialize_counts(values: list[tuple[str, int]]) -> list[KafkaProducerMessage
kafka_driver = KafkaDriver.create(
dag=dag,
consumer_config={
"enable.partition.eof": True,
"group.id": "beavers",
"bootstrap.servers": "localhost:9092",
},
Expand All @@ -80,10 +82,8 @@ def serialize_counts(values: list[tuple[str, int]]) -> list[KafkaProducerMessage
# --8<-- [end:kafka_driver]


# Note: you can test it with
# Note: you can test it with the following commands
# kafka-topics --create --topic words --bootstrap-server=localhost:9092
# kafka-console-producer --topic words --bootstrap-server=localhost:9092
# And:
# kafka-console-consumer
# --topic=counts \
# --bootstrap-server=localhost:9092 \
# kafka-console-consumer --topic=counts --bootstrap-server=localhost:9092 \
# --property print.key=true
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pip-tools = "^6.12.1"
pre-commit = ">=2.20.0"
pylint = ">=2.15.0"
pytest = ">=7.2.0"
click = ">=8.1.7"
mkdocs-material = ">=9.0.3"
mkdocstrings = { version = ">=0.21.2", extras = ['python'] }
mock = "*"
Expand Down
23 changes: 23 additions & 0 deletions scripts/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Scripts

## `kafka_test_bench`

Tests a simple application with kafka, making sure it replays in order.
The "timestamp" of the output messages should be in order across topics when replaying.


Helpful commands:

```shell
docker run -p 9092:9092 -d bashj79/kafka-kraft
kafka-topics --create --topic left --bootstrap-server=localhost:9092
kafka-topics --create --topic right --bootstrap-server=localhost:9092
kafka-topics --create --topic both --bootstrap-server=localhost:9092
kafka-console-producer --topic left --bootstrap-server=localhost:9092
kafka-console-producer --topic right --bootstrap-server=localhost:9092
kafka-console-consumer \
--topic=both \
--bootstrap-server=localhost:9092 \
--property print.key=true
python -m scripts.kafka_test_bench --batch-size=2
```
Loading

0 comments on commit 1a2527a

Please sign in to comment.