Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode;


import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.GetSpaceUsed;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
Expand Down Expand Up @@ -349,7 +352,8 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY));
FS_GETSPACEUSED_JITTER_KEY,
FS_GETSPACEUSED_CLASSNAME));

public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");

Expand Down Expand Up @@ -683,6 +687,7 @@ public String reconfigurePropertyImpl(String property, String newVal)
return reconfSlowDiskParameters(property, newVal);
case FS_DU_INTERVAL_KEY:
case FS_GETSPACEUSED_JITTER_KEY:
case FS_GETSPACEUSED_CLASSNAME:
return reconfDfsUsageParameters(property, newVal);
default:
break;
Expand Down Expand Up @@ -879,7 +884,7 @@ private String reconfDfsUsageParameters(String property, String newVal)
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(interval, null);
value.updateDfsUsageConfig(interval, null, null);
}
}
} else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
Expand All @@ -891,13 +896,25 @@ private String reconfDfsUsageParameters(String property, String newVal)
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(null, jitter);
value.updateDfsUsageConfig(null, jitter, null);
}
}
} else if (property.equals(FS_GETSPACEUSED_CLASSNAME)) {
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
Class<? extends GetSpaceUsed> klass = (newVal == null ? DU.class :
Class.forName(newVal).asSubclass(GetSpaceUsed.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singer-bin Thanks for updating the PR.
We may need to use WindowsGetSpaceUsed.class if it is windows os.

if (Shell.WINDOWS) {
result = WindowsGetSpaceUsed.class;
} else {
result = DU.class;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your review, I will revise it right away. @tasanuma

result = klass.getName();
List<FsVolumeImpl> volumeList = data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(null, null, klass);
}
}
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException | IOException e) {
} catch (IllegalArgumentException | IOException | ClassNotFoundException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -240,7 +241,8 @@ public void run() {
SHUTDOWN_HOOK_PRIORITY);
}

public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException {
public void updateDfsUsageConfig(Long interval, Long jitter, Class<? extends GetSpaceUsed> klass)
throws IOException {
// Close the old dfsUsage if it is CachingGetSpaceUsed.
if (dfsUsage instanceof CachingGetSpaceUsed) {
((CachingGetSpaceUsed) dfsUsage).close();
Expand All @@ -255,6 +257,10 @@ public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException
FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
}

if (klass != null) {
config.setClass(FS_GETSPACEUSED_CLASSNAME, klass, CachingGetSpaceUsed.class);
}
// Start new dfsUsage.
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
.setVolume(volume)
Expand All @@ -269,6 +275,16 @@ public GetSpaceUsed getDfsUsage() {
return dfsUsage;
}

public void refreshSpaceUsedKlass(Configuration conf) throws IOException {
((CachingGetSpaceUsed) dfsUsage).close();
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
.setVolume(volume)
.setPath(bpDir)
.setConf(conf)
.setInitialUsed(loadDfsUsed())
.build();
}

private synchronized static void initializeAddReplicaPool(Configuration conf,
FsDatasetImpl dataset) {
if (addReplicaThreadPool == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.fs.CachingGetSpaceUsed;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
Expand Down Expand Up @@ -58,7 +60,6 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
Expand Down Expand Up @@ -86,6 +87,7 @@ public class TestDataNodeReconfiguration {
private final int NUM_NAME_NODE = 1;
private final int NUM_DATA_NODE = 10;
private MiniDFSCluster cluster;
private static long counter = 0;

@Before
public void Setup() throws IOException {
Expand Down Expand Up @@ -756,4 +758,33 @@ public void testDfsUsageParameters() throws ReconfigurationException {
}
}
}

public static class DummyCachingGetSpaceUsed extends CachingGetSpaceUsed {
public DummyCachingGetSpaceUsed(Builder builder) throws IOException {
super(builder.setInterval(1000).setJitter(0L));
}

@Override
protected void refresh() {
counter++;
}
}

@Test
public void testDfsUsageKlass() throws ReconfigurationException, InterruptedException {

long lastCounter = counter;
Thread.sleep(5000);
assertEquals(lastCounter, counter);

for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
dn.reconfigurePropertyImpl(FS_GETSPACEUSED_CLASSNAME,
DummyCachingGetSpaceUsed.class.getName());
}

lastCounter = counter;
Thread.sleep(5000);
assertTrue(counter > lastCounter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(18, outs.size());
assertEquals(19, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down