Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crons: Run monitor tasks check_missing + check_timeout via topic boundaries OR clock pulses #53661

Closed
11 of 12 tasks
evanpurkhiser opened this issue Jul 26, 2023 · 5 comments
Closed
11 of 12 tasks
Assignees

Comments

@evanpurkhiser
Copy link
Member

evanpurkhiser commented Jul 26, 2023

The problem

Currently we have two tasks which must run once per minute. These task are responsible for creating missed check-ins for monitors which did not receive check-ins on time, and for marking check-ins as timed-out when they pass their timeout_at window.

There are two problems with these tasks

  1. If we backlog our consumer queue monitors may be marked as missed or in-progress check-ins may be marked as timed out since the tasks may correctly run on time, but check-ins which should have been processed on time may have been backlogged. (we should be able to completely solve this)

  2. It is possible that we are late to marking missed or timed-out check-ins due to periodic tasks being skipped during deploys. This is an artifact of how the celery beat runner is stopped and restarted during deploys. (we can improve this, but not necessarily solve it)

    (This will not be completely solved by this issue)

Solution

Instead of running our tasks via Celery's Period Tasks, we can provide a way to have our tasks be triggered via our consumer, in-time with our kafka topic. This will avoid our tasks being run out even when there is a backlog.

I propose we have two modes which this should be done in, due to constraints of SASS, single tenant, and on-premise.

(The mode would be configured with a settings flag)

  1. Mode one: "High volume mode" -- When we are guaranteed to have a high volume of monitor check-ins from our producer. In this scenario we compute the timestamp of the message with seconds removed (floored to the minute) and compare that with the previously stored value. This would be stored in Redis. Any time we roll over the minute we hold a lock and schedule the two tasks (check_missing / check_timeout) for immediate execution via celery task runners. Importantly will receive a timestamp from the messages as their reference time and will use that to determine what monitors should be marked as missed or check-ins marked as timed out.

    Why this mode?

    Due to the high volume we're able to accurately trigger our tasks on minute markers purely based on our kafka topic. With high volume, this is the most accurate way to schedule our two monitor tasks for execution.

  2. Mode two: "low volume mode" -- When we do not have a high volume of check-ins, such as single tenant or on premise. In this scenario we have another periodic celery task which produces a clock pulse message once per minute and puts the message in the ingest-monitors topic. These messages should include the timestamp of when the pulse was generated. Ideally these pulses are generated on the minute exactly wall clock time so they are put into the queue in order.

    Our monitor_consumer arroyo processor would then discriminate on clock pulse messages and schedule the two tasks (check_missing / check_timeout) for immediate execution via celery task runners. Importantly will receive a timestamp from the messages as their reference time and will use that to determine what monitors should be marked as missed or check-ins marked as timed out.

    Why this mode?

    When volume of check-ins is not high we are not able to simply use our messages as a clock. Instead we must have an external clock pulse that is guaranteed to trigger so we can ensure even in a backlog situation we have messages in the kafka queue that are in time with when the tasks should be run.

Task execution order

Another important aspect is that when we do backlog and we push many of our two tasks into queue for celery to process, each set of tasks SHOULD happen in order and NOT at the same time. To guarantee this we need to create a new worker queue for each task with the size of 1, so each task runs sequentially.

Notes

  • We should validate that when we DO have a backlog, we do not run into a scenario where the next_checkin value is generated based on the current time, it MUST use the reference time, so future messages in the backlog are correctly processed.

Tasks

  1. Scope: Backend
  2. Scope: Backend
  3. Scope: Backend
  4. Scope: Backend
  5. Scope: Backend
@evanpurkhiser evanpurkhiser changed the title Crons: Run monitor tasks check_missing + check_timeout via kafka clock pulses Crons: Run monitor tasks check_missing + check_timeout via topic boundaries OR clock pulses Jul 26, 2023
evanpurkhiser added a commit that referenced this issue Aug 4, 2023
This is a partial implementation of GH-53661.

This implements the "High Volume" mode, where check-ins from the
consumer are essentially the 'clock pulses' that are used to dispatch
the monitor tasks each minute.

This change does NOT actually dispatch the tasks, but it does send some
telemetry each time the tasks would be dispatched, as well as capture
error conditions when the clock skips.
evanpurkhiser added a commit that referenced this issue Aug 4, 2023
…tch (#54204)

This is a partial implementation of GH-53661.

This implements the "High Volume" mode, where check-ins from the
consumer are essentially the 'clock pulses' that are used to dispatch
the monitor tasks each minute.

This change does NOT actually dispatch the tasks, but it does send some
telemetry each time the tasks would be dispatched, as well as capture
error conditions when the clock skips.
@fpacifici
Copy link
Contributor

Regarding the two modes:
Can we just have the pulse message running in both modes and treat everything as high volume mode?
Having two separate modes that have to be manually selected and are not adaptive to the actual load makes the system more fragile.

  • It adds moving parts some of which are not stressed in the development environment. That creates blind spots. We will likely develop based only on one, and break production on what is not tested.
  • More failure modes to deal with.
  • The system does not adapt to to the load itself but requires manual intervention to switch from one mode to another if the load profile changes. And it will.

If you add the pulse in all environments you are guaranteed to have at least one message per minute everywhere, unless the consumer is backlogging, and just run in high volume mode.

@fpacifici
Copy link
Contributor

Regarding using Kafka as a clock.
How are we dealing with cases where one partition is slower than others? It happens during backlogs. Few partitions backlog first and some clear before others.

If one partition is further back from another and the more recent one is triggering the task to fill in missing checkins you may be filling in check ins that are still stuck in the slower partition but not missing.

There are two ways to address this and we do for subscriptions:

  • Use the subscriptions scheduler, which instead of using one partition as a clock, uses all of them together. Basically it keeps track of a watermark across all partitions, and only executes a task (evaluating subscriptions there) when all partitions have reached minute = X. This would require you to have some sort of commit log as one consumer needs visibility on all partitions.
  • Semantically partition your data and make each replica take care only of the tasks that concerns the partitions it is consuming. The way to break down your topic semantically depends on the content of the topic. For events we used project which is horrible asindividual projects are too big. But maybe we can find a dimension that is sensible here. This way you would also not need Redis.

@fpacifici
Copy link
Contributor

re: locks.
You do not need an explicit lock on Redis.
Redis has atomic SET operations that return the previous value of the key you are setting.
Your consumer can work this way:

previous_value_set = None
if message.timestamp % whatever != previous_value_set:
   previous_redis_value = SET(key, message.timestamp & whatever
   if previous_redis_value != previous_value_set:
      # We changed the value in redis so nobody ran into that minute before 
     schedule_task
   else:
      # somebody else scheduled the task
   previous_value_set = message.timestamp & whatever.
   # This so that for the rest of this minute this specific consumer will not bother redis as we already know what the result would be

@evanpurkhiser
Copy link
Member Author

Thanks for the feedback @fpacifici

You do not need an explicit lock on Redis.

Thanks for this feedback, I've addressed this here: #54341

Can we just have the pulse message running in both modes and treat everything as high volume mode?

This is good, yes. I will follow up and implement this.

How are we dealing with cases where one partition is slower than others? It happens during backlogs. Few partitions backlog first and some clear before others.

Also a good catch, we didn't take into consideration a partial partition backlog. I will follow up with @wedamija about this tomorrow and we'll come up with a plan.

evanpurkhiser added a commit that referenced this issue Aug 8, 2023
Instead of taking a lock when ticking the monitor tasks trigger clock,
we can use redis' `GETSET` command to retrieve the current value and set
a new value atomically.

This addresses @fpacifici's feedback here
#53661 (comment)
evanpurkhiser added a commit that referenced this issue Aug 8, 2023
This is a follow up to GH-54204 as suggested by @fpacifici

#53661 (comment)

> Can we just have the pulse message running in both modes and treat
> everything as high volume mode?

Instead of having two modes, we can simply always use the same logic for
dispatching the monitor tasks on the minute roll-over, using the
consumer as a clock.

Previously the worry here was that in low-volume check-in situations
nothing would drive the clock and we would need to have an external
clock, with a different way to dispatch the tasks. But there is no need
for a different way to dispatch the tasks, we can have an external clock
that pulses messages into the topic and we can simply use the same logic
already implemented to use the topic messages as a clock.

This change removes the concept of "high volume" / "low volume" and adds
the concept of a "clock_pulse" message to the consumer.

In a follow up PR we will introduce the celery beat task which produces
the clock_pulse messages.
evanpurkhiser added a commit that referenced this issue Aug 10, 2023
This is a follow up to GH-54204 as suggested by @fpacifici

#53661 (comment)

> Can we just have the pulse message running in both modes and treat
> everything as high volume mode?

Instead of having two modes, we can simply always use the same logic for
dispatching the monitor tasks on the minute roll-over, using the
consumer as a clock.

Previously the worry here was that in low-volume check-in situations
nothing would drive the clock and we would need to have an external
clock, with a different way to dispatch the tasks. But there is no need
for a different way to dispatch the tasks, we can have an external clock
that pulses messages into the topic and we can simply use the same logic
already implemented to use the topic messages as a clock.

This change removes the concept of "high volume" / "low volume" and adds
the concept of a "clock_pulse" message to the consumer.

In a follow up PR we will introduce the celery beat task which produces
the clock_pulse messages.
evanpurkhiser added a commit that referenced this issue Aug 10, 2023
This is a follow up to GH-54204 as suggested by @fpacifici

#53661 (comment)

> Can we just have the pulse message running in both modes and treat
> everything as high volume mode?

Instead of having two modes, we can simply always use the same logic for
dispatching the monitor tasks on the minute roll-over, using the
consumer as a clock.

Previously the worry here was that in low-volume check-in situations
nothing would drive the clock and we would need to have an external
clock, with a different way to dispatch the tasks. But there is no need
for a different way to dispatch the tasks, we can have an external clock
that pulses messages into the topic and we can simply use the same logic
already implemented to use the topic messages as a clock.

This change removes the concept of "high volume" / "low volume" and adds
the concept of a "clock_pulse" message to the consumer.

In a follow up PR we will introduce the celery beat task which produces
the clock_pulse messages.
evanpurkhiser added a commit that referenced this issue Aug 15, 2023
This is the last piece of GH-53661 to ensure tasks are triggered in
scenarios where there is not enough volume.
@evanpurkhiser
Copy link
Member Author

This is now completed!

@github-actions github-actions bot locked and limited conversation to collaborators Aug 31, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants