diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/DistributedTaskInfoResource.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/DistributedTaskInfoResource.java new file mode 100644 index 0000000000000..23bcf0f8b8619 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/DistributedTaskInfoResource.java @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.presto.execution.TaskId; +import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.spi.QueryId; + +import javax.annotation.security.RolesAllowed; +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriInfo; + +import java.net.URI; +import java.util.Optional; + +import static com.facebook.presto.server.security.RoleType.ADMIN; +import static com.facebook.presto.server.security.RoleType.USER; +import static java.util.Objects.requireNonNull; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; + +@Path("/v1/taskInfo") +@RolesAllowed({USER, ADMIN}) +public class DistributedTaskInfoResource +{ + private final ResourceManagerClusterStateProvider clusterStateProvider; + private final ResourceManagerProxy proxyHelper; + + @Inject + public DistributedTaskInfoResource( + ResourceManagerClusterStateProvider clusterStateProvider, + ResourceManagerProxy proxyHelper) + { + this.clusterStateProvider = requireNonNull(clusterStateProvider, "clusterStateProvider is null"); + this.proxyHelper = requireNonNull(proxyHelper, "proxyHelper is null"); + } + + @GET + @Path("{taskId}") + public void getTaskInfo(@PathParam("taskId") TaskId taskId, + @Context UriInfo uriInfo, + @Context HttpServletRequest servletRequest, + @Suspended AsyncResponse asyncResponse) + throws WebApplicationException + { + proxyTaskInfoResponse(servletRequest, asyncResponse, uriInfo, taskId); + } + + private URI createTaskInfoUri(BasicQueryInfo queryInfo, UriInfo uriInfo) + { + return UriBuilder.fromUri(queryInfo.getSelf()).replacePath(uriInfo.getPath()).build(); + } + + private void proxyTaskInfoResponse(HttpServletRequest servletRequest, AsyncResponse asyncResponse, UriInfo uriInfo, TaskId taskId) + { + QueryId queryId = taskId.getQueryId(); + Optional queryInfo = clusterStateProvider.getClusterQueries().stream() + .filter(query -> query.getQueryId().equals(queryId)) + .findFirst(); + + if (queryInfo.isPresent()) { + proxyHelper.performRequest(servletRequest, asyncResponse, createTaskInfoUri(queryInfo.get(), uriInfo)); + } + else { + asyncResponse.resume(Response.status(NOT_FOUND).type(MediaType.APPLICATION_JSON).build()); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerModule.java b/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerModule.java index 7f22e371b4381..231d4018c082a 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ResourceManagerModule.java @@ -26,6 +26,7 @@ import com.facebook.presto.resourcemanager.DistributedQueryInfoResource; import com.facebook.presto.resourcemanager.DistributedQueryResource; import com.facebook.presto.resourcemanager.DistributedResourceGroupInfoResource; +import com.facebook.presto.resourcemanager.DistributedTaskInfoResource; import com.facebook.presto.resourcemanager.ForResourceManager; import com.facebook.presto.resourcemanager.RaftConfig; import com.facebook.presto.resourcemanager.RatisServer; @@ -111,6 +112,7 @@ protected void setup(Binder binder) jaxrsBinder(binder).bind(DistributedQueryResource.class); jaxrsBinder(binder).bind(DistributedQueryInfoResource.class); jaxrsBinder(binder).bind(DistributedClusterStatsResource.class); + jaxrsBinder(binder).bind(DistributedTaskInfoResource.class); httpClientBinder(binder).bindHttpClient("resourceManager", ForResourceManager.class); binder.bind(ResourceManagerProxy.class).in(Scopes.SINGLETON); diff --git a/presto-main/src/main/java/com/facebook/presto/server/TaskInfoResource.java b/presto-main/src/main/java/com/facebook/presto/server/TaskInfoResource.java index 2016097cccca6..ff6ca3990979c 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/TaskInfoResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/TaskInfoResource.java @@ -13,39 +13,96 @@ */ package com.facebook.presto.server; +import com.facebook.presto.dispatcher.DispatchManager; import com.facebook.presto.execution.QueryManager; import com.facebook.presto.execution.StageInfo; import com.facebook.presto.execution.TaskId; import com.facebook.presto.execution.TaskInfo; +import com.facebook.presto.metadata.InternalNode; +import com.facebook.presto.metadata.InternalNodeManager; +import com.facebook.presto.resourcemanager.ResourceManagerProxy; import com.facebook.presto.spi.QueryId; import com.google.inject.Inject; import javax.annotation.security.RolesAllowed; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriInfo; +import java.net.URI; +import java.net.UnknownHostException; import java.util.Optional; +import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; @Path("v1/taskInfo") @RolesAllowed("ADMIN") public class TaskInfoResource { + public static final String INCLUDE_LOCAL_QUERY_ONLY = "includeLocalQueryOnly"; + private final DispatchManager dispatchManager; private final QueryManager queryManager; + private final boolean resourceManagerEnabled; + private final InternalNodeManager internalNodeManager; + private final Optional proxyHelper; @Inject - public TaskInfoResource(QueryManager queryManager) + public TaskInfoResource( + DispatchManager dispatchManager, + QueryManager queryManager, + InternalNodeManager internalNodeManager, + ServerConfig serverConfig, + Optional proxyHelper) { + this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); this.queryManager = requireNonNull(queryManager, "queryManager is null"); + this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null"); + this.resourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled(); + this.proxyHelper = requireNonNull(proxyHelper, "proxyHelper is null"); } @GET @Path("{taskId}") - public TaskInfo getTaskInfo(@PathParam("taskId") TaskId taskId) - throws NotFoundException + @Produces(MediaType.APPLICATION_JSON) + public void getTaskInfo(@PathParam("taskId") TaskId taskId, + @QueryParam(INCLUDE_LOCAL_QUERY_ONLY) @DefaultValue("false") boolean includeLocalQueryOnly, + @Context UriInfo uriInfo, + @Context HttpServletRequest servletRequest, + @Suspended AsyncResponse asyncResponse) + throws WebApplicationException + { + if (requestNeedsToBeProxied(taskId, includeLocalQueryOnly)) { + proxyTaskInfo(servletRequest, asyncResponse, uriInfo); + } + else { + try { + asyncResponse.resume(Response.ok(getTaskInfo(taskId)).build()); + } + catch (NotFoundException e) { + asyncResponse.resume(Response.status(NOT_FOUND).entity("Could not find the requested taskInfo").build()); + } + catch (Exception e) { + asyncResponse.resume(Response.serverError().entity(e.getMessage()).build()); + } + } + } + + private TaskInfo getTaskInfo(TaskId taskId) { QueryId queryId = taskId.getQueryId(); try { @@ -69,4 +126,42 @@ public TaskInfo getTaskInfo(@PathParam("taskId") TaskId taskId) throw new NotFoundException(e); } } + + private boolean requestNeedsToBeProxied(TaskId taskId, boolean includeLocalQueryOnly) + { + return !includeLocalQueryOnly + && resourceManagerEnabled + && !dispatchManager.isQueryPresent(taskId.getQueryId()); + } + + private URI createTaskInfoUri(UriInfo uriInfo, InternalNode resourceManagerNode) + throws UnknownHostException + { + return UriBuilder.fromUri(uriInfo.getRequestUri()) + .queryParam(INCLUDE_LOCAL_QUERY_ONLY, true) + .scheme(resourceManagerNode.getInternalUri().getScheme()) + .host(resourceManagerNode.getHostAndPort().toInetAddress().getHostName()) + .port(resourceManagerNode.getInternalUri().getPort()) + .build(); + } + + private void proxyTaskInfo(HttpServletRequest servletRequest, AsyncResponse asyncResponse, UriInfo uriInfo) + { + try { + checkState(proxyHelper.isPresent()); + Optional resourceManager = internalNodeManager.getResourceManagers().stream() + .findAny(); + if (resourceManager.isPresent()) { + InternalNode resourceManagerNode = resourceManager.get(); + URI uri = createTaskInfoUri(uriInfo, resourceManagerNode); + proxyHelper.get().performRequest(servletRequest, asyncResponse, uri); + } + else { + asyncResponse.resume(Response.serverError().entity("Could not find the resource manager").build()); + } + } + catch (Exception e) { + asyncResponse.resume(Response.serverError().entity(e.getMessage()).build()); + } + } } diff --git a/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedTaskInfoResource.java b/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedTaskInfoResource.java new file mode 100644 index 0000000000000..86f515d674cfb --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedTaskInfoResource.java @@ -0,0 +1,149 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.airlift.http.client.HttpClient; +import com.facebook.airlift.http.client.UnexpectedResponseException; +import com.facebook.airlift.http.client.jetty.JettyHttpClient; +import com.facebook.presto.execution.TaskId; +import com.facebook.presto.execution.TaskInfo; +import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo; +import com.facebook.presto.metadata.AllNodes; +import com.facebook.presto.resourceGroups.FileResourceGroupConfigurationManagerFactory; +import com.facebook.presto.server.testing.TestingPrestoServer; +import com.facebook.presto.spi.resourceGroups.ResourceGroupId; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeoutException; + +import static com.facebook.airlift.json.JsonCodec.jsonCodec; +import static com.facebook.airlift.testing.Closeables.closeQuietly; +import static com.facebook.presto.tests.tpch.TpchQueryRunner.createQueryRunner; +import static com.facebook.presto.utils.QueryExecutionClientUtil.getResponseEntity; +import static com.facebook.presto.utils.QueryExecutionClientUtil.runToCompletion; +import static com.facebook.presto.utils.QueryExecutionClientUtil.runToFirstResult; +import static com.facebook.presto.utils.ResourceUtils.getResourceFilePath; +import static com.google.common.base.Preconditions.checkState; +import static java.lang.String.format; +import static java.lang.Thread.sleep; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; + +public class TestDistributedTaskInfoResource +{ + private static final int COORDINATOR_COUNT = 2; + private HttpClient client; + private TestingPrestoServer coordinator1; + private TestingPrestoServer coordinator2; + private TestingPrestoServer resourceManager; + + @BeforeClass + public void setup() + throws Exception + { + client = new JettyHttpClient(); + DistributedQueryRunner runner = createQueryRunner(ImmutableMap.of("query.client.timeout", "20s"), COORDINATOR_COUNT); + coordinator1 = runner.getCoordinator(0); + coordinator2 = runner.getCoordinator(1); + Optional resourceManager = runner.getResourceManager(); + checkState(resourceManager.isPresent(), "resource manager not present"); + this.resourceManager = resourceManager.get(); + coordinator1.getResourceGroupManager().get().addConfigurationManagerFactory(new FileResourceGroupConfigurationManagerFactory()); + coordinator1.getResourceGroupManager().get() + .setConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json"))); + coordinator2.getResourceGroupManager().get().addConfigurationManagerFactory(new FileResourceGroupConfigurationManagerFactory()); + coordinator2.getResourceGroupManager().get() + .setConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json"))); + } + + @Test(timeOut = 220_000) + public void testDistributedGetTaskInfo() + throws Exception + { + sleep(SECONDS.toMillis(5)); + waitUntilCoordinatorsDiscoveredHealthyInRM(SECONDS.toMillis(15)); + runToCompletion(client, coordinator1, "SELECT 1"); + runToFirstResult(client, coordinator1, "SELECT * from tpch.sf101.orders"); + + Map resourceGroupRuntimeInfoSnapshot; + int globalRunningQueries = 0; + do { + MILLISECONDS.sleep(100); + resourceGroupRuntimeInfoSnapshot = coordinator2.getResourceGroupManager().get().getResourceGroupRuntimeInfosSnapshot(); + ResourceGroupRuntimeInfo resourceGroupRuntimeInfo = resourceGroupRuntimeInfoSnapshot.get(new ResourceGroupId("global")); + if (resourceGroupRuntimeInfo != null) { + globalRunningQueries = resourceGroupRuntimeInfo.getDescendantRunningQueries(); + } + } while (globalRunningQueries != 1); + + for (TaskInfo actualTaskInfo : coordinator1.getTaskManager().getAllTaskInfo()) { + TaskId actualTaskId = actualTaskInfo.getTaskId(); + TaskInfo proxiedTaskInfo = getResponseEntity(client, coordinator2, "/v1/taskInfo/" + actualTaskId, jsonCodec(TaskInfo.class)); + assertNotNull(proxiedTaskInfo); + assertEquals(actualTaskInfo.getTaskId(), proxiedTaskInfo.getTaskId()); + } + + try { + getResponseEntity(client, coordinator2, "/v1/taskInfo/invalidTaskId", jsonCodec(TaskInfo.class)); + fail("Retrieving TaskInfo for an invalid TaskId should fail with a 404"); + } + catch (UnexpectedResponseException expected) { + assertEquals(expected.getStatusCode(), 404); + } + + try { + getResponseEntity(client, coordinator2, "/v1/taskInfo/20221102_075648_00000_8ybuj.9.0.0", jsonCodec(TaskInfo.class)); + fail("Retrieving TaskInfo for an invalid TaskId should fail with a 404"); + } + catch (UnexpectedResponseException expected) { + assertEquals(expected.getStatusCode(), 404); + } + } + + private void waitUntilCoordinatorsDiscoveredHealthyInRM(long timeoutInMillis) + throws TimeoutException, InterruptedException + { + long deadline = System.currentTimeMillis() + timeoutInMillis; + while (System.currentTimeMillis() < deadline) { + AllNodes allNodes = this.resourceManager.refreshNodes(); + if (allNodes.getActiveCoordinators().size() == COORDINATOR_COUNT) { + return; + } + sleep(100); + } + throw new TimeoutException(format("one of the nodes is still missing after: %s ms", timeoutInMillis)); + } + + @AfterClass(alwaysRun = true) + public void teardown() + { + closeQuietly(coordinator1); + closeQuietly(coordinator2); + closeQuietly(resourceManager); + closeQuietly(client); + coordinator1 = null; + coordinator2 = null; + resourceManager = null; + client = null; + } +}