Skip to content

Commit

Permalink
Merge branch 'master' into close-scheduler-producer
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara authored Dec 16, 2021
2 parents da815be + a9c82ff commit 9ec24f6
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions snuba/subscriptions/executor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def get_topics_for_entity(
result_topic_spec,
), "All entities must have same scheduled and result topics"

executor = ThreadPoolExecutor(max_concurrent_queries)

return StreamProcessor(
KafkaConsumer(
build_kafka_consumer_configuration(
Expand All @@ -101,11 +103,19 @@ def get_topics_for_entity(
),
),
Topic(scheduled_topic_spec.topic_name),
SubscriptionExecutorProcessingFactory(),
SubscriptionExecutorProcessingFactory(
executor,
dataset,
# If there are max_concurrent_queries + 10 pending futures in the queue,
# we will start raising MessageRejected to slow down the consumer as
# it means our executor cannot keep up
max_concurrent_queries + 10,
metrics,
),
)


class Noop(ProcessingStrategy[KafkaPayload]):
class Noop(ProcessingStrategy[SubscriptionTaskResult]):
"""
Placeholder.
"""
Expand All @@ -116,7 +126,7 @@ def __init__(self, commit: Callable[[Mapping[Partition, Position]], None]):
def poll(self) -> None:
pass

def submit(self, message: Message[KafkaPayload]) -> None:
def submit(self, message: Message[SubscriptionTaskResult]) -> None:
self.__commit({message.partition: Position(message.offset, message.timestamp)})

def close(self) -> None:
Expand All @@ -130,10 +140,28 @@ def join(self, timeout: Optional[float] = None) -> None:


class SubscriptionExecutorProcessingFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
executor: ThreadPoolExecutor,
dataset: Dataset,
buffer_size: int,
metrics: MetricsBackend,
) -> None:
self.__executor = executor
self.__dataset = dataset
self.__buffer_size = buffer_size
self.__metrics = metrics

def create(
self, commit: Callable[[Mapping[Partition, Position]], None]
) -> ProcessingStrategy[KafkaPayload]:
return Noop(commit)
return ExecuteQuery(
self.__dataset,
self.__executor,
self.__buffer_size,
self.__metrics,
Noop(commit),
)


class ExecuteQuery(ProcessingStrategy[KafkaPayload]):
Expand All @@ -146,13 +174,13 @@ def __init__(
self,
dataset: Dataset,
executor: ThreadPoolExecutor,
max_concurrent_queries: int,
buffer_size: int,
metrics: MetricsBackend,
next_step: ProcessingStrategy[SubscriptionTaskResult],
) -> None:
self.__dataset = dataset
self.__executor = executor
self.__max_concurrent_queries = max_concurrent_queries
self.__buffer_size = buffer_size
self.__metrics = metrics
self.__next_step = next_step

Expand Down Expand Up @@ -223,7 +251,7 @@ def submit(self, message: Message[KafkaPayload]) -> None:

# Tell the consumer to pause until we have removed some futures from
# the queue
if len(self.__queue) >= self.__max_concurrent_queries:
if len(self.__queue) >= self.__buffer_size:
raise MessageRejected

task = self.__encoder.decode(message.payload)
Expand Down

0 comments on commit 9ec24f6

Please sign in to comment.