Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SCMHAConfiguration {
private String ratisBindHost = "0.0.0.0";

@Config(key = "ratis.bind.port",
type = ConfigType.STRING,
type = ConfigType.INT,
defaultValue = "9865",
tags = {OZONE, SCM, HA, RATIS},
description = "Port used by SCM for Ratis Server."
Expand All @@ -78,7 +78,7 @@ public class SCMHAConfiguration {
description = "The size of the raft segment used by Apache Ratis on" +
" SCM. (16 KB by default)"
)
private long raftSegmentSize = 16L * 1024L;
private double raftSegmentSize = 16L * 1024L;

@Config(key = "ratis.segment.preallocated.size",
type = ConfigType.SIZE,
Expand All @@ -87,7 +87,7 @@ public class SCMHAConfiguration {
description = "The size of the buffer which is preallocated for" +
" raft segment used by Apache Ratis on SCM.(16 KB by default)"
)
private long raftSegmentPreAllocatedSize = 16 * 1024;
private double raftSegmentPreAllocatedSize = 16 * 1024;

@Config(key = "ratis.log.appender.queue.num-elements",
type = ConfigType.INT,
Expand All @@ -103,7 +103,7 @@ public class SCMHAConfiguration {
tags = {SCM, OZONE, HA, RATIS},
description = "Byte limit for Raft's Log Worker queue."
)
private int raftLogAppenderQueueByteLimit = 32 * 1024 * 1024;
private double raftLogAppenderQueueByteLimit = 32 * 1024 * 1024;

@Config(key = "ratis.log.purge.gap",
type = ConfigType.INT,
Expand Down Expand Up @@ -174,19 +174,19 @@ public String getRatisRpcType() {
}

public long getRaftSegmentSize() {
return raftSegmentSize;
return (long)raftSegmentSize;
}

public long getRaftSegmentPreAllocatedSize() {
return raftSegmentPreAllocatedSize;
return (long)raftSegmentPreAllocatedSize;
}

public int getRaftLogAppenderQueueNum() {
return raftLogAppenderQueueNum;
}

public int getRaftLogAppenderQueueByteLimit() {
return raftLogAppenderQueueByteLimit;
return (int)raftLogAppenderQueueByteLimit;
}

public int getRaftLogPurgeGap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ public void start(OzoneConfiguration config)

pipelineTable = PIPELINES.getTable(store);

checkTableStatus(pipelineTable, PIPELINES.getName());

containerTable = CONTAINERS.getTable(store);

checkTableStatus(containerTable, CONTAINERS.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public Pipeline createPipeline(ReplicationType type,

@Override
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
List<DatanodeDetails> nodes) {
List<DatanodeDetails> nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
lock.writeLock().lock();
Expand Down Expand Up @@ -207,6 +207,7 @@ public List<Pipeline> getPipelines() {

@Override
public List<Pipeline> getPipelines(ReplicationType type) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type);
} finally {
Expand Down Expand Up @@ -582,6 +583,17 @@ public void allowPipelineCreation() {
this.pipelineCreationAllowed.set(true);
}

@VisibleForTesting
public void setPipelineProvider(ReplicationType replicationType,
PipelineProvider provider) {
pipelineFactory.setProvider(replicationType, provider);
}

@VisibleForTesting
public StateManager getStateManager() {
return stateManager;
}

private void setBackgroundPipelineCreator(
BackgroundPipelineCreator backgroundPipelineCreator) {
this.backgroundPipelineCreator = backgroundPipelineCreator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
Expand Down Expand Up @@ -51,7 +52,8 @@
* ReplicationManager replicationManager;
* SCMSafeModeManager scmSafeModeManager;
* CertificateServer certificateServer;
* SCMMetadata scmMetadataStore.
* SCMMetadata scmMetadataStore;
* SCMHAManager scmHAManager.
*
* If any of these are *not* specified then the default version of these
* managers are used by SCM.
Expand All @@ -67,6 +69,7 @@ public final class SCMConfigurator {
private CertificateServer certificateServer;
private SCMMetadataStore metadataStore;
private NetworkTopology networkTopology;
private SCMHAManager scmHAManager;

/**
* Allows user to specify a version of Node manager to use with this SCM.
Expand Down Expand Up @@ -148,6 +151,15 @@ public void setNetworkTopology(NetworkTopology networkTopology) {
this.networkTopology = networkTopology;
}

/**
* Allows user to specify a custom version of SCMHAManager to be
* used with this SCM.
* @param scmHaMgr - SCMHAManager.
*/
public void setSCMHAManager(SCMHAManager scmHaMgr) {
this.scmHAManager = scmHaMgr;
}

/**
* Gets SCM Node Manager.
* @return Node Manager.
Expand Down Expand Up @@ -219,4 +231,12 @@ public SCMMetadataStore getMetadataStore() {
public NetworkTopology getNetworkTopology() {
return networkTopology;
}

/**
* Get SCMHAManager.
* @return SCMHAManager.
*/
public SCMHAManager getSCMHAManager() {
return scmHAManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;
Expand Down Expand Up @@ -94,7 +96,7 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
Expand Down Expand Up @@ -170,6 +172,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final SCMStorageConfig scmStorageConfig;

private SCMMetadataStore scmMetadataStore;
private SCMHAManager scmHAManager;

private final EventQueue eventQueue;
/*
Expand Down Expand Up @@ -237,7 +240,7 @@ private StorageContainerManager(OzoneConfiguration conf)
* @param configurator - configurator
*/
private StorageContainerManager(OzoneConfiguration conf,
SCMConfigurator configurator)
SCMConfigurator configurator)
throws IOException, AuthenticationException {
super(HddsVersionInfo.HDDS_VERSION_INFO);

Expand Down Expand Up @@ -439,6 +442,12 @@ private void initializeSystemManagers(OzoneConfiguration conf,
clusterMap = new NetworkTopologyImpl(conf);
}

if (configurator.getSCMHAManager() != null) {
scmHAManager = configurator.getSCMHAManager();
} else {
scmHAManager = new SCMHAManagerImpl(conf);
}

if(configurator.getScmNodeManager() != null) {
scmNodeManager = configurator.getScmNodeManager();
} else {
Expand All @@ -455,7 +464,10 @@ private void initializeSystemManagers(OzoneConfiguration conf,
pipelineManager = configurator.getPipelineManager();
} else {
pipelineManager =
new SCMPipelineManager(conf, scmNodeManager,
PipelineManagerV2Impl.newPipelineManager(
conf,
scmHAManager,
scmNodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue);
}
Expand Down Expand Up @@ -825,6 +837,8 @@ public void start() throws IOException {
scmRatisServer.start();
}

scmHAManager.start();

ms = HddsServerUtil
.initializeMetrics(configuration, "StorageContainerManager");

Expand Down Expand Up @@ -957,6 +971,12 @@ public void stop() {
ms.stop();
}

try {
scmHAManager.shutdown();
} catch (Exception ex) {
LOG.error("SCM HA Manager stop failed", ex);
}

scmSafeModeManager.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,16 @@
*/
package org.apache.hadoop.hdds.scm;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.security.authentication.client.AuthenticationException;

/**
* Stateless helper functions for Hdds tests.
Expand Down Expand Up @@ -74,24 +67,6 @@ private HddsTestUtils() {
TestUtils.getContainerReports(containers));
}

public static StorageContainerManager getScm(OzoneConfiguration conf)
throws IOException, AuthenticationException {
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
if(scmStore.getState() != Storage.StorageState.INITIALIZED) {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
scmStore.setClusterId(clusterId);
scmStore.setScmId(scmId);
// writes the version file properties
scmStore.initialize();
}
return StorageContainerManager.createSCM(conf);
}

/**
* Creates list of ContainerInfo.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
Expand Down Expand Up @@ -455,6 +456,22 @@ public static void quasiCloseContainer(ContainerManager containerManager,

}

/**
* Construct and returns StorageContainerManager instance using the given
* configuration.
*
* @param conf OzoneConfiguration
* @return StorageContainerManager instance
* @throws IOException
* @throws AuthenticationException
*/
public static StorageContainerManager getScmSimple(OzoneConfiguration conf)
throws IOException, AuthenticationException {
SCMConfigurator configurator = new SCMConfigurator();
configurator.setSCMHAManager(MockSCMHAManager.getInstance());
return StorageContainerManager.createSCM(conf, configurator);
}

/**
* Construct and returns StorageContainerManager instance using the given
* configuration. The ports used by this StorageContainerManager are
Expand All @@ -467,7 +484,9 @@ public static void quasiCloseContainer(ContainerManager containerManager,
*/
public static StorageContainerManager getScm(OzoneConfiguration conf)
throws IOException, AuthenticationException {
return getScm(conf, new SCMConfigurator());
SCMConfigurator configurator = new SCMConfigurator();
configurator.setSCMHAManager(MockSCMHAManager.getInstance());
return getScm(conf, configurator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
Expand Down Expand Up @@ -76,8 +78,9 @@ public class TestBlockManager {
private StorageContainerManager scm;
private SCMContainerManager mapping;
private MockNodeManager nodeManager;
private SCMPipelineManager pipelineManager;
private PipelineManagerV2Impl pipelineManager;
private BlockManagerImpl blockManager;
private SCMHAManager scmHAManager;
private final static long DEFAULT_BLOCK_SIZE = 128 * MB;
private static HddsProtos.ReplicationFactor factor;
private static HddsProtos.ReplicationType type;
Expand Down Expand Up @@ -105,14 +108,20 @@ public void setUp() throws Exception {
conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 5,
TimeUnit.SECONDS);

// Override the default Node Manager in SCM with this Mock Node Manager.
// Override the default Node Manager and SCMHAManager
// in SCM with the Mock one.
nodeManager = new MockNodeManager(true, 10);
scmHAManager = MockSCMHAManager.getInstance();

eventQueue = new EventQueue();

scmMetadataStore = new SCMMetadataStoreImpl(conf);
scmMetadataStore.start(conf);
pipelineManager =
new SCMPipelineManager(conf, nodeManager,
PipelineManagerV2Impl.newPipelineManager(
conf,
scmHAManager,
nodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue);
pipelineManager.allowPipelineCreation();
Expand Down Expand Up @@ -140,6 +149,7 @@ public void emitSafeModeStatus() {
configurator.setContainerManager(containerManager);
configurator.setScmSafeModeManager(safeModeManager);
configurator.setMetadataStore(scmMetadataStore);
configurator.setSCMHAManager(scmHAManager);
scm = TestUtils.getScm(conf, configurator);

// Initialize these fields so that the tests can pass.
Expand Down
Loading