diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index cac4374595..6c6f35a45b 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -153,6 +153,45 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: ], ), ] + if settings.ENABLE_METRICS_SUBSCRIPTIONS: + daemons += [ + ( + "subscriptions-consumer-metrics-counters", + [ + "snuba", + "subscriptions", + "--auto-offset-reset=latest", + "--log-level=debug", + "--max-batch-size=1", + "--consumer-group=snuba-metrics-subscriptions-consumers", + "--dataset=metrics", + "--entity=metrics_counters", + "--commit-log-topic=snuba-metrics-commit-log", + "--commit-log-group=metrics_group", + "--delay-seconds=1", + "--schedule-ttl=10", + "--max-query-workers=1", + ], + ), + ( + "subscriptions-consumer-metrics-sets", + [ + "snuba", + "subscriptions", + "--auto-offset-reset=latest", + "--log-level=debug", + "--max-batch-size=1", + "--consumer-group=snuba-metrics-subscriptions-consumers", + "--dataset=metrics", + "--entity=metrics_sets", + "--commit-log-topic=snuba-metrics-commit-log", + "--commit-log-group=metrics_group", + "--delay-seconds=1", + "--schedule-ttl=10", + "--max-query-workers=1", + ], + ), + ] if settings.ENABLE_SESSIONS_SUBSCRIPTIONS: daemons += [ diff --git a/snuba/datasets/storages/metrics.py b/snuba/datasets/storages/metrics.py index e0bb101f88..d74224c861 100644 --- a/snuba/datasets/storages/metrics.py +++ b/snuba/datasets/storages/metrics.py @@ -24,6 +24,7 @@ from snuba.query.processors.arrayjoin_keyvalue_optimizer import ( ArrayJoinKeyValueOptimizer, ) +from snuba.subscriptions.utils import SchedulingWatermarkMode from snuba.utils.streams.topics import Topic PRE_VALUE_COLUMNS: Sequence[Column[SchemaModifiers]] = [ @@ -58,11 +59,15 @@ ), query_processors=[], stream_loader=build_kafka_stream_loader_from_settings( - processor=SetsMetricsProcessor(), default_topic=Topic.METRICS, + processor=SetsMetricsProcessor(), + default_topic=Topic.METRICS, + commit_log_topic=Topic.METRICS_COMMIT_LOG, + subscription_scheduler_mode=SchedulingWatermarkMode.GLOBAL, + subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_METRICS, + subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_METRICS, ), ) - counters_buckets = WritableTableStorage( storage_key=StorageKey.METRICS_COUNTERS_BUCKETS, storage_set_key=StorageSetKey.METRICS, @@ -76,7 +81,12 @@ ), query_processors=[], stream_loader=build_kafka_stream_loader_from_settings( - processor=CounterMetricsProcessor(), default_topic=Topic.METRICS, + processor=CounterMetricsProcessor(), + default_topic=Topic.METRICS, + commit_log_topic=Topic.METRICS_COMMIT_LOG, + subscription_scheduler_mode=SchedulingWatermarkMode.GLOBAL, + subscription_scheduled_topic=Topic.SUBSCRIPTION_SCHEDULED_METRICS, + subscription_result_topic=Topic.SUBSCRIPTION_RESULTS_METRICS, ), ) diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index 1d6afd5173..e78440335c 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -161,6 +161,7 @@ # Metric Alerts Subscription Options ENABLE_SESSIONS_SUBSCRIPTIONS = os.environ.get("ENABLE_SESSIONS_SUBSCRIPTIONS", False) +ENABLE_METRICS_SUBSCRIPTIONS = os.environ.get("ENABLE_METRICS_SUBSCRIPTIONS", False) # Subscriptions scheduler buffer size SUBSCRIPTIONS_DEFAULT_BUFFER_SIZE = 10000