Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,9 @@ public final class ScmConfigKeys {

public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY =
"net.topology.node.switch.mapping.impl";

public static final String HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL
= "hdds.container.ratis.statemachine.write.wait.interval";
public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT = 10 * 60 * 1000_000_000L;
/**
* Never constructed.
*/
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3548,6 +3548,14 @@
Timeout for the request submitted directly to Ratis in datanode.
</description>
</property>
<property>
<name>hdds.container.ratis.statemachine.write.wait.interval</name>
<tag>OZONE, DATANODE</tag>
<value>10m</value>
<description>
Timeout for the write path for container blocks.
</description>
</property>
<property>
<name>hdds.datanode.slow.op.warning.threshold</name>
<tag>OZONE, DATANODE, PERFORMANCE</tag>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,22 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.HddsUtils;
Expand Down Expand Up @@ -187,13 +192,38 @@ long getStartTime() {
}
}

static class WriteFutures {
private final Future<ContainerCommandResponseProto> writeChunkFuture;
private final CompletableFuture<Message> raftFuture;
private final long startTime;

WriteFutures(Future<ContainerCommandResponseProto> writeChunkFuture,
CompletableFuture<Message> raftFuture, long startTime) {
this.writeChunkFuture = writeChunkFuture;
this.raftFuture = raftFuture;
this.startTime = startTime;
}

public Future<ContainerCommandResponseProto> getWriteChunkFuture() {
return writeChunkFuture;
}

public CompletableFuture<Message> getRaftFuture() {
return raftFuture;
}

long getStartTime() {
return startTime;
}
}

private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long,
CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
private final NavigableMap<Long, WriteFutures> writeChunkFutureMap;
private final long writeChunkWaitMaxNs;

// keeps track of the containers created per pipeline
private final Map<Long, Long> container2BCSIDMap;
Expand Down Expand Up @@ -229,7 +259,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
this.containerController = containerController;
this.ratisServer = ratisServer;
metrics = CSMMetrics.create(gid);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.writeChunkFutureMap = new ConcurrentSkipListMap<>();
applyTransactionCompletionMap = new ConcurrentHashMap<>();
this.unhealthyContainers = ConcurrentHashMap.newKeySet();
long pendingRequestsBytesLimit = (long)conf.getStorageSize(
Expand Down Expand Up @@ -273,6 +303,8 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
this.waitOnBothFollowers = conf.getObject(
DatanodeConfiguration.class).waitOnAllFollowers();

this.writeChunkWaitMaxNs = conf.getTimeDuration(ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL,
ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT, TimeUnit.NANOSECONDS);
}

private void validatePeers() throws IOException {
Expand Down Expand Up @@ -542,6 +574,16 @@ private ContainerCommandResponseProto dispatchCommand(
private CompletableFuture<Message> writeStateMachineData(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
final WriteFutures previous = writeChunkFutureMap.get(entryIndex);
if (previous != null) {
// generally state machine will wait forever, for precaution, a check is added if retry happens.
return previous.getRaftFuture();
}
try {
validateLongRunningWrite();
} catch (StorageContainerException e) {
return completeExceptionally(e);
}
final WriteChunkRequestProto write = requestProto.getWriteChunk();
RaftServer server = ratisServer.getServer();
Preconditions.checkArgument(!write.getData().isEmpty());
Expand All @@ -564,19 +606,22 @@ private CompletableFuture<Message> writeStateMachineData(
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
// ensure the write chunk happens asynchronously in writeChunkExecutor pool
// thread.
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() -> {
// ensure the write chunk happens asynchronously in writeChunkExecutor pool thread.
Future<ContainerCommandResponseProto> future = getChunkExecutor(
requestProto.getWriteChunk()).submit(() -> {
try {
try {
checkContainerHealthy(write.getBlockID().getContainerID(), true);
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, requestProto);
ContainerCommandResponseProto result = ContainerUtils.logAndReturnError(LOG, e, requestProto);
handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture);
return result;
}
metrics.recordWriteStateMachineQueueingLatencyNs(
Time.monotonicNowNanos() - startTime);
return dispatchCommand(requestProto, context);
ContainerCommandResponseProto result = dispatchCommand(requestProto, context);
handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture);
return result;
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
Expand All @@ -588,55 +633,87 @@ private CompletableFuture<Message> writeStateMachineData(
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(e);
throw e;
} finally {
// Remove the future once it finishes execution from the
writeChunkFutureMap.remove(entryIndex);
}
}, getChunkExecutor(requestProto.getWriteChunk()));
});

writeChunkFutureMap.put(entryIndex, writeChunkFuture);
writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, startTime));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName());
}
// Remove the future once it finishes execution from the
// writeChunkFutureMap.
writeChunkFuture.thenApply(r -> {
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
// After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
// that should not crash the pipeline.
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" +
return raftFuture;
}

private void handleCommandResult(ContainerCommandRequestProto requestProto, long entryIndex, long startTime,
ContainerCommandResponseProto r, WriteChunkRequestProto write,
CompletableFuture<Message> raftFuture) {
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
// After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
// that should not crash the pipeline.
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
metrics.incNumWriteDataFails();
// If the write fails currently we mark the stateMachine as unhealthy.
// This leads to pipeline close. Any change in that behavior requires
// handling the entry for the write chunk in cache.
stateMachineHealthy.set(false);
unhealthyContainers.add(write.getBlockID().getContainerID());
raftFuture.completeExceptionally(sce);
} else {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
if (LOG.isDebugEnabled()) {
LOG.debug(getGroupId() +
": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
metrics.incNumWriteDataFails();
// If the write fails currently we mark the stateMachine as unhealthy.
// This leads to pipeline close. Any change in that behavior requires
// handling the entry for the write chunk in cache.
stateMachineHealthy.set(false);
unhealthyContainers.add(write.getBlockID().getContainerID());
raftFuture.completeExceptionally(sce);
} else {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
if (LOG.isDebugEnabled()) {
LOG.debug(getGroupId() +
": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName());
}
raftFuture.complete(r::toByteString);
metrics.recordWriteStateMachineCompletionNs(
Time.monotonicNowNanos() - startTime);
write.getChunkData().getChunkName());
}
raftFuture.complete(r::toByteString);
metrics.recordWriteStateMachineCompletionNs(
Time.monotonicNowNanos() - startTime);
}
}

writeChunkFutureMap.remove(entryIndex);
return r;
});
return raftFuture;
private void validateLongRunningWrite() throws StorageContainerException {
// get min valid write chunk operation's future context
Map.Entry<Long, WriteFutures> writeFutureContextEntry = null;
for (boolean found = false; !found;) {
writeFutureContextEntry = writeChunkFutureMap.firstEntry();
if (null == writeFutureContextEntry) {
return;
}
if (writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
// there is a possibility that writeChunkFutureMap may have dangling entry, as remove is done before add future
writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
} else {
found = true;
}
}
// validate for timeout in milli second
long waitTime = Time.monotonicNowNanos() - writeFutureContextEntry.getValue().getStartTime();
if (waitTime > writeChunkWaitMaxNs) {
LOG.error("Write chunk has taken {}ns crossing threshold {}ns for index {} groupId {}, " +
"cancelling pending write chunk for this group", waitTime, writeChunkWaitMaxNs,
writeFutureContextEntry.getKey(), getGroupId());
stateMachineHealthy.set(false);
writeChunkFutureMap.forEach((key, value) -> {
value.getWriteChunkFuture().cancel(true);
});
throw new StorageContainerException("Write chunk has taken " + waitTime + "ns crossing threshold "
+ writeChunkWaitMaxNs + "ns for index " + writeFutureContextEntry.getKey() + " groupId " + getGroupId(),
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
}
}

private StateMachine.DataChannel getStreamDataChannel(
Expand Down Expand Up @@ -819,9 +896,13 @@ private ByteString readStateMachineData(
*/
@Override
public CompletableFuture<Void> flush(long index) {
return CompletableFuture.allOf(
writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index)
.map(Map.Entry::getValue).toArray(CompletableFuture[]::new));
final SortedMap<Long, WriteFutures> head = writeChunkFutureMap.headMap(index, true);
if (head.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.allOf(head.values().stream()
.map(WriteFutures::getRaftFuture)
.toArray(CompletableFuture[]::new));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
Expand All @@ -30,8 +31,10 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -57,6 +60,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -263,4 +267,54 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
ContainerProtos.ContainerCommandResponseProto.parseFrom(succcesfulTransaction.getContent());
assertEquals(ContainerProtos.Result.SUCCESS, resp.getResult());
}

@Test
public void testWriteTimout() throws Exception {
RaftProtos.LogEntryProto entry = mock(RaftProtos.LogEntryProto.class);
when(entry.getTerm()).thenReturn(1L);
when(entry.getIndex()).thenReturn(1L);
RaftProtos.LogEntryProto entryNext = mock(RaftProtos.LogEntryProto.class);
when(entryNext.getTerm()).thenReturn(1L);
when(entryNext.getIndex()).thenReturn(2L);
TransactionContext trx = mock(TransactionContext.class);
ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class);
when(trx.getStateMachineContext()).thenReturn(context);
doAnswer(e -> {
try {
Thread.sleep(200000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw ie;
}
return null;
}).when(dispatcher).dispatch(any(), any());

when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data"))
.setBlockID(
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
.setContainerID(1)
.setDatanodeUuid(UUID.randomUUID().toString()).build());
AtomicReference<Throwable> throwable = new AtomicReference<>(null);
Function<Throwable, ? extends Message> throwableSetter = t -> {
throwable.set(t);
return null;
};
Field writeChunkWaitMaxNs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs");
writeChunkWaitMaxNs.setAccessible(true);
writeChunkWaitMaxNs.set(stateMachine, 1000_000_000);
CompletableFuture<Message> firstWrite = stateMachine.write(entry, trx);
Thread.sleep(2000);
CompletableFuture<Message> secondWrite = stateMachine.write(entryNext, trx);
firstWrite.exceptionally(throwableSetter).get();
assertNotNull(throwable.get());
assertInstanceOf(InterruptedException.class, throwable.get());

secondWrite.exceptionally(throwableSetter).get();
assertNotNull(throwable.get());
assertInstanceOf(StorageContainerException.class, throwable.get());
StorageContainerException sce = (StorageContainerException) throwable.get();
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
}
}