Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
Expand Down Expand Up @@ -807,6 +809,12 @@ public Map<SCMCommandProto.Type, Integer> getCommandQueueSummary() {
return summary;
}

public boolean isPipelineCloseInProgress(UUID pipelineID) {
ClosePipelineCommandHandler handler = parentDatanodeStateMachine.getCommandDispatcher()
.getClosePipelineCommandHandler();
return handler.isPipelineInProgress(pipelineID);
}

/**
* Returns the count of the Execution.
* @return long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ public ClosePipelineCommandHandler(
this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
}

/**
* Returns true if pipeline is in progress, else false.
*
* @return boolean
*/
public boolean isPipelineInProgress(UUID pipelineID) {
Comment thread
sarvekshayr marked this conversation as resolved.
Outdated
return pipelinesInProgress.contains(pipelineID);
}

/**
* Handles a given SCM command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public CommandHandler getDeleteBlocksCommandHandler() {
return handlerMap.get(Type.deleteBlocksCommand);
}

@VisibleForTesting
Comment thread
sarvekshayr marked this conversation as resolved.
Outdated
public ClosePipelineCommandHandler getClosePipelineCommandHandler() {
return (ClosePipelineCommandHandler) handlerMap.get(Type.closePipelineCommand);
}

/**
* Dispatch the command to the correct handler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,15 @@ private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoPr
private void triggerPipelineClose(RaftGroupId groupId, String detail,
ClosePipelineInfo.Reason reasonCode) {
PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());

if (context != null) {
if (context.isPipelineCloseInProgress(pipelineID.getId())) {
LOG.debug("Skipped triggering pipeline close for {} as it is already in progress. Reason: {}",
pipelineID.getId(), detail);
return;
}
}

ClosePipelineInfo.Builder closePipelineInfo =
ClosePipelineInfo.newBuilder()
.setPipelineID(pipelineID.getProtobuf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
Expand All @@ -31,6 +35,11 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -132,6 +141,55 @@ void testCommandIdempotency() throws IOException {
.remove(any(), anyBoolean(), anyBoolean());
}

@Test
void testPendingPipelineClose() throws IOException, InterruptedException {
final List<DatanodeDetails> datanodes = getDatanodes();
final DatanodeDetails currentDatanode = datanodes.get(0);
final PipelineID pipelineID = PipelineID.randomId();
final UUID pipelineUUID = pipelineID.getId();
final SCMCommand<ClosePipelineCommandProto> command1 = new ClosePipelineCommand(pipelineID);
final SCMCommand<ClosePipelineCommandProto> command2 = new ClosePipelineCommand(pipelineID);
StateContext stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf);

final boolean shouldDeleteRatisLogDirectory = true;
XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory);
when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true);
Collection<RaftPeer> raftPeers = datanodes.stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
when(writeChannel.getServer()).thenReturn(mock(RaftServer.class));
when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode));
when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers);

CountDownLatch firstCommandStarted = new CountDownLatch(1);
CountDownLatch secondCommandSubmitted = new CountDownLatch(1);

doAnswer(invocation -> {
firstCommandStarted.countDown();
secondCommandSubmitted.await();
return null;
}).when(writeChannel).removeGroup(pipelineID.getProtobuf());

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

final ClosePipelineCommandHandler commandHandler =
new ClosePipelineCommandHandler((leader, tls) -> raftClient, singleThreadExecutor);
assertFalse(commandHandler.isPipelineInProgress(pipelineUUID));
commandHandler.handle(command1, ozoneContainer, stateContext, connectionManager);
assertTrue(firstCommandStarted.await(5, TimeUnit.SECONDS));
commandHandler.handle(command2, ozoneContainer, stateContext, connectionManager);
secondCommandSubmitted.countDown();

singleThreadExecutor.shutdown();
assertTrue(singleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS));

// Only one command should have been processed due to duplicate prevention
assertEquals(1, commandHandler.getInvocationCount());
assertFalse(commandHandler.isPipelineInProgress(pipelineUUID));
}

private List<DatanodeDetails> getDatanodes() {
final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails();
final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails();
Expand Down
Loading