Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 12 - Review client notifiers #9335

Merged
merged 7 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.eclipse.jetty.client.internal.HttpContentResponse;
import org.eclipse.jetty.client.transport.HttpConversation;
import org.eclipse.jetty.client.transport.HttpRequest;
import org.eclipse.jetty.client.transport.ResponseNotifier;
import org.eclipse.jetty.client.transport.ResponseListeners;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
Expand All @@ -44,13 +44,11 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler

private final HttpClient client;
private final int maxContentLength;
private final ResponseNotifier notifier;

protected AuthenticationProtocolHandler(HttpClient client, int maxContentLength)
{
this.client = client;
this.maxContentLength = maxContentLength;
this.notifier = new ResponseNotifier();
}

protected HttpClient getHttpClient()
Expand Down Expand Up @@ -274,19 +272,20 @@ private void forwardSuccessComplete(HttpRequest request, Response response)
{
HttpConversation conversation = request.getConversation();
conversation.updateResponseListeners(null);
notifier.forwardSuccessComplete(conversation.getResponseListeners(), request, response);
ResponseListeners responseListeners = conversation.getResponseListeners();
responseListeners.emitSuccessComplete(new Result(request, response));
}

private void forwardFailureComplete(HttpRequest request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = request.getConversation();
conversation.updateResponseListeners(null);
List<Response.ResponseListener> responseListeners = conversation.getResponseListeners();
ResponseListeners responseListeners = conversation.getResponseListeners();
if (responseFailure == null)
notifier.forwardSuccess(responseListeners, response);
responseListeners.emitSuccess(response);
else
notifier.forwardFailure(responseListeners, response, responseFailure);
notifier.notifyComplete(responseListeners, new Result(request, requestFailure, response, responseFailure));
responseListeners.emitFailure(response, responseFailure);
responseListeners.notifyComplete(new Result(request, requestFailure, response, responseFailure));
}

private List<Authentication.HeaderInfo> parseAuthenticateHeader(Response response, HttpHeader header)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@

package org.eclipse.jetty.client;

import java.util.List;

import org.eclipse.jetty.client.internal.HttpContentResponse;
import org.eclipse.jetty.client.transport.HttpConversation;
import org.eclipse.jetty.client.transport.HttpExchange;
import org.eclipse.jetty.client.transport.HttpRequest;
import org.eclipse.jetty.client.transport.ResponseNotifier;
import org.eclipse.jetty.client.transport.ResponseListeners;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
Expand All @@ -32,13 +30,6 @@ public class ContinueProtocolHandler implements ProtocolHandler
public static final String NAME = "continue";
private static final String ATTRIBUTE = ContinueProtocolHandler.class.getName() + ".100continue";

private final ResponseNotifier notifier;

public ContinueProtocolHandler()
{
this.notifier = new ResponseNotifier();
}

@Override
public String getName()
{
Expand Down Expand Up @@ -96,9 +87,9 @@ public void onSuccess(Response response)
// Server either does not support 100 Continue,
// or it does and wants to refuse the request content,
// or we got some other HTTP status code like a redirect.
List<Response.ResponseListener> listeners = exchange.getResponseListeners();
ResponseListeners listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
notifier.forwardSuccess(listeners, contentResponse);
listeners.emitSuccess(contentResponse);
exchange.proceed(new HttpRequestException("Expectation failed", request));
}
}
Expand All @@ -114,9 +105,9 @@ public void onFailure(Response response, Throwable failure)

HttpExchange exchange = conversation.getExchanges().peekLast();
assert exchange.getResponse() == response;
List<Response.ResponseListener> listeners = exchange.getResponseListeners();
ResponseListeners listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
notifier.forwardFailureComplete(listeners, exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure);
listeners.emitFailureComplete(new Result(exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@

package org.eclipse.jetty.client;

import java.util.List;

import org.eclipse.jetty.client.internal.HttpContentResponse;
import org.eclipse.jetty.client.transport.HttpConversation;
import org.eclipse.jetty.client.transport.HttpExchange;
import org.eclipse.jetty.client.transport.HttpRequest;
import org.eclipse.jetty.client.transport.ResponseNotifier;
import org.eclipse.jetty.client.transport.ResponseListeners;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;

Expand Down Expand Up @@ -54,8 +52,6 @@ protected void onEarlyHints(Request request, HttpFields responseHeaders)

private class EarlyHintsListener extends BufferingResponseListener
{
private final ResponseNotifier notifier = new ResponseNotifier();

@Override
public void onSuccess(Response response)
{
Expand Down Expand Up @@ -83,9 +79,9 @@ public void onFailure(Response response, Throwable failure)
HttpExchange exchange = conversation.getExchanges().peekLast();
if (exchange != null)
{
List<Response.ResponseListener> listeners = exchange.getResponseListeners();
ResponseListeners listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
notifier.forwardFailureComplete(listeners, exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure);
listeners.emitFailureComplete(new Result(exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.client.internal.HttpAuthenticationStore;
import org.eclipse.jetty.client.internal.NotifyingRequestListeners;
import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.transport.HttpConversation;
import org.eclipse.jetty.client.transport.HttpDestination;
Expand All @@ -63,7 +64,6 @@
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
Expand Down Expand Up @@ -117,7 +117,7 @@ public class HttpClient extends ContainerLifeCycle

private final ConcurrentMap<Origin, HttpDestination> destinations = new ConcurrentHashMap<>();
private final ProtocolHandlers handlers = new ProtocolHandlers();
private final List<Request.Listener> requestListeners = new ArrayList<>();
private final RequestListeners requestListeners = new NotifyingRequestListeners();
private final ContentDecoder.Factories decoderFactories = new ContentDecoder.Factories();
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
private final HttpClientTransport transport;
Expand Down Expand Up @@ -156,16 +156,11 @@ public HttpClient(HttpClientTransport transport)
this.transport = Objects.requireNonNull(transport);
addBean(transport);
this.connector = ((AbstractHttpClientTransport)transport).getContainedBeans(ClientConnector.class).stream().findFirst().orElseThrow();
addBean(requestListeners);
addBean(handlers);
addBean(decoderFactories);
}

@Override
public void dump(Appendable out, String indent) throws IOException
sbordet marked this conversation as resolved.
Show resolved Hide resolved
{
dumpObjects(out, indent, new DumpableCollection("requestListeners", requestListeners));
}

public HttpClientTransport getTransport()
{
return transport;
Expand Down Expand Up @@ -259,12 +254,12 @@ protected void doStop() throws Exception
}

/**
* Returns a <em>non</em> thread-safe list of {@link Request.Listener}s that can be modified before
* performing requests.
* Returns a <em>non</em> thread-safe container of {@link Request.Listener}s
* that allows to add request listeners before performing requests.
*
* @return a list of {@link Request.Listener} that can be used to add and remove listeners
* @return a {@link RequestListeners} instance that can be used to add request listeners
*/
public List<Request.Listener> getRequestListeners()
public RequestListeners getRequestListeners()
{
return requestListeners;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,7 +26,6 @@
import org.eclipse.jetty.client.internal.HttpContentResponse;
import org.eclipse.jetty.client.transport.HttpConversation;
import org.eclipse.jetty.client.transport.HttpRequest;
import org.eclipse.jetty.client.transport.ResponseNotifier;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
Expand Down Expand Up @@ -71,12 +69,10 @@ public class HttpRedirector
private static final String ATTRIBUTE = HttpRedirector.class.getName() + ".redirects";

private final HttpClient client;
private final ResponseNotifier notifier;

public HttpRedirector(HttpClient client)
{
this.client = client;
this.notifier = new ResponseNotifier();
}

/**
Expand Down Expand Up @@ -330,7 +326,7 @@ private Request sendRedirect(HttpRequest httpRequest, Response response, Respons
LOG.debug("Could not redirect to {}, request body is not reproducible", location);
HttpConversation conversation = httpRequest.getConversation();
conversation.updateResponseListeners(null);
notifier.forwardSuccessComplete(conversation.getResponseListeners(), httpRequest, response);
conversation.getResponseListeners().emitSuccessComplete(new Result(httpRequest, response));
return null;
}

Expand Down Expand Up @@ -383,8 +379,6 @@ private void fail(Request request, Throwable requestFailure, Response response,
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.updateResponseListeners(null);
List<Response.ResponseListener> listeners = conversation.getResponseListeners();
notifier.notifyFailure(listeners, response, responseFailure);
notifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
conversation.getResponseListeners().emitFailureComplete(new Result(request, requestFailure, response, responseFailure));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@

package org.eclipse.jetty.client;

import java.util.List;

import org.eclipse.jetty.client.internal.HttpContentResponse;
import org.eclipse.jetty.client.transport.HttpConversation;
import org.eclipse.jetty.client.transport.HttpExchange;
import org.eclipse.jetty.client.transport.HttpRequest;
import org.eclipse.jetty.client.transport.ResponseNotifier;
import org.eclipse.jetty.client.transport.ResponseListeners;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;

Expand Down Expand Up @@ -54,8 +52,6 @@ protected void onProcessing(Request request, HttpFields responseHeaders)

private class ProcessingListener extends BufferingResponseListener
{
private final ResponseNotifier notifier = new ResponseNotifier();

@Override
public void onSuccess(Response response)
{
Expand Down Expand Up @@ -83,9 +79,9 @@ public void onFailure(Response response, Throwable failure)
HttpExchange exchange = conversation.getExchanges().peekLast();
if (exchange != null)
{
List<Response.ResponseListener> listeners = exchange.getResponseListeners();
ResponseListeners listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
notifier.forwardFailureComplete(listeners, exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure);
listeners.emitFailureComplete(new Result(exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,6 @@ default Request port(int port)
*/
Request followRedirects(boolean follow);

/**
* @param listenerClass the class of the listener, or null for all listeners classes
* @param <T> the type of listener class
* @return the listeners for request events of the given class
*/
<T extends RequestListener> List<T> getRequestListeners(Class<T> listenerClass);

/**
* @param listener a listener for request events
* @return this request object
Expand Down Expand Up @@ -402,22 +395,22 @@ default Request port(int port)
Request onResponseContentAsync(Response.AsyncContentListener listener);

/**
* @param listener a listener for response success event
* @param listener a listener for driving {@link org.eclipse.jetty.io.Content.Source}
* @return this request object
*/
Request onResponseSuccess(Response.SuccessListener listener);
Request onResponseContentSource(Response.ContentSourceListener listener);

/**
* @param listener a listener for response failure event
* @param listener a listener for response success event
* @return this request object
*/
Request onResponseFailure(Response.FailureListener listener);
Request onResponseSuccess(Response.SuccessListener listener);

/**
* @param listener a listener for driving {@link org.eclipse.jetty.io.Content.Source}
* @param listener a listener for response failure event
* @return this request object
*/
Request onResponseContentSource(Response.ContentSourceListener listener);
Request onResponseFailure(Response.FailureListener listener);

/**
* <p>Sets a handler for pushed resources.</p>
Expand Down
Loading