Skip to content

Commit

Permalink
Issue #4331 Close Complete
Browse files Browse the repository at this point in the history
Added async close to HttpWriter and ResponseWriter
Always use async close, with blocker if necessary.

Signed-off-by: Greg Wilkins <[email protected]>
  • Loading branch information
gregw committed Dec 5, 2019
1 parent a699aa0 commit 186164c
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public EncodingHttpWriter(HttpOutput out, String encoding)
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
if (length == 0 && out.isAllContentWritten()) // TODO why is this needed?
{
out.close();
return;
Expand Down
85 changes: 16 additions & 69 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.eclipse.jetty.server;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -288,42 +287,6 @@ public void closedBySendError()
}
}

public void complete(Closeable wrapper, Callback callback)
{
if (wrapper == this || wrapper == null)
{
// If there is no wrapper, then complete is just an normal async close
close(callback);
return;
}

// otherwise we must close the wrapper, but all calls to close() will now
// be treated as async anyway.
synchronized (_channelState)
{
_completing = true;
_closeCallback = Callback.combine(_closeCallback, callback);
}

try
{
wrapper.close();
}
catch (Throwable th)
{
LOG.ignore(th);
}

// If the wrapper intercepted the close, then initiate directly
boolean closed;
synchronized (_channelState)
{
closed = _state == State.CLOSED || _state == State.CLOSING;
}
if (!closed)
close(null);
}

public void close(Callback callback)
{
synchronized (_channelState)
Expand Down Expand Up @@ -393,36 +356,17 @@ public Callback callback()
@Override
public void close() throws IOException
{
Callback callback = null;
synchronized (_channelState)
{
if (_completing)
// Completion has started so all closes are async
close(null);
// Else handle with blocking unless already closed.
else if (_state == State.CLOSED)
return;
}

// This is a completion close, so we will handle without blocking.
if (callback != null)
try (Blocker blocker = _writeBlocker.acquire())
{
close(callback);
close(blocker);
blocker.block();
}
else
catch (Throwable failure)
{
try (Blocker blocker = _writeBlocker.acquire())
{
close(blocker);
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
}

Expand Down Expand Up @@ -1211,6 +1155,7 @@ public void setWriteListener(WriteListener writeListener)
{
if (_state != State.OPEN)
throw new IllegalStateException("!OPEN");
_state = State.READY;
_writeListener = writeListener;
wake = _channel.getState().onWritePossible();
}
Expand Down Expand Up @@ -1296,7 +1241,6 @@ public void run()
closed();
}
}

}

@Override
Expand Down Expand Up @@ -1337,7 +1281,6 @@ protected void onCompleteSuccess()

case UNREADY:
_state = _last ? State.CLOSED : State.READY;
// TODO should we close first and then call OWP?
close = true;
wake = _channel.getState().onWritePossible();
break;
Expand All @@ -1351,9 +1294,13 @@ protected void onCompleteSuccess()
}

if (close)
HttpOutput.this.close(null);

if (wake)
{
if (wake)
HttpOutput.this.close(Callback.from(() -> _channel.execute(_channel))); // TODO can we call directly? Why execute?
else
HttpOutput.this.close(null);
}
else if (wake)
_channel.execute(_channel); // TODO can we call directly? Why execute?
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import java.io.Writer;

import org.eclipse.jetty.util.ByteArrayOutputStream2;
import org.eclipse.jetty.util.Callback;

/**
*
*/
public abstract class HttpWriter extends Writer
{
public static final int MAX_OUTPUT_CHARS = 512;
public static final int MAX_OUTPUT_CHARS = 512; // TODO should this be configurable? super size is 1024

final HttpOutput _out;
final ByteArrayOutputStream2 _bytes;
Expand All @@ -38,7 +39,7 @@ public HttpWriter(HttpOutput out)
{
_out = out;
_chars = new char[MAX_OUTPUT_CHARS];
_bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS);
_bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS); // TODO should this be pooled - or do we just recycle the writer?
}

@Override
Expand All @@ -47,6 +48,11 @@ public void close() throws IOException
_out.close();
}

public void close(Callback callback)
{
_out.close(callback);
}

@Override
public void flush() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public Iso88591HttpWriter(HttpOutput out)
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
if (length == 0 && out.isAllContentWritten()) // TODO why is this needed?
{
close();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,13 +830,16 @@ public void closeOutput() throws IOException
{
if (_outputType == OutputType.WRITER)
_writer.close();
if (!_out.isClosed())
else
_out.close();
}

public void closeOutput(Callback callback)
{
_out.complete((_outputType == OutputType.WRITER) ? _writer : _out, callback);
if (_outputType == OutputType.WRITER)
_writer.close(callback);
else
_out.close(callback);
}

public long getLongContentLength()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

Expand Down Expand Up @@ -171,6 +172,15 @@ public void close()
}
}

public void close(Callback callback)
{
synchronized (lock)
{
_isClosed = true;
}
_httpWriter.close(callback);
}

@Override
public void write(int c)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public Utf8HttpWriter(HttpOutput out)
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
if (length == 0 && out.isAllContentWritten()) // TODO why is this needed?
{
close();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr
assertThat(content, containsString(message));

// Check that a thread is held busy in write
assertThat(_threadPool.getBusyThreads(), Matchers.greaterThan(base));
assertThat(_threadPool.getBusyThreads(), Matchers.greaterThan(base)); // TODO why is this the case for async?

// Getting the Delayed callback will free the thread
PendingCallback delay = X.exchange(null, 10, TimeUnit.SECONDS);
Expand Down

0 comments on commit 186164c

Please sign in to comment.