-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Resource Management related cleanup #14104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
0fb77dc
Add query id to NoSuchElementException
tdcmeehan 9c42121
Remove system startup minimum worker requirement
tdcmeehan efcdeb2
Add DISPATCHING query states
tdcmeehan 2a35115
Split out queued phase from QueryManager
tdcmeehan 1aedb21
Add peak tasks to BasicQueryStats
tdcmeehan 7510297
Add LocalCoordinatorLocation
tdcmeehan 6329d90
Improve query event stats for immediately failed queries
tdcmeehan d641e7b
Remove Optional from QueryStateMachine resourceGroup
tdcmeehan 1c6a43f
Simplify DispatchInfo construction
tdcmeehan 848f23a
Rename SqlQueryManagerStats to QueryManagerStats
tdcmeehan a4d2f17
Simplify query manager stats tracking
tdcmeehan 1b91f7a
Fix handling of failures during query creation
tdcmeehan eb07101
Cleanup dispatcher executor management
tdcmeehan 73918d1
Change local dispatch to finish immediately after query submission
tdcmeehan 1a4ef68
Catch errors from LocalDispatchQuery querySubmitter
tdcmeehan a855e13
Fix result caching in protocol Query
tdcmeehan 823a0e5
Simplify token management in protocol Query
tdcmeehan 95e5b4c
Add purger to ExecutingStatementResource
tdcmeehan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
23 changes: 23 additions & 0 deletions
23
presto-main/src/main/java/com/facebook/presto/dispatcher/CoordinatorLocation.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.presto.dispatcher; | ||
|
|
||
| import javax.ws.rs.core.UriInfo; | ||
|
|
||
| import java.net.URI; | ||
|
|
||
| public interface CoordinatorLocation | ||
| { | ||
| URI getUri(UriInfo uriInfo, String xForwardedProto); | ||
| } |
120 changes: 120 additions & 0 deletions
120
presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchExecutor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.presto.dispatcher; | ||
|
|
||
| import com.facebook.airlift.concurrent.BoundedExecutor; | ||
| import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean; | ||
| import com.facebook.presto.execution.QueryManagerConfig; | ||
| import com.google.common.io.Closer; | ||
| import com.google.common.util.concurrent.ListeningExecutorService; | ||
| import com.google.common.util.concurrent.ListeningScheduledExecutorService; | ||
| import org.weakref.jmx.Flatten; | ||
| import org.weakref.jmx.Managed; | ||
| import org.weakref.jmx.Nested; | ||
|
|
||
| import javax.annotation.PreDestroy; | ||
| import javax.inject.Inject; | ||
|
|
||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
|
|
||
| import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; | ||
| import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; | ||
| import static java.util.Objects.requireNonNull; | ||
| import static java.util.concurrent.Executors.newCachedThreadPool; | ||
| import static java.util.concurrent.Executors.newScheduledThreadPool; | ||
|
|
||
| public class DispatchExecutor | ||
| { | ||
| private final Closer closer = Closer.create(); | ||
|
|
||
| private final ListeningExecutorService executor; | ||
| private final BoundedExecutor boundedExecutor; | ||
| private final ListeningScheduledExecutorService scheduledExecutor; | ||
|
|
||
| private final DispatchExecutorMBeans mbeans; | ||
|
|
||
| @Inject | ||
| public DispatchExecutor(QueryManagerConfig config) | ||
| { | ||
| ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed("dispatcher-query-%s")); | ||
| closer.register(coreExecutor::shutdownNow); | ||
| executor = listeningDecorator(coreExecutor); | ||
| boundedExecutor = new BoundedExecutor(coreExecutor, config.getQuerySubmissionMaxThreads()); | ||
|
|
||
| ScheduledExecutorService coreScheduledExecutor = newScheduledThreadPool(config.getQueryManagerExecutorPoolSize(), daemonThreadsNamed("dispatch-executor-%s")); | ||
| closer.register(coreScheduledExecutor::shutdownNow); | ||
| scheduledExecutor = listeningDecorator(coreScheduledExecutor); | ||
|
|
||
| mbeans = new DispatchExecutorMBeans(coreExecutor, coreScheduledExecutor); | ||
| } | ||
|
|
||
| public ListeningExecutorService getExecutor() | ||
| { | ||
| return executor; | ||
| } | ||
|
|
||
| public BoundedExecutor getBoundedExecutor() | ||
| { | ||
| return boundedExecutor; | ||
| } | ||
|
|
||
| public ListeningScheduledExecutorService getScheduledExecutor() | ||
| { | ||
| return scheduledExecutor; | ||
| } | ||
|
|
||
| @Managed | ||
| @Flatten | ||
| public DispatchExecutorMBeans getMbeans() | ||
| { | ||
| return mbeans; | ||
| } | ||
|
|
||
| @PreDestroy | ||
| public void shutdown() | ||
| throws Exception | ||
| { | ||
| closer.close(); | ||
| } | ||
|
|
||
| public class DispatchExecutorMBeans | ||
| { | ||
| private final ThreadPoolExecutorMBean executor; | ||
| private final ThreadPoolExecutorMBean scheduledExecutor; | ||
|
|
||
| public DispatchExecutorMBeans(ExecutorService coreExecutor, ScheduledExecutorService coreScheduledExecutor) | ||
| { | ||
| requireNonNull(coreExecutor, "coreExecutor is null"); | ||
| requireNonNull(coreScheduledExecutor, "coreScheduledExecutor is null"); | ||
| executor = new ThreadPoolExecutorMBean((ThreadPoolExecutor) coreExecutor); | ||
| scheduledExecutor = new ThreadPoolExecutorMBean((ThreadPoolExecutor) coreScheduledExecutor); | ||
| } | ||
|
|
||
| @Managed | ||
| @Nested | ||
| public ThreadPoolExecutorMBean getExecutor() | ||
| { | ||
| return executor; | ||
| } | ||
|
|
||
| @Managed | ||
| @Nested | ||
| public ThreadPoolExecutorMBean getScheduledExecutor() | ||
| { | ||
| return scheduledExecutor; | ||
| } | ||
| } | ||
| } | ||
74 changes: 74 additions & 0 deletions
74
presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchInfo.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.presto.dispatcher; | ||
|
|
||
| import com.facebook.presto.execution.ExecutionFailureInfo; | ||
| import io.airlift.units.Duration; | ||
|
|
||
| import java.util.Optional; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class DispatchInfo | ||
| { | ||
| private final Optional<CoordinatorLocation> coordinatorLocation; | ||
| private final Optional<ExecutionFailureInfo> failureInfo; | ||
| private final Duration elapsedTime; | ||
| private final Duration queuedTime; | ||
|
|
||
| public static DispatchInfo queued(Duration elapsedTime, Duration queuedTime) | ||
| { | ||
| return new DispatchInfo(Optional.empty(), Optional.empty(), elapsedTime, queuedTime); | ||
| } | ||
|
|
||
| public static DispatchInfo dispatched(CoordinatorLocation coordinatorLocation, Duration elapsedTime, Duration queuedTime) | ||
| { | ||
| requireNonNull(coordinatorLocation, "coordinatorLocation is null"); | ||
| return new DispatchInfo(Optional.of(coordinatorLocation), Optional.empty(), elapsedTime, queuedTime); | ||
| } | ||
|
|
||
| public static DispatchInfo failed(ExecutionFailureInfo failureInfo, Duration elapsedTime, Duration queuedTime) | ||
| { | ||
| requireNonNull(failureInfo, "coordinatorLocation is null"); | ||
| return new DispatchInfo(Optional.empty(), Optional.of(failureInfo), elapsedTime, queuedTime); | ||
| } | ||
|
|
||
| private DispatchInfo(Optional<CoordinatorLocation> coordinatorLocation, Optional<ExecutionFailureInfo> failureInfo, Duration elapsedTime, Duration queuedTime) | ||
| { | ||
| this.coordinatorLocation = requireNonNull(coordinatorLocation, "coordinatorLocation is null"); | ||
| this.failureInfo = requireNonNull(failureInfo, "failureInfo is null"); | ||
| this.elapsedTime = requireNonNull(elapsedTime, "elapsedTime is null"); | ||
| this.queuedTime = requireNonNull(queuedTime, "queuedTime is null"); | ||
| } | ||
|
|
||
| public Optional<CoordinatorLocation> getCoordinatorLocation() | ||
| { | ||
| return coordinatorLocation; | ||
| } | ||
|
|
||
| public Optional<ExecutionFailureInfo> getFailureInfo() | ||
| { | ||
| return failureInfo; | ||
| } | ||
|
|
||
| public Duration getElapsedTime() | ||
| { | ||
| return elapsedTime; | ||
| } | ||
|
|
||
| public Duration getQueuedTime() | ||
| { | ||
| return queuedTime; | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to also export the number of active threads in
BoundedExecutor? -- this has been an operation pain when we want to monitor whenBoundedExecutoris full.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BoundedExecutordoes not expose the number of active threads, however the underlying executor itself does export itself as an MBean viaThreadPoolExecutorMBean