Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public static void setRaftStorageDir(final RaftProperties properties,
final ConfigurationSource conf) {
String storageDir = haConf.getRatisStorageDir();
if (Strings.isNullOrEmpty(storageDir)) {
storageDir = ServerUtils.getDefaultRatisDirectory(conf);
File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
storageDir = (new File(metaDirPath, "scm-ha")).getPath();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's prefered to use SCMHA Configuration to set the storageDir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #1739 this can be set through HA config.

}
RaftServerConfigKeys.setStorageDir(properties,
Collections.singletonList(new File(storageDir)));
Expand All @@ -100,9 +101,9 @@ private static void setRaftRpcProperties(final RaftProperties properties,
Rpc.setRequestTimeout(properties, TimeDuration.valueOf(
conf.getRatisRequestTimeout(), TimeUnit.MILLISECONDS));
Rpc.setTimeoutMin(properties, TimeDuration.valueOf(
conf.getRatisRequestMinTimeout(), TimeUnit.MILLISECONDS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need to update these parameters. The cluster.waitForClusterToBeReady is supposed to wait for SCM become leader thus solve the not leader exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we only need the wait leader in cluster.waitForClusterToBeReady. The problem here is, we've set the min/max timeout un-properly. I will create a separate jira for his fix.

conf.getLeaderElectionMinTimeout(), TimeUnit.MILLISECONDS));
Rpc.setTimeoutMax(properties, TimeDuration.valueOf(
conf.getRatisRequestMaxTimeout(), TimeUnit.MILLISECONDS));
conf.getLeaderElectionMaxTimeout(), TimeUnit.MILLISECONDS));
Rpc.setSlownessTimeout(properties, TimeDuration.valueOf(
conf.getRatisNodeFailureTimeout(), TimeUnit.MILLISECONDS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,12 @@ public long getRatisRequestTimeout() {
return ratisRequestTimeout;
}

public long getRatisRequestMinTimeout() {
return ratisRequestTimeout - 1000L;
}

public long getRatisRequestMaxTimeout() {
return ratisRequestTimeout + 1000L;
public long getLeaderElectionMinTimeout() {
return ratisLeaderElectionTimeout;
}

public long getRatisLeaderElectionTimeout() {
return ratisLeaderElectionTimeout;
public long getLeaderElectionMaxTimeout() {
return ratisLeaderElectionTimeout + 200L;
}

public long getRatisNodeFailureTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ private Object invokeLocal(Method method, Object[] args)
*/
private Object invokeRatis(Method method, Object[] args)
throws Exception {
LOG.trace("Invoking method {} on target {}", method, ratisHandler);
long startTime = Time.monotonicNowNanos();
final SCMRatisResponse response = ratisHandler.submitRequest(
SCMRatisRequest.of(requestType, method.getName(), args));
LOG.info("Invoking method {} on target {}, cost {}us",
method, ratisHandler, (Time.monotonicNowNanos() - startTime) / 1000.0);
if (response.isSuccess()) {
return response.getResult();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -195,10 +196,7 @@ public RaftPeerId getPeerId() {
InetAddress localHost = InetAddress.getLocalHost();

// fetch hosts from ozone.scm.names
List<String> hosts =
Arrays.stream(conf.getTrimmedStrings(ScmConfigKeys.OZONE_SCM_NAMES))
.map(scmName -> HddsUtils.getHostName(scmName).get())
.collect(Collectors.toList());
List<String> hosts = parseHosts(conf);

final List<RaftPeer> raftPeers = new ArrayList<>();
for (int i = 0; i < hosts.size(); ++i) {
Expand Down Expand Up @@ -232,5 +230,29 @@ public RaftPeerId getPeerId() {

raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
}

private List<String> parseHosts(final ConfigurationSource conf)
throws UnknownHostException {
// fetch hosts from ozone.scm.names
List<String> hosts =
Arrays.stream(conf.getTrimmedStrings(ScmConfigKeys.OZONE_SCM_NAMES))
.map(scmName -> HddsUtils.getHostName(scmName).get())
.collect(Collectors.toList());

// if this is not a conf for a multi-server raft cluster,
// it means we are in integration test, and need to augment
// the conf to help build a single-server raft cluster.
if (hosts.size() == 0) {
// ozone.scm.names is not set
hosts.add(InetAddress.getLocalHost().getHostName());
} else if (hosts.size() == 1) {
// ozone.scm.names is set, yet the conf may not be usable.
hosts.set(0, InetAddress.getLocalHost().getHostName());
}

LOG.info("fetch hosts {} from ozone.scm.names {}.",
hosts, conf.get(ScmConfigKeys.OZONE_SCM_NAMES));
return hosts;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,126 +181,73 @@ public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
@Override
public Pipeline getPipeline(PipelineID pipelineID)
throws PipelineNotFoundException {
lock.readLock().lock();
try {
return stateManager.getPipeline(pipelineID);
} finally {
lock.readLock().unlock();
}
return stateManager.getPipeline(pipelineID);
}

@Override
public boolean containsPipeline(PipelineID pipelineID) {
lock.readLock().lock();
try {
getPipeline(pipelineID);
return true;
} catch (PipelineNotFoundException e) {
return false;
} finally {
lock.readLock().unlock();
}
}

@Override
public List<Pipeline> getPipelines() {
lock.readLock().lock();
try {
return stateManager.getPipelines();
} finally {
lock.readLock().unlock();
}
return stateManager.getPipelines();
}

@Override
public List<Pipeline> getPipelines(ReplicationType type) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type);
} finally {
lock.readLock().unlock();
}
return stateManager.getPipelines(type);
}

@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, factor);
} finally {
lock.readLock().unlock();
}
return stateManager.getPipelines(type, factor);
}

@Override
public List<Pipeline> getPipelines(ReplicationType type,
Pipeline.PipelineState state) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, state);
} finally {
lock.readLock().unlock();
}
return stateManager.getPipelines(type, state);
}

@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor,
Pipeline.PipelineState state) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, factor, state);
} finally {
lock.readLock().unlock();
}
return stateManager.getPipelines(type, factor, state);
}

@Override
public List<Pipeline> getPipelines(
ReplicationType type, ReplicationFactor factor,
Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
Collection<PipelineID> excludePipelines) {
lock.readLock().lock();
try {
return stateManager
return stateManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the dead lock solution seems is removing the read write lock? Is there an explanation why we can remove locks here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a jira for this issue, can give the explanation of this solution there.

.getPipelines(type, factor, state, excludeDns, excludePipelines);
} finally {
lock.readLock().unlock();
}
}

@Override
public void addContainerToPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
stateManager.addContainerToPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
stateManager.addContainerToPipeline(pipelineID, containerID);
}

@Override
public void removeContainerFromPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
stateManager.removeContainerFromPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
stateManager.removeContainerFromPipeline(pipelineID, containerID);
}

@Override
public NavigableSet<ContainerID> getContainersInPipeline(
PipelineID pipelineID) throws IOException {
lock.readLock().lock();
try {
return stateManager.getContainers(pipelineID);
} finally {
lock.readLock().unlock();
}
return stateManager.getContainers(pipelineID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,6 @@ public static void quasiCloseContainer(ContainerManagerV2 containerManager,
public static StorageContainerManager getScmSimple(OzoneConfiguration conf)
throws IOException, AuthenticationException {
SCMConfigurator configurator = new SCMConfigurator();
configurator.setSCMHAManager(MockSCMHAManager.getInstance(true));
Copy link
Contributor

@amaliujia amaliujia Dec 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need this line:
conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);

SCMHAManagerImpl will return isLeader immediately without enabling this HA:

    if (!SCMHAUtils.isSCMHAEnabled(conf)) {
      // When SCM HA is not enabled, the current SCM is always the leader.
      return Optional.of((long)0);
    }

Copy link
Contributor

@amaliujia amaliujia Dec 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently the cluster.waitForClusterToBeReady returns quickly even the SCM has not become leader yet.

I am guessing CI can still pass become you also have changed the rpc timeout parameters. So either the cluster wait for ready or the parameter change can solve the not leader problem.

I am not sure if we need both. Maybe we only need to keep cluster.waitForClusterToBeReady?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

return StorageContainerManager.createSCM(conf, configurator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -99,6 +100,12 @@ public void init() throws Exception {
o3fs = (OzoneFileSystem) FileSystem.get(new URI(rootPath), conf);
}

@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}

@Test
public void test() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,18 @@ public void waitForClusterToBeReady()
final int healthy = scm.getNodeCount(HEALTHY);
final boolean isNodeReady = healthy == hddsDatanodes.size();
final boolean exitSafeMode = !scm.isInSafeMode();
final boolean checkScmLeader = scm.checkLeader();

LOG.info("{}. Got {} of {} DN Heartbeats.",
isNodeReady ? "Nodes are ready" : "Waiting for nodes to be ready",
healthy, hddsDatanodes.size());
LOG.info(checkScmLeader ? "SCM became leader" :
"SCM has not become leader");
LOG.info(exitSafeMode ? "Cluster exits safe mode" :
"Waiting for cluster to exit safe mode",
healthy, hddsDatanodes.size());

return isNodeReady && exitSafeMode;
return isNodeReady && exitSafeMode && checkScmLeader;
}, 1000, waitForClusterToBeReadyTimeout);
}

Expand Down