diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java index 1db9e149a..91594f9f1 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.AbstractModule; import com.google.inject.Provides; +import com.google.inject.Scopes; import com.google.inject.Singleton; import io.airlift.http.client.HttpClient; import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor; @@ -82,7 +83,6 @@ public class HaGatewayProviderModule private final LbOAuthManager oauthManager; private final LbFormAuthManager formAuthManager; private final AuthorizationManager authorizationManager; - private final BackendStateManager backendStateConnectionManager; private final ResourceSecurityDynamicFeature resourceSecurityDynamicFeature; private final HaGatewayConfiguration configuration; private final ResourceGroupsManager resourceGroupsManager; @@ -96,6 +96,7 @@ protected void configure() binder().bind(ResourceGroupsManager.class).toInstance(resourceGroupsManager); binder().bind(GatewayBackendManager.class).toInstance(gatewayBackendManager); binder().bind(QueryHistoryManager.class).toInstance(queryHistoryManager); + binder().bind(BackendStateManager.class).in(Scopes.SINGLETON); } public HaGatewayProviderModule(HaGatewayConfiguration configuration) @@ -108,7 +109,6 @@ public HaGatewayProviderModule(HaGatewayConfiguration configuration) authorizationManager = new AuthorizationManager(configuration.getAuthorization(), presetUsers); resourceSecurityDynamicFeature = getAuthFilter(configuration); - backendStateConnectionManager = new BackendStateManager(); GatewayCookieConfigurationPropertiesProvider gatewayCookieConfigurationPropertiesProvider = GatewayCookieConfigurationPropertiesProvider.getInstance(); gatewayCookieConfigurationPropertiesProvider.initialize(configuration.getGatewayCookieConfiguration()); @@ -206,13 +206,6 @@ public AuthorizationManager getAuthorizationManager() return this.authorizationManager; } - @Provides - @Singleton - public BackendStateManager getBackendStateConnectionManager() - { - return this.backendStateConnectionManager; - } - @Provides @Singleton public RoutingGroupSelector getRoutingGroupSelector(@ForRouter HttpClient httpClient) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java index 8fa7de46e..f64d95c36 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java @@ -13,19 +13,29 @@ */ package io.trino.gateway.ha.router; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; import io.trino.gateway.ha.clustermonitor.ClusterStats; +import io.trino.gateway.ha.clustermonitor.TrinoStatus; import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import org.weakref.jmx.MBeanExporter; +import org.weakref.jmx.Managed; import java.util.HashMap; import java.util.Map; +import static java.util.Objects.requireNonNull; + public class BackendStateManager { - private final Map clusterStats; + private final MBeanExporter exporter; + private final Map clusterStats = new HashMap<>(); + private final Map clusterStatsJMXs = new HashMap<>(); - public BackendStateManager() + @Inject + public BackendStateManager(MBeanExporter exporter) { - this.clusterStats = new HashMap<>(); + this.exporter = requireNonNull(exporter, "exporter is null"); } public ClusterStats getBackendState(ProxyBackendConfiguration backend) @@ -36,6 +46,83 @@ public ClusterStats getBackendState(ProxyBackendConfiguration backend) public void updateStates(String clusterId, ClusterStats stats) { + if (!clusterStatsJMXs.containsKey(clusterId)) { + ClusterStatsJMX clusterStatsJMX = new ClusterStatsJMX(stats); + exporter.exportWithGeneratedName( + clusterStatsJMX, + ClusterStatsJMX.class, + ImmutableMap.builder() + .put("name", "ClusterStats") + .put("cluster_id", clusterId) + .build()); + clusterStatsJMXs.put(clusterId, clusterStatsJMX); + } + else { + clusterStatsJMXs.get(clusterId).updateFrom(stats); + } clusterStats.put(clusterId, stats); } + + public static class ClusterStatsJMX + { + private int runningQueryCount; + private int queuedQueryCount; + private int numWorkerNodes; + private TrinoStatus trinoStatus; + + public ClusterStatsJMX(ClusterStats clusterStats) + { + updateFrom(clusterStats); + } + + public void updateFrom(ClusterStats clusterStats) + { + runningQueryCount = clusterStats.runningQueryCount(); + queuedQueryCount = clusterStats.queuedQueryCount(); + numWorkerNodes = clusterStats.numWorkerNodes(); + trinoStatus = clusterStats.trinoStatus(); + } + + @Managed + public int getRunningQueryCount() + { + return runningQueryCount; + } + + @Managed + public int getQueuedQueryCount() + { + return queuedQueryCount; + } + + @Managed + public int getNumWorkerNodes() + { + return numWorkerNodes; + } + + @Managed + public int getTrinoStatusPending() + { + return trinoStatus == TrinoStatus.PENDING ? 1 : 0; + } + + @Managed + public int getTrinoStatusHealthy() + { + return trinoStatus == TrinoStatus.HEALTHY ? 1 : 0; + } + + @Managed + public int getTrinoStatusUnhealthy() + { + return trinoStatus == TrinoStatus.UNHEALTHY ? 1 : 0; + } + + @Managed + public int getTrinoStatusUnknown() + { + return trinoStatus == TrinoStatus.UNKNOWN ? 1 : 0; + } + } } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java b/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java index 931bd2fdf..94322438c 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java @@ -387,6 +387,20 @@ void testHealthCheckEndpoints() throw new IllegalStateException("Trino Gateway health check failed"); } + @Test + void testClusterStatsJMX() + throws Exception + { + Request request = new Request.Builder() + .url("http://localhost:" + routerPort + "/metrics") + .get() + .build(); + Response response = httpClient.newCall(request).execute(); + String body = response.body().string(); + assertThat(body).contains("trino1_TrinoStatusHealthy"); + assertThat(body).contains("trino2_TrinoStatusHealthy"); + } + @AfterAll void cleanup() {