Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,22 @@ public void handle(SCMCommand command, OzoneContainer container,
SCMCommandProto.Type.deleteBlocksCommand, command.getType());
return;
}

DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command,
container, context, connectionManager);
try {
DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command,
container, context, connectionManager);
deleteCommandQueues.add(cmd);
} catch (IllegalStateException e) {
String dnId = context.getParent().getDatanodeDetails().getUuidString();
Consumer<CommandStatus> updateFailure = (cmdStatus) -> {
cmdStatus.markAsFailed();
ContainerBlocksDeletionACKProto emptyACK =
ContainerBlocksDeletionACKProto
.newBuilder()
.setDnId(dnId)
.build();
((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(emptyACK);
};
updateCommandStatus(cmd.getContext(), cmd.getCmd(), updateFailure, LOG);
LOG.warn("Command is discarded because of the command queue is full");
}
}
Expand Down Expand Up @@ -382,9 +392,13 @@ private void processCmd(DeleteCmdInfo cmd) {
} finally {
final ContainerBlocksDeletionACKProto deleteAck =
blockDeletionACK;
final boolean status = cmdExecuted;
final boolean executedStatus = cmdExecuted;
Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
cmdStatus.setStatus(status);
if (executedStatus) {
cmdStatus.markAsExecuted();
} else {
cmdStatus.markAsFailed();
}
((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(deleteAck);
};
updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,22 @@ public String getMsg() {
*
* @param status
*/
public void setStatus(Status status) {
private void setStatus(Status status) {
this.status = status;
}

public void setStatus(boolean cmdExecuted) {
setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
/**
* Marks the command status as executed.
*/
public void markAsExecuted() {
setStatus(Status.EXECUTED);
}

/**
* Marks the command status as failed.
*/
public void markAsFailed() {
setStatus(Status.FAILED);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,35 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.SchemaHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand All @@ -41,6 +53,7 @@
.DeleteBlockTransactionResult;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -51,6 +64,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.BLOCK_DELETE_COMMAND_WORKER_INTERVAL;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.BLOCK_DELETE_COMMAND_WORKER_INTERVAL_DEFAULT;
Expand All @@ -70,7 +84,8 @@
*/
@Timeout(300)
public class TestDeleteBlocksCommandHandler {

@TempDir
private Path folder;
private OzoneConfiguration conf;
private ContainerLayoutVersion layout;
private OzoneContainer ozoneContainer;
Expand Down Expand Up @@ -278,6 +293,43 @@ public void testDeleteCmdWorkerInterval(
Assertions.assertEquals(deleteCmdWorker.getInterval(), 4000);
}

@Test
public void testDeleteBlockCommandHandleWhenDeleteCommandQueuesFull()
throws IOException {
int blockDeleteQueueLimit = 5;
// Setting up the test environment
OzoneConfiguration configuration = new OzoneConfiguration();
configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.toString());
DatanodeDetails datanodeDetails = MockDatanodeDetails.randomDatanodeDetails();
DatanodeConfiguration dnConf =
configuration.getObject(DatanodeConfiguration.class);
OzoneContainer container = ContainerTestUtils.getOzoneContainer(datanodeDetails, configuration);
DatanodeStateMachine stateMachine = Mockito.mock(DatanodeStateMachine.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(datanodeDetails);
StateContext context = new StateContext(configuration,
Mockito.mock(DatanodeStateMachine.DatanodeStates.class),
stateMachine, "");

// Set Queue limit
dnConf.setBlockDeleteQueueLimit(blockDeleteQueueLimit);
handler = new DeleteBlocksCommandHandler(
container, configuration, dnConf, "");

// Check if the command status is as expected: PENDING when queue is not full, FAILED when queue is full
for (int i = 0; i < blockDeleteQueueLimit + 2; i++) {
DeleteBlocksCommand deleteBlocksCommand = new DeleteBlocksCommand(emptyList());
context.addCommand(deleteBlocksCommand);
handler.handle(deleteBlocksCommand, container, context, Mockito.mock(SCMConnectionManager.class));
CommandStatus cmdStatus = context.getCmdStatus(deleteBlocksCommand.getId());
if (i < blockDeleteQueueLimit) {
Assertions.assertEquals(cmdStatus.getStatus(), Status.PENDING);
} else {
Assertions.assertEquals(cmdStatus.getStatus(), Status.FAILED);
Assertions.assertEquals(cmdStatus.getProtoBufMessage().getBlockDeletionAck().getResultsCount(), 0);
}
}
}

private DeletedBlocksTransaction createDeletedBlocksTransaction(long txID,
long containerID) {
return DeletedBlocksTransaction.newBuilder()
Expand Down