Skip to content
Closed
17 changes: 17 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3504,6 +3504,23 @@
If the buffer overflows, task reinitialization will be triggered.
</description>
</property>
<property>
<name>ozone.recon.dn.metrics.collection.thread.count</name>
<value>10</value>
<tag>OZONE, RECON, DN</tag>
<description>
The number of threads running to get metrics from jmx endpoint.
</description>
</property>
<property>
<name>ozone.recon.dn.metrics.collection.minimum.api.delay</name>
<value>30s</value>
<tag>OZONE, RECON, DN</tag>
<description>
Minimum delay in API to start a new task for Jmx collection.
It behaves like a rate limiter to avoid unnecessary task creation.
</description>
</property>
<property>
<name>ozone.scm.datanode.admin.monitor.interval</name>
<value>30s</value>
Expand Down
2 changes: 1 addition & 1 deletion hadoop-ozone/dist/src/main/compose/ozone/docker-config
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http
OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true

OZONE-SITE.XML_ozone.fs.hsync.enabled=true

OZONE-SITE.XML_ozone.recon.dn.metrics.collection.minimum.api.delay=5s
OZONE_CONF_DIR=/etc/hadoop
OZONE_LOG_DIR=/var/log/hadoop

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -46,6 +48,8 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
Expand All @@ -66,6 +70,9 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.recon.api.DataNodeMetricsService;
import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion;
import org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.ozone.test.GenericTestUtils;
Expand Down Expand Up @@ -100,6 +107,7 @@ public class TestStorageDistributionEndpoint {
private static final ObjectMapper MAPPER = new ObjectMapper();

private static final String STORAGE_DIST_ENDPOINT = "/api/v1/storageDistribution";
private static final String PENDING_DELETION_ENDPOINT = "/api/v1/pendingDeletion";

static List<Arguments> replicationConfigs() {
return Collections.singletonList(
Expand Down Expand Up @@ -131,16 +139,14 @@ public static void setup() throws Exception {
// Enhanced DataNode configuration to move pending deletion from SCM to DN faster
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
dnConf.setBlockDeletionInterval(Duration.ofMillis(100));
dnConf.setBlockDeletionInterval(Duration.ofMillis(10000));
// Increase block delete queue limit to allow more queued commands on DN
dnConf.setBlockDeleteQueueLimit(50);
// Reduce the interval for delete command worker processing
dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(100));
dnConf.setBlockDeleteCommandWorkerInterval(Duration.ofMillis(10000));
// Increase blocks deleted per interval to speed up deletion
dnConf.setBlockDeletionLimit(5000);
conf.setFromObject(dnConf);
// Increase DN delete threads for faster parallel processing
conf.setInt("ozone.datanode.block.delete.threads.max", 10);

recon = new ReconService(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
Expand Down Expand Up @@ -190,19 +196,107 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig)
}
}
waitForKeysCreated(replicationConfig);
Thread.sleep(10000);
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(STORAGE_DIST_ENDPOINT);
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
StorageCapacityDistributionResponse storageResponse =
MAPPER.readValue(response, StorageCapacityDistributionResponse.class);
GenericTestUtils.waitFor(() -> {
try {
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(STORAGE_DIST_ENDPOINT);
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
StorageCapacityDistributionResponse storageResponse =
MAPPER.readValue(response, StorageCapacityDistributionResponse.class);

assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys());
assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace());
assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes());
assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes());
assertEquals(3, storageResponse.getDataNodeUsage().size());
assertEquals(20, storageResponse.getGlobalNamespace().getTotalKeys());
assertEquals(60, storageResponse.getGlobalNamespace().getTotalUsedSpace());
assertEquals(0, storageResponse.getUsedSpaceBreakDown().getOpenKeyBytes());
assertEquals(60, storageResponse.getUsedSpaceBreakDown().getCommittedKeyBytes());
assertEquals(3, storageResponse.getDataNodeUsage().size());

return true;
} catch (Exception e) {
LOG.debug("Waiting for storage distribution assertions to pass", e);
return false;
}
}, 5000, 30000);
closeAllContainers();
fs.delete(dir1, true);
GenericTestUtils.waitFor(() -> {
try {
syncDataFromOM();
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(PENDING_DELETION_ENDPOINT + "?component=om");
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
Map<String, Integer> pendingDeletionMap =
MAPPER.readValue(response, Map.class);
assertEquals(30, pendingDeletionMap.get("totalSize"));
assertEquals(30, pendingDeletionMap.get("pendingDirectorySize") + pendingDeletionMap.get("pendingKeySize"));
return true;
} catch (Throwable e) {
LOG.debug("Waiting for storage distribution assertions to pass", e);
return false;
}
}, 2000, 60000);

GenericTestUtils.waitFor(() -> {
try {
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(PENDING_DELETION_ENDPOINT + "?component=scm");
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
ScmPendingDeletion pendingDeletion =
MAPPER.readValue(response, ScmPendingDeletion.class);
assertEquals(30, pendingDeletion.getTotalReplicatedBlockSize());
assertEquals(10, pendingDeletion.getTotalBlocksize());
assertEquals(10, pendingDeletion.getTotalBlocksCount());
return true;
} catch (Throwable e) {
LOG.debug("Waiting for storage distribution assertions to pass", e);
return false;
}
}, 2000, 60000);
GenericTestUtils.waitFor(() -> {
try {
scm.getScmHAManager().asSCMHADBTransactionBuffer().flush();
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(PENDING_DELETION_ENDPOINT + "?component=dn");
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
DataNodeMetricsServiceResponse pendingDeletion =
MAPPER.readValue(response, DataNodeMetricsServiceResponse.class);
assertNotNull(pendingDeletion);
assertEquals(30, pendingDeletion.getTotalPendingDeletion());
assertEquals(DataNodeMetricsService.MetricCollectionStatus.SUCCEEDED, pendingDeletion.getStatus());
assertEquals(pendingDeletion.getTotalNodesQueries(), pendingDeletion.getPendingDeletionPerDataNode().size());
assertEquals(0, pendingDeletion.getTotalNodeQueryFailures());
pendingDeletion.getPendingDeletionPerDataNode().forEach(dn -> {
assertEquals(10, dn.getPendingBlockSize());
});
return true;
} catch (Throwable e) {
LOG.debug("Waiting for storage distribution assertions to pass", e);
return false;
}
}, 2000, 60000);
cluster.getHddsDatanodes().get(0).stop();

GenericTestUtils.waitFor(() -> {
try {
StringBuilder urlBuilder = new StringBuilder();
urlBuilder.append(getReconWebAddress(conf))
.append(PENDING_DELETION_ENDPOINT + "?component=dn");
String response = TestReconEndpointUtil.makeHttpCall(conf, urlBuilder);
DataNodeMetricsServiceResponse pendingDeletion =
MAPPER.readValue(response, DataNodeMetricsServiceResponse.class);
assertNotNull(pendingDeletion);
assertEquals(1, pendingDeletion.getTotalNodeQueryFailures());
assertTrue(pendingDeletion.getPendingDeletionPerDataNode()
.stream()
.anyMatch(dn -> dn.getPendingBlockSize() == -1));
return true;
} catch (Throwable e) {
return false;
}
Comment thread
priyeshkaratha marked this conversation as resolved.
}, 2000, 30000);
}

private void verifyBlocksCreated(
Expand Down Expand Up @@ -266,6 +360,14 @@ private static void performOperationOnKeyContainers(
}
}

private static void closeAllContainers() {
for (ContainerInfo container :
scm.getContainerManager().getContainers()) {
scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER,
container.containerID());
}
}

@AfterEach
public void cleanup() {
assertDoesNotThrow(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.recon.ReconConfigKeys;
import org.apache.hadoop.ozone.recon.spi.MetricsServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.JmxServiceProviderImpl;
import org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -67,6 +68,15 @@ public MetricsServiceProvider getMetricsServiceProvider() {
return null;
}

/**
* Returns the configured MetricsServiceProvider implementation for Jmx.
* @param endpoint
* @return MetricsServiceProvider instance for Jmx
*/
public MetricsServiceProvider getJmxMetricsServiceProvider(String endpoint) {
return new JmxServiceProviderImpl(configuration, reconUtils, endpoint);
}

/**
* Returns the Prometheus endpoint if configured. Otherwise returns null.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ public final class ReconServerConfigKeys {
public static final int
OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3;

public static final String OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT =
"ozone.recon.dn.metrics.collection.thread.count";
public static final int OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT = 10;

public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY =
"ozone.recon.dn.metrics.collection.minimum.api.delay";
public static final String OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT = "30s";

/**
* Private constructor for utility class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,4 +847,31 @@ public static String constructObjectPathWithPrefix(long... ids) {
}
return pathBuilder.toString();
}

public static Map<String, Object> getMetricsData(List<Map<String, Object>> metrics, String beanName) {
if (metrics == null || StringUtils.isEmpty(beanName)) {
return null;
}
for (Map<String, Object> item :metrics) {
if (item.get("name").equals(beanName)) {
return item;
}
}
return null;
}

public static long extractMetricValue(Map<String, Object> metrics, String keyName) {
if (metrics == null || StringUtils.isEmpty(keyName)) {
return -1;
}
Object value = metrics.get(keyName);
if (value instanceof Long) {
return (long) value;
}
if (value instanceof Integer) {
Integer intValue = (Integer) value;
return intValue.longValue();
}
return -1;
}
}
Loading
Loading