Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
Expand All @@ -59,6 +60,8 @@ public abstract class ContainerData {
// For now, we support only KeyValueContainer.
private final ContainerType containerType;

private final AtomicBoolean immediateCloseActionSent = new AtomicBoolean(false);

// Unique identifier for the container
private final long containerID;

Expand Down Expand Up @@ -171,6 +174,10 @@ public long getContainerID() {
return containerID;
}

public AtomicBoolean getImmediateCloseActionSent() {
return immediateCloseActionSent;
}

/**
* Returns the path to base dir of the container.
* @return Path to base dir.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
Expand Down Expand Up @@ -580,7 +581,9 @@ public void validateContainerCommand(
*/
private void sendCloseContainerActionIfNeeded(Container container) {
// We have to find a more efficient way to close a container.
boolean isSpaceFull = isContainerFull(container) || isVolumeFull(container);
boolean isOpen = container != null && container.getContainerState() == State.OPEN;
boolean isVolumeFull = isOpen && isVolumeFullExcludingCommittedSpace(container);
boolean isSpaceFull = isContainerFull(container) || isVolumeFull;
boolean shouldClose = isSpaceFull || isContainerUnhealthy(container);
if (shouldClose) {
ContainerData containerData = container.getContainerData();
Expand All @@ -591,6 +594,21 @@ private void sendCloseContainerActionIfNeeded(Container container) {
.setContainerID(containerData.getContainerID())
.setAction(ContainerAction.Action.CLOSE).setReason(reason).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are closing the container because of full volume, we can set the reason to VOLUME_FULL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nandakumar131 Thanks for the review! Since VOLUME_FULL isn't strictly required, in the interest of time let's merge this PR? I'd like to avoid making a proto change in this one. We can make the improvement in a future PR if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can do that as a followup.

context.addContainerActionIfAbsent(action);
if (isVolumeFull) {
HddsVolume volume = containerData.getVolume();
LOG.warn("Volume [{}] is full. containerID: {}. Volume usage: [{}].", volume, containerData.getContainerID(),
volume.getCurrentUsage());
}
AtomicBoolean immediateCloseActionSent = containerData.getImmediateCloseActionSent();
// if an immediate heartbeat has not been triggered already, trigger it now
if (immediateCloseActionSent.compareAndSet(false, true)) {
context.getParent().triggerHeartbeat();
if (isVolumeFull) {
// log only if volume is full
// don't want to log if only container is full because that is expected to happen frequently
LOG.warn("Triggered immediate heartbeat because of full volume.");
}
}
}
}

Expand All @@ -608,20 +626,8 @@ private boolean isContainerFull(Container container) {
}
}

private boolean isVolumeFull(Container container) {
boolean isOpen = Optional.ofNullable(container)
.map(cont -> cont.getContainerState() == ContainerDataProto.State.OPEN)
.orElse(Boolean.FALSE);
if (isOpen) {
HddsVolume volume = container.getContainerData().getVolume();
StorageLocationReport volumeReport = volume.getReport();
boolean full = volumeReport.getUsableSpace() <= 0;
if (full) {
LOG.info("Container {} volume is full: {}", container.getContainerData().getContainerID(), volumeReport);
}
return full;
}
return false;
private boolean isVolumeFullExcludingCommittedSpace(Container container) {
return container.getContainerData().getVolume().isVolumeFull();
}

private boolean isContainerUnhealthy(Container container) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
Expand Down Expand Up @@ -194,6 +195,12 @@ public VolumeInfoMetrics getVolumeInfoStats() {
return volumeInfoMetrics;
}

public boolean isVolumeFull() {
SpaceUsageSource currentUsage = getCurrentUsage();
// if the volume is failed, this method will implicitly return true because available space will be 0
return currentUsage.getAvailable() - getFreeSpaceToSpare(currentUsage.getCapacity()) <= 0;
}

@Override
protected StorageLocationReport.Builder reportBuilder() {
StorageLocationReport.Builder builder = super.reportBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
Expand Down Expand Up @@ -125,6 +126,16 @@ public static void init() {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(new OzoneConfiguration());
}

/**
* Tests that close container action is sent when a container is full. First two containers are created. Then we
* write to one of them to confirm normal writes are successful. Then we increase the used space of both containers
* such that they're close to full, and write to both of them simultaneously. The expectation is that close
* container action should be added for both of them and two immediate heartbeats should be sent. Next, we write
* again to the first container. This time the close container action should be queued but immediate heartbeat
* should not be sent because of throttling. This confirms that the throttling is per container.
* @param layout
* @throws IOException
*/
@ContainerLayoutTestInfo.ContainerTest
public void testContainerCloseActionWhenFull(
ContainerLayoutVersion layout) throws IOException {
Expand All @@ -141,16 +152,23 @@ public void testContainerCloseActionWhenFull(
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = newContainerSet();
StateContext context = ContainerTestUtils.getMockContext(dd, conf);
// create both containers
KeyValueContainerData containerData = new KeyValueContainerData(1L,
layout,
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
dd.getUuidString());
KeyValueContainerData containerData2 = new KeyValueContainerData(2L,
layout, (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), dd.getUuidString());
Container container = new KeyValueContainer(containerData, conf);
Container container2 = new KeyValueContainer(containerData2, conf);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
container2.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
containerSet.addContainer(container);
containerSet.addContainer(container2);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
Expand All @@ -159,6 +177,7 @@ public void testContainerCloseActionWhenFull(
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, volumeChoosingPolicy, metrics, NO_OP_ICR_SENDER));
}
// write successfully to first container
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics, null);
hddsDispatcher.setClusterId(scmId.toString());
Expand All @@ -168,15 +187,29 @@ public void testContainerCloseActionWhenFull(
responseOne.getResult());
verify(context, times(0))
.addContainerActionIfAbsent(any(ContainerAction.class));
// increment used space of both containers
containerData.setBytesUsed(Double.valueOf(
StorageUnit.MB.toBytes(950)).longValue());
containerData2.setBytesUsed(Double.valueOf(StorageUnit.MB.toBytes(950)).longValue());
ContainerCommandResponseProto responseTwo = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
ContainerCommandResponseProto responseThree = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 2L, 1L), null);
assertEquals(ContainerProtos.Result.SUCCESS,
responseTwo.getResult());
verify(context, times(1))
assertEquals(ContainerProtos.Result.SUCCESS, responseThree.getResult());
// container action should be added for both containers
verify(context, times(2))
.addContainerActionIfAbsent(any(ContainerAction.class));

DatanodeStateMachine stateMachine = context.getParent();
// immediate heartbeat should be triggered for both the containers
verify(stateMachine, times(2)).triggerHeartbeat();

// if we write again to container 1, the container action should get added but heartbeat should not get triggered
// again because of throttling
hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 3L), null);
verify(context, times(3)).addContainerActionIfAbsent(any(ContainerAction.class));
verify(stateMachine, times(2)).triggerHeartbeat(); // was called twice before
} finally {
volumeSet.shutdown();
ContainerMetrics.remove();
Expand Down Expand Up @@ -276,6 +309,7 @@ public void testContainerCloseActionWhenVolumeFull(
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = newContainerSet();
StateContext context = ContainerTestUtils.getMockContext(dd, conf);
DatanodeStateMachine stateMachine = context.getParent();
// create a 50 byte container
// available (160) > 100 (min free space) + 50 (container size)
KeyValueContainerData containerData = new KeyValueContainerData(1L,
Expand All @@ -300,14 +334,21 @@ public void testContainerCloseActionWhenVolumeFull(
conf, containerSet, volumeSet, handlers, context, metrics, null);
hddsDispatcher.setClusterId(scmId.toString());
containerData.getVolume().getVolumeUsage()
.ifPresent(usage -> usage.incrementUsedSpace(50));
usedSpace.addAndGet(50);
.ifPresent(usage -> usage.incrementUsedSpace(60));
usedSpace.addAndGet(60);
ContainerCommandResponseProto response = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
verify(context, times(1))
.addContainerActionIfAbsent(any(ContainerAction.class));
// verify that immediate heartbeat is triggered
verify(stateMachine, times(1)).triggerHeartbeat();
// the volume has reached the min free space boundary but this time the heartbeat should not be triggered because
// of throttling
hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
verify(context, times(2)).addContainerActionIfAbsent(any(ContainerAction.class));
verify(stateMachine, times(1)).triggerHeartbeat(); // was called once before

// try creating another container now as the volume used has crossed
// threshold
Expand Down
Loading