Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,9 @@ public final class ScmConfigKeys {

public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY =
"net.topology.node.switch.mapping.impl";

public static final String HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL
= "hdds.container.ratis.statemachine.write.wait.interval";
public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT = 10 * 60 * 1000_1000_1000L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TYPO: 1000_1000_1000L should be 1000_000_000L

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

/**
* Never constructed.
*/
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3548,6 +3548,14 @@
Timeout for the request submitted directly to Ratis in datanode.
</description>
</property>
<property>
<name>hdds.container.ratis.statemachine.write.wait.interval</name>
<tag>OZONE, DATANODE</tag>
<value>10m</value>
<description>
Timeout for the write path for container blocks.
</description>
</property>
<property>
<name>hdds.datanode.slow.op.warning.threshold</name>
<tag>OZONE, DATANODE, PERFORMANCE</tag>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,22 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.HddsUtils;
Expand Down Expand Up @@ -187,13 +192,38 @@ long getStartTime() {
}
}

static class WriteFutures {
private final Future<ContainerCommandResponseProto> writeChunkFuture;
private final CompletableFuture<Message> raftFuture;
private final long startTime;

WriteFutures(Future<ContainerCommandResponseProto> writeChunkFuture,
CompletableFuture<Message> raftFuture, long startTime) {
this.writeChunkFuture = writeChunkFuture;
this.raftFuture = raftFuture;
this.startTime = startTime;
}

public Future<ContainerCommandResponseProto> getWriteChunkFuture() {
return writeChunkFuture;
}

public CompletableFuture<Message> getRaftFuture() {
return raftFuture;
}

long getStartTime() {
return startTime;
}
}

private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long,
CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
private final NavigableMap<Long, WriteFutures> writeChunkFutureMap;
private final long writeChunkWaitMaxNs;

// keeps track of the containers created per pipeline
private final Map<Long, Long> container2BCSIDMap;
Expand Down Expand Up @@ -229,7 +259,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
this.containerController = containerController;
this.ratisServer = ratisServer;
metrics = CSMMetrics.create(gid);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.writeChunkFutureMap = new ConcurrentSkipListMap<>();
applyTransactionCompletionMap = new ConcurrentHashMap<>();
this.unhealthyContainers = ConcurrentHashMap.newKeySet();
long pendingRequestsBytesLimit = (long)conf.getStorageSize(
Expand Down Expand Up @@ -273,6 +303,8 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
this.waitOnBothFollowers = conf.getObject(
DatanodeConfiguration.class).waitOnAllFollowers();

this.writeChunkWaitMaxNs = conf.getTimeDuration(ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL,
ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT, TimeUnit.NANOSECONDS);
}

private void validatePeers() throws IOException {
Expand Down Expand Up @@ -542,6 +574,16 @@ private ContainerCommandResponseProto dispatchCommand(
private CompletableFuture<Message> writeStateMachineData(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
final WriteFutures previous = writeChunkFutureMap.get(entryIndex);
if (previous != null) {
// generally state machine will wait forever, for precaution, a check is added if retry happens.
return previous.getRaftFuture();
}
try {
validateLongRunningWrite();
} catch (StorageContainerException e) {
return completeExceptionally(e);
}
final WriteChunkRequestProto write = requestProto.getWriteChunk();
RaftServer server = ratisServer.getServer();
Preconditions.checkArgument(!write.getData().isEmpty());
Expand All @@ -564,22 +606,25 @@ private CompletableFuture<Message> writeStateMachineData(
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
// ensure the write chunk happens asynchronously in writeChunkExecutor pool
// thread.
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() -> {
// ensure the write chunk happens asynchronously in writeChunkExecutor pool thread.
Future<ContainerCommandResponseProto> future = getChunkExecutor(
requestProto.getWriteChunk()).submit(() -> {
try {
try {
checkContainerHealthy(write.getBlockID().getContainerID(), true);
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, requestProto);
ContainerCommandResponseProto result = ContainerUtils.logAndReturnError(LOG, e, requestProto);
handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture);
return result;
}
metrics.recordWriteStateMachineQueueingLatencyNs(
Time.monotonicNowNanos() - startTime);
return dispatchCommand(requestProto, context);
ContainerCommandResponseProto result = dispatchCommand(requestProto, context);
handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture);
return result;
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indentation change should be reverted.

entryIndex, write.getChunkData().getChunkName(), e);
metrics.incNumWriteDataFails();
// write chunks go in parallel. It's possible that one write chunk
Expand All @@ -588,55 +633,91 @@ private CompletableFuture<Message> writeStateMachineData(
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(e);
throw e;
} finally {
// Remove the future once it finishes execution from the
writeChunkFutureMap.remove(entryIndex);
}
}, getChunkExecutor(requestProto.getWriteChunk()));
});

writeChunkFutureMap.put(entryIndex, writeChunkFuture);
writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, startTime));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName());
}
// Remove the future once it finishes execution from the
// writeChunkFutureMap.
writeChunkFuture.thenApply(r -> {
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
// After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
// that should not crash the pipeline.
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" +
return raftFuture;
}

private void handleCommandResult(ContainerCommandRequestProto requestProto, long entryIndex, long startTime,
ContainerCommandResponseProto r, WriteChunkRequestProto write,
CompletableFuture<Message> raftFuture) {
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
// After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
// that should not crash the pipeline.
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
metrics.incNumWriteDataFails();
// If the write fails currently we mark the stateMachine as unhealthy.
// This leads to pipeline close. Any change in that behavior requires
// handling the entry for the write chunk in cache.
stateMachineHealthy.set(false);
unhealthyContainers.add(write.getBlockID().getContainerID());
raftFuture.completeExceptionally(sce);
} else {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
if (LOG.isDebugEnabled()) {
LOG.debug(getGroupId() +
": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
metrics.incNumWriteDataFails();
// If the write fails currently we mark the stateMachine as unhealthy.
// This leads to pipeline close. Any change in that behavior requires
// handling the entry for the write chunk in cache.
stateMachineHealthy.set(false);
unhealthyContainers.add(write.getBlockID().getContainerID());
raftFuture.completeExceptionally(sce);
} else {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
if (LOG.isDebugEnabled()) {
LOG.debug(getGroupId() +
": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName());
}
raftFuture.complete(r::toByteString);
metrics.recordWriteStateMachineCompletionNs(
Time.monotonicNowNanos() - startTime);
write.getChunkData().getChunkName());
}
raftFuture.complete(r::toByteString);
metrics.recordWriteStateMachineCompletionNs(
Time.monotonicNowNanos() - startTime);
}
}

writeChunkFutureMap.remove(entryIndex);
return r;
});
return raftFuture;
private void validateLongRunningWrite() throws StorageContainerException {
// get min valid write chunk operation's future context
Map.Entry<Long, WriteFutures> writeFutureContextEntry = null;
for (boolean found = false; !found;) {
writeFutureContextEntry = writeChunkFutureMap.firstEntry();
if (null == writeFutureContextEntry) {
return;
}
if (writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
// there is a possibility that writeChunkFutureMap may have dangling entry, as remove is done before add future
writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
} else {
found = true;
}
}
if (null == writeFutureContextEntry) {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeFutureContextEntry cannot be null here.

// validate for timeout in milli second
long waitTime = Time.monotonicNowNanos() - writeFutureContextEntry.getValue().getStartTime();
if (waitTime > writeChunkWaitMaxNs) {
LOG.error("Write chunk has taken {}ns crossing threshold {}ns for index {} groupId {}", waitTime,
writeChunkWaitMaxNs, writeFutureContextEntry.getKey(), getGroupId());
stateMachineHealthy.set(false);
writeChunkFutureMap.forEach((key, value) -> {
LOG.error("Cancelling write chunk due to timeout {}ns crossing {}ns for index {}, groupId {}", waitTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't print forEach error here. The same message can be printed a thousand times if there are a thousand entries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, anyway as interrupt exception, the caller will have log about index and group failure, there is covered.

writeChunkWaitMaxNs, key, getGroupId());
value.getWriteChunkFuture().cancel(true);
});
throw new StorageContainerException("Write chunk has taken " + waitTime + "ns crossing threshold "
+ writeChunkWaitMaxNs + "ns for index " + writeFutureContextEntry.getKey() + " groupId " + getGroupId(),
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
}
}

private StateMachine.DataChannel getStreamDataChannel(
Expand Down Expand Up @@ -819,9 +900,13 @@ private ByteString readStateMachineData(
*/
@Override
public CompletableFuture<Void> flush(long index) {
return CompletableFuture.allOf(
writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index)
.map(Map.Entry::getValue).toArray(CompletableFuture[]::new));
final SortedMap<Long, WriteFutures> head = writeChunkFutureMap.headMap(index + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: it may be better to use headMap(index, true).

if (head.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.allOf(head.values().stream()
.map(WriteFutures::getRaftFuture)
.toArray(CompletableFuture[]::new));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
Expand All @@ -30,8 +31,10 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -57,6 +60,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -263,4 +267,54 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
ContainerProtos.ContainerCommandResponseProto.parseFrom(succcesfulTransaction.getContent());
assertEquals(ContainerProtos.Result.SUCCESS, resp.getResult());
}

@Test
public void testWriteTimout() throws Exception {
RaftProtos.LogEntryProto entry = mock(RaftProtos.LogEntryProto.class);
when(entry.getTerm()).thenReturn(1L);
when(entry.getIndex()).thenReturn(1L);
RaftProtos.LogEntryProto entryNext = mock(RaftProtos.LogEntryProto.class);
when(entryNext.getTerm()).thenReturn(1L);
when(entryNext.getIndex()).thenReturn(2L);
TransactionContext trx = mock(TransactionContext.class);
ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class);
when(trx.getStateMachineContext()).thenReturn(context);
doAnswer(e -> {
try {
Thread.sleep(200000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw ie;
}
return null;
}).when(dispatcher).dispatch(any(), any());

when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data"))
.setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
.setContainerID(1)
.setDatanodeUuid(UUID.randomUUID().toString()).build());
AtomicReference<Throwable> throwable = new AtomicReference<>(null);
Function<Throwable, ? extends Message> throwableSetter = t -> {
throwable.set(t);
return null;
};
Field writeChunkWaitMaxMs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs");
writeChunkWaitMaxMs.setAccessible(true);
writeChunkWaitMaxMs.set(stateMachine, 1000);
CompletableFuture<Message> firstWrite = stateMachine.write(entry, trx);
Thread.sleep(2000);
CompletableFuture<Message> secondWrite = stateMachine.write(entryNext, trx);
firstWrite.exceptionally(throwableSetter).get();
assertNotNull(throwable.get());
assertInstanceOf(InterruptedException.class, throwable.get());

secondWrite.exceptionally(throwableSetter).get();
assertNotNull(throwable.get());
assertInstanceOf(StorageContainerException.class, throwable.get());
StorageContainerException sce = (StorageContainerException) throwable.get();
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
}
}