Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3504,6 +3504,23 @@
If the buffer overflows, task reinitialization will be triggered.
</description>
</property>
<property>
<name>ozone.recon.dn.metrics.collection.thread.count</name>
<value>10</value>
<tag>OZONE, RECON, DN</tag>
<description>
The number of threads running to get metrics from jmx endpoint.
</description>
</property>
<property>
<name>ozone.recon.dn.metrics.collection.minimum.api.delay</name>
<value>30s</value>
<tag>OZONE, RECON, DN</tag>
<description>
Minimum delay in API to start a new task for Jmx collection.
It behaves like a rate limiter to avoid unnecessary task creation.
</description>
</property>
<property>
<name>ozone.scm.datanode.admin.monitor.interval</name>
<value>30s</value>
Expand Down
2 changes: 1 addition & 1 deletion hadoop-ozone/dist/src/main/compose/ozone/docker-config
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http
OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true

OZONE-SITE.XML_ozone.fs.hsync.enabled=true

OZONE-SITE.XML_ozone.recon.dn.metrics.collection.minimum.api.delay=5s
OZONE_CONF_DIR=/etc/hadoop
OZONE_LOG_DIR=/var/log/hadoop

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.recon.TestReconEndpointUtil.getReconWebAddress;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -46,6 +49,8 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
Expand All @@ -66,6 +71,9 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService;
import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion;
import org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.ozone.test.GenericTestUtils;
Expand Down Expand Up @@ -100,6 +108,7 @@ public class TestStorageDistributionEndpoint {
private static final ObjectMapper MAPPER = new ObjectMapper();

private static final String STORAGE_DIST_ENDPOINT = "/api/v1/storageDistribution";
private static final String PENDING_DELETION_ENDPOINT = "/api/v1/pendingDeletion";

static List<Arguments> replicationConfigs() {
return Collections.singletonList(
Expand All @@ -110,17 +119,14 @@ static List<Arguments> replicationConfigs() {
@BeforeAll
public static void setup() throws Exception {
conf = new OzoneConfiguration();
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, 100,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, TimeUnit.MILLISECONDS);
conf.setLong(OZONE_SCM_HA_RATIS_SNAPSHOT_GAP, 1L);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 50,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 50, TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL, 500, TimeUnit.MILLISECONDS);
conf.set(ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY, "1s");

// Enhanced SCM configuration for faster block deletion processing
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
Expand All @@ -129,18 +135,9 @@ public static void setup() throws Exception {
conf.set(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0s");

// Enhanced DataNode configuration to move pending deletion from SCM to DN faster
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionInterval(Duration.ofMillis(100));
// Increase block delete queue limit to allow more queued commands on DN
dnConf.setBlockDeleteQueueLimit(50);
// Reduce the interval for delete command worker processing
dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100));
// Increase blocks deleted per interval to speed up deletion
dnConf.setBlockDeletionLimit(5000);
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionInterval(Duration.ofMillis(30000));
conf.setFromObject(dnConf);
// Increase DN delete threads for faster parallel processing
conf.setInt("ozone.datanode.block.delete.threads.max", 10);

recon = new ReconService(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
Expand Down Expand Up @@ -190,19 +187,110 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig)
}
}
waitForKeysCreated(replicationConfig);
Thread.sleep(10000);
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(STORAGE_DIST_ENDPOINT);
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
StorageCapacityDistributionResponse storageResponse =
MAPPER.readValue(response, StorageCapacityDistributionResponse.class);
GenericTestUtils.waitFor(() -> {
try {
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(STORAGE_DIST_ENDPOINT);
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
StorageCapacityDistributionResponse storageResponse =
MAPPER.readValue(response, StorageCapacityDistributionResponse.class);

assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys());
assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace());
assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes());
assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes());
assertEquals(3, storageResponse.getDataNodeUsage().size());
assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys());
assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace());
assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes());
assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes());
assertEquals(3, storageResponse.getDataNodeUsage().size());

return true;
} catch (Exception e) {
LOG.debug("Waiting for storage distribution assertions to pass", e);
return false;
}
}, 1000, 30000);
closeAllContainers();
fs.delete(dir1, true);
GenericTestUtils.waitFor(() -> {
try {
syncDataFromOM();
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(PENDING_DELETION_ENDPOINT + "?component=om");
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
Map<String, Integer> pendingDeletionMap =
MAPPER.readValue(response, Map.class);
assertEquals(30, pendingDeletionMap.get("totalSize"));
assertEquals(30, pendingDeletionMap.get("pendingDirectorySize") + pendingDeletionMap.get("pendingKeySize"));
return true;
} catch (Throwable e) {
LOG.debug("Waiting for storage distribution assertions to pass", e);
return false;
}
}, 1000, 30000);

GenericTestUtils.waitFor(() -> {
try {
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(PENDING_DELETION_ENDPOINT + "?component=scm");
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
ScmPendingDeletion pendingDeletion =
MAPPER.readValue(response, ScmPendingDeletion.class);
assertEquals(30, pendingDeletion.getTotalReplicatedBlockSize());
assertEquals(10, pendingDeletion.getTotalBlocksize());
assertEquals(10, pendingDeletion.getTotalBlocksCount());
return true;
} catch (Throwable e) {
LOG.debug("Waiting for storage distribution assertions to pass", e);
return false;
}
}, 2000, 30000);
GenericTestUtils.waitFor(() ->
scm.getClientProtocolServer().getDeletedBlockSummary().getTotalBlockCount() == 0,
1000, 30000);
GenericTestUtils.waitFor(() -> {
try {
scm.getScmHAManager().asSCMHADBTransactionBuffer().flush();
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(PENDING_DELETION_ENDPOINT + "?component=dn");
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
DataNodeMetricsServiceResponse pendingDeletion =
MAPPER.readValue(response, DataNodeMetricsServiceResponse.class);
assertNotNull(pendingDeletion);
assertEquals(30, pendingDeletion.getTotalPendingDeletion());
assertEquals(DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED, pendingDeletion.getStatus());
assertEquals(pendingDeletion.getTotalNodesQueried(), pendingDeletion.getPendingDeletionPerDataNode().size());
assertEquals(0, pendingDeletion.getTotalNodeQueryFailures());
pendingDeletion.getPendingDeletionPerDataNode().forEach(dn -> {
assertEquals(10, dn.getPendingBlockSize());
});
return true;
} catch (Throwable e) {
LOG.debug("Waiting for storage distribution assertions to pass", e);
return false;
}
}, 2000, 60000);
cluster.getHddsDatanodes().get(0).stop();

GenericTestUtils.waitFor(() -> {
try {
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(PENDING_DELETION_ENDPOINT + "?component=dn");
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
DataNodeMetricsServiceResponse pendingDeletion =
MAPPER.readValue(response, DataNodeMetricsServiceResponse.class);
assertNotNull(pendingDeletion);
assertEquals(1, pendingDeletion.getTotalNodeQueryFailures());
assertTrue(pendingDeletion.getPendingDeletionPerDataNode()
.stream()
.anyMatch(dn -> dn.getPendingBlockSize() == -1));
return true;
} catch (Throwable e) {
return false;
}
Comment thread
priyeshkaratha marked this conversation as resolved.
Comment thread
priyeshkaratha marked this conversation as resolved.
}, 2000, 60000);
}

private void verifyBlocksCreated(
Expand Down Expand Up @@ -266,6 +354,14 @@ private static void performOperationOnKeyContainers(
}
}

private static void closeAllContainers() {
for (ContainerInfo container :
scm.getContainerManager().getContainers()) {
scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER,
container.containerID());
}
}

@AfterEach
public void cleanup() {
assertDoesNotThrow(() -> {
Expand Down
20 changes: 20 additions & 0 deletions hadoop-ozone/recon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>${httpasyncclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>${httpcore.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.recon.ReconConfigKeys;
import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.JmxServiceProviderImpl;
import org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -67,6 +68,14 @@ public MetricsServiceProvider getMetricsServiceProvider() {
return null;
}

/**
* Returns the configured MetricsServiceProvider implementation for Jmx.
* @return MetricsServiceProvider instance for Jmx
*/
public MetricsServiceProvider getJmxMetricsServiceProvider() {
return new JmxServiceProviderImpl(configuration, reconUtils);
}

/**
* Returns the Prometheus endpoint if configured. Otherwise returns null.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ public final class ReconServerConfigKeys {
public static final int
OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3;

public static final String OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT =
"ozone.recon.dn.metrics.collection.thread.count";
public static final int OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT = 10;

public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY =
"ozone.recon.dn.metrics.collection.minimum.api.delay";
public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT = "30s";

/**
* Private constructor for utility class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,4 +847,31 @@ public static String constructObjectPathWithPrefix(long... ids) {
}
return pathBuilder.toString();
}

public static Map<String, Object> getMetricsData(List<Map<String, Object>> metrics, String beanName) {
if (metrics == null || StringUtils.isEmpty(beanName)) {
return null;
}
for (Map<String, Object> item :metrics) {
if (item.get("name").equals(beanName)) {
return item;
}
}
return null;
}

public static long extractMetricValue(Map<String, Object> metrics, String keyName) {
if (metrics == null || StringUtils.isEmpty(keyName)) {
return -1;
}
Object value = metrics.get(keyName);
if (value instanceof Long) {
return (long) value;
}
if (value instanceof Integer) {
Integer intValue = (Integer) value;
return intValue.longValue();
}
return -1;
Comment on lines +867 to +875
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The extractMetricValue method currently handles only Long and Integer types. JMX metric values can be other Number subtypes or even strings. To make this utility more robust, it should be updated to handle any Number type and also attempt to parse String values.

    Object value = metrics.get(keyName);
    if (value instanceof Number) {
      return ((Number) value).longValue();
    }
    if (value instanceof String) {
      try {
        return Long.parseLong((String) value);
      } catch (NumberFormatException e) {
        // Not a number string, fall through to return -1
      }
    }
    return -1;

}
}
Loading
Loading