Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(deletes): bulk delete consumer #6510

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions devservices/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ services:
ENABLE_ISSUE_OCCURRENCE_CONSUMER: ${ENABLE_ISSUE_OCCURRENCE_CONSUMER:-}
ENABLE_AUTORUN_MIGRATION_SEARCH_ISSUES: 1
ENABLE_GROUP_ATTRIBUTES_CONSUMER: ${ENABLE_GROUP_ATTRIBUTES_CONSUMER:-}
ENABLE_LW_DELETIONS_CONSUMER: ${ENABLE_LW_DELETIONS_CONSUMER:-}
platform: linux/amd64
extra_hosts:
host.docker.internal: host-gateway
Expand Down
18 changes: 18 additions & 0 deletions snuba/cli/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,24 @@ def devserver(*, bootstrap: bool, workers: bool) -> None:
),
]

if settings.ENABLE_LW_DELETIONS_CONSUMER:
daemons += [
(
"lw-deletions-consumer",
[
"snuba",
"lw-deletions-consumer",
"--storage-name=search_issues",
"--consumer-group=search_issues_deletes_group",
"--max-rows-batch-size=10",
"--max-batch-time-ms=1000",
"--auto-offset-reset=latest",
"--no-strict-offset-reset",
"--log-level=debug",
],
),
]

manager = Manager()
for name, cmd in daemons:
manager.add_process(
Expand Down
174 changes: 174 additions & 0 deletions snuba/cli/lw_deletions_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import logging
import signal
from typing import Any, Optional, Sequence

import click
import sentry_sdk
from arroyo import configure_metrics
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing import StreamProcessor

from snuba import environment, settings
from snuba.consumers.consumer_builder import (
ConsumerBuilder,
KafkaParameters,
ProcessingParameters,
)
from snuba.consumers.consumer_config import resolve_consumer_config
from snuba.datasets.deletion_settings import MAX_ROWS_TO_DELETE_DEFAULT
from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.environment import setup_logging, setup_sentry
from snuba.lw_deletions.formatters import STORAGE_FORMATTER
from snuba.lw_deletions.strategy import LWDeletionsConsumerStrategyFactory
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.utils.streams.metrics_adapter import StreamMetricsAdapter
from snuba.web.bulk_delete_query import STORAGE_TOPIC

# A longer batch time for deletes is reasonable
# since we want fewer mutations
DEFAULT_DELETIONS_MAX_BATCH_TIME_MS = 60000 * 2

logger = logging.getLogger(__name__)


@click.command()
@click.option(
"--consumer-group",
help="Consumer group use for consuming the deletion topic.",
required=True,
)
@click.option(
"--bootstrap-server",
multiple=True,
help="Kafka bootstrap server to use for consuming.",
)
@click.option("--storage", help="Storage name to consume from", required=True)
@click.option(
"--max-rows-batch-size",
default=MAX_ROWS_TO_DELETE_DEFAULT,
type=int,
help="Max amount of rows to delete at one time.",
)
@click.option(
"--max-batch-time-ms",
default=DEFAULT_DELETIONS_MAX_BATCH_TIME_MS,
type=int,
help="Max duration to buffer messages in memory for.",
)
@click.option(
"--auto-offset-reset",
default="earliest",
type=click.Choice(["error", "earliest", "latest"]),
help="Kafka consumer auto offset reset.",
)
@click.option(
"--no-strict-offset-reset",
is_flag=True,
help="Forces the kafka consumer auto offset reset.",
)
MeredithAnya marked this conversation as resolved.
Show resolved Hide resolved
@click.option(
"--queued-max-messages-kbytes",
default=settings.DEFAULT_QUEUED_MAX_MESSAGE_KBYTES,
type=int,
help="Maximum number of kilobytes per topic+partition in the local consumer queue.",
)
@click.option(
"--queued-min-messages",
default=settings.DEFAULT_QUEUED_MIN_MESSAGES,
type=int,
help="Minimum number of messages per topic+partition the local consumer queue should contain before messages are sent to kafka.",
)
@click.option("--log-level", help="Logging level to use.")
def lw_deletions_consumer(
*,
consumer_group: str,
bootstrap_server: Sequence[str],
storage: str,
max_rows_batch_size: int,
max_batch_time_ms: int,
auto_offset_reset: str,
no_strict_offset_reset: bool,
queued_max_messages_kbytes: int,
queued_min_messages: int,
log_level: str,
) -> None:
setup_logging(log_level)
setup_sentry()

logger.info("Consumer Starting")

sentry_sdk.set_tag("storage", storage)
shutdown_requested = False
consumer: Optional[StreamProcessor[KafkaPayload]] = None

def handler(signum: int, frame: Any) -> None:
nonlocal shutdown_requested
shutdown_requested = True

if consumer is not None:
consumer.signal_shutdown()

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

topic = STORAGE_TOPIC[storage]

while not shutdown_requested:
MeredithAnya marked this conversation as resolved.
Show resolved Hide resolved
metrics_tags = {
"consumer_group": consumer_group,
"storage": storage,
}
metrics = MetricsWrapper(
environment.metrics, "lw_deletions_consumer", tags=metrics_tags
)
configure_metrics(StreamMetricsAdapter(metrics), force=True)
consumer_config = resolve_consumer_config(
storage_names=[storage],
raw_topic=topic.value,
commit_log_topic=None,
replacements_topic=None,
bootstrap_servers=bootstrap_server,
commit_log_bootstrap_servers=[],
replacement_bootstrap_servers=[],
slice_id=None,
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
group_instance_id=consumer_group,
)

consumer_builder = ConsumerBuilder(
consumer_config=consumer_config,
kafka_params=KafkaParameters(
group_id=consumer_group,
auto_offset_reset=auto_offset_reset,
strict_offset_reset=not no_strict_offset_reset,
queued_max_messages_kbytes=queued_max_messages_kbytes,
queued_min_messages=queued_min_messages,
),
processing_params=ProcessingParameters(None, None, None),
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
MeredithAnya marked this conversation as resolved.
Show resolved Hide resolved
max_insert_batch_size=0,
max_insert_batch_time_ms=0,
metrics=metrics,
slice_id=None,
join_timeout=None,
enforce_schema=False,
metrics_tags=metrics_tags,
)

writable_storage = get_writable_storage(StorageKey(storage))
formatter = STORAGE_FORMATTER[storage]()
strategy_factory = LWDeletionsConsumerStrategyFactory(
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
storage=writable_storage,
formatter=formatter,
metrics=metrics,
)

consumer = consumer_builder.build_lw_deletions_consumer(strategy_factory)

consumer.run()
consumer_builder.flush()
9 changes: 9 additions & 0 deletions snuba/consumers/consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,15 @@ def build_dlq_consumer(
dlq_policy,
)

def build_lw_deletions_consumer(
self, strategy_factory: ProcessingStrategyFactory[KafkaPayload]
) -> StreamProcessor[KafkaPayload]:
return self.__build_consumer(
strategy_factory,
self.raw_topic,
self.__build_default_dlq_policy(),
MeredithAnya marked this conversation as resolved.
Show resolved Hide resolved
)

def __build_default_dlq_policy(self) -> Optional[DlqPolicy[KafkaPayload]]:
"""
Default DLQ policy applies to the base consumer or the DLQ consumer when
Expand Down
Empty file.
157 changes: 157 additions & 0 deletions snuba/lw_deletions/batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

import time
from typing import Callable, Generic, MutableSequence, Optional, TypeVar, Union

from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.processing.strategies.buffer import Buffer
from arroyo.types import BaseValue, FilteredPayload, Message, TStrategyPayload

ValuesBatch = MutableSequence[BaseValue[TStrategyPayload]]


TPayload = TypeVar("TPayload")
TResult = TypeVar("TResult")


Accumulator = Callable[[TResult, BaseValue[TPayload]], TResult]


class ReduceRowsBuffer(Generic[TPayload, TResult]):
def __init__(
self,
accumulator: Accumulator[TResult, TPayload],
initial_value: Callable[[], TResult],
max_batch_size: int,
max_batch_time: float,
increment_by: Optional[Callable[[BaseValue[TPayload]], int]] = None,
):
self.accumulator = accumulator
self.initial_value = initial_value
self.max_batch_size = max_batch_size
self.max_batch_time = max_batch_time
self.increment_by = increment_by

self._buffer = initial_value()
self._buffer_size = 0
self._buffer_until = time.time() + max_batch_time
MeredithAnya marked this conversation as resolved.
Show resolved Hide resolved

@property
def buffer(self) -> TResult:
return self._buffer

@property
def is_empty(self) -> bool:
return self._buffer_size == 0

@property
def is_ready(self) -> bool:
return (
self._buffer_size >= self.max_batch_size
or time.time() >= self._buffer_until
)

def append(self, message: BaseValue[TPayload]) -> None:
"""
Instead of increasing the buffer size based on the number
of messages, we use the `rows_to_delete` attribute in the
message payload so we can batch by the number of rows we
want to delete.
"""
self._buffer = self.accumulator(self._buffer, message)
if self.increment_by:
buffer_increment = self.increment_by(message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on this function says that we want to use rows_to_delete yet the implementation does not seem to use that attribute. Instead it uses message to increment. Am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the increment_by is a function that returns an integer. and that function can do anything based on the message. The function we used is created here: https://github.com/getsentry/snuba/pull/6510/files#diff-9f2e56ee48e901305aa0d40ccb0a705f36a0c7ab07f032fd6360046a965310c5R121 and then passed to the strategy

else:
buffer_increment = 1
self._buffer_size += buffer_increment

def new(self) -> "ReduceRowsBuffer[TPayload, TResult]":
return ReduceRowsBuffer(
accumulator=self.accumulator,
initial_value=self.initial_value,
max_batch_size=self.max_batch_size,
max_batch_time=self.max_batch_time,
increment_by=self.increment_by,
)


class ReduceCustom(
ProcessingStrategy[Union[FilteredPayload, TPayload]], Generic[TPayload, TResult]
):
def __init__(
self,
max_batch_size: int,
max_batch_time: float,
accumulator: Accumulator[TResult, TPayload],
initial_value: Callable[[], TResult],
next_step: ProcessingStrategy[TResult],
increment_by: Optional[Callable[[BaseValue[TPayload]], int]] = None,
) -> None:
self.__buffer_step = Buffer(
buffer=ReduceRowsBuffer(
max_batch_size=max_batch_size,
max_batch_time=max_batch_time,
accumulator=accumulator,
initial_value=initial_value,
increment_by=increment_by,
),
next_step=next_step,
)

def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None:
self.__buffer_step.submit(message)

def poll(self) -> None:
self.__buffer_step.poll()

def close(self) -> None:
self.__buffer_step.close()

def terminate(self) -> None:
self.__buffer_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
self.__buffer_step.join(timeout)


class BatchStepCustom(ProcessingStrategy[Union[FilteredPayload, TStrategyPayload]]):
def __init__(
self,
max_batch_size: int,
max_batch_time: float,
next_step: ProcessingStrategy[ValuesBatch[TStrategyPayload]],
increment_by: Optional[Callable[[BaseValue[TStrategyPayload]], int]] = None,
) -> None:
def accumulator(
result: ValuesBatch[TStrategyPayload], value: BaseValue[TStrategyPayload]
) -> ValuesBatch[TStrategyPayload]:
result.append(value)
return result

self.__reduce_step: ReduceCustom[
TStrategyPayload, ValuesBatch[TStrategyPayload]
] = ReduceCustom(
max_batch_size,
max_batch_time,
accumulator,
lambda: [],
next_step,
increment_by,
)

def submit(
self, message: Message[Union[FilteredPayload, TStrategyPayload]]
) -> None:
self.__reduce_step.submit(message)

def poll(self) -> None:
self.__reduce_step.poll()

def close(self) -> None:
self.__reduce_step.close()

def terminate(self) -> None:
self.__reduce_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
self.__reduce_step.join(timeout)
Loading
Loading