Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;

import com.facebook.presto.server.QueryStateInfo;
import com.facebook.presto.server.ResourceGroupInfo;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupState;
import com.facebook.presto.spi.resourceGroups.SchedulingPolicy;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.CAN_QUEUE;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.CAN_RUN;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.FULL;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.addExact;
import static java.util.Objects.requireNonNull;

public class AggregatedResourceGroupInfoBuilder
{
private final ResourceGroupId id;
private final SchedulingPolicy schedulingPolicy;
private final int schedulingWeight;
private final Map<ResourceGroupId, ResourceGroupInfo> subGroupsMap;
private final List<QueryStateInfo> runningQueries;
private static final Map<ResourceGroupState, Integer> resourceGroupStatePreference
= ImmutableMap.of(FULL, 1, CAN_QUEUE, 2, CAN_RUN, 3);

private ResourceGroupState state;
private DataSize softMemoryLimit;
private int softConcurrencyLimit;
private int hardConcurrencyLimit;
private int maxQueuedQueries;
private long memoryUsageBytes;
private int numQueuedQueries;
//TODO check if we need to deprecate this
private int numEligibleSubGroups;

public AggregatedResourceGroupInfoBuilder(ResourceGroupInfo resourceGroupInfo)
{
this.id = requireNonNull(resourceGroupInfo.getId(), "id is null");
this.state = requireNonNull(resourceGroupInfo.getState(), "state is null");
this.schedulingPolicy = resourceGroupInfo.getSchedulingPolicy();
this.schedulingWeight = resourceGroupInfo.getSchedulingWeight();
this.softMemoryLimit = resourceGroupInfo.getSoftMemoryLimit();
this.softConcurrencyLimit = resourceGroupInfo.getSoftConcurrencyLimit();
this.hardConcurrencyLimit = resourceGroupInfo.getHardConcurrencyLimit();
this.maxQueuedQueries = resourceGroupInfo.getMaxQueuedQueries();
this.memoryUsageBytes = resourceGroupInfo.getMemoryUsage().toBytes();
this.numQueuedQueries = resourceGroupInfo.getNumQueuedQueries();
this.numEligibleSubGroups = resourceGroupInfo.getNumEligibleSubGroups();
this.subGroupsMap = new HashMap<>();
this.runningQueries = new ArrayList<>();
addRunningQueries(resourceGroupInfo.getRunningQueries());
addSubgroups(resourceGroupInfo.getSubGroups());
}

public AggregatedResourceGroupInfoBuilder add(ResourceGroupInfo resourceGroupInfo)
{
checkState(resourceGroupInfo != null && this.id.equals(resourceGroupInfo.getId()));
this.numQueuedQueries = addExact(this.numQueuedQueries, resourceGroupInfo.getNumQueuedQueries());
if (resourceGroupStatePreference.get(resourceGroupInfo.getState()) < resourceGroupStatePreference.get(this.state)) {
this.state = resourceGroupInfo.getState();
}
this.memoryUsageBytes = addExact(this.memoryUsageBytes, resourceGroupInfo.getMemoryUsage().toBytes());
List<ResourceGroupInfo> subGroups = resourceGroupInfo.getSubGroups();
addSubgroups(subGroups);

List<QueryStateInfo> runningQueries = resourceGroupInfo.getRunningQueries();
addRunningQueries(runningQueries);
return this;
}

private void addSubgroups(List<ResourceGroupInfo> subGroups)
{
if (subGroups != null) {
for (ResourceGroupInfo subgroup : subGroups) {
subGroupsMap.compute(subgroup.getId(), (k, v) -> v == null ? subgroup : new AggregatedResourceGroupInfoBuilder(v).add(subgroup).build());
}
}
}

private void addRunningQueries(List<QueryStateInfo> runningQueries)
{
if (runningQueries != null) {
this.runningQueries.addAll(runningQueries);
}
}

public ResourceGroupInfo build()
{
return new ResourceGroupInfo(
id,
state,
schedulingPolicy,
schedulingWeight,
softMemoryLimit,
softConcurrencyLimit,
hardConcurrencyLimit,
maxQueuedQueries,
DataSize.succinctBytes(memoryUsageBytes),
numQueuedQueries,
runningQueries.size(),
numEligibleSubGroups,
ImmutableList.copyOf(subGroupsMap.values()),
runningQueries);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.QueryStateInfo;
import com.facebook.presto.spi.QueryId;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;

import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static com.facebook.presto.server.security.RoleType.ADMIN;
import static com.facebook.presto.server.security.RoleType.USER;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static java.util.Objects.requireNonNull;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;

@Path("/v1/queryState")
@RolesAllowed({USER, ADMIN})
public class DistributedQueryInfoResource
{
private static final Logger log = Logger.get(DistributedQueryInfoResource.class);
private static final JsonCodec<List<QueryStateInfo>> JSON_CODEC = JsonCodec.listJsonCodec(QueryStateInfo.class);
private final ResourceManagerClusterStateProvider clusterStateProvider;
private final InternalNodeManager internalNodeManager;
private ListeningExecutorService executor;
private final ResourceManagerProxy proxyHelper;

@Inject
public DistributedQueryInfoResource(ResourceManagerClusterStateProvider clusterStateProvider, InternalNodeManager internalNodeManager,
@ForResourceManager ListeningExecutorService executor, ResourceManagerProxy proxyHelper)
{
this.clusterStateProvider = requireNonNull(clusterStateProvider, "clusterStateProvider is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.executor = requireNonNull(executor, "executor is null");
this.proxyHelper = requireNonNull(proxyHelper, "proxyHelper is null");
}

@GET
public void getAllQueryInfo(@QueryParam("user") String user,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
try {
Set<InternalNode> coordinators = internalNodeManager.getCoordinators();
List<ListenableFuture<List<QueryStateInfo>>> queryStateInfoFutureList = new ArrayList<>();
for (InternalNode coordinator : coordinators) {
queryStateInfoFutureList.add(getQueryStateFromCoordinator(servletRequest, xForwardedProto, uriInfo, coordinator));
}
Futures.whenAllComplete(queryStateInfoFutureList).call(() -> {
try {
List<QueryStateInfo> queryStateInfoList = new ArrayList<>();
for (Future<List<QueryStateInfo>> queryStateInfoFuture : queryStateInfoFutureList) {
queryStateInfoList.addAll(queryStateInfoFuture.get());
}
return asyncResponse.resume(Response.ok(queryStateInfoList).build());
}
catch (Exception ex) {
log.error(ex, "Error in getting query info from one of the coordinators");
return asyncResponse.resume(Response.serverError().entity(ex.getMessage()).build());
}
}, executor);
}
catch (Exception ex) {
log.error(ex, "Error in getting query info");
asyncResponse.resume(Response.serverError().entity(ex.getMessage()).build());
}
}

@GET
@Path("{queryId}")
@Produces(MediaType.APPLICATION_JSON)
public void getQueryStateInfo(@PathParam("queryId") QueryId queryId,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
throws WebApplicationException
{
proxyQueryInfoResponse(servletRequest, asyncResponse, uriInfo, queryId);
}

private ListenableFuture<List<QueryStateInfo>> getQueryStateFromCoordinator(HttpServletRequest servletRequest, String xForwardedProto, UriInfo uriInfo, InternalNode coordinatorNode)
throws IOException
{
String scheme = isNullOrEmpty(xForwardedProto) ? uriInfo.getRequestUri().getScheme() : xForwardedProto;
URI uri = uriInfo.getRequestUriBuilder()
.queryParam("includeLocalQueryOnly", true)
.scheme(scheme)
.host(coordinatorNode.getHostAndPort().toInetAddress().getHostName())
.port(coordinatorNode.getInternalUri().getPort())
.build();
return proxyHelper.getResponse(servletRequest, uri, JSON_CODEC);
}

private void proxyQueryInfoResponse(HttpServletRequest servletRequest, AsyncResponse asyncResponse, UriInfo uriInfo, QueryId queryId)
{
Optional<BasicQueryInfo> queryInfo = clusterStateProvider.getClusterQueries().stream()
.filter(query -> query.getQueryId().equals(queryId))
.findFirst();

if (!queryInfo.isPresent()) {
asyncResponse.resume(Response.status(NOT_FOUND).type(MediaType.APPLICATION_JSON).build());
return;
}
proxyHelper.performRequest(servletRequest, asyncResponse, uriBuilderFrom(queryInfo.get().getSelf()).replacePath(uriInfo.getPath()).build());
}
}
Loading