Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine HttpChannel.Listener #9872

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
private final Thread[] _acceptors;
private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
private final HttpChannelListeners _httpChannelListeners = new HttpChannelListeners();
private Shutdown _shutdown;
private HttpChannelListeners _httpChannelListeners = new HttpChannelListeners();
private long _idleTimeout = 30000;
private long _shutdownIdleTimeout = 1000L;
private String _defaultProtocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,95 +128,116 @@ interface Factory
* <p>Listener instances that are set as a bean on the {@link Connector} are
* efficiently added to {@link HttpChannel}.
*/
public interface Listener extends EventListener
interface Listener extends EventListener
{
/**
* Invoked just after the HTTP request line and headers have been parsed.
* Invoked just after the HTTP request line and headers have been parsed
* (i.e. from within the call to {@link HttpChannel#onRequest(MetaData.Request)}).
Comment on lines +134 to +135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap with <p>.

*
* <p>
* This the state of the request from the network, and does not include
* any request customizations (eg: forwarding, secure, etc)
* </p>
*
* <p>
* Implementations must not block!
* </p>
*
* @param request the request object
* @param request the request object. The {@code read()} and {@code demand(Runnable)} methods must not be called by the listener.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think calling read() and demand() should not be a worry -- if called wrongly it's the user fault.

* @see HttpChannel#onRequest(MetaData.Request)
*/
default void onRequestBegin(Request request)
{
}

/**
* Invoked just before calling the server handler tree.
* Invoked just before calling the server handler tree (i.e. just before the {@link Runnable}
* returned from {@link HttpChannel#onRequest(MetaData.Request)} is run).
*
* <p>
* This is the final state of the request before the handlers are called.
* This includes any request customization.
* </p>
*
* @param request the request object
* @param request the request object. The {@code read()} and {@code demand(Runnable)} methods must not be called by the listener.
* @see HttpChannel#onRequest(MetaData.Request)
*/
default void onBeforeHandling(Request request)
{
}

/**
* Invoked after application handling
* Invoked after application handling (i.e. just after the call to the {@link Runnable} returned from
* {@link HttpChannel#onRequest(MetaData.Request)} returns).
*
* @param request the request object
* @param request the request object. The {@code read()} and {@code demand(Runnable)} methods must not be called by the listener.
* @param handled if the server handlers handled the request
* @param failure the exception thrown by the application
* @see Handler#handle(Request, Response, Callback)
* @see HttpChannel#onRequest(MetaData.Request)
*/
default void onAfterHandling(Request request, boolean handled, Throwable failure)
{
}

/**
* Invoked every time a request content chunk has been parsed, just before
* making it available to the application.
*
* <p>
* TODO: make notes about Trailers / EOF / Errors
* </p>
* making it available to the application (i.e. from within a call to
* {@link Request#read()}).
*
* @param request the request object
* @param chunk a request content chunk TODO: make bytebuffer a slice?
* @param request the request object. The {@code read()} and {@code demand(Runnable)} methods must not be called by the listener.
* @param chunk a request content chunk, including {@link org.eclipse.jetty.io.Content.Chunk.Error}
* and {@link org.eclipse.jetty.http.Trailers} chunks.
* If a reference to the chunk (or its {@link ByteBuffer}) is kept,
* then {@link Content.Chunk#retain()} must be called.
* @see Request#read()
*/
default void onRequestRead(Request request, Content.Chunk chunk)
{
}

/**
* Invoked just before the response line is written to the network.
* Invoked just before the response is line written to the network (i.e. from
* within the first call to {@link Response#write(boolean, ByteBuffer, Callback)}).
*
* @param request the request object
* @param request the request object. The {@code read()} and {@code demand(Runnable)} methods must not be called by the listener.
* @param status the response status
* @param response the response object
* @param response the immutable fields of the response object
* @see Response#write(boolean, ByteBuffer, Callback)
*/
default void onResponseCommitted(Request request, int status, HttpFields response)
{
}

/**
* Invoked after a response content chunk has been written to the network.
* Invoked before each response content chunk has been written (i.e. from
* within the any call to {@link Response#write(boolean, ByteBuffer, Callback)}).
*
* @param request the request object
* @param request the request object. The {@code read()} and {@code demand(Runnable)} methods must not be called by the listener.
* @param last indicating last write
* @param content a {@link ByteBuffer#slice() slice} of the response content chunk
* @param content The {@link ByteBuffer} of the response content chunk (readonly).
* @see Response#write(boolean, ByteBuffer, Callback)
*/
default void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
}

/**
* Invoked after each response content chunk has been written
* (i.e. immediately before calling the {@link Callback} passed to
* {@link Response#write(boolean, ByteBuffer, Callback)}).
*
* @param request the request object. The {@code read()} and {@code demand(Runnable)} methods must not be called by the listener.
* @param failure if there was a failure to write the given content
* @see Response#write(boolean, ByteBuffer, Callback)
*/
default void onResponseWrite(Request request, boolean last, ByteBuffer content, Throwable failure)
default void onResponseWriteComplete(Request request, Throwable failure)
{
}

/**
* Invoked when the request <em>and</em> response processing are complete,
* just before the request and response will be recycled.
* just before the request and response will be recycled (i.e. after the
* {@link Runnable} return from {@link HttpChannel#onRequest(MetaData.Request)}
* has returned and the {@link Callback} passed to {@link Handler#handle(Request, Response, Callback)}
* has been completed).
*
* @param request the request object
* @param request the request object. The {@code read()} and {@code demand(Runnable)} methods must not be called by the listener.
* @param failure if there was a failure to complete
*/
default void onComplete(Request request, Throwable failure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class HttpChannelListeners implements HttpChannel.Listener
private static final MethodType LISTENER_TYPE_ON_REQUEST_READ = methodType(Void.TYPE, Request.class, Content.Chunk.class);

private static final MethodType LISTENER_TYPE_ON_RESPONSE_COMMITTED = methodType(Void.TYPE, Request.class, Integer.TYPE, HttpFields.class);
private static final MethodType LISTENER_TYPE_ON_RESPONSE_WRITE = methodType(Void.TYPE, Request.class, Boolean.TYPE, ByteBuffer.class, Throwable.class);
private static final MethodType LISTENER_TYPE_ON_RESPONSE_WRITE = methodType(Void.TYPE, Request.class, Boolean.TYPE, ByteBuffer.class);
private static final MethodType LISTENER_TYPE_ON_RESPONSE_WRITE_COMPLETE = methodType(Void.TYPE, Request.class, Throwable.class);

private static final MethodType LISTENER_TYPE_ON_COMPLETE = methodType(Void.TYPE, Request.class, Throwable.class);

Expand All @@ -57,6 +58,7 @@ public class HttpChannelListeners implements HttpChannel.Listener

private static final MethodHandle LISTENER_HANDLER_ON_RESPONSE_COMMITTED;
private static final MethodHandle LISTENER_HANDLER_ON_RESPONSE_WRITE;
private static final MethodHandle LISTENER_HANDLER_ON_RESPONSE_WRITE_COMPLETE;

private static final MethodHandle LISTENER_HANDLER_ON_COMPLETE;

Expand All @@ -73,14 +75,11 @@ public class HttpChannelListeners implements HttpChannel.Listener

LISTENER_HANDLER_ON_RESPONSE_COMMITTED = lookup.findVirtual(HttpChannel.Listener.class, "onResponseCommitted", LISTENER_TYPE_ON_RESPONSE_COMMITTED);
LISTENER_HANDLER_ON_RESPONSE_WRITE = lookup.findVirtual(HttpChannel.Listener.class, "onResponseWrite", LISTENER_TYPE_ON_RESPONSE_WRITE);
LISTENER_HANDLER_ON_RESPONSE_WRITE_COMPLETE = lookup.findVirtual(HttpChannel.Listener.class, "onResponseWriteComplete", LISTENER_TYPE_ON_RESPONSE_WRITE_COMPLETE);

LISTENER_HANDLER_ON_COMPLETE = lookup.findVirtual(HttpChannel.Listener.class, "onComplete", LISTENER_TYPE_ON_COMPLETE);
}
catch (NoSuchMethodException e)
{
throw new RuntimeException(e);
}
catch (IllegalAccessException e)
catch (NoSuchMethodException | IllegalAccessException e)
{
throw new RuntimeException(e);
}
Expand All @@ -93,7 +92,9 @@ public class HttpChannelListeners implements HttpChannel.Listener
private MethodHandle onRequestReadHandle;
private MethodHandle onResponseCommittedHandle;
private MethodHandle onResponseWriteHandle;
private MethodHandle onResponseWriteCompleteHandle;
private MethodHandle onCompleteHandle;
private boolean invokeOnResponseWrite;

public HttpChannelListeners()
{
Expand All @@ -110,8 +111,10 @@ public void set(Collection<HttpChannel.Listener> listeners)

onResponseCommittedHandle = MethodHandles.empty(LISTENER_TYPE_ON_RESPONSE_COMMITTED);
onResponseWriteHandle = MethodHandles.empty(LISTENER_TYPE_ON_RESPONSE_WRITE);
onResponseWriteCompleteHandle = MethodHandles.empty(LISTENER_TYPE_ON_RESPONSE_WRITE_COMPLETE);

onCompleteHandle = MethodHandles.empty(LISTENER_TYPE_ON_COMPLETE);
invokeOnResponseWrite = false;

if (listeners == null)
return;
Expand All @@ -131,7 +134,12 @@ public void set(Collection<HttpChannel.Listener> listeners)
if (notDefault(LISTENER_HANDLER_ON_RESPONSE_COMMITTED, listener))
onResponseCommittedHandle = MethodHandles.foldArguments(onResponseCommittedHandle, LISTENER_HANDLER_ON_RESPONSE_COMMITTED.bindTo(listener));
if (notDefault(LISTENER_HANDLER_ON_RESPONSE_WRITE, listener))
{
invokeOnResponseWrite = true;
onResponseWriteHandle = MethodHandles.foldArguments(onResponseWriteHandle, LISTENER_HANDLER_ON_RESPONSE_WRITE.bindTo(listener));
}
if (notDefault(LISTENER_HANDLER_ON_RESPONSE_WRITE_COMPLETE, listener))
onResponseWriteCompleteHandle = MethodHandles.foldArguments(onResponseWriteCompleteHandle, LISTENER_HANDLER_ON_RESPONSE_WRITE_COMPLETE.bindTo(listener));
if (notDefault(LISTENER_HANDLER_ON_COMPLETE, listener))
onCompleteHandle = MethodHandles.foldArguments(onCompleteHandle, LISTENER_HANDLER_ON_COMPLETE.bindTo(listener));
}
Expand Down Expand Up @@ -219,11 +227,26 @@ public void onResponseCommitted(Request request, int status, HttpFields response
}

@Override
public void onResponseWrite(Request request, boolean last, ByteBuffer content, Throwable failure)
public void onResponseWrite(Request request, boolean last, ByteBuffer content)
{
try
{
if (invokeOnResponseWrite)
onResponseWriteHandle.invoke(request, last, content.asReadOnlyBuffer());
}
catch (Throwable ignore)
{
if (LOG.isTraceEnabled())
LOG.trace("IGNORED", ignore);
}
}

@Override
public void onResponseWriteComplete(Request request, Throwable throwable)
{
try
{
onResponseWriteHandle.invoke(request, last, content, failure);
onResponseWriteCompleteHandle.invoke(request, throwable);
}
catch (Throwable ignore)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private enum StreamSendState
private final SerializedInvoker _serializedInvoker;
private final Attributes _requestAttributes = new Attributes.Lazy();
private final ResponseHttpFields _responseHeaders = new ResponseHttpFields();
private final HttpChannel.Listener _combinedListener;
private final HttpChannel.Listener _httpChannelListeners;
private Thread _handling;
private boolean _handled;
private StreamSendState _streamSendState = StreamSendState.SENDING;
Expand All @@ -127,7 +127,7 @@ public HttpChannelState(ConnectionMetaData connectionMetaData)
_serializedInvoker = new HttpChannelSerializedInvoker();

Connector connector = connectionMetaData.getConnector();
_combinedListener = (connector instanceof AbstractConnector)
_httpChannelListeners = (connector instanceof AbstractConnector)
? ((AbstractConnector)connector).getHttpChannelListeners()
: AbstractConnector.NOOP_LISTENER;
}
Expand Down Expand Up @@ -276,7 +276,7 @@ public Runnable onRequest(MetaData.Request request)
if (getHttpConfiguration().getSendDateHeader())
responseHeaders.add(getConnectionMetaData().getConnector().getServer().getDateField());

_combinedListener.onRequestBegin(_request);
_httpChannelListeners.onRequestBegin(_request);

// This is deliberately not serialized to allow a handler to block.
return _handlerInvoker;
Expand Down Expand Up @@ -578,19 +578,21 @@ public void run()
if (customized != request && server.getRequestLog() != null)
request.setLoggedRequest(customized);

_combinedListener.onBeforeHandling(request);
_httpChannelListeners.onBeforeHandling(request);

handled = server.handle(customized, response, request._callback);
if (!handled)
Response.writeError(customized, response, request._callback, HttpStatus.NOT_FOUND_404);
}
catch (Throwable t)
{
request._callback.failed(t);
thrownFailure = t;
request._callback.failed(thrownFailure);
}
finally
{
_httpChannelListeners.onAfterHandling(request, handled, thrownFailure);
}

_combinedListener.onAfterHandling(request, handled, thrownFailure);

HttpStream stream;
Throwable failure;
Expand Down Expand Up @@ -683,7 +685,7 @@ private void completeStream(HttpStream stream, Throwable failure)
}
finally
{
_combinedListener.onComplete(_request, failure);
_httpChannelListeners.onComplete(_request, failure);
// This is THE ONLY PLACE the stream is succeeded or failed.
if (failure == null)
stream.succeeded();
Expand Down Expand Up @@ -719,7 +721,7 @@ public static class ChannelRequest implements Attributes, Request
_id = httpChannelState.getHttpStream().getId(); // Copy ID now, as stream will ultimately be nulled
_connectionMetaData = httpChannelState.getConnectionMetaData();
_metaData = Objects.requireNonNull(metaData);
_listener = httpChannelState._combinedListener;
_listener = httpChannelState._httpChannelListeners;
_lock = httpChannelState._lock;
}

Expand Down Expand Up @@ -1039,7 +1041,7 @@ public static class ChannelResponse implements Response, Callback
private ChannelResponse(HttpChannelState httpChannelState, ChannelRequest request)
{
_request = request;
_listener = httpChannelState._combinedListener;
_listener = httpChannelState._httpChannelListeners;
}

private void lockedPrepareErrorResponse()
Expand Down Expand Up @@ -1172,10 +1174,9 @@ else if (contentLength >= 0)
else if (failure != null)
{
Throwable throwable = failure;
ByteBuffer slice = content != null ? content.slice() : BufferUtil.EMPTY_BUFFER;
httpChannelState._serializedInvoker.run(() ->
{
_listener.onResponseWrite(_request, last, slice, throwable);
_listener.onResponseWriteComplete(_request, throwable);
callback.failed(throwable);
});
}
Expand All @@ -1197,12 +1198,9 @@ else if (failure != null)
{
if (LOG.isDebugEnabled())
LOG.debug("writing last={} {} {}", last, BufferUtil.toDetailString(content), this);
ByteBuffer slice = content != null ? content.slice() : BufferUtil.EMPTY_BUFFER;
Callback listenerCallback = Callback.from(() ->
{
_listener.onResponseWrite(_request, last, slice, null);
}, this);
stream.send(_request._metaData, responseMetaData, last, content, listenerCallback);

_listener.onResponseWrite(_request, last, content);
stream.send(_request._metaData, responseMetaData, last, content, this);
}
}

Expand All @@ -1229,7 +1227,10 @@ public void succeeded()
httpChannel.lockedStreamSendCompleted(true);
}
if (callback != null)
{
_listener.onResponseWriteComplete(_request, null);
httpChannel._serializedInvoker.run(callback::succeeded);
}
}

/**
Expand Down Expand Up @@ -1257,7 +1258,10 @@ public void failed(Throwable x)
httpChannel.lockedStreamSendCompleted(false);
}
if (callback != null)
{
_listener.onResponseWriteComplete(_request, x);
httpChannel._serializedInvoker.run(() -> callback.failed(x));
}
}

@Override
Expand Down
Loading