From 7e6eb86f9db982da158ce0739137976860fb532e Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Tue, 25 Nov 2025 23:32:59 +0530 Subject: [PATCH 01/12] HDDS-14010. Adding endpooint to get pending deletion data. --- .../TestStorageDistributionEndpoint.java | 32 +++-- hadoop-ozone/recon/pom.xml | 17 +++ .../recon/api/DataNodeMetricsService.java | 126 ++++++++++++++++++ .../recon/api/PendingDeletionEndpoint.java | 120 +++++++++++++++++ .../types/DataNodeMetricsServiceResponse.java | 96 +++++++++++++ .../types/DatanodePendingDeletionMetrics.java | 55 ++++++++ .../recon/api/types/ScmPendingDeletion.java | 40 ++++++ .../tasks/DataNodeMetricsCollectionTask.java | 102 ++++++++++++++ 8 files changed, 576 insertions(+), 12 deletions(-) create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java index c43958fe1549..9e66dbd196cb 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java @@ -190,19 +190,27 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) } } waitForKeysCreated(replicationConfig); - Thread.sleep(10000); - StringBuilder urlBuilder = new StringBuilder(); - urlBuilder.append(getReconWebAddress(conf)) - .append(STORAGE_DIST_ENDPOINT); - String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); - StorageCapacityDistributionResponse storageResponse = - MAPPER.readValue(response, StorageCapacityDistributionResponse.class); + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(STORAGE_DIST_ENDPOINT); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + StorageCapacityDistributionResponse storageResponse = + MAPPER.readValue(response, StorageCapacityDistributionResponse.class); - assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys()); - assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace()); - assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes()); - assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes()); - assertEquals(3, storageResponse.getDataNodeUsage().size()); + assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys()); + assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace()); + assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes()); + assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes()); + assertEquals(3, storageResponse.getDataNodeUsage().size()); + + return true; + } catch (Exception e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 5000, 30000); } private void verifyBlocksCreated( diff --git a/hadoop-ozone/recon/pom.xml b/hadoop-ozone/recon/pom.xml index 9a0936ebf194..0e5f522cfdff 100644 --- a/hadoop-ozone/recon/pom.xml +++ b/hadoop-ozone/recon/pom.xml @@ -126,6 +126,23 @@ org.apache.hadoop hadoop-hdfs-client + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + + commons-logging + commons-logging + + + + + org.apache.httpcomponents + httpcore + ${httpcore.version} + org.apache.ozone hdds-common diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java new file mode 100644 index 000000000000..60b66fbfc797 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The DataNodeMetricsService class is responsible for collecting and managing + * metrics related to datanodes in an Ozone Recon environment. Specifically, + * it gathers metrics about pending block deletions from the datanodes and + * provides aggregated results. + * This service tracks the status of metric collection tasks and provides + * an interface to query the state and results of these tasks. + * The metrics collection process involves communicating with each datanode, + * fetching their pending deletion metrics, and aggregating the data. + */ +@Singleton +public class DataNodeMetricsService { + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private Long totalPendingDeletion = 0L; + private List pendingDeletionList; + private final ReconNodeManager reconNodeManager; + + @Inject + public DataNodeMetricsService(OzoneStorageContainerManager reconSCM) { + reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + } + + public void startTask() { + Set nodes = reconNodeManager.getNodeStats().keySet(); + pendingDeletionList = new ArrayList<>(); + totalPendingDeletion = 0L; + currentStatus = MetricCollectionStatus.IN_PROGRESS; + ExecutorService executor = Executors.newFixedThreadPool(10); + List> futures = new ArrayList<>(); + for (DatanodeDetails node : nodes) { + String hostName = node.getHostName(); + String uuid = node.getUuidString(); + int port = node.getPort(DatanodeDetails.Port.Name.HTTP).getValue(); + DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask(hostName, port, uuid); + futures.add(executor.submit(task)); + } + boolean hasTimedOut = false; + for (Future future : futures) { + try { + DatanodePendingDeletionMetrics result = future.get(30, TimeUnit.SECONDS); + totalPendingDeletion += result.getPendingBlockSize(); + pendingDeletionList.add(result); + } catch (TimeoutException e) { + hasTimedOut = true; + LOG.error("Task timed out after " + 30 + " seconds: {}", e.getMessage()); + } catch (Exception e) { + System.err.println("Task failed or was interrupted: " + e.getMessage()); + } + } + executor.shutdownNow(); + if (hasTimedOut) { + currentStatus = MetricCollectionStatus.FAILED; + } else { + currentStatus = MetricCollectionStatus.SUCCEEDED; + } + } + + public DataNodeMetricsServiceResponse getCollectedMetrics() { + if (currentStatus == MetricCollectionStatus.SUCCEEDED) { + currentStatus = MetricCollectionStatus.NOT_STARTED; + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(MetricCollectionStatus.SUCCEEDED) + .setPendingDeletion(pendingDeletionList) + .setTotalPendingDeletion(totalPendingDeletion) + .build(); + } else { + DataNodeMetricsServiceResponse response = DataNodeMetricsServiceResponse.newBuilder() + .setStatus(currentStatus) + .build(); + currentStatus = MetricCollectionStatus.NOT_STARTED; + return response; + } + } + + public MetricCollectionStatus getTaskStatus() { + return currentStatus; + } + + /** + * Enum representing the status of a metric collection task. + * This enum is used to describe the various stages in the lifecycle of + * a metric collection operation. + */ + public enum MetricCollectionStatus { + SUCCEEDED, FAILED, IN_PROGRESS, NOT_STARTED + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java new file mode 100644 index 000000000000..18130ceec981 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * REST API endpoint that provides metrics and information related to + * pending deletions. It responds to requests on the "/pendingDeletion" path + * and produces application/json responses. + */ +@Path("/pendingDeletion") +@Produces("application/json") +@AdminOnly +public class PendingDeletionEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(PendingDeletionEndpoint.class); + private final ReconGlobalMetricsService reconGlobalMetricsService; + private final DataNodeMetricsService dataNodeMetricsService; + private final StorageContainerLocationProtocol scmClient; + + @Inject + public PendingDeletionEndpoint(ReconGlobalMetricsService reconGlobalMetricsService, + DataNodeMetricsService dataNodeMetricsService, + StorageContainerLocationProtocol scmClient) { + this.reconGlobalMetricsService = reconGlobalMetricsService; + this.dataNodeMetricsService = dataNodeMetricsService; + this.scmClient = scmClient; + } + + @GET + public Response getPendingDeletionByComponent(@QueryParam("component") String component) { + if (component.isEmpty()) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("component query parameter is required").build(); + } + final String normalizedComponent = component.trim().toLowerCase(); + switch (normalizedComponent) { + case "dn": + return handleDataNodeMetrics(); + case "scm": + return handleScmPendingDeletion(); + case "om": + return handleOmPendingDeletion(); + default: + return Response.status(Response.Status.BAD_REQUEST) + .entity("component query parameter must be one of dn, scm, om").build(); + } + } + + private Response handleDataNodeMetrics() { + DataNodeMetricsService.MetricCollectionStatus status = dataNodeMetricsService.getTaskStatus(); + if (status == DataNodeMetricsService.MetricCollectionStatus.NOT_STARTED) { + CompletableFuture.runAsync(dataNodeMetricsService::startTask); + return Response.accepted(DataNodeMetricsServiceResponse.newBuilder() + .setStatus(DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) + .build() + ).build(); + } else if (status == DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED) { + return Response.ok(dataNodeMetricsService.getCollectedMetrics()).build(); + } else { + return Response.accepted(DataNodeMetricsServiceResponse.newBuilder() + .setStatus(status) + .build() + ).build(); + } + } + + private Response handleScmPendingDeletion() { + try { + HddsProtos.DeletedBlocksTransactionSummary summary = scmClient.getDeletedBlockSummary(); + if (summary == null) { + return Response.noContent() + .build(); + } + ScmPendingDeletion pendingDeletion = new ScmPendingDeletion( + summary.getTotalBlockSize(), + summary.getTotalBlockReplicatedSize(), + summary.getTotalBlockCount()); + return Response.ok(pendingDeletion).build(); + } catch (Exception e) { + LOG.error("Error getting SCM pending deletion", e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Error retrieving SCM pending deletion: " + e.getMessage()) + .build(); + } + } + + private Response handleOmPendingDeletion() { + Map pendingDeletion = reconGlobalMetricsService.calculatePendingSizes(); + return Response.ok(pendingDeletion).build(); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java new file mode 100644 index 000000000000..e51aefdd7165 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService; + +/** + * Represents a response from the DataNodeMetricsService. + * This class encapsulates the result of a metrics collection task, + * including the collection status, total pending deletions across all data nodes, + * and details about pending deletions for each data node. + * + * Instances of this class are created using the {@link Builder} class. + */ +public class DataNodeMetricsServiceResponse { + @JsonProperty("status") + private DataNodeMetricsService.MetricCollectionStatus status; + @JsonProperty("totalPendingDeletion") + private Long totalPendingDeletion; + @JsonProperty("pendingDeletionPerDataNode") + private List pendingDeletionPerDataNode; + + public DataNodeMetricsServiceResponse(Builder builder) { + this.status = builder.status; + this.totalPendingDeletion = builder.totalPendingDeletion; + this.pendingDeletionPerDataNode = builder.pendingDeletion; + } + + public DataNodeMetricsService.MetricCollectionStatus getStatus() { + return status; + } + + public Long getTotalPendingDeletion() { + return totalPendingDeletion; + } + + public List getPendingDeletionPerDataNode() { + return pendingDeletionPerDataNode; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for constructing instances of {@link DataNodeMetricsServiceResponse}. + * This class provides a fluent interface for setting the various properties + * of a DataNodeMetricsServiceResponse object before creating a new immutable instance. + * The Builder is designed to be used in a staged and intuitive manner. + * The properties that can be configured include: + * - Status of the metric collection process. + * - Total number of blocks pending deletion across all data nodes. + * - Metrics related to pending deletions from individual data nodes. + */ + public static final class Builder { + private DataNodeMetricsService.MetricCollectionStatus status; + private Long totalPendingDeletion; + private List pendingDeletion; + + public Builder setStatus(DataNodeMetricsService.MetricCollectionStatus status) { + this.status = status; + return this; + } + + public Builder setTotalPendingDeletion(Long totalPendingDeletion) { + this.totalPendingDeletion = totalPendingDeletion; + return this; + } + + public Builder setPendingDeletion(List pendingDeletion) { + this.pendingDeletion = pendingDeletion; + return this; + } + + public DataNodeMetricsServiceResponse build() { + return new DataNodeMetricsServiceResponse(this); + } + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java new file mode 100644 index 000000000000..9e352565f036 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents pending deletion metrics for a datanode. + * This class encapsulates information about blocks pending deletion on a specific datanode. + */ +public class DatanodePendingDeletionMetrics { + + @JsonProperty("hostName") + private final String hostName; + + @JsonProperty("datanodeUuid") + private final String datanodeUuid; + + @JsonProperty("pendingBlockSize") + private final long pendingBlockSize; + + public DatanodePendingDeletionMetrics(String hostName, String datanodeUuid, long pendingBlockSize) { + this.hostName = hostName; + this.datanodeUuid = datanodeUuid; + this.pendingBlockSize = pendingBlockSize; + } + + public String getHostName() { + return hostName; + } + + public long getPendingBlockSize() { + return pendingBlockSize; + } + + public String getDatanodeUuid() { + return datanodeUuid; + } +} + diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java new file mode 100644 index 000000000000..fde44289183a --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents metadata related to pending deletions in the storage container manager (SCM). + * This class encapsulates information such as the total block size, the total size of replicated blocks, + * and the total number of blocks awaiting deletion. + */ +public class ScmPendingDeletion { + @JsonProperty("totalBlocksize") + private final long totalBlocksize; + @JsonProperty + private final long totalReplicatedBlockSize; + @JsonProperty("totalBlocksCount") + private final long totalBlocksCount; + + public ScmPendingDeletion(long size, long replicatedSize, long totalBlocks) { + this.totalBlocksize = size; + this.totalReplicatedBlockSize = replicatedSize; + this.totalBlocksCount = totalBlocks; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java new file mode 100644 index 000000000000..da8438c306f4 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Task for collecting pending deletion metrics from a DataNode using JMX. + * This class implements the Callable interface and retrieves pending deletion + * information (e.g., total pending block bytes) from a DataNode by invoking its + * JMX endpoint. The metrics are parsed and encapsulated in the + * {@link DatanodePendingDeletionMetrics} object. + */ +public class DataNodeMetricsCollectionTask implements Callable { + + private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsCollectionTask.class); + private final String host; + private final int port; + private final String nodeUuid; + private static ObjectMapper objectMapper = new ObjectMapper(); + + public DataNodeMetricsCollectionTask(String host, int port, String nodeUuid) { + this.host = host; + this.port = port; + this.nodeUuid = nodeUuid; + } + + @Override + public DatanodePendingDeletionMetrics call() throws Exception { + LOG.debug("Collecting pending deletion metrics from DataNode {}:{}", host, port); + try (CloseableHttpClient httpClient = HttpClients.custom() + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectTimeout(60000) + .setSocketTimeout(60000) + .build()) + .build()) { + + InputStream in = httpClient.execute(new HttpGet(getJmxMetricsUrl())).getEntity().getContent(); + byte[] responseBytes = IOUtils.toByteArray(in); + String jsonResponse = new String(responseBytes, StandardCharsets.UTF_8); + return new DatanodePendingDeletionMetrics(host, nodeUuid, + parseMetrics(jsonResponse, "BlockDeletingService", "TotalPendingBlockBytes")); + } + } + + private static long parseMetrics(String jsonResponse, String serviceName, String keyName) + throws IOException { + if (jsonResponse == null || jsonResponse.isEmpty()) { + return -1L; + } + JsonNode root = objectMapper.readTree(jsonResponse); + JsonNode beans = root.get("beans"); + if (beans != null && beans.isArray()) { + // Find the bean matching the service name + for (JsonNode bean : beans) { + String beanName = bean.path("name").asText(""); + if (beanName.contains(serviceName)) { + // Extract and return the metric value from the bean + return extractMetrics(bean, keyName); + } + } + } + return -1L; + } + + private static long extractMetrics(JsonNode beanNode, String keyName) { + return beanNode.path(keyName).asLong(0L); + } + + private String getJmxMetricsUrl() { + return String.format("%s://%s:%d/jmx?qry=Hadoop:service=%s,name=%s", "http", + host, port, "HddsDatanode", "BlockDeletingService"); + } +} From fd21837b2e3efcf8d81d8822dd88c62facdbb3d3 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Tue, 2 Dec 2025 21:48:59 +0530 Subject: [PATCH 02/12] adding testcase for pending deletion --- .../TestStorageDistributionEndpoint.java | 79 ++++++++++++++++++- .../recon/api/DataNodeMetricsService.java | 12 ++- .../types/DataNodeMetricsServiceResponse.java | 6 ++ .../types/DatanodePendingDeletionMetrics.java | 10 ++- .../recon/api/types/ScmPendingDeletion.java | 18 +++++ 5 files changed, 115 insertions(+), 10 deletions(-) diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java index 9e66dbd196cb..241169329a51 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java @@ -35,6 +35,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; @@ -46,6 +47,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.HddsDatanodeService; @@ -66,6 +69,9 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; import org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.ozone.test.GenericTestUtils; @@ -100,6 +106,7 @@ public class TestStorageDistributionEndpoint { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String STORAGE_DIST_ENDPOINT = "/api/v1/storageDistribution"; + private static final String PENDING_DELETION_ENDPOINT = "/api/v1/pendingDeletion"; static List replicationConfigs() { return Collections.singletonList( @@ -131,16 +138,14 @@ public static void setup() throws Exception { // Enhanced DataNode configuration to move pending deletion from SCM to DN faster DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class); - dnConf.setBlockDeletionInterval(Duration.ofMillis(100)); + dnConf.setBlockDeletionInterval(Duration.ofMillis(10000)); // Increase block delete queue limit to allow more queued commands on DN dnConf.setBlockDeleteQueueLimit(50); // Reduce the interval for delete command worker processing - dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100)); + dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(10000)); // Increase blocks deleted per interval to speed up deletion dnConf.setBlockDeletionLimit(5000); conf.setFromObject(dnConf); - // Increase DN delete threads for faster parallel processing - conf.setInt("ozone.datanode.block.delete.threads.max", 10); recon = new ReconService(conf); cluster = MiniOzoneCluster.newBuilder(conf) @@ -211,6 +216,64 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) return false; } }, 5000, 30000); + closeAllContainers(); + fs.delete(dir1, true); + GenericTestUtils.waitFor(() -> { + try { + syncDataFromOM(); + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT+"?component=om"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + Map pendingDeletionMap = + MAPPER.readValue(response, Map.class); + assertEquals(30, pendingDeletionMap.get("totalSize")); + assertEquals(30, pendingDeletionMap.get("pendingDirectorySize") + pendingDeletionMap.get("pendingKeySize")); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 2000, 60000); + + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT+"?component=scm"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + ScmPendingDeletion pendingDeletion = + MAPPER.readValue(response, ScmPendingDeletion.class); + assertEquals(30, pendingDeletion.getTotalReplicatedBlockSize()); + assertEquals(10, pendingDeletion.getTotalBlocksize()); + assertEquals(10, pendingDeletion.getTotalBlocksCount()); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 2000, 60000); + GenericTestUtils.waitFor(() -> { + try { + scm.getScmHAManager().asSCMHADBTransactionBuffer().flush(); + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT+"?component=dn"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + DataNodeMetricsServiceResponse pendingDeletion = + MAPPER.readValue(response, DataNodeMetricsServiceResponse.class); + assertNotNull(pendingDeletion); + assertEquals(30, pendingDeletion.getTotalPendingDeletion()); + assertEquals(DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED, pendingDeletion.getStatus()); + pendingDeletion.getPendingDeletionPerDataNode().forEach(dn -> { + assertEquals(10, dn.getPendingBlockSize()); + }); + return true; + } catch (Throwable e) { + LOG.debug("Waiting for storage distribution assertions to pass", e); + return false; + } + }, 2000, 60000); } private void verifyBlocksCreated( @@ -274,6 +337,14 @@ private static void performOperationOnKeyContainers( } } + private static void closeAllContainers() { + for (ContainerInfo container : + scm.getContainerManager().getContainers()) { + scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER, + container.containerID()); + } + } + @AfterEach public void cleanup() { assertDoesNotThrow(() -> { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index 60b66fbfc797..1c468fb7cb17 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -17,6 +17,9 @@ package org.apache.hadoop.ozone.recon.api; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_KEY; + import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -27,6 +30,7 @@ import java.util.concurrent.TimeoutException; import javax.inject.Inject; import javax.inject.Singleton; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; @@ -35,7 +39,6 @@ import org.apache.hadoop.ozone.recon.tasks.DataNodeMetricsCollectionTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * The DataNodeMetricsService class is responsible for collecting and managing * metrics related to datanodes in an Ozone Recon environment. Specifically, @@ -46,6 +49,7 @@ * The metrics collection process involves communicating with each datanode, * fetching their pending deletion metrics, and aggregating the data. */ + @Singleton public class DataNodeMetricsService { private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); @@ -53,10 +57,12 @@ public class DataNodeMetricsService { private Long totalPendingDeletion = 0L; private List pendingDeletionList; private final ReconNodeManager reconNodeManager; + private final int threadCount; @Inject - public DataNodeMetricsService(OzoneStorageContainerManager reconSCM) { + public DataNodeMetricsService(OzoneStorageContainerManager reconSCM, OzoneConfiguration conf) { reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); + threadCount = conf.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY, OZONE_RECON_TASK_THREAD_COUNT_DEFAULT); } public void startTask() { @@ -64,7 +70,7 @@ public void startTask() { pendingDeletionList = new ArrayList<>(); totalPendingDeletion = 0L; currentStatus = MetricCollectionStatus.IN_PROGRESS; - ExecutorService executor = Executors.newFixedThreadPool(10); + ExecutorService executor = Executors.newFixedThreadPool(threadCount); List> futures = new ArrayList<>(); for (DatanodeDetails node : nodes) { String hostName = node.getHostName(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java index e51aefdd7165..5e9afe88f7fa 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java @@ -43,6 +43,12 @@ public DataNodeMetricsServiceResponse(Builder builder) { this.pendingDeletionPerDataNode = builder.pendingDeletion; } + public DataNodeMetricsServiceResponse() { + this.status = DataNodeMetricsService.MetricCollectionStatus.NOT_STARTED; + this.totalPendingDeletion = 0L; + this.pendingDeletionPerDataNode = null; + } + public DataNodeMetricsService.MetricCollectionStatus getStatus() { return status; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java index 9e352565f036..ca70ed5dc60a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.recon.api.types; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; /** @@ -34,7 +35,11 @@ public class DatanodePendingDeletionMetrics { @JsonProperty("pendingBlockSize") private final long pendingBlockSize; - public DatanodePendingDeletionMetrics(String hostName, String datanodeUuid, long pendingBlockSize) { + @JsonCreator + public DatanodePendingDeletionMetrics( + @JsonProperty("hostName") String hostName, + @JsonProperty("datanodeUuid") String datanodeUuid, + @JsonProperty("pendingBlockSize") long pendingBlockSize) { this.hostName = hostName; this.datanodeUuid = datanodeUuid; this.pendingBlockSize = pendingBlockSize; @@ -51,5 +56,4 @@ public long getPendingBlockSize() { public String getDatanodeUuid() { return datanodeUuid; } -} - +} \ No newline at end of file diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java index fde44289183a..edab570e20e5 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ScmPendingDeletion.java @@ -32,9 +32,27 @@ public class ScmPendingDeletion { @JsonProperty("totalBlocksCount") private final long totalBlocksCount; + public ScmPendingDeletion() { + this.totalBlocksize = 0; + this.totalReplicatedBlockSize = 0; + this.totalBlocksCount = 0; + } + public ScmPendingDeletion(long size, long replicatedSize, long totalBlocks) { this.totalBlocksize = size; this.totalReplicatedBlockSize = replicatedSize; this.totalBlocksCount = totalBlocks; } + + public long getTotalBlocksize() { + return totalBlocksize; + } + + public long getTotalReplicatedBlockSize() { + return totalReplicatedBlockSize; + } + + public long getTotalBlocksCount() { + return totalBlocksCount; + } } From 79a0d5fe4a47044b0f158655387f75d8a3edc899 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Tue, 2 Dec 2025 22:26:28 +0530 Subject: [PATCH 03/12] fixing checkstyle --- .../hadoop/ozone/recon/TestStorageDistributionEndpoint.java | 6 +++--- .../recon/api/types/DatanodePendingDeletionMetrics.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java index 241169329a51..52f27e4840e4 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java @@ -223,7 +223,7 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) syncDataFromOM(); StringBuilder urlBuilder = new StringBuilder(); urlBuilder.append(getReconWebAddress(conf)) - .append(PENDING_DELETION_ENDPOINT+"?component=om"); + .append(PENDING_DELETION_ENDPOINT + "?component=om"); String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); Map pendingDeletionMap = MAPPER.readValue(response, Map.class); @@ -240,7 +240,7 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) try { StringBuilder urlBuilder = new StringBuilder(); urlBuilder.append(getReconWebAddress(conf)) - .append(PENDING_DELETION_ENDPOINT+"?component=scm"); + .append(PENDING_DELETION_ENDPOINT + "?component=scm"); String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); ScmPendingDeletion pendingDeletion = MAPPER.readValue(response, ScmPendingDeletion.class); @@ -258,7 +258,7 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) scm.getScmHAManager().asSCMHADBTransactionBuffer().flush(); StringBuilder urlBuilder = new StringBuilder(); urlBuilder.append(getReconWebAddress(conf)) - .append(PENDING_DELETION_ENDPOINT+"?component=dn"); + .append(PENDING_DELETION_ENDPOINT + "?component=dn"); String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); DataNodeMetricsServiceResponse pendingDeletion = MAPPER.readValue(response, DataNodeMetricsServiceResponse.class); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java index ca70ed5dc60a..964add573096 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodePendingDeletionMetrics.java @@ -56,4 +56,4 @@ public long getPendingBlockSize() { public String getDatanodeUuid() { return datanodeUuid; } -} \ No newline at end of file +} From 93dd60a1f58cdedcba80dfe7be590696eedbfd2a Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Thu, 4 Dec 2025 15:16:31 +0530 Subject: [PATCH 04/12] addressing review comments. --- .../src/main/resources/ozone-default.xml | 34 ++++++++++ .../ozone/recon/ReconServerConfigKeys.java | 16 +++++ .../recon/api/DataNodeMetricsService.java | 36 +++++++--- .../tasks/DataNodeMetricsCollectionTask.java | 66 ++++++++++++++++--- 4 files changed, 135 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 658a0d67d5ab..f3a942c1efb8 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3504,6 +3504,40 @@ If the buffer overflows, task reinitialization will be triggered. + + ozone.recon.dn.metrics.collection.thread.count + 1 + OZONE, RECON, DN + + The number of threads running to get metrics from jmx endpoint. + + + + ozone.recon.dn.http.request.timeout + 30 + OZONE, RECON, DN + + Timeout for the request submitted directly to datanode metrics. + + + + ozone.recon.dn.http.connection.timeout + 30 + OZONE, RECON, DN + + The maximum time allowed for a client to establish + the initial physical connection (TCP handshake) with the dn. + + + + ozone.recon.dn.http.socket.timeout + 30 + OZONE, RECON, DN + + The maximum time of inactivity allowed between any two consecutive data packets + when exchanging data with a server, after the connection is established. + + ozone.scm.datanode.admin.monitor.interval 30s diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index d3f684238712..a47fd7c6ea42 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -191,6 +191,22 @@ public final class ReconServerConfigKeys { public static final int OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3; + public static final String OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT = + "ozone.recon.dn.metrics.collection.thread.count"; + public static final int OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT = 10; + + public static final String OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT = + "ozone.recon.dn.http.request.timeout"; + public static final int OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT_DEFAULT = 60; + + public static final String OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT = + "ozone.recon.dn.http.connection.timeout"; + public static final int OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT = 10; + + public static final String OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT = + "ozone.recon.dn.http.socket.timeout"; + public static final int OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT = 30; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index 1c468fb7cb17..087e60340f86 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -17,8 +17,14 @@ package org.apache.hadoop.ozone.recon.api; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_DEFAULT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_KEY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT; import java.util.ArrayList; import java.util.List; @@ -58,36 +64,48 @@ public class DataNodeMetricsService { private List pendingDeletionList; private final ReconNodeManager reconNodeManager; private final int threadCount; + private final int httpRequestTimeout; + private final int httpConnectionTimeout; + private final int httpSocketTimeout; @Inject public DataNodeMetricsService(OzoneStorageContainerManager reconSCM, OzoneConfiguration conf) { reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); - threadCount = conf.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY, OZONE_RECON_TASK_THREAD_COUNT_DEFAULT); + threadCount = conf.getInt(OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT, + OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT); + httpRequestTimeout = conf.getInt(OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT, OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT_DEFAULT); + httpConnectionTimeout = conf.getInt(OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT, + OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT); + httpSocketTimeout = conf.getInt(OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT, OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT); } public void startTask() { Set nodes = reconNodeManager.getNodeStats().keySet(); - pendingDeletionList = new ArrayList<>(); + pendingDeletionList = new ArrayList<>(nodes.size()); totalPendingDeletion = 0L; currentStatus = MetricCollectionStatus.IN_PROGRESS; ExecutorService executor = Executors.newFixedThreadPool(threadCount); List> futures = new ArrayList<>(); for (DatanodeDetails node : nodes) { - String hostName = node.getHostName(); - String uuid = node.getUuidString(); int port = node.getPort(DatanodeDetails.Port.Name.HTTP).getValue(); - DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask(hostName, port, uuid); + DataNodeMetricsCollectionTask task = DataNodeMetricsCollectionTask.newBuilder() + .setHost(node.getHostName()) + .setPort(port) + .setNodeUuid(node.getUuidString()) + .setHttpConnectionTimeout(httpConnectionTimeout) + .setHttpSocketTimeout(httpSocketTimeout) + .build(); futures.add(executor.submit(task)); } boolean hasTimedOut = false; for (Future future : futures) { try { - DatanodePendingDeletionMetrics result = future.get(30, TimeUnit.SECONDS); + DatanodePendingDeletionMetrics result = future.get(httpRequestTimeout, TimeUnit.SECONDS); totalPendingDeletion += result.getPendingBlockSize(); pendingDeletionList.add(result); } catch (TimeoutException e) { hasTimedOut = true; - LOG.error("Task timed out after " + 30 + " seconds: {}", e.getMessage()); + LOG.error("Task timed out after " + httpRequestTimeout + " seconds: {}", e.getMessage()); } catch (Exception e) { System.err.println("Task failed or was interrupted: " + e.getMessage()); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java index da8438c306f4..9543fcdd0361 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java @@ -46,11 +46,16 @@ public class DataNodeMetricsCollectionTask implements Callable Date: Thu, 4 Dec 2025 18:09:16 +0530 Subject: [PATCH 05/12] handling https and json parsing --- .../apache/hadoop/ozone/recon/ReconUtils.java | 46 +++++++ .../recon/api/DataNodeMetricsService.java | 11 +- .../tasks/DataNodeMetricsCollectionTask.java | 128 ++++++++++-------- 3 files changed, 123 insertions(+), 62 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java index ccc92648f117..eccbe6b39821 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java @@ -27,6 +27,8 @@ import static org.jooq.impl.DSL.select; import static org.jooq.impl.DSL.using; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.inject.Singleton; @@ -847,4 +849,48 @@ public static String constructObjectPathWithPrefix(long... ids) { } return pathBuilder.toString(); } + + public static long parseMetrics(String jsonResponse, String serviceName, String keyName) { + ObjectMapper objectMapper = new ObjectMapper(); + if (jsonResponse == null || jsonResponse.isEmpty()) { + log.warn("Empty or null JSON response for service: {}", serviceName); + return -1L; + } + + try { + JsonNode root = objectMapper.readTree(jsonResponse); + if (root == null) { + log.warn("Failed to parse JSON response for service: {}", serviceName); + return -1L; + } + + JsonNode beans = root.get("beans"); + if (beans == null || !beans.isArray()) { + log.warn("No 'beans' array found in JSON response for service: {}", serviceName); + return -1L; + } + + // Find the bean matching the service name + for (JsonNode bean : beans) { + String beanName = bean.path("name").asText(""); + if (beanName.contains(serviceName)) { + // Extract and return the metric value from the bean + return extractMetrics(bean, keyName); + } + } + + log.warn("Service '{}' not found in JMX beans", serviceName); + return -1L; + } catch (IOException e) { + log.error("Failed to parse JSON response for service: {}", serviceName, e); + return -1L; + } catch (Exception e) { + log.error("Unexpected error parsing metrics for service: {}", serviceName, e); + return -1L; + } + } + + private static long extractMetrics(JsonNode beanNode, String keyName) { + return beanNode.path(keyName).asLong(0L); + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index 087e60340f86..9be0f6ba8a43 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.server.http.HttpConfig; import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; @@ -67,6 +68,7 @@ public class DataNodeMetricsService { private final int httpRequestTimeout; private final int httpConnectionTimeout; private final int httpSocketTimeout; + private final boolean httpsEnabled; @Inject public DataNodeMetricsService(OzoneStorageContainerManager reconSCM, OzoneConfiguration conf) { @@ -77,6 +79,7 @@ public DataNodeMetricsService(OzoneStorageContainerManager reconSCM, OzoneConfig httpConnectionTimeout = conf.getInt(OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT, OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT); httpSocketTimeout = conf.getInt(OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT, OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT); + httpsEnabled = HttpConfig.getHttpPolicy(conf).isHttpsEnabled(); } public void startTask() { @@ -87,13 +90,11 @@ public void startTask() { ExecutorService executor = Executors.newFixedThreadPool(threadCount); List> futures = new ArrayList<>(); for (DatanodeDetails node : nodes) { - int port = node.getPort(DatanodeDetails.Port.Name.HTTP).getValue(); DataNodeMetricsCollectionTask task = DataNodeMetricsCollectionTask.newBuilder() - .setHost(node.getHostName()) - .setPort(port) - .setNodeUuid(node.getUuidString()) + .setNodeDetails(node) .setHttpConnectionTimeout(httpConnectionTimeout) .setHttpSocketTimeout(httpSocketTimeout) + .setHttpsEnabled(httpsEnabled) .build(); futures.add(executor.submit(task)); } @@ -110,7 +111,7 @@ public void startTask() { System.err.println("Task failed or was interrupted: " + e.getMessage()); } } - executor.shutdownNow(); + executor.shutdown(); if (hasTimedOut) { currentStatus = MetricCollectionStatus.FAILED; } else { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java index 9543fcdd0361..47c63b4e83ff 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java @@ -17,16 +17,20 @@ package org.apache.hadoop.ozone.recon.tasks; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.InputStream; +import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.util.concurrent.Callable; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; +import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.http.HttpEntity; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; +import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; @@ -42,25 +46,22 @@ public class DataNodeMetricsCollectionTask implements Callable { private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsCollectionTask.class); - private final String host; - private final int port; - private final String nodeUuid; - private static ObjectMapper objectMapper = new ObjectMapper(); + private final DatanodeDetails nodeDetails; private final int httpConnectionTimeout; private final int httpSocketTimeout; + private final boolean httpsEnabled; public DataNodeMetricsCollectionTask(Builder builder) { - this.host = builder.host; - this.port = builder.port; - this.nodeUuid = builder.nodeUuid; - - httpConnectionTimeout = builder.httpConnectionTimeout; - httpSocketTimeout = builder.httpSocketTimeout; + this.nodeDetails = builder.nodeDetails; + this.httpConnectionTimeout = builder.httpConnectionTimeout; + this.httpSocketTimeout = builder.httpSocketTimeout; + this.httpsEnabled = builder.httpsEnabled; } @Override - public DatanodePendingDeletionMetrics call() throws Exception { - LOG.debug("Collecting pending deletion metrics from DataNode {}:{}", host, port); + public DatanodePendingDeletionMetrics call() { + + LOG.debug("Collecting pending deletion metrics from DataNode {}", nodeDetails.getHostName()); try (CloseableHttpClient httpClient = HttpClients.custom() .setDefaultRequestConfig(RequestConfig.custom() .setConnectTimeout(httpConnectionTimeout) @@ -68,41 +69,60 @@ public DatanodePendingDeletionMetrics call() throws Exception { .build()) .build()) { - InputStream in = httpClient.execute(new HttpGet(getJmxMetricsUrl())).getEntity().getContent(); - byte[] responseBytes = IOUtils.toByteArray(in); - String jsonResponse = new String(responseBytes, StandardCharsets.UTF_8); - return new DatanodePendingDeletionMetrics(host, nodeUuid, - parseMetrics(jsonResponse, "BlockDeletingService", "TotalPendingBlockBytes")); - } - } + HttpGet httpGet = new HttpGet(getJmxMetricsUrl()); + try { + HttpEntity entity = httpClient.execute(httpGet).getEntity(); + if (entity == null) { + LOG.warn("Empty response received from DataNode {}", nodeDetails.getHostName()); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); + } - private static long parseMetrics(String jsonResponse, String serviceName, String keyName) - throws IOException { - if (jsonResponse == null || jsonResponse.isEmpty()) { - return -1L; - } - JsonNode root = objectMapper.readTree(jsonResponse); - JsonNode beans = root.get("beans"); - if (beans != null && beans.isArray()) { - // Find the bean matching the service name - for (JsonNode bean : beans) { - String beanName = bean.path("name").asText(""); - if (beanName.contains(serviceName)) { - // Extract and return the metric value from the bean - return extractMetrics(bean, keyName); + InputStream in = entity.getContent(); + if (in == null) { + LOG.warn("No content stream available from DataNode {}", nodeDetails.getHostName()); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); + } + + byte[] responseBytes = IOUtils.toByteArray(in); + if (responseBytes.length == 0) { + LOG.warn("Empty response body received from DataNode {}", nodeDetails.getHostName()); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); } + String jsonResponse = new String(responseBytes, StandardCharsets.UTF_8); + long metrics = ReconUtils.parseMetrics( + jsonResponse, "BlockDeletingService", "TotalPendingBlockBytes"); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), metrics); + } finally { + httpGet.releaseConnection(); } + } catch (ConnectTimeoutException e) { + LOG.error("Connection timeout while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); + } catch (SocketTimeoutException e) { + LOG.error("Socket timeout while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); + } catch (IOException e) { + LOG.error("IO error while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); + } catch (Exception e) { + LOG.error("Unexpected error while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); + return new DatanodePendingDeletionMetrics( + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); } - return -1L; - } - - private static long extractMetrics(JsonNode beanNode, String keyName) { - return beanNode.path(keyName).asLong(0L); } private String getJmxMetricsUrl() { - return String.format( - "%s://%s:%d/jmx?qry=Hadoop:service=HddsDatanode,name=BlockDeletingService", "http", host, port); + String protocol = httpsEnabled ? "https" : "http"; + Name portName = httpsEnabled ? DatanodeDetails.Port.Name.HTTPS : DatanodeDetails.Port.Name.HTTP; + return String.format("%s://%s:%d/jmx?qry=Hadoop:service=HddsDatanode,name=BlockDeletingService", + protocol, nodeDetails.getHostName(), nodeDetails.getPort(portName).getValue()); } public static Builder newBuilder() { @@ -114,24 +134,13 @@ public static Builder newBuilder() { * Provides methods to set configuration parameters for the task. */ public static class Builder { - private String host; - private int port; - private String nodeUuid; + private DatanodeDetails nodeDetails; private int httpConnectionTimeout; private int httpSocketTimeout; + private boolean httpsEnabled; - public Builder setHost(String host) { - this.host = host; - return this; - } - - public Builder setPort(int port) { - this.port = port; - return this; - } - - public Builder setNodeUuid(String nodeUuid) { - this.nodeUuid = nodeUuid; + public Builder setNodeDetails(DatanodeDetails nodeDetails) { + this.nodeDetails = nodeDetails; return this; } @@ -145,6 +154,11 @@ public Builder setHttpSocketTimeout(int httpSocketTimeout) { return this; } + public Builder setHttpsEnabled(boolean httpsEnabled) { + this.httpsEnabled = httpsEnabled; + return this; + } + public DataNodeMetricsCollectionTask build() { return new DataNodeMetricsCollectionTask(this); } From fa268ce264efe9ef9b42e29b5dd9e79f800f3ab6 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Thu, 4 Dec 2025 18:20:43 +0530 Subject: [PATCH 06/12] minor code fixes for config --- hadoop-hdds/common/src/main/resources/ozone-default.xml | 6 +++--- .../ozone/recon/tasks/DataNodeMetricsCollectionTask.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index f3a942c1efb8..b8cd29ca54f1 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3506,7 +3506,7 @@ ozone.recon.dn.metrics.collection.thread.count - 1 + 10 OZONE, RECON, DN The number of threads running to get metrics from jmx endpoint. @@ -3514,7 +3514,7 @@ ozone.recon.dn.http.request.timeout - 30 + 60 OZONE, RECON, DN Timeout for the request submitted directly to datanode metrics. @@ -3522,7 +3522,7 @@ ozone.recon.dn.http.connection.timeout - 30 + 10 OZONE, RECON, DN The maximum time allowed for a client to establish diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java index 47c63b4e83ff..853b0ded9163 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java @@ -64,8 +64,8 @@ public DatanodePendingDeletionMetrics call() { LOG.debug("Collecting pending deletion metrics from DataNode {}", nodeDetails.getHostName()); try (CloseableHttpClient httpClient = HttpClients.custom() .setDefaultRequestConfig(RequestConfig.custom() - .setConnectTimeout(httpConnectionTimeout) - .setSocketTimeout(httpSocketTimeout) + .setConnectTimeout(httpConnectionTimeout * 1000) + .setSocketTimeout(httpSocketTimeout * 1000) .build()) .build()) { From f3db9e55b88d4d1e72b390a752428e1a3c0f4164 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Mon, 8 Dec 2025 12:35:16 +0530 Subject: [PATCH 07/12] Handle failures in better way --- .../TestStorageDistributionEndpoint.java | 23 ++ .../recon/api/DataNodeMetricsService.java | 210 ++++++++++++++++-- .../types/DataNodeMetricsServiceResponse.java | 28 +++ .../tasks/DataNodeMetricsCollectionTask.java | 9 + 4 files changed, 249 insertions(+), 21 deletions(-) diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java index 52f27e4840e4..34605b2a7eee 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestStorageDistributionEndpoint.java @@ -30,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.Duration; @@ -265,6 +266,8 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) assertNotNull(pendingDeletion); assertEquals(30, pendingDeletion.getTotalPendingDeletion()); assertEquals(DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED, pendingDeletion.getStatus()); + assertEquals(pendingDeletion.getTotalNodesQueries(), pendingDeletion.getPendingDeletionPerDataNode().size()); + assertEquals(0, pendingDeletion.getTotalNodeQueryFailures()); pendingDeletion.getPendingDeletionPerDataNode().forEach(dn -> { assertEquals(10, dn.getPendingBlockSize()); }); @@ -274,6 +277,26 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig) return false; } }, 2000, 60000); + cluster.getHddsDatanodes().get(0).stop(); + + GenericTestUtils.waitFor(() -> { + try { + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append(getReconWebAddress(conf)) + .append(PENDING_DELETION_ENDPOINT + "?component=dn"); + String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder); + DataNodeMetricsServiceResponse pendingDeletion = + MAPPER.readValue(response, DataNodeMetricsServiceResponse.class); + assertNotNull(pendingDeletion); + assertEquals(1, pendingDeletion.getTotalNodeQueryFailures()); + assertTrue(pendingDeletion.getPendingDeletionPerDataNode() + .stream() + .anyMatch(dn -> dn.getPendingBlockSize() == -1)); + return true; + } catch (Throwable e) { + return false; + } + }, 2000, 30000); } private void verifyBlocksCreated( diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index 9be0f6ba8a43..2a8197813bd4 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -26,14 +26,20 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -60,6 +66,7 @@ @Singleton public class DataNodeMetricsService { private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); + private static ExecutorService executorService; private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; private Long totalPendingDeletion = 0L; private List pendingDeletionList; @@ -69,6 +76,8 @@ public class DataNodeMetricsService { private final int httpConnectionTimeout; private final int httpSocketTimeout; private final boolean httpsEnabled; + private int totalNodesQueried; + private int totalNodesFailed; @Inject public DataNodeMetricsService(OzoneStorageContainerManager reconSCM, OzoneConfiguration conf) { @@ -80,42 +89,181 @@ public DataNodeMetricsService(OzoneStorageContainerManager reconSCM, OzoneConfig OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT); httpSocketTimeout = conf.getInt(OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT, OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT); httpsEnabled = HttpConfig.getHttpPolicy(conf).isHttpsEnabled(); + executorService = Executors.newFixedThreadPool(threadCount, + new ThreadFactoryBuilder().setNameFormat("DataNodeMetricsCollectionTasksThread-%d") + .build()); } public void startTask() { + totalNodesFailed = 0; Set nodes = reconNodeManager.getNodeStats().keySet(); - pendingDeletionList = new ArrayList<>(nodes.size()); + + if (nodes.isEmpty()) { + LOG.warn("No datanodes found to query for metrics collection"); + initializeTaskState(0); + currentStatus = MetricCollectionStatus.SUCCEEDED; + return; + } + + LOG.info("Starting metrics collection task for {} datanodes", nodes.size()); + initializeTaskState(nodes.size()); + Map> dataNodeFutures = + submitMetricsCollectionTasks(nodes); + collectMetricsWithTimeout(dataNodeFutures, nodes.size()); + // Add any remaining unfinished tasks as failed entries + addFailedEntries(dataNodeFutures); + currentStatus = MetricCollectionStatus.SUCCEEDED; + LOG.info("Metrics collection completed. Queried: {}, Failed: {}", + totalNodesQueried, totalNodesFailed); + } + + /** + * Initializes the state for a new metrics collection task. + */ + private void initializeTaskState(int nodeCount) { + pendingDeletionList = new ArrayList<>(nodeCount); totalPendingDeletion = 0L; currentStatus = MetricCollectionStatus.IN_PROGRESS; - ExecutorService executor = Executors.newFixedThreadPool(threadCount); - List> futures = new ArrayList<>(); + } + + /** + * Submits metrics collection tasks for all datanodes. + */ + private Map> submitMetricsCollectionTasks( + Set nodes) { + Map> futures = new HashMap<>(); for (DatanodeDetails node : nodes) { DataNodeMetricsCollectionTask task = DataNodeMetricsCollectionTask.newBuilder() .setNodeDetails(node) .setHttpConnectionTimeout(httpConnectionTimeout) .setHttpSocketTimeout(httpSocketTimeout) + .setHttpRequestTimeout(httpRequestTimeout) .setHttpsEnabled(httpsEnabled) .build(); - futures.add(executor.submit(task)); + DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics( + node.getHostName(), node.getUuidString(), -1L); + futures.put(key, executorService.submit(task)); + } + + totalNodesQueried = futures.size(); + LOG.debug("Submitted {} metrics collection tasks", totalNodesQueried); + + return futures; + } + + /** + * Collects metrics from completed tasks with a global timeout. + * Uses iterator to safely remove entries while iterating. + */ + private void collectMetricsWithTimeout( + Map> futures, + int nodeCount) { + // Calculate timeout: half of total request timeout for all nodes + long maximumTaskRunningTimeMs = (long) httpRequestTimeout * 1000 * nodeCount / 2; + long startTime = System.currentTimeMillis(); + long pollIntervalMs = 200; + while (!futures.isEmpty()) { + if (hasTimedOut(startTime, maximumTaskRunningTimeMs)) { + LOG.warn("Stopping metrics collection task due to timeout. " + + "Remaining tasks: {}", futures.size()); + break; + } + processCompletedTasks(futures); + sleepBetweenPolls(pollIntervalMs); + } + } + + /** + * Checks if the task has exceeded the maximum allowed running time. + */ + private boolean hasTimedOut(long startTime, long maximumTaskRunningTimeMs) { + return (System.currentTimeMillis() - startTime) > maximumTaskRunningTimeMs; + } + + /** + * Processes all completed tasks and removes them from the futures map. + * Uses iterator to avoid ConcurrentModificationException. + */ + private void processCompletedTasks( + Map> futures) { + Iterator>> + iterator = futures.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry> entry = + iterator.next(); + + if (entry.getValue().isDone()) { + processCompletedTask(entry.getKey(), entry.getValue()); + iterator.remove(); + } } - boolean hasTimedOut = false; - for (Future future : futures) { - try { - DatanodePendingDeletionMetrics result = future.get(httpRequestTimeout, TimeUnit.SECONDS); + } + + /** + * Processes a single completed task and updates the metrics. + */ + private void processCompletedTask(DatanodePendingDeletionMetrics key, + Future future) { + try { + DatanodePendingDeletionMetrics result = future.get(httpRequestTimeout, TimeUnit.SECONDS); + if (result.getPendingBlockSize() < 0) { + totalNodesFailed += 1; + } else { totalPendingDeletion += result.getPendingBlockSize(); - pendingDeletionList.add(result); - } catch (TimeoutException e) { - hasTimedOut = true; - LOG.error("Task timed out after " + httpRequestTimeout + " seconds: {}", e.getMessage()); - } catch (Exception e) { - System.err.println("Task failed or was interrupted: " + e.getMessage()); } + pendingDeletionList.add(result); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + logTaskFailure(key, e); + totalNodesFailed += 1; + // Add the key with -1 indicating failure + pendingDeletionList.add(key); } - executor.shutdown(); - if (hasTimedOut) { - currentStatus = MetricCollectionStatus.FAILED; - } else { - currentStatus = MetricCollectionStatus.SUCCEEDED; + } + + /** + * Logs appropriate error message based on exception type. + */ + private void logTaskFailure(DatanodePendingDeletionMetrics key, Exception e) { + String errorType = getErrorType(e); + LOG.error("Task {} for datanode {} [{}]: {}", + errorType, key.getHostName(), key.getDatanodeUuid(), e.getMessage()); + } + + /** + * Returns a human-readable error type description. + */ + private String getErrorType(Exception e) { + if (e instanceof ExecutionException) { + return "execution failed"; + } else if (e instanceof InterruptedException) { + return "interrupted"; + } else if (e instanceof TimeoutException) { + return "timed out"; + } + return "failed"; + } + + /** + * Adds all remaining unfinished tasks as failed entries. + */ + private void addFailedEntries( + Map> futures) { + for (DatanodePendingDeletionMetrics key : futures.keySet()) { + pendingDeletionList.add(key); + totalNodesFailed++; + } + } + + /** + * Sleeps for the specified interval, handling interruptions gracefully. + */ + private void sleepBetweenPolls(long intervalMs) { + try { + Thread.sleep(intervalMs); + } catch (InterruptedException e) { + LOG.warn("Polling sleep interrupted: {}", e.getMessage()); + Thread.currentThread().interrupt(); // Restore interrupt status } } @@ -126,10 +274,16 @@ public DataNodeMetricsServiceResponse getCollectedMetrics() { .setStatus(MetricCollectionStatus.SUCCEEDED) .setPendingDeletion(pendingDeletionList) .setTotalPendingDeletion(totalPendingDeletion) + .setTotalNodesQueries(totalNodesQueried) + .setTotalNodeQueryFailures(totalNodesFailed) .build(); } else { - DataNodeMetricsServiceResponse response = DataNodeMetricsServiceResponse.newBuilder() + DataNodeMetricsServiceResponse response = DataNodeMetricsServiceResponse.newBuilder() .setStatus(currentStatus) + .setTotalNodesQueries(totalNodesQueried) + .setTotalNodeQueryFailures(totalNodesFailed) + .setTotalPendingDeletion(totalPendingDeletion) + .setPendingDeletion(pendingDeletionList) .build(); currentStatus = MetricCollectionStatus.NOT_STARTED; return response; @@ -146,6 +300,20 @@ public MetricCollectionStatus getTaskStatus() { * a metric collection operation. */ public enum MetricCollectionStatus { - SUCCEEDED, FAILED, IN_PROGRESS, NOT_STARTED + SUCCEEDED, IN_PROGRESS, NOT_STARTED + } + + @PreDestroy + public void shutdown() { + LOG.info("Shutting down DataNodeMetricsService executor..."); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java index 5e9afe88f7fa..d333d1b82c44 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java @@ -36,17 +36,25 @@ public class DataNodeMetricsServiceResponse { private Long totalPendingDeletion; @JsonProperty("pendingDeletionPerDataNode") private List pendingDeletionPerDataNode; + @JsonProperty("totalNodesQueries") + private int totalNodesQueries; + @JsonProperty("totalNodeQueriesFailed") + private long totalNodeQueryFailures; public DataNodeMetricsServiceResponse(Builder builder) { this.status = builder.status; this.totalPendingDeletion = builder.totalPendingDeletion; this.pendingDeletionPerDataNode = builder.pendingDeletion; + this.totalNodesQueries = builder.totalNodesQueries; + this.totalNodeQueryFailures = builder.totalNodeQueryFailures; } public DataNodeMetricsServiceResponse() { this.status = DataNodeMetricsService.MetricCollectionStatus.NOT_STARTED; this.totalPendingDeletion = 0L; this.pendingDeletionPerDataNode = null; + this.totalNodesQueries = 0; + this.totalNodeQueryFailures = 0; } public DataNodeMetricsService.MetricCollectionStatus getStatus() { @@ -61,6 +69,14 @@ public List getPendingDeletionPerDataNode() { return pendingDeletionPerDataNode; } + public int getTotalNodesQueries() { + return totalNodesQueries; + } + + public long getTotalNodeQueryFailures() { + return totalNodeQueryFailures; + } + public static Builder newBuilder() { return new Builder(); } @@ -79,6 +95,8 @@ public static final class Builder { private DataNodeMetricsService.MetricCollectionStatus status; private Long totalPendingDeletion; private List pendingDeletion; + private int totalNodesQueries; + private long totalNodeQueryFailures; public Builder setStatus(DataNodeMetricsService.MetricCollectionStatus status) { this.status = status; @@ -95,6 +113,16 @@ public Builder setPendingDeletion(List pendingDe return this; } + public Builder setTotalNodesQueries(int totalNodesQueries) { + this.totalNodesQueries = totalNodesQueries; + return this; + } + + public Builder setTotalNodeQueryFailures(long totalNodeQueryFailures) { + this.totalNodeQueryFailures = totalNodeQueryFailures; + return this; + } + public DataNodeMetricsServiceResponse build() { return new DataNodeMetricsServiceResponse(this); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java index 853b0ded9163..4a44ac2499c6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java @@ -49,6 +49,7 @@ public class DataNodeMetricsCollectionTask implements Callable Date: Mon, 8 Dec 2025 13:25:53 +0530 Subject: [PATCH 08/12] fixing ci errors --- .../apache/hadoop/ozone/recon/api/DataNodeMetricsService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index 2a8197813bd4..f7d8875c8b4c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -66,7 +66,7 @@ @Singleton public class DataNodeMetricsService { private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); - private static ExecutorService executorService; + private final ExecutorService executorService; private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; private Long totalPendingDeletion = 0L; private List pendingDeletionList; From 69e4763b29a62405f7c5777acd6fb6152dccbdc2 Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Tue, 9 Dec 2025 14:02:49 +0530 Subject: [PATCH 09/12] adding delay for triggering metrics collection task --- .../src/main/resources/ozone-default.xml | 9 ++++ .../dist/src/main/compose/ozone/docker-config | 2 +- .../ozone/recon/ReconServerConfigKeys.java | 4 ++ .../recon/api/DataNodeMetricsService.java | 48 +++++++++++-------- .../recon/api/PendingDeletionEndpoint.java | 15 ++---- 5 files changed, 45 insertions(+), 33 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index b8cd29ca54f1..b67e1dfd02d9 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3538,6 +3538,15 @@ when exchanging data with a server, after the connection is established. + + ozone.recon.dn.metrics.collection.minimum.api.delay + 30 + OZONE, RECON, DN + + Minimum delay in API to start a new task for Jmx collection. + It behaves like a rate limiter to avoid unnecessary task creation. + + ozone.scm.datanode.admin.monitor.interval 30s diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config index f2a9e0447932..f6c2e81cbbcb 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config @@ -56,7 +56,7 @@ OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true OZONE-SITE.XML_ozone.fs.hsync.enabled=true - +OZONE-SITE.XML_ozone.recon.dn.metrics.collection.minimum.api.delay=5 OZONE_CONF_DIR=/etc/hadoop OZONE_LOG_DIR=/var/log/hadoop diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index a47fd7c6ea42..96a4bd915761 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -207,6 +207,10 @@ public final class ReconServerConfigKeys { "ozone.recon.dn.http.socket.timeout"; public static final int OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT = 30; + public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY = + "ozone.recon.dn.metrics.collection.minimum.api.delay"; + public static final int OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT = 30; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index f7d8875c8b4c..b418e3141cf5 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT; @@ -71,30 +73,43 @@ public class DataNodeMetricsService { private Long totalPendingDeletion = 0L; private List pendingDeletionList; private final ReconNodeManager reconNodeManager; - private final int threadCount; private final int httpRequestTimeout; private final int httpConnectionTimeout; private final int httpSocketTimeout; private final boolean httpsEnabled; + private final int minimumApiDelay; private int totalNodesQueried; private int totalNodesFailed; + private long lastCollectionEndTime; @Inject public DataNodeMetricsService(OzoneStorageContainerManager reconSCM, OzoneConfiguration conf) { reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); - threadCount = conf.getInt(OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT, + int threadCount = conf.getInt(OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT, OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT); httpRequestTimeout = conf.getInt(OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT, OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT_DEFAULT); httpConnectionTimeout = conf.getInt(OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT, OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT); httpSocketTimeout = conf.getInt(OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT, OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT); httpsEnabled = HttpConfig.getHttpPolicy(conf).isHttpsEnabled(); + lastCollectionEndTime = 0; executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder().setNameFormat("DataNodeMetricsCollectionTasksThread-%d") .build()); + minimumApiDelay = conf.getInt(OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT); } public void startTask() { + if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { + LOG.warn("Metrics collection task is already in progress. Skipping new task."); + return; + } + if (lastCollectionEndTime > System.currentTimeMillis() - 1000 * minimumApiDelay) { + LOG.info("Skipping metrics collection task due to last collection time being more than {} seconds ago.", + minimumApiDelay); + return; + } totalNodesFailed = 0; Set nodes = reconNodeManager.getNodeStats().keySet(); @@ -113,6 +128,7 @@ public void startTask() { // Add any remaining unfinished tasks as failed entries addFailedEntries(dataNodeFutures); currentStatus = MetricCollectionStatus.SUCCEEDED; + lastCollectionEndTime = System.currentTimeMillis(); LOG.info("Metrics collection completed. Queried: {}, Failed: {}", totalNodesQueried, totalNodesFailed); } @@ -268,26 +284,18 @@ private void sleepBetweenPolls(long intervalMs) { } public DataNodeMetricsServiceResponse getCollectedMetrics() { - if (currentStatus == MetricCollectionStatus.SUCCEEDED) { - currentStatus = MetricCollectionStatus.NOT_STARTED; + if (currentStatus == MetricCollectionStatus.NOT_STARTED || currentStatus == MetricCollectionStatus.IN_PROGRESS) { return DataNodeMetricsServiceResponse.newBuilder() - .setStatus(MetricCollectionStatus.SUCCEEDED) - .setPendingDeletion(pendingDeletionList) - .setTotalPendingDeletion(totalPendingDeletion) - .setTotalNodesQueries(totalNodesQueried) - .setTotalNodeQueryFailures(totalNodesFailed) - .build(); - } else { - DataNodeMetricsServiceResponse response = DataNodeMetricsServiceResponse.newBuilder() - .setStatus(currentStatus) - .setTotalNodesQueries(totalNodesQueried) - .setTotalNodeQueryFailures(totalNodesFailed) - .setTotalPendingDeletion(totalPendingDeletion) - .setPendingDeletion(pendingDeletionList) - .build(); - currentStatus = MetricCollectionStatus.NOT_STARTED; - return response; + .setStatus(MetricCollectionStatus.IN_PROGRESS) + .build(); } + return DataNodeMetricsServiceResponse.newBuilder() + .setStatus(currentStatus) + .setPendingDeletion(pendingDeletionList) + .setTotalPendingDeletion(totalPendingDeletion) + .setTotalNodesQueries(totalNodesQueried) + .setTotalNodeQueryFailures(totalNodesFailed) + .build(); } public MetricCollectionStatus getTaskStatus() { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java index 18130ceec981..b535ea5c2c11 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java @@ -27,7 +27,6 @@ import javax.ws.rs.core.Response; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,19 +76,11 @@ public Response getPendingDeletionByComponent(@QueryParam("component") String co private Response handleDataNodeMetrics() { DataNodeMetricsService.MetricCollectionStatus status = dataNodeMetricsService.getTaskStatus(); - if (status == DataNodeMetricsService.MetricCollectionStatus.NOT_STARTED) { - CompletableFuture.runAsync(dataNodeMetricsService::startTask); - return Response.accepted(DataNodeMetricsServiceResponse.newBuilder() - .setStatus(DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) - .build() - ).build(); - } else if (status == DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED) { + CompletableFuture.runAsync(dataNodeMetricsService::startTask); + if (status == DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED) { return Response.ok(dataNodeMetricsService.getCollectedMetrics()).build(); } else { - return Response.accepted(DataNodeMetricsServiceResponse.newBuilder() - .setStatus(status) - .build() - ).build(); + return Response.accepted(dataNodeMetricsService.getCollectedMetrics()).build(); } } From 776ac472ba503ec2a9c81414b585e37024caca6f Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Wed, 10 Dec 2025 11:38:40 +0530 Subject: [PATCH 10/12] Refactoring code based on reviews --- .../src/main/resources/ozone-default.xml | 26 ---- .../recon/MetricsServiceProviderFactory.java | 10 ++ .../ozone/recon/ReconServerConfigKeys.java | 14 +- .../apache/hadoop/ozone/recon/ReconUtils.java | 59 +++----- .../recon/api/DataNodeMetricsService.java | 54 +++---- .../recon/spi/MetricsServiceProvider.java | 5 +- .../spi/impl/JmxServiceProviderImpl.java | 118 +++++++++++++++ .../impl/PrometheusServiceProviderImpl.java | 14 +- .../tasks/DataNodeMetricsCollectionTask.java | 143 ++++-------------- 9 files changed, 203 insertions(+), 240 deletions(-) create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index b67e1dfd02d9..11f2ab2292c7 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3512,32 +3512,6 @@ The number of threads running to get metrics from jmx endpoint. - - ozone.recon.dn.http.request.timeout - 60 - OZONE, RECON, DN - - Timeout for the request submitted directly to datanode metrics. - - - - ozone.recon.dn.http.connection.timeout - 10 - OZONE, RECON, DN - - The maximum time allowed for a client to establish - the initial physical connection (TCP handshake) with the dn. - - - - ozone.recon.dn.http.socket.timeout - 30 - OZONE, RECON, DN - - The maximum time of inactivity allowed between any two consecutive data packets - when exchanging data with a server, after the connection is established. - - ozone.recon.dn.metrics.collection.minimum.api.delay 30 diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java index d09c01ea72f9..2baf0511ee39 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/MetricsServiceProviderFactory.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.recon.ReconConfigKeys; import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; +import org.apache.hadoop.ozone.recon.spi.impl.JmxServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,15 @@ public MetricsServiceProvider getMetricsServiceProvider() { return null; } + /** + * Returns the configured MetricsServiceProvider implementation for Jmx. + * @param endpoint + * @return MetricsServiceProvider instance for Jmx + */ + public MetricsServiceProvider getJmxMetricsServiceProvider(String endpoint) { + return new JmxServiceProviderImpl(configuration, reconUtils, endpoint); + } + /** * Returns the Prometheus endpoint if configured. Otherwise returns null. * diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index 96a4bd915761..8fa11caa00f2 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -195,21 +195,9 @@ public final class ReconServerConfigKeys { "ozone.recon.dn.metrics.collection.thread.count"; public static final int OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT = 10; - public static final String OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT = - "ozone.recon.dn.http.request.timeout"; - public static final int OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT_DEFAULT = 60; - - public static final String OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT = - "ozone.recon.dn.http.connection.timeout"; - public static final int OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT = 10; - - public static final String OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT = - "ozone.recon.dn.http.socket.timeout"; - public static final int OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT = 30; - public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY = "ozone.recon.dn.metrics.collection.minimum.api.delay"; - public static final int OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT = 30; + public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT = "30s"; /** * Private constructor for utility class. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java index eccbe6b39821..293a4d77a069 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java @@ -27,8 +27,6 @@ import static org.jooq.impl.DSL.select; import static org.jooq.impl.DSL.using; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.inject.Singleton; @@ -850,47 +848,30 @@ public static String constructObjectPathWithPrefix(long... ids) { return pathBuilder.toString(); } - public static long parseMetrics(String jsonResponse, String serviceName, String keyName) { - ObjectMapper objectMapper = new ObjectMapper(); - if (jsonResponse == null || jsonResponse.isEmpty()) { - log.warn("Empty or null JSON response for service: {}", serviceName); - return -1L; + public static Map getMetricsData(List> metrics, String beanName) { + if (metrics == null || StringUtils.isEmpty(beanName)) { + return null; } - - try { - JsonNode root = objectMapper.readTree(jsonResponse); - if (root == null) { - log.warn("Failed to parse JSON response for service: {}", serviceName); - return -1L; - } - - JsonNode beans = root.get("beans"); - if (beans == null || !beans.isArray()) { - log.warn("No 'beans' array found in JSON response for service: {}", serviceName); - return -1L; - } - - // Find the bean matching the service name - for (JsonNode bean : beans) { - String beanName = bean.path("name").asText(""); - if (beanName.contains(serviceName)) { - // Extract and return the metric value from the bean - return extractMetrics(bean, keyName); - } + for (Map item :metrics) { + if (item.get("name").equals(beanName)) { + return item; } - - log.warn("Service '{}' not found in JMX beans", serviceName); - return -1L; - } catch (IOException e) { - log.error("Failed to parse JSON response for service: {}", serviceName, e); - return -1L; - } catch (Exception e) { - log.error("Unexpected error parsing metrics for service: {}", serviceName, e); - return -1L; } + return null; } - private static long extractMetrics(JsonNode beanNode, String keyName) { - return beanNode.path(keyName).asLong(0L); + public static long extractMetricValue(Map metrics, String keyName) { + if (metrics == null || StringUtils.isEmpty(keyName)) { + return -1; + } + Object value = metrics.get(keyName); + if (value instanceof Long) { + return (long) value; + } + if (value instanceof Integer) { + Integer intValue = (Integer) value; + return intValue.longValue(); + } + return -1; } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index b418e3141cf5..20d1452076a6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -17,12 +17,6 @@ package org.apache.hadoop.ozone.recon.api; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT_DEFAULT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT; @@ -48,6 +42,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; import org.apache.hadoop.hdds.server.http.HttpConfig; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; @@ -69,35 +64,35 @@ public class DataNodeMetricsService { private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); private final ExecutorService executorService; - private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; - private Long totalPendingDeletion = 0L; - private List pendingDeletionList; private final ReconNodeManager reconNodeManager; - private final int httpRequestTimeout; - private final int httpConnectionTimeout; - private final int httpSocketTimeout; private final boolean httpsEnabled; private final int minimumApiDelay; + private final MetricsServiceProviderFactory metricsServiceProviderFactory; + private MetricCollectionStatus currentStatus = MetricCollectionStatus.NOT_STARTED; + private Long totalPendingDeletion = 0L; + private List pendingDeletionList; private int totalNodesQueried; private int totalNodesFailed; private long lastCollectionEndTime; + private static final int REQUEST_TIMEOUT = 60000; @Inject - public DataNodeMetricsService(OzoneStorageContainerManager reconSCM, OzoneConfiguration conf) { + public DataNodeMetricsService( + OzoneStorageContainerManager reconSCM, + OzoneConfiguration config, + MetricsServiceProviderFactory metricsServiceProviderFactory) { reconNodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); - int threadCount = conf.getInt(OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT, + int threadCount = config.getInt(OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT, OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT); - httpRequestTimeout = conf.getInt(OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT, OZONE_RECON_DN_HTTP_REQUEST_TIMEOUT_DEFAULT); - httpConnectionTimeout = conf.getInt(OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT, - OZONE_RECON_DN_HTTP_CONNECTION_TIMEOUT_DEFAULT); - httpSocketTimeout = conf.getInt(OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT, OZONE_RECON_DN_HTTP_SOCKET_TIMEOUT_DEFAULT); - httpsEnabled = HttpConfig.getHttpPolicy(conf).isHttpsEnabled(); + minimumApiDelay = (int) config.getTimeDuration(OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, + OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT, TimeUnit.MILLISECONDS); + + httpsEnabled = HttpConfig.getHttpPolicy(config).isHttpsEnabled(); lastCollectionEndTime = 0; executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder().setNameFormat("DataNodeMetricsCollectionTasksThread-%d") .build()); - minimumApiDelay = conf.getInt(OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, - OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT); + this.metricsServiceProviderFactory = metricsServiceProviderFactory; } public void startTask() { @@ -105,9 +100,9 @@ public void startTask() { LOG.warn("Metrics collection task is already in progress. Skipping new task."); return; } - if (lastCollectionEndTime > System.currentTimeMillis() - 1000 * minimumApiDelay) { + if (lastCollectionEndTime > System.currentTimeMillis() - minimumApiDelay) { LOG.info("Skipping metrics collection task due to last collection time being more than {} seconds ago.", - minimumApiDelay); + minimumApiDelay / 1000); return; } totalNodesFailed = 0; @@ -149,13 +144,8 @@ private Map nodes) { Map> futures = new HashMap<>(); for (DatanodeDetails node : nodes) { - DataNodeMetricsCollectionTask task = DataNodeMetricsCollectionTask.newBuilder() - .setNodeDetails(node) - .setHttpConnectionTimeout(httpConnectionTimeout) - .setHttpSocketTimeout(httpSocketTimeout) - .setHttpRequestTimeout(httpRequestTimeout) - .setHttpsEnabled(httpsEnabled) - .build(); + DataNodeMetricsCollectionTask task = new DataNodeMetricsCollectionTask( + node, httpsEnabled, metricsServiceProviderFactory); DatanodePendingDeletionMetrics key = new DatanodePendingDeletionMetrics( node.getHostName(), node.getUuidString(), -1L); futures.put(key, executorService.submit(task)); @@ -175,7 +165,7 @@ private void collectMetricsWithTimeout( Map> futures, int nodeCount) { // Calculate timeout: half of total request timeout for all nodes - long maximumTaskRunningTimeMs = (long) httpRequestTimeout * 1000 * nodeCount / 2; + long maximumTaskRunningTimeMs = (long) REQUEST_TIMEOUT * nodeCount / 2; long startTime = System.currentTimeMillis(); long pollIntervalMs = 200; while (!futures.isEmpty()) { @@ -222,7 +212,7 @@ private void processCompletedTasks( private void processCompletedTask(DatanodePendingDeletionMetrics key, Future future) { try { - DatanodePendingDeletionMetrics result = future.get(httpRequestTimeout, TimeUnit.SECONDS); + DatanodePendingDeletionMetrics result = future.get(REQUEST_TIMEOUT, TimeUnit.SECONDS); if (result.getPendingBlockSize() < 0) { totalNodesFailed += 1; } else { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java index 3d0aa3140671..7ddba027bfec 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/MetricsServiceProvider.java @@ -19,6 +19,7 @@ import java.net.HttpURLConnection; import java.util.List; +import java.util.Map; import org.apache.hadoop.ozone.recon.metrics.Metric; /** @@ -48,12 +49,12 @@ HttpURLConnection getMetricsResponse(String api, String queryString) List getMetricsInstant(String queryString) throws Exception; /** - * Returns a list of {@link Metric} for the given ranged query. + * Returns a list of {@link Map} for the given query. * * @param queryString query string with metric name, start time, end time, * step and other filters. * @return List of Json map of metrics response. * @throws Exception exception */ - List getMetricsRanged(String queryString) throws Exception; + List> getMetrics(String queryString) throws Exception; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java new file mode 100644 index 000000000000..0426838050be --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/JmxServiceProviderImpl.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.spi.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.inject.Singleton; +import javax.ws.rs.core.Response; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.ozone.recon.ReconUtils; +import org.apache.hadoop.ozone.recon.metrics.Metric; +import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; + +/** + * Implementation of the Prometheus Metrics Service provider. + */ +@Singleton +public class JmxServiceProviderImpl implements MetricsServiceProvider { + + public static final String JMX_INSTANT_QUERY_API = "qry"; + private URLConnectionFactory connectionFactory; + private final String jmxEndpoint; + private ReconUtils reconUtils; + + public JmxServiceProviderImpl(OzoneConfiguration configuration, + ReconUtils reconUtils, String jmxEndpoint) { + + connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(configuration); + // Remove the trailing slash from endpoint url. + if (jmxEndpoint != null && jmxEndpoint.endsWith("/")) { + jmxEndpoint = jmxEndpoint.substring(0, jmxEndpoint.length() - 1); + } + this.jmxEndpoint = jmxEndpoint; + this.reconUtils = reconUtils; + } + + /** + * Returns {@link HttpURLConnection} after querying Metrics endpoint for the + * given metric. + * + * @param api api. + * @param queryString query string with metric name and other filters. + * @return HttpURLConnection + * @throws Exception exception + */ + @Override + public HttpURLConnection getMetricsResponse(String api, String queryString) + throws Exception { + String url = String.format("%s?%s=%s", jmxEndpoint, api, + queryString); + return reconUtils.makeHttpCall(connectionFactory, + url, false); + } + + @Override + public List getMetricsInstant(String queryString) throws Exception { + return Collections.emptyList(); + } + + /** + * Returns a list of {@link Metric} for the given instant query. + * + * @param queryString query string with metric name and other filters. + * @return List of Json map of metrics response. + * @throws Exception exception + */ + @Override + public List> getMetrics(String queryString) + throws Exception { + return getMetrics(JMX_INSTANT_QUERY_API, queryString); + } + + /** + * Returns a list of {@link Metric} for the given api and query string. + * + * @param api api + * @param queryString query string with metric name and other filters. + * @return List of Json map of metrics response. + * @throws Exception + */ + private List> getMetrics(String api, String queryString) + throws Exception { + HttpURLConnection urlConnection = + getMetricsResponse(api, queryString); + if (Response.Status.fromStatusCode(urlConnection.getResponseCode()) + .getFamily() == Response.Status.Family.SUCCESSFUL) { + InputStream inputStream = urlConnection.getInputStream(); + ObjectMapper mapper = new ObjectMapper(); + Map jsonMap = mapper.readValue(inputStream, Map.class); + inputStream.close(); + Object beansObj = jsonMap.get("beans"); + if (beansObj instanceof List) { + return (List>) beansObj; + } + } + return Collections.emptyList(); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java index 64c00490613e..c1df1c8d3dc6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/PrometheusServiceProviderImpl.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -49,7 +50,6 @@ public class PrometheusServiceProviderImpl implements MetricsServiceProvider { public static final String PROMETHEUS_INSTANT_QUERY_API = "query"; - public static final String PROMETHEUS_RANGED_QUERY_API = "query_range"; private static final Logger LOG = LoggerFactory.getLogger(PrometheusServiceProviderImpl.class); @@ -123,17 +123,9 @@ public List getMetricsInstant(String queryString) return getMetrics(PROMETHEUS_INSTANT_QUERY_API, queryString); } - /** - * Returns a list of {@link Metric} for the given ranged query. - * - * @param queryString query string with metric name, start time, end time, - * step and other filters. - * @return List of Json map of metrics response. - * @throws Exception exception - */ @Override - public List getMetricsRanged(String queryString) throws Exception { - return getMetrics(PROMETHEUS_RANGED_QUERY_API, queryString); + public List> getMetrics(String queryString) throws Exception { + return Collections.emptyList(); } /** diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java index 4a44ac2499c6..abc94edb5bc8 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DataNodeMetricsCollectionTask.java @@ -17,22 +17,15 @@ package org.apache.hadoop.ozone.recon.tasks; -import java.io.IOException; -import java.io.InputStream; -import java.net.SocketTimeoutException; -import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; +import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; -import org.apache.http.HttpEntity; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; +import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,75 +40,34 @@ public class DataNodeMetricsCollectionTask implements Callable> metrics = metricsServiceProvider.getMetrics(BEAN_NAME); + if (metrics == null) { return new DatanodePendingDeletionMetrics( - nodeDetails.getHostName(), nodeDetails.getUuidString(), metrics); - } finally { - httpGet.releaseConnection(); + nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); } - } catch (ConnectTimeoutException e) { - LOG.error("Connection timeout while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); - return new DatanodePendingDeletionMetrics( - nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); - } catch (SocketTimeoutException e) { - LOG.error("Socket timeout while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); - return new DatanodePendingDeletionMetrics( - nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); - } catch (IOException e) { - LOG.error("IO error while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); + Map deletionMetrics = ReconUtils.getMetricsData(metrics, BEAN_NAME); + long pendingBlockSize = ReconUtils.extractMetricValue(deletionMetrics, "TotalPendingBlockBytes"); + return new DatanodePendingDeletionMetrics( - nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); + nodeDetails.getHostName(), nodeDetails.getUuidString(), pendingBlockSize); + } catch (Exception e) { - LOG.error("Unexpected error while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); + LOG.error("Connection timeout while collecting metrics from DataNode {}", nodeDetails.getHostName(), e); return new DatanodePendingDeletionMetrics( nodeDetails.getHostName(), nodeDetails.getUuidString(), -1L); } @@ -124,52 +76,9 @@ public DatanodePendingDeletionMetrics call() { private String getJmxMetricsUrl() { String protocol = httpsEnabled ? "https" : "http"; Name portName = httpsEnabled ? DatanodeDetails.Port.Name.HTTPS : DatanodeDetails.Port.Name.HTTP; - return String.format("%s://%s:%d/jmx?qry=Hadoop:service=HddsDatanode,name=BlockDeletingService", - protocol, nodeDetails.getHostName(), nodeDetails.getPort(portName).getValue()); - } - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder class used to construct instances of {@link DataNodeMetricsCollectionTask}. - * Provides methods to set configuration parameters for the task. - */ - public static class Builder { - private DatanodeDetails nodeDetails; - private int httpConnectionTimeout; - private int httpSocketTimeout; - private int httpRequestTimeout; - private boolean httpsEnabled; - - public Builder setNodeDetails(DatanodeDetails nodeDetails) { - this.nodeDetails = nodeDetails; - return this; - } - - public Builder setHttpConnectionTimeout(int httpConnectionTimeout) { - this.httpConnectionTimeout = httpConnectionTimeout; - return this; - } - - public Builder setHttpSocketTimeout(int httpSocketTimeout) { - this.httpSocketTimeout = httpSocketTimeout; - return this; - } - - public Builder setHttpRequestTimeout(int httpRequestTimeout) { - this.httpRequestTimeout = httpRequestTimeout; - return this; - } - - public Builder setHttpsEnabled(boolean httpsEnabled) { - this.httpsEnabled = httpsEnabled; - return this; - } - - public DataNodeMetricsCollectionTask build() { - return new DataNodeMetricsCollectionTask(this); - } + return String.format("%s://%s:%d/jmx", + protocol, + nodeDetails.getHostName(), + nodeDetails.getPort(portName).getValue()); } } From 3e490227548c9ee25fb04298984d387fada9600b Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Wed, 10 Dec 2025 12:29:55 +0530 Subject: [PATCH 11/12] fixing bugs found during testing --- hadoop-hdds/common/src/main/resources/ozone-default.xml | 2 +- hadoop-ozone/dist/src/main/compose/ozone/docker-config | 2 +- .../apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 11f2ab2292c7..3dad6045698e 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3514,7 +3514,7 @@ ozone.recon.dn.metrics.collection.minimum.api.delay - 30 + 30s OZONE, RECON, DN Minimum delay in API to start a new task for Jmx collection. diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config index f6c2e81cbbcb..81e05a7f2791 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config @@ -56,7 +56,7 @@ OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true OZONE-SITE.XML_ozone.fs.hsync.enabled=true -OZONE-SITE.XML_ozone.recon.dn.metrics.collection.minimum.api.delay=5 +OZONE-SITE.XML_ozone.recon.dn.metrics.collection.minimum.api.delay=5s OZONE_CONF_DIR=/etc/hadoop OZONE_LOG_DIR=/var/log/hadoop diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java index b535ea5c2c11..42814ed91e3e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java @@ -56,7 +56,7 @@ public PendingDeletionEndpoint(ReconGlobalMetricsService reconGlobalMetricsServi @GET public Response getPendingDeletionByComponent(@QueryParam("component") String component) { - if (component.isEmpty()) { + if (component == null || component.isEmpty()) { return Response.status(Response.Status.BAD_REQUEST) .entity("component query parameter is required").build(); } From f58f4e44db0b3d154b68e28196b91798c54afe8c Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Wed, 10 Dec 2025 13:22:49 +0530 Subject: [PATCH 12/12] fixing build errors and minor issues --- hadoop-ozone/recon/pom.xml | 17 ----------------- .../recon/api/DataNodeMetricsService.java | 6 +++--- .../recon/api/PendingDeletionEndpoint.java | 12 ++++++++---- .../types/DataNodeMetricsServiceResponse.java | 18 +++++++++--------- 4 files changed, 20 insertions(+), 33 deletions(-) diff --git a/hadoop-ozone/recon/pom.xml b/hadoop-ozone/recon/pom.xml index 0e5f522cfdff..9a0936ebf194 100644 --- a/hadoop-ozone/recon/pom.xml +++ b/hadoop-ozone/recon/pom.xml @@ -126,23 +126,6 @@ org.apache.hadoop hadoop-hdfs-client - - org.apache.httpcomponents - httpclient - ${httpclient.version} - - - - commons-logging - commons-logging - - - - - org.apache.httpcomponents - httpcore - ${httpcore.version} - org.apache.ozone hdds-common diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index 20d1452076a6..9daa0524ae41 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -95,7 +95,7 @@ public DataNodeMetricsService( this.metricsServiceProviderFactory = metricsServiceProviderFactory; } - public void startTask() { + public synchronized void startTask() { if (currentStatus == MetricCollectionStatus.IN_PROGRESS) { LOG.warn("Metrics collection task is already in progress. Skipping new task."); return; @@ -273,7 +273,7 @@ private void sleepBetweenPolls(long intervalMs) { } } - public DataNodeMetricsServiceResponse getCollectedMetrics() { + public synchronized DataNodeMetricsServiceResponse getCollectedMetrics() { if (currentStatus == MetricCollectionStatus.NOT_STARTED || currentStatus == MetricCollectionStatus.IN_PROGRESS) { return DataNodeMetricsServiceResponse.newBuilder() .setStatus(MetricCollectionStatus.IN_PROGRESS) @@ -283,7 +283,7 @@ public DataNodeMetricsServiceResponse getCollectedMetrics() { .setStatus(currentStatus) .setPendingDeletion(pendingDeletionList) .setTotalPendingDeletion(totalPendingDeletion) - .setTotalNodesQueries(totalNodesQueried) + .setTotalNodesQueried(totalNodesQueried) .setTotalNodeQueryFailures(totalNodesFailed) .build(); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java index 42814ed91e3e..e47dc0554671 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java @@ -27,6 +27,7 @@ import javax.ws.rs.core.Response; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,11 +77,14 @@ public Response getPendingDeletionByComponent(@QueryParam("component") String co private Response handleDataNodeMetrics() { DataNodeMetricsService.MetricCollectionStatus status = dataNodeMetricsService.getTaskStatus(); - CompletableFuture.runAsync(dataNodeMetricsService::startTask); - if (status == DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED) { - return Response.ok(dataNodeMetricsService.getCollectedMetrics()).build(); + if (status != DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) { + CompletableFuture.runAsync(dataNodeMetricsService::startTask); + } + DataNodeMetricsServiceResponse response = dataNodeMetricsService.getCollectedMetrics(); + if (response.getStatus() == DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) { + return Response.accepted(response).build(); } else { - return Response.accepted(dataNodeMetricsService.getCollectedMetrics()).build(); + return Response.ok(response).build(); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java index d333d1b82c44..6ec9cda0a59e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DataNodeMetricsServiceResponse.java @@ -36,8 +36,8 @@ public class DataNodeMetricsServiceResponse { private Long totalPendingDeletion; @JsonProperty("pendingDeletionPerDataNode") private List pendingDeletionPerDataNode; - @JsonProperty("totalNodesQueries") - private int totalNodesQueries; + @JsonProperty("totalNodesQueried") + private int totalNodesQueried; @JsonProperty("totalNodeQueriesFailed") private long totalNodeQueryFailures; @@ -45,7 +45,7 @@ public DataNodeMetricsServiceResponse(Builder builder) { this.status = builder.status; this.totalPendingDeletion = builder.totalPendingDeletion; this.pendingDeletionPerDataNode = builder.pendingDeletion; - this.totalNodesQueries = builder.totalNodesQueries; + this.totalNodesQueried = builder.totalNodesQueried; this.totalNodeQueryFailures = builder.totalNodeQueryFailures; } @@ -53,7 +53,7 @@ public DataNodeMetricsServiceResponse() { this.status = DataNodeMetricsService.MetricCollectionStatus.NOT_STARTED; this.totalPendingDeletion = 0L; this.pendingDeletionPerDataNode = null; - this.totalNodesQueries = 0; + this.totalNodesQueried = 0; this.totalNodeQueryFailures = 0; } @@ -69,8 +69,8 @@ public List getPendingDeletionPerDataNode() { return pendingDeletionPerDataNode; } - public int getTotalNodesQueries() { - return totalNodesQueries; + public int getTotalNodesQueried() { + return totalNodesQueried; } public long getTotalNodeQueryFailures() { @@ -95,7 +95,7 @@ public static final class Builder { private DataNodeMetricsService.MetricCollectionStatus status; private Long totalPendingDeletion; private List pendingDeletion; - private int totalNodesQueries; + private int totalNodesQueried; private long totalNodeQueryFailures; public Builder setStatus(DataNodeMetricsService.MetricCollectionStatus status) { @@ -113,8 +113,8 @@ public Builder setPendingDeletion(List pendingDe return this; } - public Builder setTotalNodesQueries(int totalNodesQueries) { - this.totalNodesQueries = totalNodesQueries; + public Builder setTotalNodesQueried(int totalNodesQueried) { + this.totalNodesQueried = totalNodesQueried; return this; }