Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ protected FSImage(Configuration conf,

this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
FSImageFormatProtobuf.initParallelLoad(conf);
}

void format(FSNamesystem fsn, String clusterId, boolean force)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ public void load(File file, boolean requireSameLayoutVersion)
* the layout version.
*/
public static LoaderDelegator newLoader(Configuration conf, FSNamesystem fsn) {

return new LoaderDelegator(conf, fsn);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public final class FSImageFormatProtobuf {
private static final Logger LOG = LoggerFactory
.getLogger(FSImageFormatProtobuf.class);

private static volatile boolean enableParallelLoad = false;

public static final class LoaderContext {
private SerialNumberManager.StringTable stringTable;
private final ArrayList<INodeReference> refList = Lists.newArrayList();
Expand Down Expand Up @@ -582,9 +584,7 @@ private void loadErasureCodingSection(InputStream in)
}

private static boolean enableParallelSaveAndLoad(Configuration conf) {
boolean loadInParallel =
conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
boolean loadInParallel = enableParallelLoad;
boolean compressionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
Expand All @@ -600,6 +600,20 @@ private static boolean enableParallelSaveAndLoad(Configuration conf) {
return loadInParallel;
}

public static void initParallelLoad(Configuration conf) {
enableParallelLoad =
conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
}

public static void refreshParallelSaveAndLoad(boolean enable) {
enableParallelLoad = enable;
}

public static boolean getEnableParallelLoad() {
return enableParallelLoad;
}

public static final class Saver {
public static final int CHECK_CANCEL_INTERVAL = 4096;
private boolean writeSubSections = false;
Expand Down Expand Up @@ -640,10 +654,6 @@ public int getInodesPerSubSection() {
return inodesPerSubSection;
}

public boolean shouldWriteSubSections() {
return writeSubSections;
}

/**
* Commit the length and offset of a fsimage section to the summary index,
* including the sub section, which will be committed before the section is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
Expand Down Expand Up @@ -325,7 +327,8 @@ public enum OperationCategory {
DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY));
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
DFS_IMAGE_PARALLEL_LOAD_KEY));

private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
Expand Down Expand Up @@ -2184,6 +2187,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
.equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
reconfBlockPlacementPolicy();
return newVal;
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
return reconfigureParallelLoad(newVal);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
Expand Down Expand Up @@ -2359,6 +2364,17 @@ String reconfigureSPSModeEvent(String newVal, String property)
return newVal;
}

String reconfigureParallelLoad(String newVal) {
boolean enableParallelLoad;
if (newVal == null) {
enableParallelLoad = DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
} else {
enableParallelLoad = Boolean.parseBoolean(newVal);
}
FSImageFormatProtobuf.refreshParallelSaveAndLoad(enableParallelLoad);
return Boolean.toString(enableParallelLoad);
}

@Override // ReconfigurableBase
protected Configuration getNewConf() {
return new HdfsConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.Before;
import org.junit.After;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.junit.Assert.*;

import org.slf4j.Logger;
Expand Down Expand Up @@ -378,6 +379,21 @@ public void testBlockInvalidateLimitAfterReconfigured()
datanodeManager.getBlockInvalidateLimit());
}

@Test
public void testEnableParallelLoadAfterReconfigured()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();

// By default, enableParallelLoad is false
assertEquals(false, FSImageFormatProtobuf.getEnableParallelLoad());

nameNode.reconfigureProperty(DFS_IMAGE_PARALLEL_LOAD_KEY,
Boolean.toString(true));

// After reconfigured, enableParallelLoad is true
assertEquals(true, FSImageFormatProtobuf.getEnableParallelLoad());
}

@After
public void shutDown() throws IOException {
if (cluster != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
Expand Down Expand Up @@ -429,11 +430,12 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(12, outs.size());
assertEquals(13, outs.size());
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(4));
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(5));
assertEquals(errs.size(), 0);
}

Expand Down