-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set a max number of requests per connection #4809
Comments
I guess this can be done by allowing user to set initial Stream id number. |
An alternative approach that would not need significant resources would be to just randomly close connections after every request - potentially by having a request customizer that added the |
Hi, As this functionality was not available, so as a workaround we tried to set the localStreamIds using reflection. For e.g. when we needed 1 stream per connection we set localStreamIds to 2147483647 and send requests 1 by 1 it works fine. But when we send 2 requests in parallel, none of the request is sent to the server and we get an error which says Invalid stream Id from generateHeaders() method of HeadersGenerator class. So, for this we thought of closing the session by overriding onStreamCreated() method of BufferingFlowControlStrategy when the stream Id comes negative: @OverRide But as we are closing the session even our 1st request on stream Id 2147483647 is not sent to the server. So is there any way by which we can flush the pending requests and wait for their responses before closing the connection. |
@prateekkohli2112 I don't think setting the I see two solutions:
Would it be ok if the connection is not used anymore, but not closed immediately, i.e. the connection pool does not return it to the application, and it will eventually idle time out? |
As per your suggestion we are now not using localStreamIds and maintaining a counter for number of requests per connection. But we cannot wait for connection to close after idle time out because we have enabled PINGs as well, so our connection will never close. To handle this once the requests for a connection are exhausted we are closing the connection in protected SendFailure send(Connection connection, HttpExchange exchange) method of HttpDestinationOverHTTP2 as below: getConnectionPool().remove(connection); new Thread(() -> Is it the correct way to close a connection? |
@prateekkohli2112 the busy loop waiting for the stream size to go to zero is a CPU burner, so I don't particularly recommend it. Call I think we will implement this in our connection pools, after all we need to implement the counter to avoid to overflow the stream ids. |
Thanks @sbordet. We had to use the while loop over the streams to make sure that there was no request/response left before closing the connection. Would you be able to suggest any other work around by which we can check that all the requests/responses have been completed before closing the connection which does not consume much CPU. |
And also, till when can we expect this change in the connection pool? |
@prateekkohli2112 you would need to write a custom connection pool to count calls to This is a low priority change, so I have no date for this issue to be fixed. |
Hello @sbordet, I am using the above written code to close the connection: new Thread(() -> But I am getting AsynchronousCloseException multiple times when I am sending high traffic. |
Your snippet of code above is inherently unsafe: the moment after you found As I said, best option is to write your own connection pool - we plan to do a similar change in ours. |
Thanks @sbordet. Just a small doubt, is it possible that till the time I am closing the connection in release once the number of streams have exhausted for that connection, another request acquires that connection. How can I handle it? |
You write your own connection pool, or wait for this issue to be fixed. |
Hi, As per your suggestion we have implemented a custom pool in which we are keeping a counter in activate method and remove the connection from the pool once the maximumNumberOfRequests have reached for a connection. And in the release method we keep another counter and by which we close the connection by setting the idle timeout after the count has reached the maximumNumberOfRequests.
After running it for about an hour, I start getting AsynchronousCloseException/ request time outs in my Jetty logs and ClientAbortException/ClosedChannelExceptions in my Tomcat sever logs. Am I still doing anything wrong in closing the connection because my whole system starts behaving abnormally after sometime. |
Probably - the logic above seems wrong to me but there is too little to tell. |
Below is my complete custom pool: public class CustomRoundRobinConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
{
private final List<Entry> entries;
private int maxMultiplex;
private ConcurrentHashMap<Connection, Integer> requestCountActivateMap;
private ConcurrentHashMap<Connection, Integer> requestCountReleaseMap;
private int index;
private int numberOfStreamsPerConnection;
private Set<Connection> expiredConnections;
public CustomRoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester, int numberOfStreamsPerConnection) {
this(destination, maxConnections, requester, 1, numberOfStreamsPerConnection);
}
public CustomRoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester, int maxMultiplex,
int numberOfStreamsPerConnection)
{
super(destination, maxConnections, requester);
expiredConnections = new HashSet<>();
requestCountActivateMap = new ConcurrentHashMap<>();
requestCountReleaseMap = new ConcurrentHashMap<>();
this.numberOfStreamsPerConnection = numberOfStreamsPerConnection;
entries = new ArrayList<>(maxConnections);
for (int i = 0; i < maxConnections; ++i)
{
entries.add(new Entry());
}
this.maxMultiplex = maxMultiplex;
}
@Override
public int getMaxMultiplex()
{
synchronized (this)
{
return maxMultiplex;
}
}
@Override
public void setMaxMultiplex(int maxMultiplex)
{
synchronized (this)
{
this.maxMultiplex = maxMultiplex;
}
}
@Override
protected void onCreated(Connection connection)
{
synchronized (this)
{
for (Entry entry : entries)
{
if (entry.connection == null)
{
requestCountActivateMap.put(connection, 1);
requestCountReleaseMap.put(connection, 1);
entry.connection = connection;
break;
}
}
}
idle(connection, false);
}
@Override
protected Connection activate()
{
Connection connection = null;
synchronized (this)
{
int offset = 0;
int capacity = getMaxConnectionCount();
while (offset < capacity)
{
int idx = index + offset;
if (idx >= capacity)
idx -= capacity;
Entry entry = entries.get(idx);
if (entry.connection == null)
break;
if (entry.active < getMaxMultiplex())
{
++entry.active;
++entry.used;
connection = entry.connection;
index += offset + 1;
if (index >= capacity)
index -= capacity;
break;
}
++offset;
}
if (connection != null)
{
if (requestCountActivateMap.containsKey(connection))
{
Integer numberOfStreams = requestCountActivateMap.get(connection);
if (numberOfStreams == numberOfStreamsPerConnection)
{
remove(connection);
expiredConnections.add(connection);
return active(connection);
}
requestCountActivateMap.put(connection, ++numberOfStreams);
return active(connection);
}
}
return connection == null ? null : active(connection);
}
}
@Override
public boolean isActive(Connection connection)
{
synchronized (this)
{
for (Entry entry : entries)
{
if (entry.connection == connection)
return entry.active > 0;
}
return expiredConnections.contains(connection);
//return false;
}
}
@Override
public boolean release(Connection connection)
{
boolean found = false;
boolean idle = false;
synchronized (this)
{
for (Entry entry : entries)
{
if (entry.connection == connection)
{
found = true;
int active = --entry.active;
idle = active == 0;
break;
}
}
if (!found)
{
if (!expiredConnections.contains(connection))
{
requestCountActivateMap.remove(connection);
requestCountReleaseMap.remove(connection);
System.out.println("Connection to remove abruptly: " + connection.toString());
return false;
}
}
Integer numberOfStreams = requestCountReleaseMap.get(connection);
HttpConnectionOverHTTP2 conn = (HttpConnectionOverHTTP2) connection;
HTTP2Session http2Session = (HTTP2Session) conn.getSession();
//System.out.println("Connection to remove : " + connection.toString() + ". Streams : " + numberOfStreams);
if (numberOfStreams == numberOfStreamsPerConnection)
{
expiredConnections.remove(connection);
requestCountActivateMap.remove(connection);
requestCountReleaseMap.remove(connection);
http2Session.getEndPoint().setIdleTimeout(990);
return true;
}
requestCountReleaseMap.put(connection, ++numberOfStreams);
}
released(connection);
if (idle)
return idle(connection, isClosed());
return true;
}
@Override
public boolean remove(Connection connection)
{
boolean found = false;
synchronized (this)
{
for (Entry entry : entries)
{
if (entry.connection == connection)
{
found = true;
entry.reset();
break;
}
}
}
if (found)
{
released(connection);
removed(connection);
}
return found;
}
private static class Entry
{
private Connection connection;
private int active;
private long used;
private void reset()
{
connection = null;
active = 0;
used = 0;
}
@Override
public String toString()
{
return String.format("{u=%d,c=%s}", used, connection);
}
}
} |
Update: We tried both connection.close() and http2Session.getEndPoint().setIdleTimeout(990) for closing the connection. But in some cases AsynchronousCloseException. We don't really know if connection.close() closes the connection before waiting for streams to complete and similarly with idleTimeout() how effective would it be to keep the inactive connections open for 1 second. |
@prateekkohli2112 this has now been implemented. |
Improved Pool.reserve(int) logic to take into account the fact that an entry can accommodate maxMultiplex acquires. This reduces connection openings for HTTP/2 in case of spikes of requests. Signed-off-by: Simone Bordet <[email protected]>
Added test case for idle connections not used for any request. Signed-off-by: Simone Bordet <[email protected]>
9.4.24.v20191120
jdk1.8.0_201
Linux
Already discussed the same in #4651 issue.
Actually we are planning to have a L4 load balancer between Jetty client and HTTP server. The load balancer routes traffic based on established connections with the backend servers. Load balancer will maintain a mapping between a client connection and backend server connection. And every request from a particular client will be routed on a specific connection.
Now for dynamically scaling the backend servers, we would want that these client connections have a limited age. So to avoid starving a backend server of traffic.
We were thinking of doing this based on a limited number of streams on a single connection. And that's why need to set either the max number of local streams or an initial number from where stream IDs should start.
The text was updated successfully, but these errors were encountered: