diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 658a0d67d5ab..3dad6045698e 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3504,6 +3504,23 @@ If the buffer overflows, task reinitialization will be triggered. + + ozone.recon.dn.metrics.collection.thread.count + 10 + OZONE, RECON, DN + + The number of threads running to get metrics from jmx endpoint. + + + + ozone.recon.dn.metrics.collection.minimum.api.delay + 30s + OZONE, RECON, DN + + Minimum delay in API to start a new task for Jmx collection. + It behaves like a rate limiter to avoid unnecessary task creation. + + ozone.scm.datanode.admin.monitor.interval 30s diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config index f2a9e0447932..81e05a7f2791 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config @@ -56,7 +56,7 @@ OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true OZONE-SITE.XML_ozone.fs.hsync.enabled=true - +OZONE-SITE.XML_ozone.recon.dn.metrics.collection.minimum.api.delay=5s OZONE_CONF_DIR=/etc/hadoop OZONE_LOG_DIR=/var/log/hadoop diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java index c43958fe1549..e8a5dab62eef 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java @@ -22,19 +22,22 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT; import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; import static org.apache.hadoop.hdds.client.ReplicationType.RATIS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.recon.TestReconEndpointUtil.getReconWebAddress; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; @@ -46,6 +49,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.HddsDatanodeService; @@ -66,6 +71,9 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; import org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.ozone.test.GenericTestUtils; @@ -100,6 +108,7 @@ public class TestStorageDistributionEndpoint { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String STORAGE_DIST_ENDPOINT = "/api/v1/storageDistribution"; + private static final String PENDING_DELETION_ENDPOINT = "/api/v1/pendingDeletion"; static List replicationConfigs() { return Collections.singletonList( @@ -110,17 +119,14 @@ static List replicationConfigs() { @BeforeAll public static void setup() throws Exception { conf = new OzoneConfiguration(); - conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, - TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, 100, - TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, - 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, TimeUnit.MILLISECONDS); conf.setLong(OZONE_SCM_HA_RATIS_SNAPSHOT_GAP, 1L); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 50, - TimeUnit.MILLISECONDS); - conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, - TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 50, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL, 500, TimeUnit.MILLISECONDS); + conf.set(ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, "1s"); // Enhanced SCM configuration for faster block deletion processing ScmConfig scmConfig = conf.getObject(ScmConfig.class); @@ -129,18 +135,9 @@ public static void setup() throws Exception { conf.set(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0s"); // Enhanced DataNode configuration to move pending deletion from SCM to DN faster - DatanodeConfiguration dnConf = - conf.getObject(DatanodeConfiguration.class); - dnConf.setBlockDeletionInterval(Duration.ofMillis(100)); - // Increase block delete queue limit to allow more queued commands on DN - dnConf.setBlockDeleteQueueLimit(50); - // Reduce the interval for delete command worker processing - dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100)); - // Increase blocks deleted per interval to speed up deletion - dnConf.setBlockDeletionLimit(5000); + DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class); + dnConf.setBlockDeletionInterval(Duration.ofMillis(30000)); conf.setFromObject(dnConf); - // Increase DN delete threads for faster parallel processing - conf.setInt("ozone.datanode.block.delete.threads.max", 10); recon = new ReconService(conf); cluster = MiniOzoneCluster.newBuilder(conf) @@ -190,19 +187,110 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) } } waitForKeysCreated(replicationConfig); - Thread.sleep(10000); - StringBuilder urlBuilder = new StringBuilder(); - urlBuilder.append(getReconWebAddress(conf)) - .append(STORAGE_DIST_ENDPOINT); - String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); - StorageCapacityDistributionResponse storageResponse = - MAPPER.readValue(response, StorageCapacityDistributionResponse.class); + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(STORAGE_DIST_ENDPOINT); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + StorageCapacityDistributionResponse storageResponse = + MAPPER.readValue(response, StorageCapacityDistributionResponse.class); - assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys()); - assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace()); - assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes()); - assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes()); - assertEquals(3, storageResponse.getDataNodeUsage().size()); + assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys()); + assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace()); + assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes()); + assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes()); + assertEquals(3, storageResponse.getDataNodeUsage().size()); + + return true; + } catch (Exception e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 1000, 30000); + closeAllContainers(); + fs.delete(dir1, true); + GenericTestUtils.waitFor(() -> { + try { + syncDataFromOM(); + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=om"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + Map pendingDeletionMap = + MAPPER.readValue(response, Map.class); + assertEquals(30, pendingDeletionMap.get("totalSize")); + assertEquals(30, pendingDeletionMap.get("pendingDirectorySize") + pendingDeletionMap.get("pendingKeySize")); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 1000, 30000); + + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=scm"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + ScmPendingDeletion pendingDeletion = + MAPPER.readValue(response, ScmPendingDeletion.class); + assertEquals(30, pendingDeletion.getTotalReplicatedBlockSize()); + assertEquals(10, pendingDeletion.getTotalBlocksize()); + assertEquals(10, pendingDeletion.getTotalBlocksCount()); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 2000, 30000); + GenericTestUtils.waitFor(() -> + scm.getClientProtocolServer().getDeletedBlockSummary().getTotalBlockCount() == 0, + 1000, 30000); + GenericTestUtils.waitFor(() -> { + try { + scm.getScmHAManager().asSCMHADBTransactionBuffer().flush(); + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=dn"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + DataNodeMetricsServiceResponse pendingDeletion = + MAPPER.readValue(response, DataNodeMetricsServiceResponse.class); + assertNotNull(pendingDeletion); + assertEquals(30, pendingDeletion.getTotalPendingDeletion()); + assertEquals(DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED, pendingDeletion.getStatus()); + assertEquals(pendingDeletion.getTotalNodesQueried(), pendingDeletion.getPendingDeletionPerDataNode().size()); + assertEquals(0, pendingDeletion.getTotalNodeQueryFailures()); + pendingDeletion.getPendingDeletionPerDataNode().forEach(dn -> { + assertEquals(10, dn.getPendingBlockSize()); + }); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 2000, 60000); + cluster.getHddsDatanodes().get(0).stop(); + + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=dn"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + DataNodeMetricsServiceResponse pendingDeletion = + MAPPER.readValue(response, DataNodeMetricsServiceResponse.class); + assertNotNull(pendingDeletion); + assertEquals(1, pendingDeletion.getTotalNodeQueryFailures()); + assertTrue(pendingDeletion.getPendingDeletionPerDataNode() + .stream() + .anyMatch(dn -> dn.getPendingBlockSize() == -1)); + return true; + } catch (Throwable e) { + return false; + } + }, 2000, 60000); } private void verifyBlocksCreated( @@ -266,6 +354,14 @@ private static void performOperationOnKeyContainers( } } + private static void closeAllContainers() { + for (ContainerInfo container : + scm.getContainerManager().getContainers()) { + scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER, + container.containerID()); + } + } + @AfterEach public void cleanup() { assertDoesNotThrow(() -> { diff --git a/hadoop-ozone/recon/pom.xml b/hadoop-ozone/recon/pom.xml index 9a0936ebf194..cdabc26f3fbd 100644 --- a/hadoop-ozone/recon/pom.xml +++ b/hadoop-ozone/recon/pom.xml @@ -126,6 +126,26 @@ org.apache.hadoop hadoop-hdfs-client + + org.apache.httpcomponents + httpasyncclient + ${httpasyncclient.version} + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + org.apache.httpcomponents + httpcore + ${httpcore.version} + + + org.apache.httpcomponents + httpcore-nio + ${httpcore.version} + org.apache.ozone hdds-common diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java index d09c01ea72f9..cd315e220d9e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.recon.ReconConfigKeys; import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; +import org.apache.hadoop.ozone.recon.spi.impl.JmxServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,14 @@ public MetricsServiceProvider getMetricsServiceProvider() { return null; } + /** + * Returns the configured MetricsServiceProvider implementation for Jmx. + * @return MetricsServiceProvider instance for Jmx + */ + public MetricsServiceProvider getJmxMetricsServiceProvider() { + return new JmxServiceProviderImpl(configuration, reconUtils); + } + /** * Returns the Prometheus endpoint if configured. Otherwise returns null. * diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index d3f684238712..8fa11caa00f2 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -191,6 +191,14 @@ public final class ReconServerConfigKeys { public static final int OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3; + public static final String OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT = + "ozone.recon.dn.metrics.collection.thread.count"; + public static final int OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT = 10; + + public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY = + "ozone.recon.dn.metrics.collection.minimum.api.delay"; + public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT = "30s"; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java index ccc92648f117..293a4d77a069 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java @@ -847,4 +847,31 @@ public static String constructObjectPathWithPrefix(long... ids) { } return pathBuilder.toString(); } + + public static Map getMetricsData(List> metrics, String beanName) { + if (metrics == null || StringUtils.isEmpty(beanName)) { + return null; + } + for (Map item :metrics) { + if (item.get("name").equals(beanName)) { + return item; + } + } + return null; + } + + public static long extractMetricValue(Map metrics, String keyName) { + if (metrics == null || StringUtils.isEmpty(keyName)) { + return -1; + } + Object value = metrics.get(keyName); + if (value instanceof Long) { + return (long) value; + } + if (value instanceof Integer) { + Integer intValue = (Integer) value; + return intValue.longValue(); + } + return -1; + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java new file mode 100644 index 000000000000..ff7d4ea992bc --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.ReconUtils; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service for collecting and managing DataNode metrics asynchronously. + * Uses non-blocking HTTP client for efficient concurrent metric collection. + */ +@Singleton +public class DataNodeMetricsService { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static final String BEAN_NAME = "Hadoop:service=HddsDatanode,name=BlockDeletingService"; + + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final long minimumApiDelayMs; + private final MetricsServiceProvider asyncServiceProvider; + // Immutable state holder for thread-safe access + private final AtomicReference currentState = + new AtomicReference<>(new MetricsState()); + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + + this.reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + this.httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + this.minimumApiDelayMs = config.getTimeDuration( + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + this.asyncServiceProvider = metricsServiceProviderFactory.getJmxMetricsServiceProvider(); + } + + /** + * Starts an asynchronous metrics collection task if one is not already running. + * Returns immediately - use getCollectedMetrics() to check status and results. + */ + public synchronized void startTask() { + MetricsState state = currentState.get(); + + // Check if already running + if (state.status == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection task is already in progress"); + return; + } + + // Check rate limit + if (state.lastCollectionEndTime > System.currentTimeMillis() - minimumApiDelayMs) { + LOG.info("Skipping metrics collection due to rate limit ({}ms)", minimumApiDelayMs); + return; + } + + Set nodes = reconNodeManager.getNodeStats().keySet(); + + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query"); + currentState.set(new MetricsState( + MetricCollectionStatus.SUCCEEDED, + new ArrayList<>(), + 0L, 0, 0, + System.currentTimeMillis() + )); + return; + } + + LOG.info("Starting async metrics collection for {} datanodes", nodes.size()); + + // Update state to IN_PROGRESS + currentState.set(new MetricsState( + MetricCollectionStatus.IN_PROGRESS, + new ArrayList<>(), + 0L, 0, 0, + state.lastCollectionEndTime + )); + + collectMetricsAsync(nodes); + } + + /** + * Collects metrics from all datanodes asynchronously using CompletableFuture. + */ + private void collectMetricsAsync(Set nodes) { + int totalNodes = nodes.size(); + + // Create futures for all nodes + List> futures = nodes.stream() + .map(this::collectFromSingleDataNode) + .collect(Collectors.toList()); + + // Combine all futures + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .handle((result, throwable) -> { + // Collect all completed results (including failures marked as -1) + List allResults = new ArrayList<>(); + long totalPendingDeletion = 0L; + int failedCount = 0; + + for (CompletableFuture future : futures) { + try { + // getNow with default will return the result if completed, or default if not + DatanodePendingDeletionMetrics metrics = future.getNow(null); + if (metrics != null) { + allResults.add(metrics); + if (metrics.getPendingBlockSize() < 0) { + failedCount++; + } else { + totalPendingDeletion += metrics.getPendingBlockSize(); + } + } + } catch (Exception e) { + LOG.debug("Failed to get result from future", e); + failedCount++; + } + } + + // Update state with results + currentState.set(new MetricsState( + MetricCollectionStatus.SUCCEEDED, + allResults, + totalPendingDeletion, + totalNodes, + failedCount, + System.currentTimeMillis() + )); + + LOG.info("Metrics collection completed. Queried: {}, Failed: {}, Total pending: {} bytes", + totalNodes, failedCount, totalPendingDeletion); + + return null; + }); + } + + /** + * Collects metrics from a single datanode asynchronously. + */ + private CompletableFuture collectFromSingleDataNode( + DatanodeDetails nodeDetails) { + + String jmxUrl = buildJmxUrl(nodeDetails); + String hostname = nodeDetails.getHostName(); + String uuid = nodeDetails.getUuidString(); + return asyncServiceProvider.getMetricsAsync(jmxUrl, BEAN_NAME) + .thenApply(metrics -> { + try { + if (metrics == null || metrics.isEmpty()) { + LOG.warn("No metrics returned from datanode {}", hostname); + return new DatanodePendingDeletionMetrics(hostname, uuid, -1L); + } + Map deletionMetrics = ReconUtils.getMetricsData(metrics, BEAN_NAME); + long pendingBlockSize = ReconUtils.extractMetricValue( + deletionMetrics, "TotalPendingBlockBytes"); + LOG.debug("Successfully collected metrics from {}: {} bytes", hostname, pendingBlockSize); + return new DatanodePendingDeletionMetrics(hostname, uuid, pendingBlockSize); + } catch (Exception e) { + LOG.error("Error parsing metrics from datanode {}", hostname, e); + return new DatanodePendingDeletionMetrics(hostname, uuid, -1L); + } + }) + .exceptionally(throwable -> { + LOG.error("Failed to collect metrics from datanode {}: {}", hostname, throwable.getMessage()); + return new DatanodePendingDeletionMetrics(hostname, uuid, -1L); + }); + } + + /** + * Builds the JMX URL for a datanode. + */ + private String buildJmxUrl(DatanodeDetails nodeDetails) { + String protocol = httpsEnabled ? "https" : "http"; + Name portName = httpsEnabled ? + DatanodeDetails.Port.Name.HTTPS : DatanodeDetails.Port.Name.HTTP; + return String.format("%s://%s:%d/jmx", + protocol, + nodeDetails.getHostName(), + nodeDetails.getPort(portName).getValue()); + } + + /** + * Returns the current metrics state and results. + */ + public DataNodeMetricsServiceResponse getCollectedMetrics() { + MetricsState state = currentState.get(); + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(state.status) + .setPendingDeletion(state.pendingDeletionList) + .setTotalPendingDeletion(state.totalPendingDeletion) + .setTotalNodesQueried(state.totalNodesQueried) + .setTotalNodeQueryFailures(state.totalNodesFailed) + .build(); + } + + /** + * Returns the current collection status. + */ + public MetricCollectionStatus getTaskStatus() { + return currentState.get().status; + } + + @PreDestroy + public void shutdown() { + LOG.info("Shutting down DataNodeMetricsService..."); + asyncServiceProvider.shutdown(); + } + + /** + * Immutable state holder for thread-safe state management. + */ + private static class MetricsState { + private final MetricCollectionStatus status; + private final List pendingDeletionList; + private final Long totalPendingDeletion; + private final int totalNodesQueried; + private final int totalNodesFailed; + private final long lastCollectionEndTime; + + MetricsState() { + this(MetricCollectionStatus.NOT_STARTED, new ArrayList<>(), 0L, 0, 0, 0); + } + + MetricsState(MetricCollectionStatus status, + List pendingDeletionList, + Long totalPendingDeletion, + int totalNodesQueried, + int totalNodesFailed, + long lastCollectionEndTime) { + this.status = status; + this.pendingDeletionList = pendingDeletionList; + this.totalPendingDeletion = totalPendingDeletion; + this.totalNodesQueried = totalNodesQueried; + this.totalNodesFailed = totalNodesFailed; + this.lastCollectionEndTime = lastCollectionEndTime; + } + } + + /** + * Status of metric collection task. + */ + public enum MetricCollectionStatus { + SUCCEEDED, IN_PROGRESS, NOT_STARTED + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java new file mode 100644 index 000000000000..e47dc0554671 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * REST API endpoint that provides metrics and information related to + * pending deletions. It responds to requests on the "/pendingDeletion" path + * and produces application/json responses. + */ +@Path("/pendingDeletion") +@Produces("application/json") +@AdminOnly +public class PendingDeletionEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(PendingDeletionEndpoint.class); + private final ReconGlobalMetricsService reconGlobalMetricsService; + private final DataNodeMetricsService dataNodeMetricsService; + private final StorageContainerLocationProtocol scmClient; + + @Inject + public PendingDeletionEndpoint(ReconGlobalMetricsService reconGlobalMetricsService, + DataNodeMetricsService dataNodeMetricsService, + StorageContainerLocationProtocol scmClient) { + this.reconGlobalMetricsService = reconGlobalMetricsService; + this.dataNodeMetricsService = dataNodeMetricsService; + this.scmClient = scmClient; + } + + @GET + public Response getPendingDeletionByComponent(@QueryParam("component") String component) { + if (component == null || component.isEmpty()) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("component query parameter is required").build(); + } + final String normalizedComponent = component.trim().toLowerCase(); + switch (normalizedComponent) { + case "dn": + return handleDataNodeMetrics(); + case "scm": + return handleScmPendingDeletion(); + case "om": + return handleOmPendingDeletion(); + default: + return Response.status(Response.Status.BAD_REQUEST) + .entity("component query parameter must be one of dn, scm, om").build(); + } + } + + private Response handleDataNodeMetrics() { + DataNodeMetricsService.MetricCollectionStatus status = dataNodeMetricsService.getTaskStatus(); + if (status != DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) { + CompletableFuture.runAsync(dataNodeMetricsService::startTask); + } + DataNodeMetricsServiceResponse response = dataNodeMetricsService.getCollectedMetrics(); + if (response.getStatus() == DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) { + return Response.accepted(response).build(); + } else { + return Response.ok(response).build(); + } + } + + private Response handleScmPendingDeletion() { + try { + HddsProtos.DeletedBlocksTransactionSummary summary = scmClient.getDeletedBlockSummary(); + if (summary == null) { + return Response.noContent() + .build(); + } + ScmPendingDeletion pendingDeletion = new ScmPendingDeletion( + summary.getTotalBlockSize(), + summary.getTotalBlockReplicatedSize(), + summary.getTotalBlockCount()); + return Response.ok(pendingDeletion).build(); + } catch (Exception e) { + LOG.error("Error getting SCM pending deletion", e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Error retrieving SCM pending deletion: " + e.getMessage()) + .build(); + } + } + + private Response handleOmPendingDeletion() { + Map pendingDeletion = reconGlobalMetricsService.calculatePendingSizes(); + return Response.ok(pendingDeletion).build(); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java new file mode 100644 index 000000000000..6ec9cda0a59e --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService; + +/** + * Represents a response from the DataNodeMetricsService. + * This class encapsulates the result of a metrics collection task, + * including the collection status, total pending deletions across all data nodes, + * and details about pending deletions for each data node. + * + * Instances of this class are created using the {@link Builder} class. + */ +public class DataNodeMetricsServiceResponse { + @JsonProperty("status") + private DataNodeMetricsService.MetricCollectionStatus status; + @JsonProperty("totalPendingDeletion") + private Long totalPendingDeletion; + @JsonProperty("pendingDeletionPerDataNode") + private List pendingDeletionPerDataNode; + @JsonProperty("totalNodesQueried") + private int totalNodesQueried; + @JsonProperty("totalNodeQueriesFailed") + private long totalNodeQueryFailures; + + public DataNodeMetricsServiceResponse(Builder builder) { + this.status = builder.status; + this.totalPendingDeletion = builder.totalPendingDeletion; + this.pendingDeletionPerDataNode = builder.pendingDeletion; + this.totalNodesQueried = builder.totalNodesQueried; + this.totalNodeQueryFailures = builder.totalNodeQueryFailures; + } + + public DataNodeMetricsServiceResponse() { + this.status = DataNodeMetricsService.MetricCollectionStatus.NOT_STARTED; + this.totalPendingDeletion = 0L; + this.pendingDeletionPerDataNode = null; + this.totalNodesQueried = 0; + this.totalNodeQueryFailures = 0; + } + + public DataNodeMetricsService.MetricCollectionStatus getStatus() { + return status; + } + + public Long getTotalPendingDeletion() { + return totalPendingDeletion; + } + + public List getPendingDeletionPerDataNode() { + return pendingDeletionPerDataNode; + } + + public int getTotalNodesQueried() { + return totalNodesQueried; + } + + public long getTotalNodeQueryFailures() { + return totalNodeQueryFailures; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for constructing instances of {@link DataNodeMetricsServiceResponse}. + * This class provides a fluent interface for setting the various properties + * of a DataNodeMetricsServiceResponse object before creating a new immutable instance. + * The Builder is designed to be used in a staged and intuitive manner. + * The properties that can be configured include: + * - Status of the metric collection process. + * - Total number of blocks pending deletion across all data nodes. + * - Metrics related to pending deletions from individual data nodes. + */ + public static final class Builder { + private DataNodeMetricsService.MetricCollectionStatus status; + private Long totalPendingDeletion; + private List pendingDeletion; + private int totalNodesQueried; + private long totalNodeQueryFailures; + + public Builder setStatus(DataNodeMetricsService.MetricCollectionStatus status) { + this.status = status; + return this; + } + + public Builder setTotalPendingDeletion(Long totalPendingDeletion) { + this.totalPendingDeletion = totalPendingDeletion; + return this; + } + + public Builder setPendingDeletion(List pendingDeletion) { + this.pendingDeletion = pendingDeletion; + return this; + } + + public Builder setTotalNodesQueried(int totalNodesQueried) { + this.totalNodesQueried = totalNodesQueried; + return this; + } + + public Builder setTotalNodeQueryFailures(long totalNodeQueryFailures) { + this.totalNodeQueryFailures = totalNodeQueryFailures; + return this; + } + + public DataNodeMetricsServiceResponse build() { + return new DataNodeMetricsServiceResponse(this); + } + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java new file mode 100644 index 000000000000..964add573096 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents pending deletion metrics for a datanode. + * This class encapsulates information about blocks pending deletion on a specific datanode. + */ +public class DatanodePendingDeletionMetrics { + + @JsonProperty("hostName") + private final String hostName; + + @JsonProperty("datanodeUuid") + private final String datanodeUuid; + + @JsonProperty("pendingBlockSize") + private final long pendingBlockSize; + + @JsonCreator + public DatanodePendingDeletionMetrics( + @JsonProperty("hostName") String hostName, + @JsonProperty("datanodeUuid") String datanodeUuid, + @JsonProperty("pendingBlockSize") long pendingBlockSize) { + this.hostName = hostName; + this.datanodeUuid = datanodeUuid; + this.pendingBlockSize = pendingBlockSize; + } + + public String getHostName() { + return hostName; + } + + public long getPendingBlockSize() { + return pendingBlockSize; + } + + public String getDatanodeUuid() { + return datanodeUuid; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java new file mode 100644 index 000000000000..edab570e20e5 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents metadata related to pending deletions in the storage container manager (SCM). + * This class encapsulates information such as the total block size, the total size of replicated blocks, + * and the total number of blocks awaiting deletion. + */ +public class ScmPendingDeletion { + @JsonProperty("totalBlocksize") + private final long totalBlocksize; + @JsonProperty + private final long totalReplicatedBlockSize; + @JsonProperty("totalBlocksCount") + private final long totalBlocksCount; + + public ScmPendingDeletion() { + this.totalBlocksize = 0; + this.totalReplicatedBlockSize = 0; + this.totalBlocksCount = 0; + } + + public ScmPendingDeletion(long size, long replicatedSize, long totalBlocks) { + this.totalBlocksize = size; + this.totalReplicatedBlockSize = replicatedSize; + this.totalBlocksCount = totalBlocks; + } + + public long getTotalBlocksize() { + return totalBlocksize; + } + + public long getTotalReplicatedBlockSize() { + return totalReplicatedBlockSize; + } + + public long getTotalBlocksCount() { + return totalBlocksCount; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java index 3d0aa3140671..17253115d6a6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java @@ -19,6 +19,8 @@ import java.net.HttpURLConnection; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.ozone.recon.metrics.Metric; /** @@ -48,12 +50,19 @@ HttpURLConnection getMetricsResponse(String api, String queryString) List getMetricsInstant(String queryString) throws Exception; /** - * Returns a list of {@link Metric} for the given ranged query. + * Returns a list of {@link Map} for the given query. * * @param queryString query string with metric name, start time, end time, * step and other filters. * @return List of Json map of metrics response. * @throws Exception exception */ - List getMetricsRanged(String queryString) throws Exception; + CompletableFuture>> getMetricsAsync(String endpoint, String queryString); + + /** + * Shuts down the service provider and releases any system resources or + * connections held by it. This method is typically used to clean up + * and terminate the service provider's operation. + */ + void shutdown(); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java new file mode 100644 index 000000000000..a23313fc7b54 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.spi.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.ozone.recon.ReconUtils; +import org.apache.hadoop.ozone.recon.metrics.Metric; +import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; +import org.apache.http.HttpResponse; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.nio.reactor.ConnectingIOReactor; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the Prometheus Metrics Service provider. + */ +@Singleton +public class JmxServiceProviderImpl implements MetricsServiceProvider { + + private static final Logger LOG = LoggerFactory.getLogger(JmxServiceProviderImpl.class); + private final CloseableHttpAsyncClient asyncHttpClient; + private final ObjectMapper objectMapper; + + public static final String JMX_INSTANT_QUERY_API = "qry"; + private URLConnectionFactory connectionFactory; + private ReconUtils reconUtils; + private static final int DEFAULT_TIMEOUT_MS = 60000; + private static final int MAX_TOTAL_CONNECTIONS = 100; + private static final int MAX_CONNECTIONS_PER_ROUTE = 10; + + public JmxServiceProviderImpl(OzoneConfiguration configuration, + ReconUtils reconUtils) { + + connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(configuration); + this.reconUtils = reconUtils; + this.asyncHttpClient = createAsyncHttpClient(); + this.asyncHttpClient.start(); + this.objectMapper = new ObjectMapper(); + } + + private CloseableHttpAsyncClient createAsyncHttpClient() { + try { + // Configure IO reactor for non-blocking I/O + IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setConnectTimeout(DEFAULT_TIMEOUT_MS) + .setSoTimeout(DEFAULT_TIMEOUT_MS) + .setIoThreadCount(Runtime.getRuntime().availableProcessors()) + .build(); + + ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); + PoolingNHttpClientConnectionManager connManager = + new PoolingNHttpClientConnectionManager(ioReactor); + connManager.setMaxTotal(MAX_TOTAL_CONNECTIONS); + connManager.setDefaultMaxPerRoute(MAX_CONNECTIONS_PER_ROUTE); + + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(DEFAULT_TIMEOUT_MS) + .setSocketTimeout(DEFAULT_TIMEOUT_MS) + .setConnectionRequestTimeout(DEFAULT_TIMEOUT_MS) + .build(); + + return HttpAsyncClients.custom() + .setConnectionManager(connManager) + .setDefaultRequestConfig(requestConfig) + .build(); + + } catch (Exception e) { + throw new RuntimeException("Failed to create async HTTP client", e); + } + } + + /** + * Returns {@link HttpURLConnection} after querying Metrics endpoint for the + * given metric. + * + * @param api api. + * @param queryString query string with metric name and other filters. + * @return HttpURLConnection + * @throws Exception exception + */ + @Override + public HttpURLConnection getMetricsResponse(String api, String queryString) + throws Exception { + return null; + } + + @Override + public List getMetricsInstant(String queryString) throws Exception { + return Collections.emptyList(); + } + + /** + * Returns a list of {@link Metric} for the given instant query. + * + * @param queryString query string with metric name and other filters. + * @return List of Json map of metrics response. + * @throws Exception exception + */ + @Override + public CompletableFuture>> getMetricsAsync(String jmxEndpoint, String queryString) { + // Remove the trailing slash from endpoint url. + if (jmxEndpoint != null && jmxEndpoint.endsWith("/")) { + jmxEndpoint = jmxEndpoint.substring(0, jmxEndpoint.length() - 1); + } + return getMetrics(jmxEndpoint, JMX_INSTANT_QUERY_API, queryString); + } + + @Override + public void shutdown() { + try { + LOG.info("Shutting down async HTTP client..."); + asyncHttpClient.close(); + } catch (IOException e) { + LOG.error("Error shutting down async HTTP client", e); + } + } + + /** + * Returns a list of {@link Metric} for the given api and query string. + * + * @param api api + * @param queryString query string with metric name and other filters. + * @return List of Json map of metrics response. + * @throws Exception + */ + private CompletableFuture>> getMetrics(String jmxEndpoint, String api, String queryString) { + CompletableFuture>> future = new CompletableFuture<>(); + + try { + String url = String.format("%s?%s=%s", jmxEndpoint, JMX_INSTANT_QUERY_API, queryString); + HttpGet request = new HttpGet(url); + + asyncHttpClient.execute(request, new FutureCallback() { + @Override + public void completed(HttpResponse response) { + try { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode >= 200 && statusCode < 300) { + String responseBody = EntityUtils.toString(response.getEntity()); + Map jsonResponse = objectMapper.readValue( + responseBody, Map.class); + + List> beans = + (List>) jsonResponse.get("beans"); + future.complete(beans); + } else { + future.completeExceptionally( + new IOException("HTTP error code: " + statusCode)); + } + } catch (Exception e) { + future.completeExceptionally(e); + } + } + + @Override + public void failed(Exception ex) { + LOG.debug("HTTP request failed for {}: {}", url, ex.getMessage()); + future.completeExceptionally(ex); + } + + @Override + public void cancelled() { + future.completeExceptionally( + new IOException("Request cancelled")); + } + }); + + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java index 64c00490613e..b5d2d081672c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.inject.Singleton; import javax.ws.rs.core.Response; @@ -45,11 +46,9 @@ * Implementation of the Prometheus Metrics Service provider. */ @Singleton -public class PrometheusServiceProviderImpl - implements MetricsServiceProvider { +public class PrometheusServiceProviderImpl implements MetricsServiceProvider { public static final String PROMETHEUS_INSTANT_QUERY_API = "query"; - public static final String PROMETHEUS_RANGED_QUERY_API = "query_range"; private static final Logger LOG = LoggerFactory.getLogger(PrometheusServiceProviderImpl.class); @@ -123,17 +122,14 @@ public List getMetricsInstant(String queryString) return getMetrics(PROMETHEUS_INSTANT_QUERY_API, queryString); } - /** - * Returns a list of {@link Metric} for the given ranged query. - * - * @param queryString query string with metric name, start time, end time, - * step and other filters. - * @return List of Json map of metrics response. - * @throws Exception exception - */ @Override - public List getMetricsRanged(String queryString) throws Exception { - return getMetrics(PROMETHEUS_RANGED_QUERY_API, queryString); + public CompletableFuture>> getMetricsAsync(String endpoint, String queryString) { + return null; + } + + @Override + public void shutdown() { + } /** diff --git a/pom.xml b/pom.xml index 4931b7002837..e4620601e7fa 100644 --- a/pom.xml +++ b/pom.xml @@ -105,6 +105,7 @@ ${hdds.version} ${ozone.version} 2.6.1 + 4.1.5 4.5.14 4.4.16 1.77.0