Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.ozone.common.BlockGroup;

/**
*
* Block APIs.
* Container is transparent to these APIs.
*/
public interface BlockManager extends Closeable,
EventHandler<SafeModeStatus> {
public interface BlockManager extends Closeable {
/**
* Allocates a new block for a given size.
* @param size - Block Size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
Expand All @@ -46,10 +44,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.UniqueId;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.common.BlockGroup;
Expand All @@ -72,6 +67,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// Currently only user of the block service is Ozone, CBlock manages blocks
// by itself and does not rely on the Block service offered by SCM.

private final StorageContainerManager scm;
private final PipelineManager pipelineManager;
private final ContainerManagerV2 containerManager;

Expand All @@ -81,7 +77,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private final SCMBlockDeletingService blockDeletingService;

private ObjectName mxBean;
private SafeModePrecheck safeModePrecheck;
private PipelineChoosePolicy pipelineChoosePolicy;

/**
Expand All @@ -94,6 +89,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
public BlockManagerImpl(final ConfigurationSource conf,
final StorageContainerManager scm) {
Objects.requireNonNull(scm, "SCM cannot be null");
this.scm = scm;
this.pipelineManager = scm.getPipelineManager();
this.containerManager = scm.getContainerManager();
this.pipelineChoosePolicy = scm.getPipelineChoosePolicy();
Expand All @@ -116,9 +112,8 @@ public BlockManagerImpl(final ConfigurationSource conf,
TimeUnit.MILLISECONDS);
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog, containerManager,
scm.getScmNodeManager(), scm.getEventQueue(), svcInterval,
serviceTimeout, conf);
safeModePrecheck = new SafeModePrecheck(conf);
scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(),
svcInterval, serviceTimeout, conf);
}

/**
Expand Down Expand Up @@ -158,7 +153,10 @@ public AllocatedBlock allocateBlock(final long size, ReplicationType type,
if (LOG.isTraceEnabled()) {
LOG.trace("Size : {} , type : {}, factor : {} ", size, type, factor);
}
ScmUtils.preCheck(ScmOps.allocateBlock, safeModePrecheck);
if (scm.getScmContext().isInSafeMode()) {
throw new SCMException("SafeModePrecheck failed for allocateBlock",
SCMException.ResultCodes.SAFE_MODE_EXCEPTION);
}
if (size < 0 || size > containerSize) {
LOG.warn("Invalid block size requested : {}", size);
throw new SCMException("Unsupported block size: " + size,
Expand Down Expand Up @@ -294,8 +292,10 @@ private AllocatedBlock newBlock(ContainerInfo containerInfo)
@Override
public void deleteBlocks(List<BlockGroup> keyBlocksInfoList)
throws IOException {
ScmUtils.preCheck(ScmOps.deleteBlock, safeModePrecheck);

if (scm.getScmContext().isInSafeMode()) {
throw new SCMException("SafeModePrecheck failed for deleteBlocks",
SCMException.ResultCodes.SAFE_MODE_EXCEPTION);
}
Map<Long, List<Long>> containerBlocks = new HashMap<>();
// TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted.
Expand Down Expand Up @@ -365,26 +365,13 @@ public SCMBlockDeletingService getSCMBlockDeletingService() {
return this.blockDeletingService;
}

/**
* Returns status of scm safe mode determined by SAFE_MODE_STATUS event.
* */
public boolean isScmInSafeMode() {
return this.safeModePrecheck.isInSafeMode();
}

/**
* Get class logger.
* */
public static Logger getLogger() {
return LOG;
}

@Override
public void onMessage(SafeModeStatus status,
EventPublisher publisher) {
this.safeModePrecheck.setInSafeMode(status.isInSafeMode());
}

/**
* This class uses system current time milliseconds to generate unique id.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.server.events.EventPublisher;
Expand All @@ -40,10 +41,12 @@
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -64,19 +67,22 @@ public class SCMBlockDeletingService extends BackgroundService {
private final ContainerManagerV2 containerManager;
private final NodeManager nodeManager;
private final EventPublisher eventPublisher;
private final SCMContext scmContext;

private int blockDeleteLimitSize;

@SuppressWarnings("parameternumber")
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
ContainerManagerV2 containerManager, NodeManager nodeManager,
EventPublisher eventPublisher, Duration interval, long serviceTimeout,
ConfigurationSource conf) {
EventPublisher eventPublisher, SCMContext scmContext,
Duration interval, long serviceTimeout, ConfigurationSource conf) {
super("SCMBlockDeletingService", interval.toMillis(), TimeUnit.MILLISECONDS,
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
this.deletedBlockLog = deletedBlockLog;
this.containerManager = containerManager;
this.nodeManager = nodeManager;
this.eventPublisher = eventPublisher;
this.scmContext = scmContext;

blockDeleteLimitSize =
conf.getObject(ScmConfig.class).getBlockDeletionLimit();
Expand Down Expand Up @@ -146,9 +152,10 @@ public EmptyTaskResult call() throws Exception {
// We should stop caching new commands if num of un-processed
// command is bigger than a limit, e.g 50. In case datanode goes
// offline for sometime, the cached commands be flooded.
SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
command.setTerm(scmContext.getTerm());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also catch the NotLeaderException like other places does? There are some other places we don't catch this exception that is swallowed by IOException.

        } catch (NotLeaderException nle) {

        }  catch (IOException e) {
          // We may tolerate a number of failures for sometime
          // but if it continues to fail, at some point we need to raise
          // an exception and probably fail the SCM ? At present, it simply
          // continues to retry the scanning.
          LOG.error("Failed to get block deletion transactions from delTX log",
              e);
          return EmptyTaskResult.newResult();
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will fix them in the next patch.

eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dnId,
new DeleteBlocksCommand(dnTXs)));
new CommandForDatanode<>(dnId, command));
if (LOG.isDebugEnabled()) {
LOG.debug(
"Added delete block command for datanode {} in the queue,"
Expand All @@ -170,6 +177,9 @@ public EmptyTaskResult call() throws Exception {
transactions.getBlocksDeleted(),
transactions.getDatanodeTransactionMap().size(),
Time.monotonicNow() - startTime);
} catch (NotLeaderException nle) {
LOG.warn("Skip current run, since not leader any more.", nle);
return EmptyTaskResult.newResult();
} catch (IOException e) {
// We may tolerate a number of failures for sometime
// but if it continues to fail, at some point we need to raise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;

import java.io.IOException;
Expand All @@ -46,6 +49,7 @@
public class AbstractContainerReportHandler {

private final ContainerManagerV2 containerManager;
private final SCMContext scmContext;
private final Logger logger;

/**
Expand All @@ -56,10 +60,13 @@ public class AbstractContainerReportHandler {
* @param logger Logger to be used for logging
*/
AbstractContainerReportHandler(final ContainerManagerV2 containerManager,
final SCMContext scmContext,
final Logger logger) {
Preconditions.checkNotNull(containerManager);
Preconditions.checkNotNull(scmContext);
Preconditions.checkNotNull(logger);
this.containerManager = containerManager;
this.scmContext = scmContext;
this.logger = logger;
}

Expand Down Expand Up @@ -317,11 +324,17 @@ protected ContainerManagerV2 getContainerManager() {

protected void deleteReplica(ContainerID containerID, DatanodeDetails dn,
EventPublisher publisher, String reason) {
final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(containerID.getId(), true);
final CommandForDatanode datanodeCommand = new CommandForDatanode<>(
dn.getUuid(), deleteCommand);
publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
SCMCommand<?> command = new DeleteContainerCommand(
containerID.getId(), true);
try {
command.setTerm(scmContext.getTerm());
} catch (NotLeaderException nle) {
logger.warn("Skip sending delete container command," +
" since not leader SCM", nle);
return;
}
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dn.getUuid(), command));
logger.info("Sending delete container command for " + reason +
" container {} to datanode {}", containerID.getId(), dn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,11 +53,14 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {

private final PipelineManager pipelineManager;
private final ContainerManagerV2 containerManager;
private final SCMContext scmContext;

public CloseContainerEventHandler(final PipelineManager pipelineManager,
final ContainerManagerV2 containerManager) {
final ContainerManagerV2 containerManager,
final SCMContext scmContext) {
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
this.scmContext = scmContext;
}

@Override
Expand All @@ -74,19 +79,21 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
.getContainer(containerID);
// Send close command to datanodes, if the container is in CLOSING state
if (container.getState() == LifeCycleState.CLOSING) {
SCMCommand<?> command = new CloseContainerCommand(
containerID.getId(), container.getPipelineID());
command.setTerm(scmContext.getTerm());

final CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(
containerID.getId(), container.getPipelineID());

getNodes(container).forEach(node -> publisher.fireEvent(
DATANODE_COMMAND,
new CommandForDatanode<>(node.getUuid(), closeContainerCommand)));
getNodes(container).forEach(node ->
publisher.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(node.getUuid(), command)));
} else {
LOG.warn("Cannot close container {}, which is in {} state.",
containerID, container.getState());
}

} catch (NotLeaderException nle) {
LOG.warn("Skip sending close container command,"
+ " since current SCM is not leader.", nle);
} catch (IOException | InvalidStateTransitionException ex) {
LOG.error("Failed to close the container {}.", containerID, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
Expand Down Expand Up @@ -73,8 +74,9 @@ public class ContainerReportHandler extends AbstractContainerReportHandler
*/
public ContainerReportHandler(final NodeManager nodeManager,
final ContainerManagerV2 containerManager,
final SCMContext scmContext,
OzoneConfiguration conf) {
super(containerManager, LOG);
super(containerManager, scmContext, LOG);
this.nodeManager = nodeManager;
this.containerManager = containerManager;

Expand All @@ -88,7 +90,7 @@ public ContainerReportHandler(final NodeManager nodeManager,

public ContainerReportHandler(final NodeManager nodeManager,
final ContainerManagerV2 containerManager) {
this(nodeManager, containerManager, null);
this(nodeManager, containerManager, SCMContext.emptyContext(), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos
.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
Expand All @@ -47,8 +48,9 @@ public class IncrementalContainerReportHandler extends

public IncrementalContainerReportHandler(
final NodeManager nodeManager,
final ContainerManagerV2 containerManager) {
super(containerManager, LOG);
final ContainerManagerV2 containerManager,
final SCMContext scmContext) {
super(containerManager, scmContext, LOG);
this.nodeManager = nodeManager;
}

Expand Down
Loading