Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
Expand Down Expand Up @@ -807,6 +809,12 @@ public Map<SCMCommandProto.Type, Integer> getCommandQueueSummary() {
return summary;
}

public boolean isPipelineCloseInProgress(UUID pipelineID) {
ClosePipelineCommandHandler handler = parentDatanodeStateMachine.getCommandDispatcher()
.getClosePipelineCommandHandler();
return handler.isPipelineCloseInProgress(pipelineID);
}

/**
* Returns the count of the Execution.
* @return long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ public ClosePipelineCommandHandler(
this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
}

/**
* Returns true if pipeline close is in progress, else false.
*
* @return boolean
*/
public boolean isPipelineCloseInProgress(UUID pipelineID) {
return pipelinesInProgress.contains(pipelineID);
}

/**
* Handles a given SCM command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public CommandHandler getDeleteBlocksCommandHandler() {
return handlerMap.get(Type.deleteBlocksCommand);
}

public ClosePipelineCommandHandler getClosePipelineCommandHandler() {
return (ClosePipelineCommandHandler) handlerMap.get(Type.closePipelineCommand);
}

/**
* Dispatch the command to the correct handler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,19 @@ private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoPr
triggerPipelineClose(groupId, b.toString(), ClosePipelineInfo.Reason.PIPELINE_FAILED);
}

private void triggerPipelineClose(RaftGroupId groupId, String detail,
@VisibleForTesting
public void triggerPipelineClose(RaftGroupId groupId, String detail,
ClosePipelineInfo.Reason reasonCode) {
PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());

if (context != null) {
if (context.isPipelineCloseInProgress(pipelineID.getId())) {
LOG.debug("Skipped triggering pipeline close for {} as it is already in progress. Reason: {}",
pipelineID.getId(), detail);
return;
}
}

ClosePipelineInfo.Builder closePipelineInfo =
ClosePipelineInfo.newBuilder()
.setPipelineID(pipelineID.getProtobuf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
Expand All @@ -31,6 +35,11 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -132,6 +141,55 @@ void testCommandIdempotency() throws IOException {
.remove(any(), anyBoolean(), anyBoolean());
}

@Test
void testPendingPipelineClose() throws IOException, InterruptedException {
final List<DatanodeDetails> datanodes = getDatanodes();
final DatanodeDetails currentDatanode = datanodes.get(0);
final PipelineID pipelineID = PipelineID.randomId();
final UUID pipelineUUID = pipelineID.getId();
final SCMCommand<ClosePipelineCommandProto> command1 = new ClosePipelineCommand(pipelineID);
final SCMCommand<ClosePipelineCommandProto> command2 = new ClosePipelineCommand(pipelineID);
StateContext stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf);

final boolean shouldDeleteRatisLogDirectory = true;
XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory);
when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true);
Collection<RaftPeer> raftPeers = datanodes.stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
when(writeChannel.getServer()).thenReturn(mock(RaftServer.class));
when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode));
when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers);

CountDownLatch firstCommandStarted = new CountDownLatch(1);
CountDownLatch secondCommandSubmitted = new CountDownLatch(1);

doAnswer(invocation -> {
firstCommandStarted.countDown();
secondCommandSubmitted.await();
return null;
}).when(writeChannel).removeGroup(pipelineID.getProtobuf());

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

final ClosePipelineCommandHandler commandHandler =
new ClosePipelineCommandHandler((leader, tls) -> raftClient, singleThreadExecutor);
assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID));
commandHandler.handle(command1, ozoneContainer, stateContext, connectionManager);
assertTrue(firstCommandStarted.await(5, TimeUnit.SECONDS));
commandHandler.handle(command2, ozoneContainer, stateContext, connectionManager);
secondCommandSubmitted.countDown();

singleThreadExecutor.shutdown();
assertTrue(singleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS));

// Only one command should have been processed due to duplicate prevention
assertEquals(1, commandHandler.getInvocationCount());
assertFalse(commandHandler.isPipelineCloseInProgress(pipelineUUID));
}

private List<DatanodeDetails> getDatanodes() {
final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails();
final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand All @@ -56,6 +59,8 @@
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.ozone.test.GenericTestUtils;
Expand All @@ -66,6 +71,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.mockito.ArgumentCaptor;
import org.slf4j.event.Level;

/**
* Tests for Pipeline Closing.
Expand Down Expand Up @@ -256,6 +262,52 @@ void testPipelineCloseWithLogFailure()
verifyCloseForPipeline(openPipeline, actionsFromDatanode);
}

@Test
@SuppressWarnings("unchecked")
void testPipelineCloseTriggersSkippedWhenAlreadyInProgress() throws Exception {
ContainerInfo allocateContainer = containerManager
.allocateContainer(RatisReplicationConfig.getInstance(
ReplicationFactor.THREE), "newTestOwner");
ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(allocateContainer,
pipelineManager.getPipeline(allocateContainer.getPipelineID()));

DatanodeStateMachine datanodeStateMachine = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine();
XceiverServerRatis xceiverRatis = (XceiverServerRatis) datanodeStateMachine.getContainer().getWriteChannel();

GenericTestUtils.setLogLevel(XceiverServerRatis.class, Level.DEBUG);
GenericTestUtils.LogCapturer xceiverLogCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverServerRatis.class);

RaftGroupId groupId = RaftGroupId.valueOf(containerWithPipeline.getPipeline().getId().getId());
PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());

ClosePipelineCommandHandler handler = datanodeStateMachine.getCommandDispatcher().getClosePipelineCommandHandler();

Field pipelinesInProgressField = handler.getClass().getDeclaredField("pipelinesInProgress");
pipelinesInProgressField.setAccessible(true);
Set<UUID> pipelinesInProgress = (Set<UUID>) pipelinesInProgressField.get(handler);

try {
pipelinesInProgress.add(pipelineID.getId());

String detail = "test duplicate trigger ";
int numOfDuplicateTriggers = 10;
for (int i = 1; i <= numOfDuplicateTriggers; i++) {
xceiverRatis.triggerPipelineClose(groupId, detail + i, ClosePipelineInfo.Reason.PIPELINE_FAILED);
}

String xceiverLogs = xceiverLogCapturer.getOutput();
int skippedCount = StringUtils.countMatches(xceiverLogs.toLowerCase(), "skipped triggering pipeline close");
assertEquals(numOfDuplicateTriggers, skippedCount);
} finally {
pipelinesInProgress.remove(pipelineID.getId());
xceiverLogCapturer.stopCapturing();

pipelineManager.closePipeline(containerWithPipeline.getPipeline().getId());
pipelineManager.deletePipeline(containerWithPipeline.getPipeline().getId());
}
}

private boolean verifyCloseForPipeline(Pipeline pipeline,
PipelineActionsFromDatanode report) {
UUID uuidToFind = pipeline.getId().getId();
Expand Down