From 2d9b8f89bca5dc16aa08123ca07b62acf53f36db Mon Sep 17 00:00:00 2001 From: meredith Date: Tue, 24 May 2022 15:28:57 -0700 Subject: [PATCH 1/2] ref(subscriptions): Executor created within strategy --- snuba/cli/subscriptions_executor.py | 6 +----- snuba/subscriptions/executor_consumer.py | 9 ++------- tests/subscriptions/test_executor_consumer.py | 18 ++---------------- 3 files changed, 5 insertions(+), 28 deletions(-) diff --git a/snuba/cli/subscriptions_executor.py b/snuba/cli/subscriptions_executor.py index 864e7c9df0..7cd56eb27a 100644 --- a/snuba/cli/subscriptions_executor.py +++ b/snuba/cli/subscriptions_executor.py @@ -1,6 +1,5 @@ import logging import signal -from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from typing import Any, Iterator, Optional, Sequence @@ -114,8 +113,6 @@ def subscriptions_executor( ) ) - executor = ThreadPoolExecutor(max_concurrent_queries) - # TODO: Consider removing and always passing via CLI. # If a value provided via config, it overrides the one provided via CLI. # This is so we can quickly change this in an emergency. @@ -132,7 +129,6 @@ def subscriptions_executor( auto_offset_reset, not no_strict_offset_reset, metrics, - executor, stale_threshold_seconds, cooperative_rebalancing, ) @@ -147,7 +143,7 @@ def handler(signum: int, frame: Any) -> None: signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) - with executor, closing(producer), flush_querylog(): + with closing(producer), flush_querylog(): processor.run() diff --git a/snuba/subscriptions/executor_consumer.py b/snuba/subscriptions/executor_consumer.py index 7a85403aa9..b81bf64044 100644 --- a/snuba/subscriptions/executor_consumer.py +++ b/snuba/subscriptions/executor_consumer.py @@ -55,7 +55,6 @@ def build_executor_consumer( auto_offset_reset: str, strict_offset_reset: Optional[bool], metrics: MetricsBackend, - executor: ThreadPoolExecutor, stale_threshold_seconds: Optional[int], cooperative_rebalancing: bool = False, ) -> StreamProcessor[KafkaPayload]: @@ -134,7 +133,6 @@ def stats_callback(stats_json: str) -> None: KafkaConsumer(consumer_configuration), Topic(scheduled_topic_spec.topic_name), SubscriptionExecutorProcessingFactory( - executor, max_concurrent_queries, dataset, entity_names, @@ -149,7 +147,6 @@ def stats_callback(stats_json: str) -> None: class SubscriptionExecutorProcessingFactory(ProcessingStrategyFactory[KafkaPayload]): def __init__( self, - executor: ThreadPoolExecutor, max_concurrent_queries: int, dataset: Dataset, entity_names: Sequence[str], @@ -158,7 +155,6 @@ def __init__( stale_threshold_seconds: Optional[int], result_topic: str, ) -> None: - self.__executor = executor self.__max_concurrent_queries = max_concurrent_queries self.__dataset = dataset self.__entity_names = entity_names @@ -173,7 +169,6 @@ def create( return ExecuteQuery( self.__dataset, self.__entity_names, - self.__executor, self.__max_concurrent_queries, self.__stale_threshold_seconds, self.__metrics, @@ -192,7 +187,6 @@ def __init__( self, dataset: Dataset, entity_names: Sequence[str], - executor: ThreadPoolExecutor, max_concurrent_queries: int, stale_threshold_seconds: Optional[int], metrics: MetricsBackend, @@ -204,8 +198,8 @@ def __init__( ) -> None: self.__dataset = dataset self.__entity_names = set(entity_names) - self.__executor = executor self.__max_concurrent_queries = max_concurrent_queries + self.__executor = ThreadPoolExecutor(self.__max_concurrent_queries) self.__stale_threshold_seconds = stale_threshold_seconds self.__metrics = metrics self.__next_step = next_step @@ -383,6 +377,7 @@ def join(self, timeout: Optional[float] = None) -> None: ) remaining = timeout - (time.time() - start) if timeout is not None else None + self.__executor.shutdown() self.__next_step.close() self.__next_step.join(remaining) diff --git a/tests/subscriptions/test_executor_consumer.py b/tests/subscriptions/test_executor_consumer.py index e22f3dd737..85e8e7a203 100644 --- a/tests/subscriptions/test_executor_consumer.py +++ b/tests/subscriptions/test_executor_consumer.py @@ -1,7 +1,6 @@ import json import time import uuid -from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Iterator, Mapping, Optional from unittest import mock @@ -115,7 +114,6 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: auto_offset_reset, strict_offset_reset, TestingMetricsBackend(), - ThreadPoolExecutor(2), None, ) for i in range(1, 5): @@ -222,7 +220,6 @@ def test_execute_query_strategy() -> None: dataset = get_dataset("events") entity_names = ["events"] max_concurrent_queries = 2 - executor = ThreadPoolExecutor(max_concurrent_queries) metrics = TestingMetricsBackend() next_step = mock.Mock() commit = mock.Mock() @@ -230,7 +227,6 @@ def test_execute_query_strategy() -> None: strategy = ExecuteQuery( dataset, entity_names, - executor, max_concurrent_queries, None, metrics, @@ -264,14 +260,11 @@ def test_too_many_concurrent_queries() -> None: state.set_config("executor_queue_size_factor", 1) dataset = get_dataset("events") entity_names = ["events"] - executor = ThreadPoolExecutor(2) metrics = TestingMetricsBackend() next_step = mock.Mock() commit = mock.Mock() - strategy = ExecuteQuery( - dataset, entity_names, executor, 4, None, metrics, next_step, commit - ) + strategy = ExecuteQuery(dataset, entity_names, 4, None, metrics, next_step, commit) make_message = generate_message(EntityKey.EVENTS) @@ -292,14 +285,11 @@ def test_skip_execution_for_entity() -> None: # Skips execution if the entity name is not on the list dataset = get_dataset("metrics") entity_names = ["metrics_sets"] - executor = ThreadPoolExecutor() metrics = TestingMetricsBackend() next_step = mock.Mock() commit = mock.Mock() - strategy = ExecuteQuery( - dataset, entity_names, executor, 4, None, metrics, next_step, commit - ) + strategy = ExecuteQuery(dataset, entity_names, 4, None, metrics, next_step, commit) metrics_sets_message = next(generate_message(EntityKey.METRICS_SETS)) strategy.submit(metrics_sets_message) @@ -390,7 +380,6 @@ def test_execute_and_produce_result() -> None: state.set_config("subscription_mode_events", "new") dataset = get_dataset("events") entity_names = ["events"] - executor = ThreadPoolExecutor() max_concurrent_queries = 2 metrics = TestingMetricsBackend() @@ -408,7 +397,6 @@ def test_execute_and_produce_result() -> None: strategy = ExecuteQuery( dataset, entity_names, - executor, max_concurrent_queries, None, metrics, @@ -438,7 +426,6 @@ def test_execute_and_produce_result() -> None: def test_skip_stale_message() -> None: dataset = get_dataset("events") entity_names = ["events"] - executor = ThreadPoolExecutor() max_concurrent_queries = 2 metrics = TestingMetricsBackend() @@ -458,7 +445,6 @@ def test_skip_stale_message() -> None: strategy = ExecuteQuery( dataset, entity_names, - executor, max_concurrent_queries, stale_threshold_seconds, metrics, From 0541473773ace49f436a7f316191bf5fb84f7d49 Mon Sep 17 00:00:00 2001 From: meredith Date: Thu, 2 Jun 2022 10:54:26 -0700 Subject: [PATCH 2/2] update combined scheduler executor as well --- snuba/cli/subscriptions_scheduler_executor.py | 6 +----- snuba/subscriptions/combined_scheduler_executor.py | 5 ----- tests/subscriptions/test_combined_scheduler_executor.py | 3 --- 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/snuba/cli/subscriptions_scheduler_executor.py b/snuba/cli/subscriptions_scheduler_executor.py index c61e03f02f..bd21faa54c 100644 --- a/snuba/cli/subscriptions_scheduler_executor.py +++ b/snuba/cli/subscriptions_scheduler_executor.py @@ -1,5 +1,4 @@ import signal -from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from typing import Any, Iterator, Optional, Sequence @@ -115,8 +114,6 @@ def subscriptions_scheduler_executor( ) ) - executor = ThreadPoolExecutor(max_concurrent_queries) - processor = build_scheduler_executor_consumer( dataset_name, entity_names, @@ -129,7 +126,6 @@ def subscriptions_scheduler_executor( delay_seconds, stale_threshold_seconds, max_concurrent_queries, - executor, metrics, ) @@ -139,7 +135,7 @@ def handler(signum: int, frame: Any) -> None: signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) - with executor, closing(producer), flush_querylog(): + with closing(producer), flush_querylog(): processor.run() diff --git a/snuba/subscriptions/combined_scheduler_executor.py b/snuba/subscriptions/combined_scheduler_executor.py index c5642eb4db..493f1b9b89 100644 --- a/snuba/subscriptions/combined_scheduler_executor.py +++ b/snuba/subscriptions/combined_scheduler_executor.py @@ -1,4 +1,3 @@ -from concurrent.futures import ThreadPoolExecutor from dataclasses import replace from datetime import timedelta from typing import Callable, Mapping, NamedTuple, Optional, Sequence, cast @@ -48,7 +47,6 @@ def build_scheduler_executor_consumer( delay_seconds: Optional[int], stale_threshold_seconds: Optional[int], max_concurrent_queries: int, - executor: ThreadPoolExecutor, metrics: MetricsBackend, ) -> StreamProcessor[Tick]: dataset = get_dataset(dataset_name) @@ -97,7 +95,6 @@ def get_topic_configuration_for_entity( factory = CombinedSchedulerExecutorFactory( dataset, entity_names, - executor, partitions, max_concurrent_queries, producer, @@ -132,7 +129,6 @@ def __init__( self, dataset: Dataset, entity_names: Sequence[str], - executor: ThreadPoolExecutor, partitions: int, max_concurrent_queries: int, producer: Producer[KafkaPayload], @@ -176,7 +172,6 @@ def __init__( ) self.__executor_factory = SubscriptionExecutorProcessingFactory( - executor, max_concurrent_queries, dataset, entity_names, diff --git a/tests/subscriptions/test_combined_scheduler_executor.py b/tests/subscriptions/test_combined_scheduler_executor.py index 5f50dd1645..ae7f1ad5c7 100644 --- a/tests/subscriptions/test_combined_scheduler_executor.py +++ b/tests/subscriptions/test_combined_scheduler_executor.py @@ -1,6 +1,5 @@ import time import uuid -from concurrent.futures import ThreadPoolExecutor from contextlib import closing from datetime import datetime, timedelta from unittest import mock @@ -48,7 +47,6 @@ def test_combined_scheduler_and_executor() -> None: entity_names = ["events"] num_partitions = 2 max_concurrent_queries = 2 - executor = ThreadPoolExecutor(max_concurrent_queries) metrics = TestingMetricsBackend() commit = mock.Mock() @@ -67,7 +65,6 @@ def test_combined_scheduler_and_executor() -> None: factory = CombinedSchedulerExecutorFactory( dataset, entity_names, - executor, num_partitions, max_concurrent_queries, producer,