Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -197,6 +198,7 @@ long getStartTime() {
private final ExecutorService executor;
private final List<ThreadPoolExecutor> chunkExecutors;
private final Map<Long, Long> applyTransactionCompletionMap;
private final Set<Long> unhealthyContainers;
private final Cache<Long, ByteString> stateMachineDataCache;
private final AtomicBoolean stateMachineHealthy;

Expand Down Expand Up @@ -226,6 +228,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
metrics = CSMMetrics.create(gid);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
applyTransactionCompletionMap = new ConcurrentHashMap<>();
this.unhealthyContainers = ConcurrentHashMap.newKeySet();
long pendingRequestsBytesLimit = (long)conf.getStorageSize(
OzoneConfigKeys.HDDS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT,
OzoneConfigKeys.HDDS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT,
Expand Down Expand Up @@ -360,6 +363,13 @@ public boolean isStateMachineHealthy() {
return stateMachineHealthy.get();
}

private void checkContainerHealthy(long containerId) throws StorageContainerException {
if (!isStateMachineHealthy() && unhealthyContainers.contains(containerId)) {
throw new StorageContainerException(String.format("Prev writes to container %d failed, stopping all writes to " +
"container", containerId), ContainerProtos.Result.CONTAINER_UNHEALTHY);
}
}

@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
Expand Down Expand Up @@ -554,6 +564,11 @@ private CompletableFuture<Message> writeStateMachineData(
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() -> {
try {
try {
checkContainerHealthy(write.getBlockID().getContainerID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, If statemachine itself is unhealthy, we should reject any further writeStateMachineData(), so no need check for specific container. This is valid for applyTransaction() but not here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function gets called on follower as well. We should not block other container writes. This will lead to a lot of container getting impacted because of one container failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are returning an error response, apply transaction will still go through there we wouldn't find the to be unhealthy container we might end applying a putBlock transaction without actually writing data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, we should avoid additional request for the pipeline in writeData case. Application Transaction getting continue for the data already present is fine.

} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, requestProto);
}
metrics.recordWriteStateMachineQueueingLatencyNs(
Time.monotonicNowNanos() - startTime);
return dispatchCommand(requestProto, context);
Expand All @@ -564,6 +579,7 @@ private CompletableFuture<Message> writeStateMachineData(
metrics.incNumWriteDataFails();
// write chunks go in parallel. It's possible that one write chunk
// see the stateMachine is marked unhealthy by other parallel thread
unhealthyContainers.add(write.getBlockID().getContainerID());
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(e);
throw e;
Expand Down Expand Up @@ -596,6 +612,7 @@ private CompletableFuture<Message> writeStateMachineData(
// This leads to pipeline close. Any change in that behavior requires
// handling the entry for the write chunk in cache.
stateMachineHealthy.set(false);
unhealthyContainers.add(write.getBlockID().getContainerID());
raftFuture.completeExceptionally(sce);
} else {
metrics.incNumBytesWrittenCount(
Expand Down Expand Up @@ -763,6 +780,7 @@ private ByteString readStateMachineData(
+ "{} Container Result: {}", getGroupId(), response.getCmdType(), index,
response.getMessage(), response.getResult());
stateMachineHealthy.set(false);
unhealthyContainers.add(requestProto.getContainerID());
throw sce;
}

Expand Down Expand Up @@ -945,6 +963,7 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
try {
try {
this.validatePeers();
this.checkContainerHealthy(containerId);
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}
Expand Down Expand Up @@ -1031,6 +1050,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " + index
+ " for " + requestProto.getCmdType(), e);
stateMachineHealthy.compareAndSet(true, false);
unhealthyContainers.add(requestProto.getContainerID());
metrics.incNumApplyTransactionsFails();
applyTransactionFuture.completeExceptionally(e);
};
Expand Down Expand Up @@ -1065,6 +1085,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
stateMachineHealthy.compareAndSet(true, false);
unhealthyContainers.add(requestProto.getContainerID());
ratisServer.handleApplyTransactionFailure(getGroupId(), trx.getServerRole());
} else {
if (LOG.isDebugEnabled()) {
Expand Down