Skip to content

Commit

Permalink
Issue #4331 Close Complete
Browse files Browse the repository at this point in the history
Code cleanup.  Use a CLOSE state rather than non null closedCallback to be clearer that it is a state.
Renamed close(Callback) to complete(Callback)
Renamed and simplified closed() to completed()

Signed-off-by: Greg Wilkins <[email protected]>
  • Loading branch information
gregw committed Dec 13, 2019
1 parent 5774a73 commit a6723a2
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public boolean handle()
// TODO that is done.

// Set a close callback on the HttpOutput to make it an async callback
_response.closeOutput(Callback.from(_state::completed));
_response.completeOutput(Callback.from(_state::completed));

break;
}
Expand Down Expand Up @@ -1212,7 +1212,7 @@ public void failed(final Throwable x)
@Override
public void succeeded()
{
_response.getHttpOutput().closed();
_response.getHttpOutput().completed();
super.failed(x);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,9 @@ public void sendError(int code, String message)
if (LOG.isDebugEnabled())
LOG.debug("sendError {}", toStringLocked());

if (_outputState != OutputState.OPEN)
throw new IllegalStateException(_outputState.toString());

switch (_state)
{
case HANDLING:
Expand Down Expand Up @@ -969,7 +972,7 @@ protected void completed()
}

// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().closed();
_channel.getResponse().getHttpOutput().completed();

if (event != null)
{
Expand Down
135 changes: 70 additions & 65 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
| v | v
UNREADY +---ASYNC
TODO rework this for the CLOSE state:
OPEN/BLOCKING----close---------+ CLOSED/BLOCKING
/ | ^ \ ^
Expand All @@ -109,9 +110,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
swl = setWriteListener
w = write
<>close = close sets _closeCallback only.
owc = onWriteComplete(false,null),_closeCallback==null
owcC = onWriteComplete(false,null),_closeCallback!=null
<>close = close sets _completeCallback only.
owc = onWriteComplete(false,null),_completeCallback==null
owcC = onWriteComplete(false,null),_completeCallback!=null
isf = isReady()==false
ist = osReady()==true
*/
Expand All @@ -128,6 +129,7 @@ enum ApiState
enum State
{
OPEN, // Open
CLOSE, // Close needed from onWriteCompletion
CLOSING, // Close in progress after close API called
CLOSED // Closed
}
Expand Down Expand Up @@ -297,7 +299,7 @@ void onWriteComplete(boolean last, Throwable failure)
LOG.debug("onWriteComplete", failure);

boolean wake = false;
Callback callback = null;
Callback closedCallback = null;
boolean release = false;
ByteBuffer closeContent = null;
synchronized (_channelState)
Expand All @@ -309,14 +311,16 @@ void onWriteComplete(boolean last, Throwable failure)
if (_state == State.CLOSING || last || failure != null)
{
_state = State.CLOSED;
callback = _closedCallback;
closedCallback = _closedCallback;
_closedCallback = null;
release = true;
}

// Did somebody call close(Callback) while we were writing?
if (_closedCallback != null && _state == State.OPEN)
// Did somebody require a close while we were writing?
if (_state == State.CLOSE)
{
// We can now send a (probably empty) last buffer and then when it completes
// onWriteCompletion will be called again to actually execute the _completeCallback
_state = State.CLOSING;
closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
}
Expand Down Expand Up @@ -357,18 +361,18 @@ void onWriteComplete(boolean last, Throwable failure)

if (closeContent != null)
{
channelWrite(closeContent, true, new AsyncCloseCB(Callback.NOOP));
channelWrite(closeContent, true, new WriteCompleteCB());
return;
}

try
{
if (callback != null)
if (closedCallback != null)
{
if (failure == null)
callback.succeeded();
closedCallback.succeeded();
else
callback.failed(failure);
closedCallback.failed(failure);
}
}
finally
Expand All @@ -378,30 +382,23 @@ void onWriteComplete(boolean last, Throwable failure)
if (wake)
_channel.execute(_channel); // TODO can we call directly? Why execute?
}

}

public void closedBySendError()
public void softClose()
{
synchronized (_channelState)
{
switch (_apiState)
{
case BLOCKING:
case BLOCKED:
case READY:
case ASYNC:
_softClose = true;
return;

default:
throw new IllegalStateException(stateString());
}
_softClose = true;
}
}

public void close(Callback callback)
public void complete(Callback callback)
{
// This method is invoked for the COMPLETE action handling in
// HttpChannel.handle. The callback passed typically will call completed
// to finish the request cycle and so may need to asynchronously wait for:
// a pending/blocked operation to finish and then either an async close or
// wait for an application close to complete.
boolean succeeded = false;
Throwable error = null;
ByteBuffer content = null;
Expand All @@ -413,6 +410,7 @@ public void close(Callback callback)
succeeded = true;
break;

case CLOSE:
case CLOSING:
_closedCallback = Callback.combine(_closedCallback, callback);
break;
Expand All @@ -424,16 +422,20 @@ public void close(Callback callback)
break;
}

_closedCallback = Callback.combine(_closedCallback, callback);

switch (_apiState)
{
case BLOCKING:
// Output is idle blocking state, but we still do an async close
_apiState = ApiState.BLOCKED;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;

case ASYNC:
case READY:
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
Expand All @@ -442,7 +444,9 @@ public void close(Callback callback)
case BLOCKED:
case UNREADY:
case PENDING:
_closedCallback = Callback.combine(_closedCallback, callback);
// Output is currently doing an operation, so we just move to state to trigger close when
// that operation completes
_state = State.CLOSE;
break;
}
break;
Expand All @@ -465,7 +469,19 @@ public void close(Callback callback)
}

if (content != null)
channelWrite(content, true, new AsyncCloseCB(callback));
channelWrite(content, true, new WriteCompleteCB());
}

/**
* Called to indicate that the request cycle has been completed.
*/
public void completed()
{
synchronized (_channelState)
{
_state = State.CLOSED;
}
releaseBuffer();
}

@Override
Expand Down Expand Up @@ -503,10 +519,10 @@ else if (_state != State.CLOSED)
// An async operation is in progress, so we soft close now
_softClose = true;

// If we are OPEN and we will not close in onWriteComplete,
if (_state == State.OPEN && _closedCallback == null)
// then use a NOOP to trigger a close from onWriteComplete
_closedCallback = Callback.NOOP;
// If we are OPEN,
if (_state == State.OPEN)
// then trigger a close from onWriteComplete
_state = State.CLOSE;
break;

case ASYNC:
Expand All @@ -518,7 +534,12 @@ else if (_state != State.CLOSED)
break;

case BLOCKED:
// A blocking operation is in progress. Let's just block until it is complete
// A blocking operation is in progress.
// If we are still OPEN
if (_state == State.OPEN)
// we will close from onWriteComplete
_state = State.CLOSE;
// and block until CLOSED
blocker = _writeBlocker.acquire();
_closedCallback = Callback.combine(_closedCallback, blocker);
break;
Expand Down Expand Up @@ -567,7 +588,7 @@ else if (_state != State.CLOSED)
if (blocker == null)
{
// Do an async close
channelWrite(content, true, new AsyncCloseCB(Callback.NOOP));
channelWrite(content, true, new WriteCompleteCB());
}
else
{
Expand All @@ -586,29 +607,6 @@ else if (_state != State.CLOSED)
}
}

/**
* Called to indicate that the output has been closed externally
* via other means that may not have involve writing
* the last chunk.
*/
public void closed()
{
// TODO do we really need this - if so document why!!!!!
Callback callback = null;
synchronized (_channelState)
{
if (_state != State.CLOSED)
{
callback = _closedCallback; // TODO is this ever non null????
_closedCallback = null;
_state = State.CLOSED;
}
}
releaseBuffer();
if (callback != null)
callback.succeeded();
}

public ByteBuffer getBuffer()
{
return _aggregate;
Expand Down Expand Up @@ -713,7 +711,6 @@ public void flush() throws IOException

private void checkWritable() throws EofException
{
// TODO check this
if (_softClose)
throw new EofException("Closed");

Expand Down Expand Up @@ -1478,8 +1475,7 @@ public void run()
}
finally
{
// Initiate an async close
close(Callback.NOOP);
IO.close(this);
}
}

Expand Down Expand Up @@ -1844,25 +1840,34 @@ protected long getIdleTimeout()
}
}

private class AsyncCloseCB extends Callback.Nested
private class WriteCompleteCB implements Callback
{
public AsyncCloseCB(Callback callback)
final Callback _callback;

public WriteCompleteCB()
{
super(callback);
this(null);
}

public WriteCompleteCB(Callback callback)
{
_callback = callback;
}

@Override
public void succeeded()
{
onWriteComplete(true, null);
super.succeeded();
if (_callback != null)
_callback.succeeded();
}

@Override
public void failed(Throwable x)
{
onWriteComplete(true, x);
super.failed(x);
if (_callback != null)
_callback.succeeded();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public void close() throws IOException
_out.close();
}

public void close(Callback callback)
public void complete(Callback callback)
{
_out.close(callback);
_out.complete(callback);
}

@Override
Expand Down
Loading

0 comments on commit a6723a2

Please sign in to comment.