Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public final class InternalResourceGroupManager<C>
private final boolean isResourceManagerEnabled;
private final QueryManagerConfig queryManagerConfig;
private final InternalNodeManager nodeManager;
private boolean isConfigurationManagerLoaded;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is mutable and guaranteed to be accessed from multiple threads, you should ensure that there is proper visibility to each thread. Note that where you set this, in loadConfigurationManager, such care has also been taken in other mutated members, like configurationManager. I'd recommend making it an AtomicBoolean.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will send a followup PR to address this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@Inject
public InternalResourceGroupManager(
Expand All @@ -136,6 +137,7 @@ public InternalResourceGroupManager(
this.isResourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled();
this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo);
configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory());
this.isConfigurationManagerLoaded = false;
}

@Override
Expand Down Expand Up @@ -202,6 +204,7 @@ public void loadConfigurationManager()
MAX_QUEUED_QUERIES, Integer.toString(queryManagerConfig.getMaxQueuedQueries()));
setConfigurationManager(LegacyResourceGroupConfigurationManager.NAME, legacyProperties);
}
isConfigurationManagerLoaded = true;
}

private void setConfigurationManager(String name, Map<String, String> properties)
Expand Down Expand Up @@ -284,6 +287,12 @@ public List<ResourceGroupRuntimeInfo> getResourceGroupRuntimeInfos()
return resourceGroupRuntimeInfos.build();
}

@Override
public boolean isConfigurationManagerLoaded()
{
return isConfigurationManagerLoaded;
}

private void buildResourceGroupRuntimeInfo(ImmutableList.Builder<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfos, InternalResourceGroup resourceGroup)
{
if (!resourceGroup.subGroups().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,10 @@ public List<ResourceGroupRuntimeInfo> getResourceGroupRuntimeInfos()
{
throw new UnsupportedOperationException();
}

@Override
public boolean isConfigurationManagerLoaded()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ void loadConfigurationManager()
throws Exception;

List<ResourceGroupRuntimeInfo> getResourceGroupRuntimeInfos();

boolean isConfigurationManagerLoaded();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.node.NodeInfo;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.client.ServerInfo;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.metadata.StaticCatalogStore;
import com.facebook.presto.spi.NodeState;

Expand Down Expand Up @@ -56,10 +57,11 @@ public class ServerInfoResource
private final GracefulShutdownHandler shutdownHandler;
private final long startTime = System.nanoTime();
private final NodeResourceStatusProvider nodeResourceStatusProvider;
private final ResourceGroupManager resourceGroupManager;
private NodeState nodeState = ACTIVE;

@Inject
public ServerInfoResource(NodeVersion nodeVersion, NodeInfo nodeInfo, ServerConfig serverConfig, StaticCatalogStore catalogStore, GracefulShutdownHandler shutdownHandler, NodeResourceStatusProvider nodeResourceStatusProvider)
public ServerInfoResource(NodeVersion nodeVersion, NodeInfo nodeInfo, ServerConfig serverConfig, StaticCatalogStore catalogStore, GracefulShutdownHandler shutdownHandler, NodeResourceStatusProvider nodeResourceStatusProvider, ResourceGroupManager resourceGroupManager)
{
this.version = requireNonNull(nodeVersion, "nodeVersion is null");
this.environment = requireNonNull(nodeInfo, "nodeInfo is null").getEnvironment();
Expand All @@ -68,6 +70,7 @@ public ServerInfoResource(NodeVersion nodeVersion, NodeInfo nodeInfo, ServerConf
this.catalogStore = requireNonNull(catalogStore, "catalogStore is null");
this.shutdownHandler = requireNonNull(shutdownHandler, "shutdownHandler is null");
this.nodeResourceStatusProvider = requireNonNull(nodeResourceStatusProvider, "nodeResourceStatusProvider is null");
this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
}

@GET
Expand Down Expand Up @@ -118,7 +121,7 @@ public NodeState getServerState()
if (shutdownHandler.isShutdownRequested()) {
return SHUTTING_DOWN;
}
else if (!nodeResourceStatusProvider.hasResources()) {
else if (!nodeResourceStatusProvider.hasResources() || !resourceGroupManager.isConfigurationManagerLoaded()) {
return INACTIVE;
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public TestingPrestoServer(
false,
false,
coordinator,
false,
properties,
environment,
discoveryUri,
Expand All @@ -259,6 +260,7 @@ public TestingPrestoServer(
boolean coordinatorSidecar,
boolean coordinatorSidecarEnabled,
boolean coordinator,
boolean skipLoadingResourceGroupConfigurationManager,
Map<String, String> properties,
String environment,
URI discoveryUri,
Expand Down Expand Up @@ -368,7 +370,9 @@ public TestingPrestoServer(
this.resourceGroupManager = resourceGroupManager instanceof InternalResourceGroupManager
? Optional.of((InternalResourceGroupManager<?>) resourceGroupManager)
: Optional.empty();
resourceGroupManager.loadConfigurationManager();
if (!skipLoadingResourceGroupConfigurationManager) {
resourceGroupManager.loadConfigurationManager();
}
nodePartitioningManager = injector.getInstance(NodePartitioningManager.class);
planOptimizerManager = injector.getInstance(ConnectorPlanOptimizerManager.class);
clusterMemoryManager = injector.getInstance(ClusterMemoryManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public DistributedQueryRunner(Session defaultSession, int nodeCount, Map<String,
false,
false,
false,
false,
defaultSession,
nodeCount,
1,
Expand All @@ -166,6 +167,7 @@ private DistributedQueryRunner(
boolean resourceManagerEnabled,
boolean catalogServerEnabled,
boolean coordinatorSidecarEnabled,
boolean skipLoadingResourceGroupConfigurationManager,
Session defaultSession,
int nodeCount,
int coordinatorCount,
Expand Down Expand Up @@ -232,6 +234,7 @@ private DistributedQueryRunner(
false,
coordinatorSidecarEnabled,
false,
skipLoadingResourceGroupConfigurationManager,
workerProperties,
parserOptions,
environment,
Expand Down Expand Up @@ -261,6 +264,7 @@ private DistributedQueryRunner(
false,
false,
false,
skipLoadingResourceGroupConfigurationManager,
rmProperties,
parserOptions,
environment,
Expand All @@ -281,6 +285,7 @@ private DistributedQueryRunner(
false,
false,
false,
skipLoadingResourceGroupConfigurationManager,
catalogServerProperties,
parserOptions,
environment,
Expand All @@ -299,6 +304,7 @@ private DistributedQueryRunner(
true,
true,
false,
skipLoadingResourceGroupConfigurationManager,
coordinatorSidecarProperties,
parserOptions,
environment,
Expand All @@ -317,6 +323,7 @@ private DistributedQueryRunner(
false,
false,
true,
skipLoadingResourceGroupConfigurationManager,
extraCoordinatorProperties,
parserOptions,
environment,
Expand Down Expand Up @@ -414,6 +421,7 @@ private static TestingPrestoServer createTestingPrestoServer(
boolean coordinatorSidecar,
boolean coordinatorSidecarEnabled,
boolean coordinator,
boolean skipLoadingResourceGroupConfigurationManager,
Map<String, String> extraProperties,
SqlParserOptions parserOptions,
String environment,
Expand Down Expand Up @@ -444,6 +452,7 @@ private static TestingPrestoServer createTestingPrestoServer(
coordinatorSidecar,
coordinatorSidecarEnabled,
coordinator,
skipLoadingResourceGroupConfigurationManager,
properties,
environment,
discoveryUri,
Expand Down Expand Up @@ -943,6 +952,7 @@ public static class Builder
private boolean resourceManagerEnabled;
private boolean catalogServerEnabled;
private boolean coordinatorSidecarEnabled;
private boolean skipLoadingResourceGroupConfigurationManager;
private List<Module> extraModules = ImmutableList.of();
private int resourceManagerCount = 1;

Expand Down Expand Up @@ -1074,13 +1084,20 @@ public Builder setResourceManagerCount(int resourceManagerCount)
return this;
}

public Builder setSkipLoadingResourceGroupConfigurationManager(boolean skipLoadingResourceGroupConfigurationManager)
{
this.skipLoadingResourceGroupConfigurationManager = skipLoadingResourceGroupConfigurationManager;
return this;
}

public DistributedQueryRunner build()
throws Exception
{
return new DistributedQueryRunner(
resourceManagerEnabled,
catalogServerEnabled,
coordinatorSidecarEnabled,
skipLoadingResourceGroupConfigurationManager,
defaultSession,
nodeCount,
coordinatorCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class TestServerInfoResource
{
private HttpClient client;
private DistributedQueryRunner queryRunner;
private DistributedQueryRunner queryRunnerWithNoClusterReadyCheck;
private ThriftCodecManager thriftCodeManager;

@BeforeClass
Expand All @@ -72,7 +73,7 @@ public void teardown()
this.client = null;
}

@AfterGroups(groups = {"createQueryRunner", "getServerStateWithoutRequiredResourceManagers", "getServerStateWithoutRequiredCoordinators"})
@AfterGroups(groups = {"createQueryRunner", "getServerStateWithoutRequiredResourceManagers", "getServerStateWithoutRequiredCoordinators", "getServerStateWithoutRequiredCoordinators", "createQueryRunnerWithNoClusterReadyCheckSkipLoadingResourceGroupConfigurationManager"})
public void serverTearDown()
{
for (TestingPrestoServer server : queryRunner.getServers()) {
Expand Down Expand Up @@ -106,11 +107,11 @@ public void createQueryRunnerWithNoClusterReadyCheckSetup()
throws Exception
{
queryRunner = createQueryRunnerWithNoClusterReadyCheck(
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of("cluster.required-resource-managers-active", "2", "cluster.required-coordinators-active", "1"),
ImmutableMap.of("query.client.timeout", "10s"), 2);
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of("cluster.required-resource-managers-active", "2", "cluster.required-coordinators-active", "1"),
ImmutableMap.of("query.client.timeout", "10s"), 2, false);
}

@Test(timeOut = 30_000, groups = {"getServerStateWithoutRequiredResourceManagers"}, dataProvider = "thriftEncodingToggle")
Expand All @@ -131,7 +132,7 @@ public void getServerStateWithoutRequiredCoordinatorsSetup()
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of("cluster.required-resource-managers-active", "1", "cluster.required-coordinators-active", "3"),
ImmutableMap.of("query.client.timeout", "10s"), 2);
ImmutableMap.of("query.client.timeout", "10s"), 2, false);
}

@Test(timeOut = 30_000, groups = {"getServerStateWithoutRequiredCoordinators"}, dataProvider = "thriftEncodingToggle")
Expand All @@ -144,6 +145,28 @@ public void testGetServerStateWithoutRequiredCoordinators(boolean useThriftEncod
assertEquals(state, NodeState.INACTIVE);
}

@BeforeGroups("createQueryRunnerWithNoClusterReadyCheckSkipLoadingResourceGroupConfigurationManager")
public void createQueryRunnerWithNoClusterReadyCheckSkipLoadingResourceGroupConfigurationManager()
throws Exception
{
queryRunner = createQueryRunnerWithNoClusterReadyCheck(
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of("cluster.required-resource-managers-active", "1", "cluster.required-coordinators-active", "1"),
ImmutableMap.of("query.client.timeout", "10s"), 2, true);
}

@Test(groups = {"createQueryRunnerWithNoClusterReadyCheckSkipLoadingResourceGroupConfigurationManager"})
public void testGetServerStateWhenResourceGroupConfigurationManagerNotLoaded()
throws Exception
{
TestingPrestoServer server = queryRunner.getCoordinator(0);
URI uri = uriBuilderFrom(server.getBaseUrl().resolve("/v1/info/state")).build();
NodeState state = getNodeState(uri, false, null);
assertEquals(state, NodeState.INACTIVE);
}

private NodeState getNodeState(URI uri, boolean useThriftEncoding, Protocol thriftProtocol)
{
Request.Builder requestBuilder = useThriftEncoding ? ThriftRequestUtils.prepareThriftGet(thriftProtocol) : getJsonTransportBuilder(prepareGet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static DistributedQueryRunner createQueryRunner(
int coordinatorCount)
throws Exception
{
return createQueryRunner(resourceManagerProperties, catalogServerProperties, coordinatorSidecarProperties, coordinatorProperties, extraProperties, coordinatorCount, false, 1);
return createQueryRunner(resourceManagerProperties, catalogServerProperties, coordinatorSidecarProperties, coordinatorProperties, extraProperties, coordinatorCount, false, 1, false);
}

public static DistributedQueryRunner createQueryRunnerWithNoClusterReadyCheck(
Expand All @@ -69,19 +69,20 @@ public static DistributedQueryRunner createQueryRunnerWithNoClusterReadyCheck(
Map<String, String> coordinatorSidecarProperties,
Map<String, String> coordinatorProperties,
Map<String, String> extraProperties,
int coordinatorCount)
int coordinatorCount,
boolean skipLoadingResourceGroupConfigurationManager)
throws Exception
{
return createQueryRunner(resourceManagerProperties, catalogServerProperties, coordinatorSidecarProperties, coordinatorProperties, extraProperties, coordinatorCount, true, 1);
return createQueryRunner(resourceManagerProperties, catalogServerProperties, coordinatorSidecarProperties, coordinatorProperties, extraProperties, coordinatorCount, true, 1, skipLoadingResourceGroupConfigurationManager);
}

public static DistributedQueryRunner createQueryRunner(Map<String, String> resourceManagerProperties, Map<String, String> coordinatorProperties, Map<String, String> extraProperties, int coordinatorCount, int resourceManagerCount)
throws Exception
{
return createQueryRunner(resourceManagerProperties, ImmutableMap.of(), ImmutableMap.of(), coordinatorProperties, extraProperties, coordinatorCount, false, resourceManagerCount);
return createQueryRunner(resourceManagerProperties, ImmutableMap.of(), ImmutableMap.of(), coordinatorProperties, extraProperties, coordinatorCount, false, resourceManagerCount, false);
}

public static DistributedQueryRunner createQueryRunner(Map<String, String> resourceManagerProperties, Map<String, String> catalogServerProperties, Map<String, String> coordinatorSidecarProperties, Map<String, String> coordinatorProperties, Map<String, String> extraProperties, int coordinatorCount, boolean skipClusterReadyCheck, int resourceManagerCount)
public static DistributedQueryRunner createQueryRunner(Map<String, String> resourceManagerProperties, Map<String, String> catalogServerProperties, Map<String, String> coordinatorSidecarProperties, Map<String, String> coordinatorProperties, Map<String, String> extraProperties, int coordinatorCount, boolean skipClusterReadyCheck, int resourceManagerCount, boolean skipLoadingResourceGroupConfigurationManager)
throws Exception
{
DistributedQueryRunner queryRunner = TpchQueryRunnerBuilder.builder()
Expand All @@ -93,6 +94,7 @@ public static DistributedQueryRunner createQueryRunner(Map<String, String> resou
.setResourceManagerEnabled(true)
.setCoordinatorCount(coordinatorCount)
.setResourceManagerCount(resourceManagerCount)
.setSkipLoadingResourceGroupConfigurationManager(skipLoadingResourceGroupConfigurationManager)
.build();
if (!skipClusterReadyCheck) {
queryRunner.waitForClusterToGetReady();
Expand Down