4545import java .util .concurrent .ConcurrentSkipListMap ;
4646import java .util .concurrent .ExecutorService ;
4747import java .util .concurrent .Executors ;
48+ import java .util .concurrent .Future ;
4849import java .util .concurrent .Semaphore ;
4950import java .util .concurrent .ThreadFactory ;
5051import java .util .concurrent .ThreadPoolExecutor ;
@@ -190,18 +191,18 @@ long getStartTime() {
190191 }
191192
192193 static class WriteFutureContext {
193- private final CompletableFuture <ContainerCommandResponseProto > writeChunkFuture ;
194+ private final Future <ContainerCommandResponseProto > writeChunkFuture ;
194195 private final CompletableFuture <Message > raftFuture ;
195196 private final long startTime ;
196197
197- WriteFutureContext (CompletableFuture <ContainerCommandResponseProto > writeChunkFuture ,
198+ WriteFutureContext (Future <ContainerCommandResponseProto > writeChunkFuture ,
198199 CompletableFuture <Message > raftFuture , long startTime ) {
199200 this .writeChunkFuture = writeChunkFuture ;
200201 this .raftFuture = raftFuture ;
201202 this .startTime = startTime ;
202203 }
203204
204- public CompletableFuture <ContainerCommandResponseProto > getWriteChunkFuture () {
205+ public Future <ContainerCommandResponseProto > getWriteChunkFuture () {
205206 return writeChunkFuture ;
206207 }
207208
@@ -576,7 +577,7 @@ private CompletableFuture<Message> writeStateMachineData(
576577 return writeChunkFutureMap .get (entryIndex ).getRaftFuture ();
577578 }
578579 try {
579- validateLongRunningWrite (entryIndex );
580+ validateLongRunningWrite ();
580581 } catch (IOException e ) {
581582 return completeExceptionally (e );
582583 }
@@ -602,82 +603,85 @@ private CompletableFuture<Message> writeStateMachineData(
602603 .setContainer2BCSIDMap (container2BCSIDMap )
603604 .build ();
604605 CompletableFuture <Message > raftFuture = new CompletableFuture <>();
605- // ensure the write chunk happens asynchronously in writeChunkExecutor pool
606- // thread.
607- CompletableFuture <ContainerCommandResponseProto > writeChunkFuture =
608- CompletableFuture .supplyAsync (() -> {
609- try {
610- try {
611- checkContainerHealthy (write .getBlockID ().getContainerID (), true );
612- } catch (StorageContainerException e ) {
613- return ContainerUtils .logAndReturnError (LOG , e , requestProto );
614- }
615- metrics .recordWriteStateMachineQueueingLatencyNs (
616- Time .monotonicNowNanos () - startTime );
617- return dispatchCommand (requestProto , context );
618- } catch (Exception e ) {
619- LOG .error ("{}: writeChunk writeStateMachineData failed: blockId" +
606+ // ensure the write chunk happens asynchronously in writeChunkExecutor pool thread.
607+ Future <ContainerCommandResponseProto > future = getChunkExecutor (requestProto .getWriteChunk ()).submit (() -> {
608+ try {
609+ try {
610+ checkContainerHealthy (write .getBlockID ().getContainerID (), true );
611+ } catch (StorageContainerException e ) {
612+ ContainerCommandResponseProto result = ContainerUtils .logAndReturnError (LOG , e , requestProto );
613+ handleCommandResult (requestProto , entryIndex , startTime , result , write , raftFuture );
614+ return result ;
615+ }
616+ metrics .recordWriteStateMachineQueueingLatencyNs (
617+ Time .monotonicNowNanos () - startTime );
618+ ContainerCommandResponseProto result = dispatchCommand (requestProto , context );
619+ handleCommandResult (requestProto , entryIndex , startTime , result , write , raftFuture );
620+ return result ;
621+ } catch (Exception e ) {
622+ LOG .error ("{}: writeChunk writeStateMachineData failed: blockId" +
620623 "{} logIndex {} chunkName {}" , getGroupId (), write .getBlockID (),
621- entryIndex , write .getChunkData ().getChunkName (), e );
622- metrics .incNumWriteDataFails ();
623- // write chunks go in parallel. It's possible that one write chunk
624- // see the stateMachine is marked unhealthy by other parallel thread
625- unhealthyContainers .add (write .getBlockID ().getContainerID ());
626- stateMachineHealthy .set (false );
627- raftFuture .completeExceptionally (e );
628- throw e ;
629- }
630- }, getChunkExecutor (requestProto .getWriteChunk ()));
624+ entryIndex , write .getChunkData ().getChunkName (), e );
625+ metrics .incNumWriteDataFails ();
626+ // write chunks go in parallel. It's possible that one write chunk
627+ // see the stateMachine is marked unhealthy by other parallel thread
628+ unhealthyContainers .add (write .getBlockID ().getContainerID ());
629+ stateMachineHealthy .set (false );
630+ raftFuture .completeExceptionally (e );
631+ throw e ;
632+ } finally {
633+ // Remove the future once it finishes execution from the
634+ writeChunkFutureMap .remove (entryIndex );
635+ }
636+ });
631637
632- writeChunkFutureMap .put (entryIndex , new WriteFutureContext (writeChunkFuture , raftFuture , startTime ));
638+ writeChunkFutureMap .put (entryIndex , new WriteFutureContext (future , raftFuture , startTime ));
633639 if (LOG .isDebugEnabled ()) {
634640 LOG .debug ("{}: writeChunk writeStateMachineData : blockId" +
635641 "{} logIndex {} chunkName {}" , getGroupId (), write .getBlockID (),
636642 entryIndex , write .getChunkData ().getChunkName ());
637643 }
638- // Remove the future once it finishes execution from the
639- // writeChunkFutureMap.
640- writeChunkFuture .thenApply (r -> {
641- if (r .getResult () != ContainerProtos .Result .SUCCESS
642- && r .getResult () != ContainerProtos .Result .CONTAINER_NOT_OPEN
643- && r .getResult () != ContainerProtos .Result .CLOSED_CONTAINER_IO
644- // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
645- // that should not crash the pipeline.
646- && r .getResult () != ContainerProtos .Result .CHUNK_FILE_INCONSISTENCY ) {
647- StorageContainerException sce =
648- new StorageContainerException (r .getMessage (), r .getResult ());
649- LOG .error (getGroupId () + ": writeChunk writeStateMachineData failed: blockId" +
644+ return raftFuture ;
645+ }
646+
647+ private void handleCommandResult (ContainerCommandRequestProto requestProto , long entryIndex , long startTime ,
648+ ContainerCommandResponseProto r , WriteChunkRequestProto write ,
649+ CompletableFuture <Message > raftFuture ) {
650+ if (r .getResult () != ContainerProtos .Result .SUCCESS
651+ && r .getResult () != ContainerProtos .Result .CONTAINER_NOT_OPEN
652+ && r .getResult () != ContainerProtos .Result .CLOSED_CONTAINER_IO
653+ // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
654+ // that should not crash the pipeline.
655+ && r .getResult () != ContainerProtos .Result .CHUNK_FILE_INCONSISTENCY ) {
656+ StorageContainerException sce =
657+ new StorageContainerException (r .getMessage (), r .getResult ());
658+ LOG .error (getGroupId () + ": writeChunk writeStateMachineData failed: blockId" +
659+ write .getBlockID () + " logIndex " + entryIndex + " chunkName " +
660+ write .getChunkData ().getChunkName () + " Error message: " +
661+ r .getMessage () + " Container Result: " + r .getResult ());
662+ metrics .incNumWriteDataFails ();
663+ // If the write fails currently we mark the stateMachine as unhealthy.
664+ // This leads to pipeline close. Any change in that behavior requires
665+ // handling the entry for the write chunk in cache.
666+ stateMachineHealthy .set (false );
667+ unhealthyContainers .add (write .getBlockID ().getContainerID ());
668+ raftFuture .completeExceptionally (sce );
669+ } else {
670+ metrics .incNumBytesWrittenCount (
671+ requestProto .getWriteChunk ().getChunkData ().getLen ());
672+ if (LOG .isDebugEnabled ()) {
673+ LOG .debug (getGroupId () +
674+ ": writeChunk writeStateMachineData completed: blockId" +
650675 write .getBlockID () + " logIndex " + entryIndex + " chunkName " +
651- write .getChunkData ().getChunkName () + " Error message: " +
652- r .getMessage () + " Container Result: " + r .getResult ());
653- metrics .incNumWriteDataFails ();
654- // If the write fails currently we mark the stateMachine as unhealthy.
655- // This leads to pipeline close. Any change in that behavior requires
656- // handling the entry for the write chunk in cache.
657- stateMachineHealthy .set (false );
658- unhealthyContainers .add (write .getBlockID ().getContainerID ());
659- raftFuture .completeExceptionally (sce );
660- } else {
661- metrics .incNumBytesWrittenCount (
662- requestProto .getWriteChunk ().getChunkData ().getLen ());
663- if (LOG .isDebugEnabled ()) {
664- LOG .debug (getGroupId () +
665- ": writeChunk writeStateMachineData completed: blockId" +
666- write .getBlockID () + " logIndex " + entryIndex + " chunkName " +
667- write .getChunkData ().getChunkName ());
668- }
669- raftFuture .complete (r ::toByteString );
670- metrics .recordWriteStateMachineCompletionNs (
671- Time .monotonicNowNanos () - startTime );
676+ write .getChunkData ().getChunkName ());
672677 }
673-
674- writeChunkFutureMap .remove (entryIndex );
675- return r ;
676- });
677- return raftFuture ;
678+ raftFuture .complete (r ::toByteString );
679+ metrics .recordWriteStateMachineCompletionNs (
680+ Time .monotonicNowNanos () - startTime );
681+ }
678682 }
679683
680- private void validateLongRunningWrite (long currIndex ) throws IOException {
684+ private void validateLongRunningWrite () throws IOException {
681685 // get min valid write chunk operation's future context
682686 Map .Entry <Long , WriteFutureContext > longWriteFutureContextEntry = writeChunkFutureMap .firstEntry ();
683687 if (null == longWriteFutureContextEntry ) {
@@ -880,7 +884,7 @@ private ByteString readStateMachineData(
880884 public CompletableFuture <Void > flush (long index ) {
881885 return CompletableFuture .allOf (
882886 writeChunkFutureMap .entrySet ().stream ().filter (x -> x .getKey () <= index )
883- .map (e -> e .getValue ().getWriteChunkFuture ()).toArray (CompletableFuture []::new ));
887+ .map (e -> e .getValue ().getRaftFuture ()).toArray (CompletableFuture []::new ));
884888 }
885889
886890 /**
0 commit comments