Skip to content

Commit

Permalink
feat: Commit offsets on subscription executor if sample rate is set t…
Browse files Browse the repository at this point in the history
…o 0 (#2317)

Currently if the executor sample rate is set to 0 we don't send any events
through the processing strategy or commit any offsets. In our dashboards
the lag grows continuously dwarfing all other topics/consumer groups.

This adds a temporary workaround that just always commits offsets for every
message if the sampling rate is 0. We can remove it once we actually start
processing executor messages.
  • Loading branch information
lynnagara authored Dec 21, 2021
1 parent f06185a commit ffad626
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions snuba/subscriptions/executor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def create(
self.__buffer_size,
self.__metrics,
Noop(commit),
commit, # TODO: this is temporary, remove before rollout
)


Expand All @@ -177,12 +178,14 @@ def __init__(
buffer_size: int,
metrics: MetricsBackend,
next_step: ProcessingStrategy[SubscriptionTaskResult],
commit: Optional[Callable[[Mapping[Partition, Position]], None]] = None,
) -> None:
self.__dataset = dataset
self.__executor = executor
self.__buffer_size = buffer_size
self.__metrics = metrics
self.__next_step = next_step
self.__commit = commit

self.__encoder = SubscriptionScheduledTaskEncoder()

Expand Down Expand Up @@ -263,6 +266,17 @@ def submit(self, message: Message[KafkaPayload]) -> None:
executor_sample_rate = cast(
float, state.get_config("executor_sample_rate", 0.0)
)

# HACK: Just commit offsets and return if we haven't started rollout
# yet. This is just a temporary workaround to prevent the lag continuously
# growing and dwarfing others on op's Kafka dashboard
if self.__commit is not None and executor_sample_rate == 0:
self.__commit(
{message.partition: Position(message.offset, message.timestamp)}
)

return

subscription_id = str(task.task.subscription.identifier)
should_execute = (
(crc32(subscription_id.encode("utf-8")) & 0xFFFFFFFF) / 2 ** 32
Expand Down

0 comments on commit ffad626

Please sign in to comment.