Send periodic query progress events#23195
Conversation
prithvip
left a comment
There was a problem hiding this comment.
It might be better (single responsibility principle) if there was a new singleton class that calls DispatchManager::getQueries periodically and sends the heartbeat, and we can wire in any heartbeat-specific configuration there. "Query heartbeat" is overloaded in DispatchManager context, since DispatchQuery has recordHeartbeat(), which is used for purging abandoned queries, and thats totally different purpose than this "heartbeat". "sendQueryStateInfos" might be better terminology and would match with QueryStateInfoResource::getQueryStateInfos API.
| this.securityConfig = requireNonNull(securityConfig, "securityConfig is null"); | ||
|
|
||
| this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); | ||
| this.queryHeartbeatExecutor = Executors.newSingleThreadScheduledExecutor(threadsNamed("query-heartbeat-management-%s")); |
There was a problem hiding this comment.
Cleaner to instantiate this in CoordinatorModule and bind it to a annotated ScheduledExecutorService here - see CoordinatorModule.java for how we do this in other cases
| catch (Throwable e) { | ||
| log.error(e, "Error sending Heartbeat Event"); | ||
| } | ||
| }, 1, 1, TimeUnit.MINUTES); |
| sendQueryHeartbeats(); | ||
| } | ||
| catch (Throwable e) { | ||
| log.error(e, "Error sending Heartbeat Event"); |
There was a problem hiding this comment.
Issues in sending the heartbeat should be caught and handled by QueryMonitor, DispatchManager shouldn't worry about that or log anything.
|
|
||
| private void sendQueryHeartbeats() | ||
| { | ||
| List<BasicQueryInfo> basicQueryInfoList = queryTracker.getAllQueries().stream().map(q -> q.getBasicQueryInfo()).collect(Collectors.toList()); |
There was a problem hiding this comment.
DispatchManager::getQueries already does this, let's use that.
f0e0fd6 to
3b20785
Compare
|
@prithvip @tdcmeehan : Gentle reminder for the review! Thanks. |
| ofEpochMilli(queryInfo.getQueryStats().getCreateTime().getMillis()))); | ||
| } | ||
| catch (Throwable e) { | ||
| log.error(e, "Error sending Query Progress Event"); |
There was a problem hiding this comment.
I don't think we need try-catch here, because any issues should be handled by the specific EventListener implementation which does not throw checked exception. The try-catch should happen there.
3b20785 to
b4e5624
Compare
|
Can you add how progress events can potentially be analyzed? |
tdcmeehan
left a comment
There was a problem hiding this comment.
Can you please add some unit tests?
b4e5624 to
48b34c6
Compare
Query progress events can be used to figure out the workload distribution by users or workload type which can be used for doing more realtime quota enforcement. Another use case would be to get insights into the queuing state of the cluster at real time and identify the workloads/users contributing to it. Query progress events allows us to aggregate data on different dimensions as needed. Cluster level events are very coarse and will not expose such detailed information. |
@tdcmeehan : Modified existing test to check for progress events and updated the PR. Thanks. |
48b34c6 to
d1d0747
Compare
d1d0747 to
9460ae2
Compare
| @PostConstruct | ||
| public void start() | ||
| { | ||
| queryProgressMonitorExecutor.scheduleWithFixedDelay( |
There was a problem hiding this comment.
For a heavily contended cluster, I think this would be pretty wasteful, because most queries will publish identical information each time even though they're queued and it won't change until they dequeue. What do you think about making it based off of query state transition? I.e. it publishes once per query state transition, and when it transitions into EXECUTING then we start a periodic timer?
There was a problem hiding this comment.
I feel that the decision to process/ignore events for QUEUED queries should be left to the consumer itself. It also depends on the use case for which these events are being utilized. For instance, within Meta itself, these events will be used for two different purposes:
-
Resource utilization of in-progress queries to perform quota enforcement and throttling. This consumer simply ignore events for QUEUED queries and only logs events for running queries.
-
Getting the current queuing state of the cluster and power certain alerts/dashboard. This requires getting events for queued queries at periodic interval in order to generate a recent snapshot. Another problem I see is that if a query fails after queueing and before logging query completed event (in case of coordinator crash or cluster restart), then we will not be able to figure out if a query is in QUEUED state because its waiting for resources or whether it died due to to some reason. So, having periodic samples will surface out more accurate snaphot of the cluster state.
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class QueryProgressEvent |
There was a problem hiding this comment.
I think what makes this different from similar classes in this package is this is designed to publish multiple events, instead of an event per state transition or at query start/finish. If you imagine that someone might put this into a queue that has at least once semantics, then it might be possible for messages to arrive out of order or duplicated. That's not a problem for peer classes in this package because you know the state transition order, or you know that you only need to look at one query completed even if there's a duplicate, but for this class, it's impossible to infer correct ordering. This may lead to strange queries or incorrect analysis of the events. I'd recommend you include a monotonically increasing identifier that downstream systems can use to infer ordering and to de-duplicate at the reader side.
There was a problem hiding this comment.
@tdcmeehan : Added an increasing event Id field in the QueryProgressEvent class.
Shouldn't higher level quota enforcement work hand-in-hand with local resource management in the resource group? If so, wouldn't it be better to publish these events at the granularity of resource group IDs? If not, how do you ensure that local resources are in line with global quota? Likewise, wouldn't events at the resource group level give you the insights needed to identify the workloads and users contributing to the queueing state? |
My understanding might be wrong, but I feel that higher level quota enforcement can be different from local resource management provided by resource groups. AFAIK, RG provides quota management on concurrency and memory share. So, if a resource group is configured to use 50% memory share and with 100 concurrency, it can run 100 queries in parallel which can consume a total of 50% cluster memory. Global quota management could happen at team, org, product level wherein a team might have quota to use X memory resources per day and once they have consumed these resources, their workload will get blocked/throttled. This enforcement cannot be achieved at RG level IMO. Also, an org might have multiple teams using different resource groups for their workload and we need to aggregate resource usage at org level to perform such quota enforcement.
Yes, we should be able to collect this data at RG level. However, even at RG level, we will be collecting BasicQueryInfos for all running queries and publishing them at periodic interval. In terms of information published, it will most likely still be same, just grouped at RG level. |
9460ae2 to
ec4ad6a
Compare
|
@tdcmeehan : Gentle reminder for the review! Thanks. |
|
@tdcmeehan : Another gentle reminder for the review!! :) |
|
Suggest adding the PR number to the release note entry per the Release Notes Guidelines. |
|
@tdcmeehan : Ping. |
| private void publishQueryProgressEvent() | ||
| { | ||
| for (BasicQueryInfo basicQueryInfo : dispatchManager.getQueries()) { | ||
| if (basicQueryInfo.getState() != FINISHED && basicQueryInfo.getState() != FAILED) { |
There was a problem hiding this comment.
| if (basicQueryInfo.getState() != FINISHED && basicQueryInfo.getState() != FAILED) { | |
| if (!basicQueryInfo.getState().isDone()) { |
| public class QueryMonitorConfig | ||
| { | ||
| private DataSize maxOutputStageJsonSize = new DataSize(16, Unit.MEGABYTE); | ||
| private Duration queryProgressPublishInterval = new Duration(1, MINUTES); |
There was a problem hiding this comment.
Can this be defaulted to 0 (don't publish events)?
| @PostConstruct | ||
| public void start() | ||
| { | ||
| queryProgressMonitorExecutor.scheduleWithFixedDelay( |
There was a problem hiding this comment.
Can you check if the duration is 0, and if so, make this a no-op? And also make the executor loaded lazily?
From your message earlier, I read two use cases: more realtime quota enforcement, and identifying sources of queueing. For both of these, I'm wondering if you get sufficient information from resource group nodes. Instead of publishing |
Just to confirm my understanding, are you suggesting that we can publish
Drilling down is not possible with the realtime quota enforcement use case given that the enforcement system will consume these raw events to make quota decision at realtime. There is no offline processing that will happen later to get more information about these queries using queryIds. So, we need to expose detailed information in the event itself.
I think I mentioned above that aggregation will happen at levels of teams, products, orgs which can be mapped to several RGs in a cluster. So, we need to log detailed information at query level and perform aggregation based on these dimensions. I feel that if we really want to publish events at RG level, we can still do that and push the logic to publish more detailed information (at query level perhaps) to the event consumer itself. Given |
8acbcd1 to
4eef95e
Compare
| private ScheduledExecutorService queryProgressMonitorExecutor; | ||
|
|
||
| @Inject | ||
| public QueryProgressMonitor( | ||
| QueryMonitor queryMonitor, | ||
| DispatchManager dispatchManager, | ||
| QueryMonitorConfig queryMonitorConfig, | ||
| ResourceGroupManager resourceGroupManager) | ||
| { | ||
| resourceGroupManager.getRootResourceGroups(); | ||
| this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); | ||
| this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); | ||
| this.queryProgressPublishInterval = requireNonNull(queryMonitorConfig, "queryMonitorConfig is null").getQueryProgressPublishInterval(); | ||
| } | ||
|
|
||
| @PostConstruct | ||
| public void start() |
There was a problem hiding this comment.
| private ScheduledExecutorService queryProgressMonitorExecutor; | |
| @Inject | |
| public QueryProgressMonitor( | |
| QueryMonitor queryMonitor, | |
| DispatchManager dispatchManager, | |
| QueryMonitorConfig queryMonitorConfig, | |
| ResourceGroupManager resourceGroupManager) | |
| { | |
| resourceGroupManager.getRootResourceGroups(); | |
| this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); | |
| this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); | |
| this.queryProgressPublishInterval = requireNonNull(queryMonitorConfig, "queryMonitorConfig is null").getQueryProgressPublishInterval(); | |
| } | |
| @PostConstruct | |
| public void start() | |
| @GuardedBy("this") | |
| private ScheduledExecutorService queryProgressMonitorExecutor; | |
| @Inject | |
| public QueryProgressMonitor( | |
| QueryMonitor queryMonitor, | |
| DispatchManager dispatchManager, | |
| QueryMonitorConfig queryMonitorConfig, | |
| ResourceGroupManager resourceGroupManager) | |
| { | |
| resourceGroupManager.getRootResourceGroups(); | |
| this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); | |
| this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); | |
| this.queryProgressPublishInterval = requireNonNull(queryMonitorConfig, "queryMonitorConfig is null").getQueryProgressPublishInterval(); | |
| } | |
| @PostConstruct | |
| public synchronized void start() |
| } | ||
|
|
||
| @Config("event.query-progress-publish-interval") | ||
| @ConfigDescription("How frequently to publish query progress events") |
There was a problem hiding this comment.
| @ConfigDescription("How frequently to publish query progress events") | |
| @ConfigDescription("How frequently to publish query progress events. 0ms disables the publication of these events.") |
This change enables coordinators to push their current state of Queued/Running queries as progress events which can be logged to different sources for analysis purposes. Progress event is sent every 1 minute for each queued/running queries.
4eef95e to
1285c88
Compare
|
Please add documentation for this new configuration property to https://github.com/prestodb/presto/blob/master/presto-docs/src/main/sphinx/admin/properties.rst . |
Description
Add support for sending periodic progress events for Queued/Running queries using EventListener interface.
Motivation and Context
This change enables coordinators to push their current state of Queued/Running queries as progress events which can be logged to different sources for analysis purposes. Progress event is sent every 1 minute for each queued/running queries.
Impact
None
Test Plan
Added a Unit test to check for queryProgress events.
Existing tests
Contributor checklist