Skip to content

Commit

Permalink
feat(subscriptions): Allow scheduling watermark mode to be overridden (
Browse files Browse the repository at this point in the history
…#2791)

This is a temporary change that allows the scheduling watermark mode
to be overridden in the combined scheduler/executor. This is needed for
single tenant as we often have empty partitions there and empty partitions are
incompatible with "global" mode. Once transactions are no longer semantically
partitioned, this will not be necessary and this can be removed.
  • Loading branch information
lynnagara authored and JoshFerge committed Jun 13, 2022
1 parent ab822c6 commit 0b2f6b3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
13 changes: 13 additions & 0 deletions snuba/cli/subscriptions_scheduler_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from snuba.subscriptions.combined_scheduler_executor import (
build_scheduler_executor_consumer,
)
from snuba.subscriptions.utils import SchedulingWatermarkMode
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.utils.streams.configuration_builder import build_kafka_producer_configuration
from snuba.utils.streams.metrics_adapter import StreamMetricsAdapter
Expand Down Expand Up @@ -69,6 +70,11 @@
help="Skip scheduling if timestamp is beyond this threshold compared to the system time",
)
@click.option("--log-level", help="Logging level to use.")
@click.option(
"--scheduling-mode",
type=click.Choice(["partition", "global"]),
help="Overrides the partition scheduling mode associated with the dataset.",
)
def subscriptions_scheduler_executor(
*,
dataset_name: str,
Expand All @@ -82,6 +88,10 @@ def subscriptions_scheduler_executor(
delay_seconds: Optional[int],
stale_threshold_seconds: Optional[int],
log_level: Optional[str],
# TODO: Temporarily overrides the scheduling mode.
# Required for single tenant since some partitions may be empty.
# To be removed once transactions is no longer semantically partitioned.
scheduling_mode: Optional[str],
) -> None:
"""
Combined subscriptions scheduler and executor. Alternative to the separate scheduler and executor processes.
Expand Down Expand Up @@ -127,6 +137,9 @@ def subscriptions_scheduler_executor(
stale_threshold_seconds,
max_concurrent_queries,
metrics,
SchedulingWatermarkMode(scheduling_mode)
if scheduling_mode is not None
else None,
)

def handler(signum: int, frame: Any) -> None:
Expand Down
19 changes: 13 additions & 6 deletions snuba/subscriptions/combined_scheduler_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def build_scheduler_executor_consumer(
stale_threshold_seconds: Optional[int],
max_concurrent_queries: int,
metrics: MetricsBackend,
scheduling_mode: Optional[SchedulingWatermarkMode],
) -> StreamProcessor[Tick]:
dataset = get_dataset(dataset_name)

Expand Down Expand Up @@ -102,6 +103,7 @@ def get_topic_configuration_for_entity(
stale_threshold_seconds,
result_topic.topic_name,
schedule_ttl,
scheduling_mode,
)

return StreamProcessor(
Expand Down Expand Up @@ -136,6 +138,7 @@ def __init__(
stale_threshold_seconds: Optional[int],
result_topic: str,
schedule_ttl: int,
scheduling_mode: Optional[SchedulingWatermarkMode] = None,
) -> None:
# TODO: self.__partitions might not be the same for each entity
self.__partitions = partitions
Expand Down Expand Up @@ -181,15 +184,19 @@ def __init__(
result_topic,
)

modes = {
self._get_entity_watermark_mode(entity_key) for entity_key in entity_keys
}
if scheduling_mode is not None:
self.__mode = scheduling_mode
else:
modes = {
self._get_entity_watermark_mode(entity_key)
for entity_key in entity_keys
}

mode = modes.pop()
mode = modes.pop()

assert len(modes) == 0, "Entities provided do not share the same mode"
assert len(modes) == 0, "Entities provided do not share the same mode"

self.__mode = mode
self.__mode = mode

def _get_entity_watermark_mode(
self, entity_key: EntityKey
Expand Down

0 comments on commit 0b2f6b3

Please sign in to comment.