Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManager;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class GracefulShutdownHandler
private final ScheduledExecutorService shutdownHandler = newSingleThreadScheduledExecutor(threadsNamed("shutdown-handler-%s"));
private final ExecutorService lifeCycleStopper = newSingleThreadExecutor(threadsNamed("lifecycle-stopper-%s"));
private final LifeCycleManager lifeCycleManager;
private final QueryManager queryManager;
private final TaskManager sqlTaskManager;
private final boolean isCoordinator;
private final ShutdownAction shutdownAction;
Expand All @@ -61,23 +63,21 @@ public GracefulShutdownHandler(
TaskManager sqlTaskManager,
ServerConfig serverConfig,
ShutdownAction shutdownAction,
LifeCycleManager lifeCycleManager)
LifeCycleManager lifeCycleManager,
QueryManager queryManager)
{
this.sqlTaskManager = requireNonNull(sqlTaskManager, "sqlTaskManager is null");
this.shutdownAction = requireNonNull(shutdownAction, "shutdownAction is null");
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.isCoordinator = requireNonNull(serverConfig, "serverConfig is null").isCoordinator();
this.gracePeriod = serverConfig.getGracePeriod();
this.queryManager = requireNonNull(queryManager, "queryManager is null");
}

public synchronized void requestShutdown()
{
log.info("Shutdown requested");

if (isCoordinator) {
throw new UnsupportedOperationException("Cannot shutdown coordinator");
}

if (isShutdownRequested()) {
return;
}
Expand All @@ -86,33 +86,14 @@ public synchronized void requestShutdown()

//wait for a grace period to start the shutdown sequence
shutdownHandler.schedule(() -> {
List<TaskInfo> activeTasks = getActiveTasks();
while (activeTasks.size() > 0) {
CountDownLatch countDownLatch = new CountDownLatch(activeTasks.size());

for (TaskInfo taskInfo : activeTasks) {
sqlTaskManager.addStateChangeListener(taskInfo.getTaskId(), newState -> {
if (newState.isDone()) {
countDownLatch.countDown();
}
});
}

log.info("Waiting for all tasks to finish");

try {
countDownLatch.await();
}
catch (InterruptedException e) {
log.warn("Interrupted while waiting for all tasks to finish");
currentThread().interrupt();
}

activeTasks = getActiveTasks();
if (isCoordinator) {
waitForQueriesToComplete();
}
else {
waitForTasksToComplete();
// wait for another grace period for all task states to be observed by the coordinator
sleepUninterruptibly(gracePeriod.toMillis(), MILLISECONDS);
}

// wait for another grace period for all task states to be observed by the coordinator
sleepUninterruptibly(gracePeriod.toMillis(), MILLISECONDS);

Future<?> shutdownFuture = lifeCycleStopper.submit(() -> {
lifeCycleManager.stop();
Expand All @@ -138,6 +119,34 @@ public synchronized void requestShutdown()
}, gracePeriod.toMillis(), MILLISECONDS);
}

private void waitForTasksToComplete()
{
List<TaskInfo> activeTasks = getActiveTasks();
while (activeTasks.size() > 0) {
CountDownLatch countDownLatch = new CountDownLatch(activeTasks.size());

for (TaskInfo taskInfo : activeTasks) {
sqlTaskManager.addStateChangeListener(taskInfo.getTaskId(), newState -> {
if (newState.isDone()) {
countDownLatch.countDown();
}
});
}

log.info("Waiting for all tasks to finish");

try {
countDownLatch.await();
}
catch (InterruptedException e) {
log.warn("Interrupted while waiting for all tasks to finish");
currentThread().interrupt();
}

activeTasks = getActiveTasks();
}
}

private List<TaskInfo> getActiveTasks()
{
return sqlTaskManager.getAllTaskInfo()
Expand All @@ -146,6 +155,38 @@ private List<TaskInfo> getActiveTasks()
.collect(toImmutableList());
}

private void waitForQueriesToComplete()
{
List<BasicQueryInfo> activeQueries = getActiveQueryInfo();
while (activeQueries.size() > 0) {
CountDownLatch countDownLatch = new CountDownLatch(activeQueries.size());
for (BasicQueryInfo queryInfo : activeQueries) {
queryManager.addStateChangeListener(queryInfo.getQueryId(), newState -> {
if (newState.isDone()) {
countDownLatch.countDown();
}
});
}
log.info("Waiting for all queries to finish");
try {
countDownLatch.await();
}
catch (InterruptedException e) {
log.warn("Interrupted while waiting for all queries to finish");
currentThread().interrupt();
}
activeQueries = getActiveQueryInfo();
}
}

private List<BasicQueryInfo> getActiveQueryInfo()
{
return queryManager.getQueries()
.stream()
.filter(queryInfo -> !queryInfo.getState().isDone())
.collect(toImmutableList());
}

private synchronized void setShutdownRequested(boolean shutdownRequested)
{
this.shutdownRequested = shutdownRequested;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ public static class TestShutdownAction
private final CountDownLatch shutdownCalled = new CountDownLatch(1);

@GuardedBy("this")
private boolean isWorkerShutdown;
private boolean isShutdown;

@Override
public synchronized void onShutdown()
{
isWorkerShutdown = true;
isShutdown = true;
shutdownCalled.countDown();
}

Expand All @@ -184,9 +184,9 @@ public void waitForShutdownComplete(long millis)
shutdownCalled.await(millis, MILLISECONDS);
}

public synchronized boolean isWorkerShutdown()
public synchronized boolean isShutdown()
{
return isWorkerShutdown;
return isShutdown;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.facebook.drift.transport.netty.server.DriftNettyServerModule;
import com.facebook.drift.transport.netty.server.DriftNettyServerTransport;
import com.facebook.presto.Session;
import com.facebook.presto.dispatcher.NoOpQueryManager;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
Expand Down Expand Up @@ -132,7 +134,8 @@ public static class TestingThriftServerInfoModule
public void configure(Binder binder)
{
configBinder(binder).bindConfig(ServerConfig.class);

//Bind noop QueryManager similar to the binding done for TaskManager here
binder.bind(QueryManager.class).to(NoOpQueryManager.class).in(Scopes.SINGLETON);
binder.bind(GracefulShutdownHandler.class).in(Scopes.SINGLETON);
binder.bind(ShutdownAction.class).to(TestingPrestoServer.TestShutdownAction.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.ArrayList;
Expand All @@ -47,6 +48,8 @@ public class TestGracefulShutdown
.setCatalog("tpch")
.setSchema("tiny")
.build();
private static final String COORDINATOR = "coordinator";
private static final String WORKER = "worker";

private ListeningExecutorService executor;

Expand All @@ -62,35 +65,47 @@ public void shutdown()
executor.shutdownNow();
}

@Test(timeOut = SHUTDOWN_TIMEOUT_MILLIS)
public void testShutdown()
throws Exception
@DataProvider(name = "testServerInfo")
public static Object[][] testServerInfo()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("node-scheduler.include-coordinator", "false")
.put("shutdown.grace-period", "10s")
.build();
return new Object[][] {
{WORKER, ImmutableMap.<String, String>builder()
.put("node-scheduler.include-coordinator", "false")
.put("shutdown.grace-period", "10s")
.build()},
{COORDINATOR, ImmutableMap.<String, String>builder()
.put("node-scheduler.include-coordinator", "false")
.put("shutdown.grace-period", "10s")
.build()},
{COORDINATOR, ImmutableMap.<String, String>builder()
.put("node-scheduler.include-coordinator", "true")
.put("shutdown.grace-period", "10s")
.build()}
};
}

@Test(timeOut = SHUTDOWN_TIMEOUT_MILLIS, dataProvider = "testServerInfo")
public void testShutdown(String serverInstanceType, Map<String, String> properties)
throws Exception
{
try (DistributedQueryRunner queryRunner = createQueryRunner(TINY_SESSION, properties)) {
List<ListenableFuture<?>> queryFutures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
queryFutures.add(executor.submit(() -> queryRunner.execute("SELECT COUNT(*), clerk FROM orders GROUP BY clerk")));
}

TestingPrestoServer worker = queryRunner.getServers()
boolean isCoordinatorInstance = serverInstanceType.equals(COORDINATOR);
TestingPrestoServer testServer = queryRunner.getServers()
.stream()
.filter(server -> !server.isCoordinator())
.filter(server -> server.isCoordinator() == isCoordinatorInstance)
.findFirst()
.get();

TaskManager taskManager = worker.getTaskManager();

// wait until tasks show up on the worker
while (taskManager.getAllTaskInfo().isEmpty()) {
MILLISECONDS.sleep(500);
if (!isCoordinatorInstance) {
TaskManager taskManager = testServer.getTaskManager();
while (taskManager.getAllTaskInfo().isEmpty()) {
MILLISECONDS.sleep(500);
}
}

worker.getGracefulShutdownHandler().requestShutdown();
testServer.getGracefulShutdownHandler().requestShutdown();

Futures.allAsList(queryFutures).get();

Expand All @@ -99,13 +114,13 @@ public void testShutdown()
assertEquals(info.getState(), FINISHED);
}

TestShutdownAction shutdownAction = (TestShutdownAction) worker.getShutdownAction();
TestShutdownAction shutdownAction = (TestShutdownAction) testServer.getShutdownAction();
shutdownAction.waitForShutdownComplete(SHUTDOWN_TIMEOUT_MILLIS);
assertTrue(shutdownAction.isWorkerShutdown());
assertTrue(shutdownAction.isShutdown());
}
}

@Test(expectedExceptions = UnsupportedOperationException.class)
@Test(timeOut = SHUTDOWN_TIMEOUT_MILLIS)
public void testCoordinatorShutdown()
throws Exception
{
Expand All @@ -117,6 +132,9 @@ public void testCoordinatorShutdown()
.get();

coordinator.getGracefulShutdownHandler().requestShutdown();
TestShutdownAction shutdownAction = (TestShutdownAction) coordinator.getShutdownAction();
shutdownAction.waitForShutdownComplete(SHUTDOWN_TIMEOUT_MILLIS);
assertTrue(shutdownAction.isShutdown());
}
}
}