Skip to content

Commit

Permalink
feat(subscriptions): Executor uses the ExecuteQuery strategy (#2280)
Browse files Browse the repository at this point in the history
Integrates the ExecuteQuery strategy in the subscriptions executor.
Note that currently the `executor_sample_rate` option is not set
so all scheduled subscriptions will be filtered at the start of the
pipeline, and even their offsets will not be committed.

This is to avoid running duplicate queries on ClickHouse until we
are ready to begin testing.
  • Loading branch information
lynnagara authored Dec 16, 2021
1 parent 84ae27b commit a9c82ff
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions snuba/subscriptions/executor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def get_topics_for_entity(
result_topic_spec,
), "All entities must have same scheduled and result topics"

executor = ThreadPoolExecutor(max_concurrent_queries)

return StreamProcessor(
KafkaConsumer(
build_kafka_consumer_configuration(
Expand All @@ -101,11 +103,19 @@ def get_topics_for_entity(
),
),
Topic(scheduled_topic_spec.topic_name),
SubscriptionExecutorProcessingFactory(),
SubscriptionExecutorProcessingFactory(
executor,
dataset,
# If there are max_concurrent_queries + 10 pending futures in the queue,
# we will start raising MessageRejected to slow down the consumer as
# it means our executor cannot keep up
max_concurrent_queries + 10,
metrics,
),
)


class Noop(ProcessingStrategy[KafkaPayload]):
class Noop(ProcessingStrategy[SubscriptionTaskResult]):
"""
Placeholder.
"""
Expand All @@ -116,7 +126,7 @@ def __init__(self, commit: Callable[[Mapping[Partition, Position]], None]):
def poll(self) -> None:
pass

def submit(self, message: Message[KafkaPayload]) -> None:
def submit(self, message: Message[SubscriptionTaskResult]) -> None:
self.__commit({message.partition: Position(message.offset, message.timestamp)})

def close(self) -> None:
Expand All @@ -130,10 +140,28 @@ def join(self, timeout: Optional[float] = None) -> None:


class SubscriptionExecutorProcessingFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
executor: ThreadPoolExecutor,
dataset: Dataset,
buffer_size: int,
metrics: MetricsBackend,
) -> None:
self.__executor = executor
self.__dataset = dataset
self.__buffer_size = buffer_size
self.__metrics = metrics

def create(
self, commit: Callable[[Mapping[Partition, Position]], None]
) -> ProcessingStrategy[KafkaPayload]:
return Noop(commit)
return ExecuteQuery(
self.__dataset,
self.__executor,
self.__buffer_size,
self.__metrics,
Noop(commit),
)


class ExecuteQuery(ProcessingStrategy[KafkaPayload]):
Expand All @@ -146,13 +174,13 @@ def __init__(
self,
dataset: Dataset,
executor: ThreadPoolExecutor,
max_concurrent_queries: int,
buffer_size: int,
metrics: MetricsBackend,
next_step: ProcessingStrategy[SubscriptionTaskResult],
) -> None:
self.__dataset = dataset
self.__executor = executor
self.__max_concurrent_queries = max_concurrent_queries
self.__buffer_size = buffer_size
self.__metrics = metrics
self.__next_step = next_step

Expand Down Expand Up @@ -223,7 +251,7 @@ def submit(self, message: Message[KafkaPayload]) -> None:

# Tell the consumer to pause until we have removed some futures from
# the queue
if len(self.__queue) >= self.__max_concurrent_queries:
if len(self.__queue) >= self.__buffer_size:
raise MessageRejected

task = self.__encoder.decode(message.payload)
Expand Down

0 comments on commit a9c82ff

Please sign in to comment.