diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 658a0d67d5ab..3dad6045698e 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3504,6 +3504,23 @@ If the buffer overflows, task reinitialization will be triggered. + + ozone.recon.dn.metrics.collection.thread.count + 10 + OZONE, RECON, DN + + The number of threads running to get metrics from jmx endpoint. + + + + ozone.recon.dn.metrics.collection.minimum.api.delay + 30s + OZONE, RECON, DN + + Minimum delay in API to start a new task for Jmx collection. + It behaves like a rate limiter to avoid unnecessary task creation. + + ozone.scm.datanode.admin.monitor.interval 30s diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config index f2a9e0447932..81e05a7f2791 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config @@ -56,7 +56,7 @@ OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true OZONE-SITE.XML_ozone.fs.hsync.enabled=true - +OZONE-SITE.XML_ozone.recon.dn.metrics.collection.minimum.api.delay=5s OZONE_CONF_DIR=/etc/hadoop OZONE_LOG_DIR=/var/log/hadoop diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java index c43958fe1549..34605b2a7eee 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java @@ -30,11 +30,13 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; @@ -46,6 +48,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.HddsDatanodeService; @@ -66,6 +70,9 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; import org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.ozone.test.GenericTestUtils; @@ -100,6 +107,7 @@ public class TestStorageDistributionEndpoint { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String STORAGE_DIST_ENDPOINT = "/api/v1/storageDistribution"; + private static final String PENDING_DELETION_ENDPOINT = "/api/v1/pendingDeletion"; static List replicationConfigs() { return Collections.singletonList( @@ -131,16 +139,14 @@ public static void setup() throws Exception { // Enhanced DataNode configuration to move pending deletion from SCM to DN faster DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class); - dnConf.setBlockDeletionInterval(Duration.ofMillis(100)); + dnConf.setBlockDeletionInterval(Duration.ofMillis(10000)); // Increase block delete queue limit to allow more queued commands on DN dnConf.setBlockDeleteQueueLimit(50); // Reduce the interval for delete command worker processing - dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100)); + dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(10000)); // Increase blocks deleted per interval to speed up deletion dnConf.setBlockDeletionLimit(5000); conf.setFromObject(dnConf); - // Increase DN delete threads for faster parallel processing - conf.setInt("ozone.datanode.block.delete.threads.max", 10); recon = new ReconService(conf); cluster = MiniOzoneCluster.newBuilder(conf) @@ -190,19 +196,107 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) } } waitForKeysCreated(replicationConfig); - Thread.sleep(10000); - StringBuilder urlBuilder = new StringBuilder(); - urlBuilder.append(getReconWebAddress(conf)) - .append(STORAGE_DIST_ENDPOINT); - String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); - StorageCapacityDistributionResponse storageResponse = - MAPPER.readValue(response, StorageCapacityDistributionResponse.class); + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(STORAGE_DIST_ENDPOINT); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + StorageCapacityDistributionResponse storageResponse = + MAPPER.readValue(response, StorageCapacityDistributionResponse.class); - assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys()); - assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace()); - assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes()); - assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes()); - assertEquals(3, storageResponse.getDataNodeUsage().size()); + assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys()); + assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace()); + assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes()); + assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes()); + assertEquals(3, storageResponse.getDataNodeUsage().size()); + + return true; + } catch (Exception e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 5000, 30000); + closeAllContainers(); + fs.delete(dir1, true); + GenericTestUtils.waitFor(() -> { + try { + syncDataFromOM(); + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=om"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + Map pendingDeletionMap = + MAPPER.readValue(response, Map.class); + assertEquals(30, pendingDeletionMap.get("totalSize")); + assertEquals(30, pendingDeletionMap.get("pendingDirectorySize") + pendingDeletionMap.get("pendingKeySize")); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 2000, 60000); + + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=scm"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + ScmPendingDeletion pendingDeletion = + MAPPER.readValue(response, ScmPendingDeletion.class); + assertEquals(30, pendingDeletion.getTotalReplicatedBlockSize()); + assertEquals(10, pendingDeletion.getTotalBlocksize()); + assertEquals(10, pendingDeletion.getTotalBlocksCount()); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 2000, 60000); + GenericTestUtils.waitFor(() -> { + try { + scm.getScmHAManager().asSCMHADBTransactionBuffer().flush(); + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=dn"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + DataNodeMetricsServiceResponse pendingDeletion = + MAPPER.readValue(response, DataNodeMetricsServiceResponse.class); + assertNotNull(pendingDeletion); + assertEquals(30, pendingDeletion.getTotalPendingDeletion()); + assertEquals(DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED, pendingDeletion.getStatus()); + assertEquals(pendingDeletion.getTotalNodesQueries(), pendingDeletion.getPendingDeletionPerDataNode().size()); + assertEquals(0, pendingDeletion.getTotalNodeQueryFailures()); + pendingDeletion.getPendingDeletionPerDataNode().forEach(dn -> { + assertEquals(10, dn.getPendingBlockSize()); + }); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 2000, 60000); + cluster.getHddsDatanodes().get(0).stop(); + + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=dn"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + DataNodeMetricsServiceResponse pendingDeletion = + MAPPER.readValue(response, DataNodeMetricsServiceResponse.class); + assertNotNull(pendingDeletion); + assertEquals(1, pendingDeletion.getTotalNodeQueryFailures()); + assertTrue(pendingDeletion.getPendingDeletionPerDataNode() + .stream() + .anyMatch(dn -> dn.getPendingBlockSize() == -1)); + return true; + } catch (Throwable e) { + return false; + } + }, 2000, 30000); } private void verifyBlocksCreated( @@ -266,6 +360,14 @@ private static void performOperationOnKeyContainers( } } + private static void closeAllContainers() { + for (ContainerInfo container : + scm.getContainerManager().getContainers()) { + scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER, + container.containerID()); + } + } + @AfterEach public void cleanup() { assertDoesNotThrow(() -> { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java index d09c01ea72f9..2baf0511ee39 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.recon.ReconConfigKeys; import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; +import org.apache.hadoop.ozone.recon.spi.impl.JmxServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,15 @@ public MetricsServiceProvider getMetricsServiceProvider() { return null; } + /** + * Returns the configured MetricsServiceProvider implementation for Jmx. + * @param endpoint + * @return MetricsServiceProvider instance for Jmx + */ + public MetricsServiceProvider getJmxMetricsServiceProvider(String endpoint) { + return new JmxServiceProviderImpl(configuration, reconUtils, endpoint); + } + /** * Returns the Prometheus endpoint if configured. Otherwise returns null. * diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index d3f684238712..8fa11caa00f2 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -191,6 +191,14 @@ public final class ReconServerConfigKeys { public static final int OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3; + public static final String OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT = + "ozone.recon.dn.metrics.collection.thread.count"; + public static final int OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT = 10; + + public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY = + "ozone.recon.dn.metrics.collection.minimum.api.delay"; + public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT = "30s"; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java index ccc92648f117..293a4d77a069 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java @@ -847,4 +847,31 @@ public static String constructObjectPathWithPrefix(long... ids) { } return pathBuilder.toString(); } + + public static Map getMetricsData(List> metrics, String beanName) { + if (metrics == null || StringUtils.isEmpty(beanName)) { + return null; + } + for (Map item :metrics) { + if (item.get("name").equals(beanName)) { + return item; + } + } + return null; + } + + public static long extractMetricValue(Map metrics, String keyName) { + if (metrics == null || StringUtils.isEmpty(keyName)) { + return -1; + } + Object value = metrics.get(keyName); + if (value instanceof Long) { + return (long) value; + } + if (value instanceof Integer) { + Integer intValue = (Integer) value; + return intValue.longValue(); + } + return -1; + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java new file mode 100644 index 000000000000..9daa0524ae41 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * The DataNodeMetricsService class is responsible for collecting and managing + * metrics related to datanodes in an Ozone Recon environment. Specifically, + * it gathers metrics about pending block deletions from the datanodes and + * provides aggregated results. + * This service tracks the status of metric collection tasks and provides + * an interface to query the state and results of these tasks. + * The metrics collection process involves communicating with each datanode, + * fetching their pending deletion metrics, and aggregating the data. + */ + +@Singleton +public class DataNodeMetricsService { + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private final ExecutorService executorService; + private final ReconNodeManager reconNodeManager; + private final boolean httpsEnabled; + private final int minimumApiDelay; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private Long totalPendingDeletion = 0L; + private List pendingDeletionList; + private int totalNodesQueried; + private int totalNodesFailed; + private long lastCollectionEndTime; + private static final int REQUEST_TIMEOUT = 60000; + + @Inject + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { + reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + int threadCount = config.getInt(OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT, + OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT); + minimumApiDelay = (int) config.getTimeDuration(OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, TimeUnit.MILLISECONDS); + + httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); + lastCollectionEndTime = 0; + executorService = Executors.newFixedThreadPool(threadCount, + new ThreadFactoryBuilder().setNameFormat("DataNodeMetricsCollectionTasksThread-%d") + .build()); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; + } + + public synchronized void startTask() { + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection task is already in progress. Skipping new task."); + return; + } + if (lastCollectionEndTime > System.currentTimeMillis() - minimumApiDelay) { + LOG.info("Skipping metrics collection task due to last collection time being more than {} seconds ago.", + minimumApiDelay / 1000); + return; + } + totalNodesFailed = 0; + Set nodes = reconNodeManager.getNodeStats().keySet(); + + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query for metrics collection"); + initializeTaskState(0); + currentStatus = MetricCollectionStatus.SUCCEEDED; + return; + } + + LOG.info("Starting metrics collection task for {} datanodes", nodes.size()); + initializeTaskState(nodes.size()); + Map> dataNodeFutures = + submitMetricsCollectionTasks(nodes); + collectMetricsWithTimeout(dataNodeFutures, nodes.size()); + // Add any remaining unfinished tasks as failed entries + addFailedEntries(dataNodeFutures); + currentStatus = MetricCollectionStatus.SUCCEEDED; + lastCollectionEndTime = System.currentTimeMillis(); + LOG.info("Metrics collection completed. Queried: {}, Failed: {}", + totalNodesQueried, totalNodesFailed); + } + + /** + * Initializes the state for a new metrics collection task. + */ + private void initializeTaskState(int nodeCount) { + pendingDeletionList = new ArrayList<>(nodeCount); + totalPendingDeletion = 0L; + currentStatus = MetricCollectionStatus.IN_PROGRESS; + } + + /** + * Submits metrics collection tasks for all datanodes. + */ + private Map> submitMetricsCollectionTasks( + Set nodes) { + Map> futures = new HashMap<>(); + for (DatanodeDetails node : nodes) { + DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask( + node, httpsEnabled, metricsServiceProviderFactory); + DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics( + node.getHostName(), node.getUuidString(), -1L); + futures.put(key, executorService.submit(task)); + } + + totalNodesQueried = futures.size(); + LOG.debug("Submitted {} metrics collection tasks", totalNodesQueried); + + return futures; + } + + /** + * Collects metrics from completed tasks with a global timeout. + * Uses iterator to safely remove entries while iterating. + */ + private void collectMetricsWithTimeout( + Map> futures, + int nodeCount) { + // Calculate timeout: half of total request timeout for all nodes + long maximumTaskRunningTimeMs = (long) REQUEST_TIMEOUT * nodeCount / 2; + long startTime = System.currentTimeMillis(); + long pollIntervalMs = 200; + while (!futures.isEmpty()) { + if (hasTimedOut(startTime, maximumTaskRunningTimeMs)) { + LOG.warn("Stopping metrics collection task due to timeout. " + + "Remaining tasks: {}", futures.size()); + break; + } + processCompletedTasks(futures); + sleepBetweenPolls(pollIntervalMs); + } + } + + /** + * Checks if the task has exceeded the maximum allowed running time. + */ + private boolean hasTimedOut(long startTime, long maximumTaskRunningTimeMs) { + return (System.currentTimeMillis() - startTime) > maximumTaskRunningTimeMs; + } + + /** + * Processes all completed tasks and removes them from the futures map. + * Uses iterator to avoid ConcurrentModificationException. + */ + private void processCompletedTasks( + Map> futures) { + Iterator>> + iterator = futures.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry> entry = + iterator.next(); + + if (entry.getValue().isDone()) { + processCompletedTask(entry.getKey(), entry.getValue()); + iterator.remove(); + } + } + } + + /** + * Processes a single completed task and updates the metrics. + */ + private void processCompletedTask(DatanodePendingDeletionMetrics key, + Future future) { + try { + DatanodePendingDeletionMetrics result = future.get(REQUEST_TIMEOUT, TimeUnit.SECONDS); + if (result.getPendingBlockSize() < 0) { + totalNodesFailed += 1; + } else { + totalPendingDeletion += result.getPendingBlockSize(); + } + pendingDeletionList.add(result); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + logTaskFailure(key, e); + totalNodesFailed += 1; + // Add the key with -1 indicating failure + pendingDeletionList.add(key); + } + } + + /** + * Logs appropriate error message based on exception type. + */ + private void logTaskFailure(DatanodePendingDeletionMetrics key, Exception e) { + String errorType = getErrorType(e); + LOG.error("Task {} for datanode {} [{}]: {}", + errorType, key.getHostName(), key.getDatanodeUuid(), e.getMessage()); + } + + /** + * Returns a human-readable error type description. + */ + private String getErrorType(Exception e) { + if (e instanceof ExecutionException) { + return "execution failed"; + } else if (e instanceof InterruptedException) { + return "interrupted"; + } else if (e instanceof TimeoutException) { + return "timed out"; + } + return "failed"; + } + + /** + * Adds all remaining unfinished tasks as failed entries. + */ + private void addFailedEntries( + Map> futures) { + for (DatanodePendingDeletionMetrics key : futures.keySet()) { + pendingDeletionList.add(key); + totalNodesFailed++; + } + } + + /** + * Sleeps for the specified interval, handling interruptions gracefully. + */ + private void sleepBetweenPolls(long intervalMs) { + try { + Thread.sleep(intervalMs); + } catch (InterruptedException e) { + LOG.warn("Polling sleep interrupted: {}", e.getMessage()); + Thread.currentThread().interrupt(); // Restore interrupt status + } + } + + public synchronized DataNodeMetricsServiceResponse getCollectedMetrics() { + if (currentStatus == MetricCollectionStatus.NOT_STARTED || currentStatus == MetricCollectionStatus.IN_PROGRESS) { + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(MetricCollectionStatus.IN_PROGRESS) + .build(); + } + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(currentStatus) + .setPendingDeletion(pendingDeletionList) + .setTotalPendingDeletion(totalPendingDeletion) + .setTotalNodesQueried(totalNodesQueried) + .setTotalNodeQueryFailures(totalNodesFailed) + .build(); + } + + public MetricCollectionStatus getTaskStatus() { + return currentStatus; + } + + /** + * Enum representing the status of a metric collection task. + * This enum is used to describe the various stages in the lifecycle of + * a metric collection operation. + */ + public enum MetricCollectionStatus { + SUCCEEDED, IN_PROGRESS, NOT_STARTED + } + + @PreDestroy + public void shutdown() { + LOG.info("Shutting down DataNodeMetricsService executor..."); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java new file mode 100644 index 000000000000..e47dc0554671 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * REST API endpoint that provides metrics and information related to + * pending deletions. It responds to requests on the "/pendingDeletion" path + * and produces application/json responses. + */ +@Path("/pendingDeletion") +@Produces("application/json") +@AdminOnly +public class PendingDeletionEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(PendingDeletionEndpoint.class); + private final ReconGlobalMetricsService reconGlobalMetricsService; + private final DataNodeMetricsService dataNodeMetricsService; + private final StorageContainerLocationProtocol scmClient; + + @Inject + public PendingDeletionEndpoint(ReconGlobalMetricsService reconGlobalMetricsService, + DataNodeMetricsService dataNodeMetricsService, + StorageContainerLocationProtocol scmClient) { + this.reconGlobalMetricsService = reconGlobalMetricsService; + this.dataNodeMetricsService = dataNodeMetricsService; + this.scmClient = scmClient; + } + + @GET + public Response getPendingDeletionByComponent(@QueryParam("component") String component) { + if (component == null || component.isEmpty()) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("component query parameter is required").build(); + } + final String normalizedComponent = component.trim().toLowerCase(); + switch (normalizedComponent) { + case "dn": + return handleDataNodeMetrics(); + case "scm": + return handleScmPendingDeletion(); + case "om": + return handleOmPendingDeletion(); + default: + return Response.status(Response.Status.BAD_REQUEST) + .entity("component query parameter must be one of dn, scm, om").build(); + } + } + + private Response handleDataNodeMetrics() { + DataNodeMetricsService.MetricCollectionStatus status = dataNodeMetricsService.getTaskStatus(); + if (status != DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) { + CompletableFuture.runAsync(dataNodeMetricsService::startTask); + } + DataNodeMetricsServiceResponse response = dataNodeMetricsService.getCollectedMetrics(); + if (response.getStatus() == DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) { + return Response.accepted(response).build(); + } else { + return Response.ok(response).build(); + } + } + + private Response handleScmPendingDeletion() { + try { + HddsProtos.DeletedBlocksTransactionSummary summary = scmClient.getDeletedBlockSummary(); + if (summary == null) { + return Response.noContent() + .build(); + } + ScmPendingDeletion pendingDeletion = new ScmPendingDeletion( + summary.getTotalBlockSize(), + summary.getTotalBlockReplicatedSize(), + summary.getTotalBlockCount()); + return Response.ok(pendingDeletion).build(); + } catch (Exception e) { + LOG.error("Error getting SCM pending deletion", e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Error retrieving SCM pending deletion: " + e.getMessage()) + .build(); + } + } + + private Response handleOmPendingDeletion() { + Map pendingDeletion = reconGlobalMetricsService.calculatePendingSizes(); + return Response.ok(pendingDeletion).build(); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java new file mode 100644 index 000000000000..6ec9cda0a59e --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService; + +/** + * Represents a response from the DataNodeMetricsService. + * This class encapsulates the result of a metrics collection task, + * including the collection status, total pending deletions across all data nodes, + * and details about pending deletions for each data node. + * + * Instances of this class are created using the {@link Builder} class. + */ +public class DataNodeMetricsServiceResponse { + @JsonProperty("status") + private DataNodeMetricsService.MetricCollectionStatus status; + @JsonProperty("totalPendingDeletion") + private Long totalPendingDeletion; + @JsonProperty("pendingDeletionPerDataNode") + private List pendingDeletionPerDataNode; + @JsonProperty("totalNodesQueried") + private int totalNodesQueried; + @JsonProperty("totalNodeQueriesFailed") + private long totalNodeQueryFailures; + + public DataNodeMetricsServiceResponse(Builder builder) { + this.status = builder.status; + this.totalPendingDeletion = builder.totalPendingDeletion; + this.pendingDeletionPerDataNode = builder.pendingDeletion; + this.totalNodesQueried = builder.totalNodesQueried; + this.totalNodeQueryFailures = builder.totalNodeQueryFailures; + } + + public DataNodeMetricsServiceResponse() { + this.status = DataNodeMetricsService.MetricCollectionStatus.NOT_STARTED; + this.totalPendingDeletion = 0L; + this.pendingDeletionPerDataNode = null; + this.totalNodesQueried = 0; + this.totalNodeQueryFailures = 0; + } + + public DataNodeMetricsService.MetricCollectionStatus getStatus() { + return status; + } + + public Long getTotalPendingDeletion() { + return totalPendingDeletion; + } + + public List getPendingDeletionPerDataNode() { + return pendingDeletionPerDataNode; + } + + public int getTotalNodesQueried() { + return totalNodesQueried; + } + + public long getTotalNodeQueryFailures() { + return totalNodeQueryFailures; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for constructing instances of {@link DataNodeMetricsServiceResponse}. + * This class provides a fluent interface for setting the various properties + * of a DataNodeMetricsServiceResponse object before creating a new immutable instance. + * The Builder is designed to be used in a staged and intuitive manner. + * The properties that can be configured include: + * - Status of the metric collection process. + * - Total number of blocks pending deletion across all data nodes. + * - Metrics related to pending deletions from individual data nodes. + */ + public static final class Builder { + private DataNodeMetricsService.MetricCollectionStatus status; + private Long totalPendingDeletion; + private List pendingDeletion; + private int totalNodesQueried; + private long totalNodeQueryFailures; + + public Builder setStatus(DataNodeMetricsService.MetricCollectionStatus status) { + this.status = status; + return this; + } + + public Builder setTotalPendingDeletion(Long totalPendingDeletion) { + this.totalPendingDeletion = totalPendingDeletion; + return this; + } + + public Builder setPendingDeletion(List pendingDeletion) { + this.pendingDeletion = pendingDeletion; + return this; + } + + public Builder setTotalNodesQueried(int totalNodesQueried) { + this.totalNodesQueried = totalNodesQueried; + return this; + } + + public Builder setTotalNodeQueryFailures(long totalNodeQueryFailures) { + this.totalNodeQueryFailures = totalNodeQueryFailures; + return this; + } + + public DataNodeMetricsServiceResponse build() { + return new DataNodeMetricsServiceResponse(this); + } + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java new file mode 100644 index 000000000000..964add573096 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents pending deletion metrics for a datanode. + * This class encapsulates information about blocks pending deletion on a specific datanode. + */ +public class DatanodePendingDeletionMetrics { + + @JsonProperty("hostName") + private final String hostName; + + @JsonProperty("datanodeUuid") + private final String datanodeUuid; + + @JsonProperty("pendingBlockSize") + private final long pendingBlockSize; + + @JsonCreator + public DatanodePendingDeletionMetrics( + @JsonProperty("hostName") String hostName, + @JsonProperty("datanodeUuid") String datanodeUuid, + @JsonProperty("pendingBlockSize") long pendingBlockSize) { + this.hostName = hostName; + this.datanodeUuid = datanodeUuid; + this.pendingBlockSize = pendingBlockSize; + } + + public String getHostName() { + return hostName; + } + + public long getPendingBlockSize() { + return pendingBlockSize; + } + + public String getDatanodeUuid() { + return datanodeUuid; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java new file mode 100644 index 000000000000..edab570e20e5 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents metadata related to pending deletions in the storage container manager (SCM). + * This class encapsulates information such as the total block size, the total size of replicated blocks, + * and the total number of blocks awaiting deletion. + */ +public class ScmPendingDeletion { + @JsonProperty("totalBlocksize") + private final long totalBlocksize; + @JsonProperty + private final long totalReplicatedBlockSize; + @JsonProperty("totalBlocksCount") + private final long totalBlocksCount; + + public ScmPendingDeletion() { + this.totalBlocksize = 0; + this.totalReplicatedBlockSize = 0; + this.totalBlocksCount = 0; + } + + public ScmPendingDeletion(long size, long replicatedSize, long totalBlocks) { + this.totalBlocksize = size; + this.totalReplicatedBlockSize = replicatedSize; + this.totalBlocksCount = totalBlocks; + } + + public long getTotalBlocksize() { + return totalBlocksize; + } + + public long getTotalReplicatedBlockSize() { + return totalReplicatedBlockSize; + } + + public long getTotalBlocksCount() { + return totalBlocksCount; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java index 3d0aa3140671..7ddba027bfec 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java @@ -19,6 +19,7 @@ import java.net.HttpURLConnection; import java.util.List; +import java.util.Map; import org.apache.hadoop.ozone.recon.metrics.Metric; /** @@ -48,12 +49,12 @@ HttpURLConnection getMetricsResponse(String api, String queryString) List getMetricsInstant(String queryString) throws Exception; /** - * Returns a list of {@link Metric} for the given ranged query. + * Returns a list of {@link Map} for the given query. * * @param queryString query string with metric name, start time, end time, * step and other filters. * @return List of Json map of metrics response. * @throws Exception exception */ - List getMetricsRanged(String queryString) throws Exception; + List> getMetrics(String queryString) throws Exception; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java new file mode 100644 index 000000000000..0426838050be --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.spi.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.inject.Singleton; +import javax.ws.rs.core.Response; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.ozone.recon.ReconUtils; +import org.apache.hadoop.ozone.recon.metrics.Metric; +import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; + +/** + * Implementation of the Prometheus Metrics Service provider. + */ +@Singleton +public class JmxServiceProviderImpl implements MetricsServiceProvider { + + public static final String JMX_INSTANT_QUERY_API = "qry"; + private URLConnectionFactory connectionFactory; + private final String jmxEndpoint; + private ReconUtils reconUtils; + + public JmxServiceProviderImpl(OzoneConfiguration configuration, + ReconUtils reconUtils, String jmxEndpoint) { + + connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(configuration); + // Remove the trailing slash from endpoint url. + if (jmxEndpoint != null && jmxEndpoint.endsWith("/")) { + jmxEndpoint = jmxEndpoint.substring(0, jmxEndpoint.length() - 1); + } + this.jmxEndpoint = jmxEndpoint; + this.reconUtils = reconUtils; + } + + /** + * Returns {@link HttpURLConnection} after querying Metrics endpoint for the + * given metric. + * + * @param api api. + * @param queryString query string with metric name and other filters. + * @return HttpURLConnection + * @throws Exception exception + */ + @Override + public HttpURLConnection getMetricsResponse(String api, String queryString) + throws Exception { + String url = String.format("%s?%s=%s", jmxEndpoint, api, + queryString); + return reconUtils.makeHttpCall(connectionFactory, + url, false); + } + + @Override + public List getMetricsInstant(String queryString) throws Exception { + return Collections.emptyList(); + } + + /** + * Returns a list of {@link Metric} for the given instant query. + * + * @param queryString query string with metric name and other filters. + * @return List of Json map of metrics response. + * @throws Exception exception + */ + @Override + public List> getMetrics(String queryString) + throws Exception { + return getMetrics(JMX_INSTANT_QUERY_API, queryString); + } + + /** + * Returns a list of {@link Metric} for the given api and query string. + * + * @param api api + * @param queryString query string with metric name and other filters. + * @return List of Json map of metrics response. + * @throws Exception + */ + private List> getMetrics(String api, String queryString) + throws Exception { + HttpURLConnection urlConnection = + getMetricsResponse(api, queryString); + if (Response.Status.fromStatusCode(urlConnection.getResponseCode()) + .getFamily() == Response.Status.Family.SUCCESSFUL) { + InputStream inputStream = urlConnection.getInputStream(); + ObjectMapper mapper = new ObjectMapper(); + Map jsonMap = mapper.readValue(inputStream, Map.class); + inputStream.close(); + Object beansObj = jsonMap.get("beans"); + if (beansObj instanceof List) { + return (List>) beansObj; + } + } + return Collections.emptyList(); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java index 64c00490613e..c1df1c8d3dc6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -49,7 +50,6 @@ public class PrometheusServiceProviderImpl implements MetricsServiceProvider { public static final String PROMETHEUS_INSTANT_QUERY_API = "query"; - public static final String PROMETHEUS_RANGED_QUERY_API = "query_range"; private static final Logger LOG = LoggerFactory.getLogger(PrometheusServiceProviderImpl.class); @@ -123,17 +123,9 @@ public List getMetricsInstant(String queryString) return getMetrics(PROMETHEUS_INSTANT_QUERY_API, queryString); } - /** - * Returns a list of {@link Metric} for the given ranged query. - * - * @param queryString query string with metric name, start time, end time, - * step and other filters. - * @return List of Json map of metrics response. - * @throws Exception exception - */ @Override - public List getMetricsRanged(String queryString) throws Exception { - return getMetrics(PROMETHEUS_RANGED_QUERY_API, queryString); + public List> getMetrics(String queryString) throws Exception { + return Collections.emptyList(); } /** diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java new file mode 100644 index 000000000000..abc94edb5bc8 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; +import org.apache.hadoop.ozone.recon.ReconUtils; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Task for collecting pending deletion metrics from a DataNode using JMX. + * This class implements the Callable interface and retrieves pending deletion + * information (e.g., total pending block bytes) from a DataNode by invoking its + * JMX endpoint. The metrics are parsed and encapsulated in the + * {@link DatanodePendingDeletionMetrics} object. + */ +public class DataNodeMetricsCollectionTask implements Callable { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsCollectionTask.class); + private final DatanodeDetails nodeDetails; + private final boolean httpsEnabled; + private final MetricsServiceProvider metricsServiceProvider; + private static final String BEAN_NAME = "Hadoop:service=HddsDatanode,name=BlockDeletingService"; + + public DataNodeMetricsCollectionTask(DatanodeDetails nodeDetails, boolean httpsEnabled, + MetricsServiceProviderFactory factory) { + this.nodeDetails = nodeDetails; + this.httpsEnabled = httpsEnabled; + this.metricsServiceProvider = factory.getJmxMetricsServiceProvider(getJmxMetricsUrl()); + } + + @Override + public DatanodePendingDeletionMetrics call() { + LOG.debug("Collecting pending deletion metrics from DataNode {}", nodeDetails.getHostName()); + try { + List> metrics = metricsServiceProvider.getMetrics(BEAN_NAME); + if (metrics == null) { + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); + } + Map deletionMetrics = ReconUtils.getMetricsData(metrics, BEAN_NAME); + long pendingBlockSize = ReconUtils.extractMetricValue(deletionMetrics, "TotalPendingBlockBytes"); + + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), pendingBlockSize); + + } catch (Exception e) { + LOG.error("Connection timeout while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); + } + } + + private String getJmxMetricsUrl() { + String protocol = httpsEnabled ? "https" : "http"; + Name portName = httpsEnabled ? DatanodeDetails.Port.Name.HTTPS : DatanodeDetails.Port.Name.HTTP; + return String.format("%s://%s:%d/jmx", + protocol, + nodeDetails.getHostName(), + nodeDetails.getPort(portName).getValue()); + } +}