3232import java .util .Collection ;
3333import java .util .Collections ;
3434import java .util .Iterator ;
35- import java .util .List ;
35+ import java .util .Map ;
3636import java .util .Set ;
3737import java .util .concurrent .CompletableFuture ;
3838import java .util .concurrent .ConcurrentHashMap ;
4545import org .apache .hadoop .crypto .Encryptor ;
4646import org .apache .hadoop .fs .Path ;
4747import org .apache .hadoop .hbase .io .asyncfs .FanOutOneBlockAsyncDFSOutputHelper .CancelOnClose ;
48+ import org .apache .hadoop .hbase .io .asyncfs .monitor .StreamSlowMonitor ;
4849import org .apache .hadoop .hbase .util .CancelableProgressable ;
50+ import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
4951import org .apache .hadoop .hbase .util .RecoverLeaseFSUtils ;
5052import org .apache .hadoop .hdfs .DFSClient ;
5153import org .apache .hadoop .hdfs .DistributedFileSystem ;
6870import org .apache .hbase .thirdparty .io .netty .channel .ChannelHandler .Sharable ;
6971import org .apache .hbase .thirdparty .io .netty .channel .ChannelHandlerContext ;
7072import org .apache .hbase .thirdparty .io .netty .channel .ChannelId ;
73+ import org .apache .hbase .thirdparty .io .netty .channel .ChannelOutboundInvoker ;
7174import org .apache .hbase .thirdparty .io .netty .channel .SimpleChannelInboundHandler ;
7275import org .apache .hbase .thirdparty .io .netty .handler .codec .protobuf .ProtobufVarint32FrameDecoder ;
7376import org .apache .hbase .thirdparty .io .netty .handler .timeout .IdleStateEvent ;
@@ -121,7 +124,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
121124
122125 private final Encryptor encryptor ;
123126
124- private final List <Channel > datanodeList ;
127+ private final Map <Channel , DatanodeInfo > datanodeInfoMap ;
125128
126129 private final DataChecksum summer ;
127130
@@ -137,17 +140,22 @@ private static final class Callback {
137140
138141 // should be backed by a thread safe collection
139142 private final Set <ChannelId > unfinishedReplicas ;
143+ private final long packetDataLen ;
144+ private final long flushTimestamp ;
145+ private long lastAckTimestamp = -1 ;
140146
141147 public Callback (CompletableFuture <Long > future , long ackedLength ,
142- Collection <Channel > replicas ) {
148+ final Collection <Channel > replicas , long packetDataLen ) {
143149 this .future = future ;
144150 this .ackedLength = ackedLength ;
151+ this .packetDataLen = packetDataLen ;
152+ this .flushTimestamp = EnvironmentEdgeManager .currentTime ();
145153 if (replicas .isEmpty ()) {
146154 this .unfinishedReplicas = Collections .emptySet ();
147155 } else {
148156 this .unfinishedReplicas =
149157 Collections .newSetFromMap (new ConcurrentHashMap <ChannelId , Boolean >(replicas .size ()));
150- replicas .stream ().map (c -> c . id () ).forEachOrdered (unfinishedReplicas ::add );
158+ replicas .stream ().map (Channel :: id ).forEachOrdered (unfinishedReplicas ::add );
151159 }
152160 }
153161 }
@@ -177,13 +185,19 @@ private enum State {
177185
178186 private volatile State state ;
179187
188+ private final StreamSlowMonitor streamSlowMonitor ;
189+
180190 // all lock-free to make it run faster
181191 private void completed (Channel channel ) {
182192 for (Iterator <Callback > iter = waitingAckQueue .iterator (); iter .hasNext ();) {
183193 Callback c = iter .next ();
184194 // if the current unfinished replicas does not contain us then it means that we have already
185195 // acked this one, let's iterate to find the one we have not acked yet.
186196 if (c .unfinishedReplicas .remove (channel .id ())) {
197+ long current = EnvironmentEdgeManager .currentTime ();
198+ streamSlowMonitor .checkProcessTimeAndSpeed (datanodeInfoMap .get (channel ), c .packetDataLen ,
199+ current - c .flushTimestamp , c .lastAckTimestamp , c .unfinishedReplicas .size ());
200+ c .lastAckTimestamp = current ;
187201 if (c .unfinishedReplicas .isEmpty ()) {
188202 // we need to remove first before complete the future. It is possible that after we
189203 // complete the future the upper layer will call close immediately before we remove the
@@ -246,7 +260,7 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
246260 }
247261 break ;
248262 }
249- datanodeList . forEach ( ch -> ch . close () );
263+ datanodeInfoMap . keySet (). forEach ( ChannelOutboundInvoker :: close );
250264 }
251265
252266 @ Sharable
@@ -314,7 +328,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
314328
315329 private void setupReceiver (int timeoutMs ) {
316330 AckHandler ackHandler = new AckHandler (timeoutMs );
317- for (Channel ch : datanodeList ) {
331+ for (Channel ch : datanodeInfoMap . keySet () ) {
318332 ch .pipeline ().addLast (
319333 new IdleStateHandler (timeoutMs , timeoutMs / 2 , 0 , TimeUnit .MILLISECONDS ),
320334 new ProtobufVarint32FrameDecoder (),
@@ -325,8 +339,8 @@ private void setupReceiver(int timeoutMs) {
325339
326340 FanOutOneBlockAsyncDFSOutput (Configuration conf ,DistributedFileSystem dfs ,
327341 DFSClient client , ClientProtocol namenode , String clientName , String src , long fileId ,
328- LocatedBlock locatedBlock , Encryptor encryptor , List <Channel > datanodeList ,
329- DataChecksum summer , ByteBufAllocator alloc ) {
342+ LocatedBlock locatedBlock , Encryptor encryptor , Map <Channel , DatanodeInfo > datanodeInfoMap ,
343+ DataChecksum summer , ByteBufAllocator alloc , StreamSlowMonitor streamSlowMonitor ) {
330344 this .conf = conf ;
331345 this .dfs = dfs ;
332346 this .client = client ;
@@ -337,13 +351,14 @@ private void setupReceiver(int timeoutMs) {
337351 this .block = locatedBlock .getBlock ();
338352 this .locations = locatedBlock .getLocations ();
339353 this .encryptor = encryptor ;
340- this .datanodeList = datanodeList ;
354+ this .datanodeInfoMap = datanodeInfoMap ;
341355 this .summer = summer ;
342356 this .maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer .getBytesPerChecksum ());
343357 this .alloc = alloc ;
344358 this .buf = alloc .directBuffer (sendBufSizePRedictor .initialSize ());
345359 this .state = State .STREAMING ;
346360 setupReceiver (conf .getInt (DFS_CLIENT_SOCKET_TIMEOUT_KEY , READ_TIMEOUT ));
361+ this .streamSlowMonitor = streamSlowMonitor ;
347362 }
348363
349364 @ Override
@@ -395,7 +410,8 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
395410 ByteBuf headerBuf = alloc .buffer (headerLen );
396411 header .putInBuffer (headerBuf .nioBuffer (0 , headerLen ));
397412 headerBuf .writerIndex (headerLen );
398- Callback c = new Callback (future , nextPacketOffsetInBlock + dataLen , datanodeList );
413+ Callback c = new Callback (future , nextPacketOffsetInBlock + dataLen ,
414+ datanodeInfoMap .keySet (), dataLen );
399415 waitingAckQueue .addLast (c );
400416 // recheck again after we pushed the callback to queue
401417 if (state != State .STREAMING && waitingAckQueue .peekFirst () == c ) {
@@ -404,7 +420,9 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
404420 waitingAckQueue .removeFirst ();
405421 return ;
406422 }
407- datanodeList .forEach (ch -> {
423+ // TODO: we should perhaps measure time taken per DN here;
424+ // we could collect statistics per DN, and/or exclude bad nodes in createOutput.
425+ datanodeInfoMap .keySet ().forEach (ch -> {
408426 ch .write (headerBuf .retainedDuplicate ());
409427 ch .write (checksumBuf .retainedDuplicate ());
410428 ch .writeAndFlush (dataBuf .retainedDuplicate ());
@@ -426,7 +444,7 @@ private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
426444 long lengthAfterFlush = nextPacketOffsetInBlock + dataLen ;
427445 Callback lastFlush = waitingAckQueue .peekLast ();
428446 if (lastFlush != null ) {
429- Callback c = new Callback (future , lengthAfterFlush , Collections .emptyList () );
447+ Callback c = new Callback (future , lengthAfterFlush , Collections .emptySet (), dataLen );
430448 waitingAckQueue .addLast (c );
431449 // recheck here if we have already removed the previous callback from the queue
432450 if (waitingAckQueue .peekFirst () == c ) {
@@ -526,8 +544,8 @@ private void endBlock() throws IOException {
526544 header .putInBuffer (headerBuf .nioBuffer (0 , headerLen ));
527545 headerBuf .writerIndex (headerLen );
528546 CompletableFuture <Long > future = new CompletableFuture <>();
529- waitingAckQueue .add (new Callback (future , finalizedLength , datanodeList ));
530- datanodeList .forEach (ch -> ch .writeAndFlush (headerBuf .retainedDuplicate ()));
547+ waitingAckQueue .add (new Callback (future , finalizedLength , datanodeInfoMap . keySet (), 0 ));
548+ datanodeInfoMap . keySet () .forEach (ch -> ch .writeAndFlush (headerBuf .retainedDuplicate ()));
531549 headerBuf .release ();
532550 try {
533551 future .get ();
@@ -544,13 +562,14 @@ private void endBlock() throws IOException {
544562 * The close method when error occurred. Now we just call recoverFileLease.
545563 */
546564 @ Override
565+ @ SuppressWarnings ("FutureReturnValueIgnored" )
547566 public void recoverAndClose (CancelableProgressable reporter ) throws IOException {
548567 if (buf != null ) {
549568 buf .release ();
550569 buf = null ;
551570 }
552- datanodeList . forEach ( ch -> ch . close () );
553- datanodeList .forEach (ch -> ch .closeFuture ().awaitUninterruptibly ());
571+ datanodeInfoMap . keySet (). forEach ( ChannelOutboundInvoker :: close );
572+ datanodeInfoMap . keySet () .forEach (ch -> ch .closeFuture ().awaitUninterruptibly ());
554573 endFileLease (client , fileId );
555574 RecoverLeaseFSUtils .recoverFileLease (dfs , new Path (src ), conf ,
556575 reporter == null ? new CancelOnClose (client ) : reporter );
@@ -561,11 +580,12 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
561580 * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
562581 */
563582 @ Override
583+ @ SuppressWarnings ("FutureReturnValueIgnored" )
564584 public void close () throws IOException {
565585 endBlock ();
566586 state = State .CLOSED ;
567- datanodeList . forEach ( ch -> ch . close () );
568- datanodeList .forEach (ch -> ch .closeFuture ().awaitUninterruptibly ());
587+ datanodeInfoMap . keySet (). forEach ( ChannelOutboundInvoker :: close );
588+ datanodeInfoMap . keySet () .forEach (ch -> ch .closeFuture ().awaitUninterruptibly ());
569589 block .setNumBytes (ackedBlockLength );
570590 completeFile (client , namenode , src , clientName , block , fileId );
571591 }
0 commit comments