Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;

import com.facebook.presto.server.QueryStateInfo;
import com.facebook.presto.server.ResourceGroupInfo;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupState;
import com.facebook.presto.spi.resourceGroups.SchedulingPolicy;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.CAN_QUEUE;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.CAN_RUN;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.FULL;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.Math.addExact;
import static java.util.Objects.requireNonNull;

public class AggregatedResourceGroupInfoBuilder
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment: I think this class can be inlined into a method.

{
private ResourceGroupId id;
private SchedulingPolicy schedulingPolicy;
private int schedulingWeight;
private Map<ResourceGroupId, AggregatedResourceGroupInfoBuilder> subGroupsMap;
private ImmutableList.Builder<QueryStateInfo> runningQueriesBuilder;
private static final Map<ResourceGroupState, Integer> resourceGroupStatePreference
= ImmutableMap.of(FULL, 1, CAN_QUEUE, 2, CAN_RUN, 3);
private ResourceGroupState state;
private DataSize softMemoryLimit;
private int softConcurrencyLimit;
private int hardConcurrencyLimit;
private int maxQueuedQueries;
private long memoryUsageBytes;
private int numQueuedQueries;

private void init(ResourceGroupInfo resourceGroupInfo)
{
this.id = requireNonNull(resourceGroupInfo.getId(), "id is null");
this.state = requireNonNull(resourceGroupInfo.getState(), "state is null");
this.schedulingPolicy = resourceGroupInfo.getSchedulingPolicy();
this.schedulingWeight = resourceGroupInfo.getSchedulingWeight();
this.softMemoryLimit = resourceGroupInfo.getSoftMemoryLimit();
this.softConcurrencyLimit = resourceGroupInfo.getSoftConcurrencyLimit();
this.hardConcurrencyLimit = resourceGroupInfo.getHardConcurrencyLimit();
this.maxQueuedQueries = resourceGroupInfo.getMaxQueuedQueries();
this.memoryUsageBytes = resourceGroupInfo.getMemoryUsage().toBytes();
this.numQueuedQueries = resourceGroupInfo.getNumQueuedQueries();
this.subGroupsMap = new HashMap<>();
this.runningQueriesBuilder = ImmutableList.builder();
addRunningQueries(resourceGroupInfo.getRunningQueries());
addSubgroups(resourceGroupInfo.getSubGroups());
}

public AggregatedResourceGroupInfoBuilder add(ResourceGroupInfo resourceGroupInfo)
{
if (this.id == null) {
init(resourceGroupInfo);
return this;
}
checkState(resourceGroupInfo != null && this.id.equals(resourceGroupInfo.getId()));
this.numQueuedQueries = addExact(this.numQueuedQueries, resourceGroupInfo.getNumQueuedQueries());
if (resourceGroupStatePreference.get(resourceGroupInfo.getState()) < resourceGroupStatePreference.get(this.state)) {
this.state = resourceGroupInfo.getState();
}
this.memoryUsageBytes = addExact(this.memoryUsageBytes, resourceGroupInfo.getMemoryUsage().toBytes());
List<ResourceGroupInfo> subGroups = resourceGroupInfo.getSubGroups();
addSubgroups(subGroups);

List<QueryStateInfo> runningQueries = resourceGroupInfo.getRunningQueries();
addRunningQueries(runningQueries);
return this;
}

private void addSubgroups(List<ResourceGroupInfo> subGroups)
{
if (subGroups == null) {
return;
}
for (ResourceGroupInfo subgroup : subGroups) {
subGroupsMap.computeIfAbsent(subgroup.getId(), k -> new AggregatedResourceGroupInfoBuilder()).add(subgroup);
}
}

private void addRunningQueries(List<QueryStateInfo> runningQueries)
{
if (runningQueries == null) {
return;
}
this.runningQueriesBuilder.addAll(runningQueries);
}

public ResourceGroupInfo build()
{
if (this.id == null) {
return null;
}
ImmutableList<QueryStateInfo> runningQueries = runningQueriesBuilder.build();
return new ResourceGroupInfo(
id,
state,
schedulingPolicy,
schedulingWeight,
softMemoryLimit,
softConcurrencyLimit,
hardConcurrencyLimit,
maxQueuedQueries,
DataSize.succinctBytes(memoryUsageBytes),
numQueuedQueries,
runningQueries.size(),
0,
subGroupsMap.values().stream().map(AggregatedResourceGroupInfoBuilder::build).collect(toImmutableList()),
runningQueries);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.UnexpectedResponseException;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.ResourceGroupInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;

import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
import static com.facebook.presto.server.security.RoleType.ADMIN;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static java.util.Objects.requireNonNull;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;

@Path("/v1/resourceGroupState")
@RolesAllowed(ADMIN)
public class DistributedResourceGroupInfoResource
{
private static final Logger log = Logger.get(DistributedResourceGroupInfoResource.class);
private final InternalNodeManager internalNodeManager;
private final ListeningExecutorService executor;
private final HttpClient httpClient;
private final JsonCodec<ResourceGroupInfo> jsonCodec;

@Inject
public DistributedResourceGroupInfoResource(InternalNodeManager internalNodeManager,
@ForResourceManager ListeningExecutorService executor, @ForResourceManager HttpClient httpClient, JsonCodec<ResourceGroupInfo> jsonCodec)
{
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.executor = requireNonNull(executor, "executor is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
}

@GET
@Produces(MediaType.APPLICATION_JSON)
@Encoded
@Path("{resourceGroupId: .+}")
public void getResourceGroupInfos(
@PathParam("resourceGroupId") String resourceGroupIdString,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
if (isNullOrEmpty(resourceGroupIdString)) {
asyncResponse.resume(Response.status(NOT_FOUND).build());
}
try {
ImmutableList.Builder<ListenableFuture<ResourceGroupInfo>> resourceGroupInfoFutureBuilder = ImmutableList.builder();
for (InternalNode coordinator : internalNodeManager.getCoordinators()) {
resourceGroupInfoFutureBuilder.add(getResourceGroupInfoFromCoordinator(xForwardedProto, uriInfo, coordinator));
}
List<ListenableFuture<ResourceGroupInfo>> resourceGroupInfoFutureList = resourceGroupInfoFutureBuilder.build();
Futures.whenAllComplete(resourceGroupInfoFutureList).call(() -> {
try {
ResourceGroupInfo aggregatedResourceGroupInfo = aggregateResourceGroupInfo(resourceGroupInfoFutureList);
if (aggregatedResourceGroupInfo == null) {
return asyncResponse.resume(Response.status(NOT_FOUND).build());
}
return asyncResponse.resume(Response.ok(aggregatedResourceGroupInfo).build());
}
catch (Exception ex) {
log.error(ex, "Error in getting resource group info from one of the coordinators");
return asyncResponse.resume(Response.serverError().entity(ex.getMessage()).build());
}
}, executor);
}
catch (IOException ex) {
log.error(ex, "Error in getting resource group info");
asyncResponse.resume(Response.serverError().entity(ex.getMessage()).build());
}
}

private ResourceGroupInfo aggregateResourceGroupInfo(List<ListenableFuture<ResourceGroupInfo>> queryStateInfoFutureList)
throws InterruptedException, ExecutionException

{
Iterator<ListenableFuture<ResourceGroupInfo>> iterator = queryStateInfoFutureList.iterator();
AggregatedResourceGroupInfoBuilder builder = new AggregatedResourceGroupInfoBuilder();
while (iterator.hasNext()) {
try {
builder.add(iterator.next().get());
}
catch (ExecutionException e) {
Throwable exceptionCause = e.getCause();
//airlift JsonResponseHandler throws UnexpectedResponseException for cases where http status code != 2xx
if (!(exceptionCause instanceof UnexpectedResponseException) ||
((UnexpectedResponseException) exceptionCause).getStatusCode() != NOT_FOUND.getStatusCode()) {
throw e;
}
}
}
return builder.build();
}

private ListenableFuture<ResourceGroupInfo> getResourceGroupInfoFromCoordinator(String xForwardedProto, UriInfo uriInfo,
InternalNode coordinatorNode)
throws IOException
{
String scheme = isNullOrEmpty(xForwardedProto) ? uriInfo.getRequestUri().getScheme() : xForwardedProto;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see us supporting xForwardedProto anywhere else. Do we know if it's been used or not?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its not currently used in the resource group flow

URI uri = uriInfo.getRequestUriBuilder()
.queryParam("includeLocalInfoOnly", true)
.scheme(scheme)
.host(coordinatorNode.getHostAndPort().toInetAddress().getHostName())
.port(coordinatorNode.getInternalUri().getPort())
.build();
Request request = prepareGet().setUri(uri).build();
return httpClient.executeAsync(request, createJsonResponseHandler(jsonCodec));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public int numAggregatedRunningQueries()
return numRunningQueries;
}

/**
* @deprecated This field is not very useful to expose as part of resource endpoint.
* In case of multi coordinator set up, it requires adding additional complexity and
* overhead to existing system to expose this field with accurate value.
*/
@Deprecated
@JsonProperty
public int getNumEligibleSubGroups()
{
Expand Down
Loading