Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMHAManagerImpl.class);

private final SCMRatisServerImpl ratisServer;
private final SCMRatisServer ratisServer;
private final ConfigurationSource conf;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -144,7 +143,6 @@ public static PipelineManagerV2Impl newPipelineManager(
@Override
public Pipeline createPipeline(ReplicationType type,
ReplicationFactor factor) throws IOException {
checkLeader();
if (!isPipelineCreationAllowed() && factor != ReplicationFactor.ONE) {
LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
"complete");
Expand Down Expand Up @@ -275,7 +273,6 @@ public List<Pipeline> getPipelines(
@Override
public void addContainerToPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
checkLeader();
lock.writeLock().lock();
try {
stateManager.addContainerToPipeline(pipelineID, containerID);
Expand All @@ -287,7 +284,6 @@ public void addContainerToPipeline(
@Override
public void removeContainerFromPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
checkLeader();
lock.writeLock().lock();
try {
stateManager.removeContainerFromPipeline(pipelineID, containerID);
Expand All @@ -299,7 +295,6 @@ public void removeContainerFromPipeline(
@Override
public NavigableSet<ContainerID> getContainersInPipeline(
PipelineID pipelineID) throws IOException {
checkLeader();
lock.readLock().lock();
try {
return stateManager.getContainers(pipelineID);
Expand All @@ -310,13 +305,11 @@ public NavigableSet<ContainerID> getContainersInPipeline(

@Override
public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
checkLeader();
return stateManager.getNumberOfContainers(pipelineID);
}

@Override
public void openPipeline(PipelineID pipelineId) throws IOException {
checkLeader();
lock.writeLock().lock();
try {
Pipeline pipeline = stateManager.getPipeline(pipelineId);
Expand All @@ -342,7 +335,6 @@ public void openPipeline(PipelineID pipelineId) throws IOException {
* @throws IOException
*/
protected void removePipeline(Pipeline pipeline) throws IOException {
checkLeader();
pipelineFactory.close(pipeline.getType(), pipeline);
PipelineID pipelineID = pipeline.getId();
lock.writeLock().lock();
Expand All @@ -364,7 +356,6 @@ protected void removePipeline(Pipeline pipeline) throws IOException {
*/
protected void closeContainersForPipeline(final PipelineID pipelineId)
throws IOException {
checkLeader();
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
for (ContainerID containerID : containerIDs) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
Expand All @@ -380,7 +371,6 @@ protected void closeContainersForPipeline(final PipelineID pipelineId)
@Override
public void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException {
checkLeader();
PipelineID pipelineID = pipeline.getId();
lock.writeLock().lock();
try {
Expand Down Expand Up @@ -410,8 +400,6 @@ public void closePipeline(Pipeline pipeline, boolean onTimeout)
@Override
public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException {
checkLeader();

Instant currentTime = Instant.now();
Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
Expand Down Expand Up @@ -484,7 +472,6 @@ public int minPipelineLimit(Pipeline pipeline) {
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException {
checkLeader();
stateManager.updatePipelineState(pipelineID.getProtobuf(),
HddsProtos.PipelineState.PIPELINE_OPEN);
}
Expand All @@ -498,7 +485,6 @@ public void activatePipeline(PipelineID pipelineID)
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
checkLeader();
stateManager.updatePipelineState(pipelineID.getProtobuf(),
HddsProtos.PipelineState.PIPELINE_DORMANT);
}
Expand All @@ -513,7 +499,6 @@ public void deactivatePipeline(PipelineID pipelineID)
@Override
public void waitPipelineReady(PipelineID pipelineID, long timeout)
throws IOException {
checkLeader();
long st = Time.monotonicNow();
if (timeout == 0) {
timeout = pipelineWaitDefaultTimeout;
Expand Down Expand Up @@ -546,7 +531,6 @@ public void waitPipelineReady(PipelineID pipelineID, long timeout)

@Override
public Map<String, Integer> getPipelineInfo() throws NotLeaderException {
checkLeader();
final Map<String, Integer> pipelineInfo = new HashMap<>();
for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
pipelineInfo.put(state.toString(), 0);
Expand Down Expand Up @@ -632,21 +616,10 @@ public void setPipelineProvider(ReplicationType replicationType,
public StateManager getStateManager() {
return stateManager;
}

public void setScmhaManager(SCMHAManager scmhaManager) {
this.scmhaManager = scmhaManager;
}

/**
* return term of underlying RaftServer if role of SCM is leader.
* @throws NotLeaderException when it's not leader.
*/
private long checkLeader() throws NotLeaderException {
Optional<Long> termOpt = scmhaManager.isLeader();
if (!termOpt.isPresent()) {
throw scmhaManager.triggerNotLeaderException();
}
return termOpt.get();
@VisibleForTesting
public SCMHAManager getScmhaManager() {
return scmhaManager;
}

private void setBackgroundPipelineCreator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ public static void quasiCloseContainer(ContainerManager containerManager,
public static StorageContainerManager getScmSimple(OzoneConfiguration conf)
throws IOException, AuthenticationException {
SCMConfigurator configurator = new SCMConfigurator();
configurator.setSCMHAManager(MockSCMHAManager.getInstance());
configurator.setSCMHAManager(MockSCMHAManager.getInstance(true));
return StorageContainerManager.createSCM(conf, configurator);
}

Expand All @@ -492,7 +492,7 @@ public static StorageContainerManager getScmSimple(OzoneConfiguration conf)
public static StorageContainerManager getScm(OzoneConfiguration conf)
throws IOException, AuthenticationException {
SCMConfigurator configurator = new SCMConfigurator();
configurator.setSCMHAManager(MockSCMHAManager.getInstance());
configurator.setSCMHAManager(MockSCMHAManager.getInstance(true));
return getScm(conf, configurator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void setUp() throws Exception {
// Override the default Node Manager and SCMHAManager
// in SCM with the Mock one.
nodeManager = new MockNodeManager(true, 10);
scmHAManager = MockSCMHAManager.getInstance();
scmHAManager = MockSCMHAManager.getInstance(true);

eventQueue = new EventQueue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static void setUp() throws Exception {
pipelineManager =
PipelineManagerV2Impl.newPipelineManager(
configuration,
MockSCMHAManager.getInstance(),
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void setUp() throws Exception {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
containerManager = new ContainerManagerImpl(conf,
MockSCMHAManager.getInstance(), pipelineManager,
MockSCMHAManager.getInstance(true), pipelineManager,
SCMDBDefinition.CONTAINERS.getTable(dbStore));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static void setUp() throws Exception {
SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(conf);
pipelineManager = PipelineManagerV2Impl.newPipelineManager(
conf,
MockSCMHAManager.getInstance(),
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
new EventQueue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,16 @@ public final class MockSCMHAManager implements SCMHAManager {
private final SCMRatisServer ratisServer;
private boolean isLeader;

public static SCMHAManager getInstance() {
return new MockSCMHAManager();
}

public static SCMHAManager getLeaderInstance() {
MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
mockSCMHAManager.setIsLeader(true);
return mockSCMHAManager;
}

public static SCMHAManager getFollowerInstance() {
MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
mockSCMHAManager.setIsLeader(false);
return mockSCMHAManager;
public static SCMHAManager getInstance(boolean isLeader) {
return new MockSCMHAManager(isLeader);
}

/**
* Creates MockSCMHAManager instance.
*/
private MockSCMHAManager() {
private MockSCMHAManager(boolean isLeader) {
this.ratisServer = new MockRatisServer();
this.isLeader = true;
this.isLeader = isLeader;
}

@Override
Expand Down Expand Up @@ -127,7 +115,7 @@ public NotLeaderException triggerNotLeaderException() {
null, new ArrayList<>());
}

private static class MockRatisServer implements SCMRatisServer {
private class MockRatisServer implements SCMRatisServer {

private Map<RequestType, Object> handlers =
new EnumMap<>(RequestType.class);
Expand All @@ -148,27 +136,40 @@ public SCMRatisResponse submitRequest(final SCMRatisRequest request)
final RaftGroupMemberId raftId = RaftGroupMemberId.valueOf(
RaftPeerId.valueOf("peer"), RaftGroupId.randomId());
RaftClientReply reply;
try {
final Message result = process(request);
reply = RaftClientReply.newBuilder()
.setClientId(ClientId.randomId())
.setServerId(raftId)
.setGroupId(RaftGroupId.emptyGroupId())
.setCallId(1L)
.setSuccess(true)
.setMessage(result)
.setException(null)
.setLogIndex(1L)
.build();
} catch (Exception ex) {
if (isLeader().isPresent()) {
try {
final Message result = process(request);
reply = RaftClientReply.newBuilder()
.setClientId(ClientId.randomId())
.setServerId(raftId)
.setGroupId(RaftGroupId.emptyGroupId())
.setCallId(1L)
.setSuccess(true)
.setMessage(result)
.setException(null)
.setLogIndex(1L)
.build();
} catch (Exception ex) {
reply = RaftClientReply.newBuilder()
.setClientId(ClientId.randomId())
.setServerId(raftId)
.setGroupId(RaftGroupId.emptyGroupId())
.setCallId(1L)
.setSuccess(false)
.setMessage(Message.EMPTY)
.setException(new StateMachineException(raftId, ex))
.setLogIndex(1L)
.build();
}
} else {
reply = RaftClientReply.newBuilder()
.setClientId(ClientId.randomId())
.setServerId(raftId)
.setGroupId(RaftGroupId.emptyGroupId())
.setCallId(1L)
.setSuccess(false)
.setMessage(Message.EMPTY)
.setException(new StateMachineException(raftId, ex))
.setException(triggerNotLeaderException())
.setLogIndex(1L)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ SCMContainerManager createContainerManager(ConfigurationSource config,
PipelineManager pipelineManager =
PipelineManagerV2Impl.newPipelineManager(
config,
MockSCMHAManager.getInstance(),
MockSCMHAManager.getInstance(true),
scmNodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue);
Expand Down
Loading