-
-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(subscriptions): Integrate ProduceResult step into executor #2299
Conversation
Only execute the scheduled tasks on a topic that match the entity name passed via the CLI. Also records a metric counting the number of skipped executions.
Subscription executor now produces results to the result topic after executing the ClickHouse query and before committing offsets. The result topic temporarily comes from either the hardcoded value or the --override-result-topic flag passed to the CLI to avoid any chance of writing to the actual result topic that is used in production currently.
Codecov Report
@@ Coverage Diff @@
## master #2299 +/- ##
==========================================
+ Coverage 92.73% 92.75% +0.02%
==========================================
Files 559 559
Lines 25704 25736 +32
==========================================
+ Hits 23836 23871 +35
+ Misses 1868 1865 -3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make the executor test better to verify that we receive a result, now that you added the producer there.
|
||
producer = KafkaProducer( | ||
build_kafka_producer_configuration( | ||
result_topic_spec.topic, override_params={"partitioner": "consistent"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does this parameter come from ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it is from the existing subscriptions consumer and it is useful to ensure the same subscription goes to the same partition
@@ -107,8 +110,7 @@ def test_executor_consumer() -> None: | |||
|
|||
encoded_task = encoder.encode(task) | |||
|
|||
fut = producer.produce(topic, payload=encoded_task) | |||
fut.result() | |||
producer.produce(scheduled_topic, payload=encoded_task).result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we be validating that the query happens, and we have proper results ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we do
With this I added the test I asked for in the previous review and fixed one bug in the executor consumer |
@@ -307,7 +295,6 @@ def submit(self, message: Message[KafkaPayload]) -> None: | |||
|
|||
def close(self) -> None: | |||
self.__closed = True | |||
self.__next_step.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was wrong. The close method should not close the following ones as the partition revoke calls join which can cause messages to be submitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Subscription executor now produces results to the result topic
after executing the ClickHouse query and before committing offsets.
The result topic temporarily comes from either the hardcoded value
or the --override-result-topic flag passed to the CLI to avoid
any chance of writing to the actual result topic that is used
in production currently.