-
Notifications
You must be signed in to change notification settings - Fork 440
TEZ-4460: Read timed out in shuffle handler - incorrect usage of EMPTY_LAST_CONTENT and channel write #257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -129,7 +129,6 @@ | |
| import io.netty.handler.codec.TooLongFrameException; | ||
| import io.netty.handler.codec.http.DefaultFullHttpResponse; | ||
| import io.netty.handler.codec.http.DefaultHttpResponse; | ||
| import io.netty.handler.codec.http.FullHttpRequest; | ||
| import io.netty.handler.codec.http.FullHttpResponse; | ||
| import io.netty.handler.codec.http.HttpHeaders; | ||
| import io.netty.handler.codec.http.HttpObjectAggregator; | ||
|
|
@@ -306,21 +305,28 @@ public ReduceMapFileCount(ReduceContext rc) { | |
|
|
||
| @Override | ||
| public void operationComplete(ChannelFuture future) throws Exception { | ||
| Channel ch = future.channel(); | ||
| if (!future.isSuccess()) { | ||
| future.channel().close(); | ||
| ch.close(); | ||
| return; | ||
| } | ||
| int waitCount = this.reduceContext.getMapsToWait().decrementAndGet(); | ||
| if (waitCount == 0) { | ||
| LOG.debug("Finished with all map outputs"); | ||
| /* | ||
| * LastHttpContent.EMPTY_LAST_CONTENT can only be written when there are no remaining maps to send, | ||
| * this is the only time we can finish the HTTP response. | ||
| */ | ||
| ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add comment why this is required here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack, this is the most important part of this patch, added a commit with a code comment
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this issue mainly due to nettty upgrade (4.x?)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, absolutely, this issue is because of the incorrect usage of netty4 APIs (investigation details are on Jira ticket) |
||
| metrics.operationComplete(future); | ||
| // Let the idle timer handler close keep-alive connections | ||
| if (reduceContext.getKeepAlive()) { | ||
| ChannelPipeline pipeline = future.channel().pipeline(); | ||
| ChannelPipeline pipeline = ch.pipeline(); | ||
| TimeoutHandler timeoutHandler = | ||
| (TimeoutHandler) pipeline.get(TIMEOUT_HANDLER); | ||
| timeoutHandler.setEnabledTimeout(true); | ||
| } else { | ||
| future.channel().close(); | ||
| ch.close(); | ||
| } | ||
| } else { | ||
| SHUFFLE.sendMap(reduceContext); | ||
|
|
@@ -993,12 +999,11 @@ public void channelActive(ChannelHandlerContext ctx) | |
| @Override | ||
| public void channelRead(ChannelHandlerContext ctx, Object message) | ||
| throws Exception { | ||
| FullHttpRequest request = (FullHttpRequest) message; | ||
| HttpRequest request = (HttpRequest) message; | ||
| handleRequest(ctx, request); | ||
| request.release(); | ||
| } | ||
|
|
||
| private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) | ||
| private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) | ||
| throws IOException, Exception { | ||
| if (request.getMethod() != GET) { | ||
| sendError(ctx, METHOD_NOT_ALLOWED); | ||
|
|
@@ -1123,13 +1128,9 @@ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request) | |
| for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { | ||
| ChannelFuture nextMap = sendMap(reduceContext); | ||
| if(nextMap == null) { | ||
| // by this special message flushed, we can make sure the whole response is finished | ||
| ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); | ||
| return; | ||
| } | ||
| } | ||
| // by this special message flushed, we can make sure the whole response is finished | ||
| ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); | ||
| } | ||
|
|
||
| private boolean isNullOrEmpty(List<String> entries) { | ||
|
|
@@ -1496,7 +1497,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, | |
| DataOutputBuffer dobRange = new DataOutputBuffer(); | ||
| // Indicate how many record to be written | ||
| WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1); | ||
| ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength())); | ||
| ch.writeAndFlush(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength())); | ||
| for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) { | ||
| TezIndexRecord index = outputInfo.getIndex(reduce); | ||
| // Records are only valid if they have a non-zero part length | ||
|
|
@@ -1511,7 +1512,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, | |
| DataOutputBuffer dob = new DataOutputBuffer(); | ||
| header.write(dob); | ||
| // Free the memory needed to store the spill and index records | ||
| ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); | ||
| ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength())); | ||
| } | ||
| outputInfo.finish(); | ||
|
|
||
|
|
@@ -1531,14 +1532,14 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, | |
| rangeOffset, rangePartLength, manageOsCache, readaheadLength, | ||
| readaheadPool, spillFile.getAbsolutePath(), | ||
| shuffleBufferSize, shuffleTransferToAllowed); | ||
| writeFuture = ch.write(partition); | ||
| writeFuture = ch.writeAndFlush(partition); | ||
| } else { | ||
| // HTTPS cannot be done with zero copy. | ||
| final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, | ||
| rangeOffset, rangePartLength, sslFileBufferSize, | ||
| manageOsCache, readaheadLength, readaheadPool, | ||
| spillFile.getAbsolutePath()); | ||
| writeFuture = ch.write(chunk); | ||
| writeFuture = ch.writeAndFlush(chunk); | ||
| } | ||
| metrics.shuffleConnections.incr(); | ||
| metrics.shuffleOutputBytes.incr(rangePartLength); // optimistic | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is in the hotpath should we enclose this in
if (LOG.isDebugEnabled()) { }?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this happens once per every shuffle request I guess, and logging parameters don't include expensive operations, so LOG.isDebugEnabled vs. LOG.debug is mostly a method call vs. method call, I don't feel we need to be extremely cautious in this case