Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #10956 - Send Reset NO_ERROR for HTTP/2 #11029

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,8 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2Stream.class);

private final AutoLock lock = new AutoLock();
private final Deque<Data> dataQueue = new ArrayDeque<>(1);
private final AtomicReference<Object> attachment = new AtomicReference<>();
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final long creationNanoTime = NanoTime.now();
Expand All @@ -71,9 +68,6 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
private final MetaData.Request request;
private final boolean local;
private Callback sendCallback;
private Throwable failure;
private boolean localReset;
private boolean remoteReset;
private Listener listener;
private long dataLength;
private boolean dataDemand;
Expand All @@ -82,6 +76,15 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
private long idleTimeout;
private long expireNanoTime = Long.MAX_VALUE;

// The following fields are protected by the lock
private final AutoLock lock = new AutoLock();
private final Deque<Data> dataQueue = new ArrayDeque<>(1);
private CloseState closeState = CloseState.NOT_CLOSED;
private Throwable failure;
private boolean localReset;
private boolean remoteReset;
private boolean resetNoError;

public HTTP2Stream(HTTP2Session session, int streamId, MetaData.Request request, boolean local)
{
this.session = session;
Expand Down Expand Up @@ -116,6 +119,24 @@ public Object getAttachment()
return attachment.get();
}

public void resetNoError()
{
boolean sendResetNoError = false;
try (AutoLock ignored = lock.lock())
{
if (!localReset && !remoteReset && !resetNoError)
{
switch (closeState)
{
case LOCALLY_CLOSED -> sendResetNoError = true; // send it now
case NOT_CLOSED, LOCALLY_CLOSING -> resetNoError = true; // send it later
}
}
}
if (sendResetNoError)
reset(new ResetFrame(streamId, ErrorCode.NO_ERROR.code), Callback.NOOP);
}

@Override
public void setAttachment(Object attachment)
{
Expand Down Expand Up @@ -166,6 +187,7 @@ public void reset(ResetFrame frame, Callback callback)
Throwable resetFailure = null;
try (AutoLock ignored = lock.lock())
{
resetNoError = false;
if (isReset())
{
resetFailure = failure;
Expand Down Expand Up @@ -248,19 +270,27 @@ public boolean isResetOrFailed()
@Override
public boolean isClosed()
{
return closeState.get() == CloseState.CLOSED;
try (AutoLock ignored = lock.lock())
{
return closeState == CloseState.CLOSED;
}
}

@Override
public boolean isRemotelyClosed()
{
CloseState state = closeState.get();
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
try (AutoLock ignored = lock.lock())
{
return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSING || closeState == CloseState.CLOSED;
}
}

public boolean isLocallyClosed()
{
return closeState.get() == CloseState.LOCALLY_CLOSED;
try (AutoLock ignored = lock.lock())
{
return closeState == CloseState.LOCALLY_CLOSED;
}
}

public void commit()
Expand Down Expand Up @@ -659,23 +689,21 @@ public boolean updateClose(boolean update, CloseState.Event event)

private boolean updateCloseAfterReceived()
{
while (true)
try (AutoLock ignored = lock.lock())
{
CloseState current = closeState.get();
CloseState current = closeState;
switch (current)
{
case NOT_CLOSED ->
{
if (closeState.compareAndSet(current, CloseState.REMOTELY_CLOSED))
return false;
closeState = CloseState.REMOTELY_CLOSED;
return false;
}
case LOCALLY_CLOSING ->
{
if (closeState.compareAndSet(current, CloseState.CLOSING))
{
updateStreamCount(0, 1);
return false;
}
closeState = CloseState.CLOSING;
updateStreamCount(0, 1);
return false;
}
case LOCALLY_CLOSED ->
{
Expand All @@ -692,23 +720,21 @@ private boolean updateCloseAfterReceived()

private boolean updateCloseBeforeSend()
{
while (true)
try (AutoLock ignored = lock.lock())
{
CloseState current = closeState.get();
CloseState current = closeState;
switch (current)
{
case NOT_CLOSED ->
{
if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSING))
return false;
closeState = CloseState.LOCALLY_CLOSING;
return false;
}
case REMOTELY_CLOSED ->
{
if (closeState.compareAndSet(current, CloseState.CLOSING))
{
updateStreamCount(0, 1);
return false;
}
closeState = CloseState.CLOSING;
updateStreamCount(0, 1);
return false;
}
default ->
{
Expand All @@ -720,15 +746,16 @@ private boolean updateCloseBeforeSend()

private boolean updateCloseAfterSend()
{
while (true)
boolean sendResetNoError;
try (AutoLock ignored = lock.lock())
{
CloseState current = closeState.get();
CloseState current = closeState;
switch (current)
{
case NOT_CLOSED, LOCALLY_CLOSING ->
{
if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSED))
return false;
closeState = CloseState.LOCALLY_CLOSED;
sendResetNoError = resetNoError;
}
case REMOTELY_CLOSED, CLOSING ->
{
Expand All @@ -741,6 +768,10 @@ private boolean updateCloseAfterSend()
}
}
}

if (sendResetNoError)
reset(new ResetFrame(streamId, ErrorCode.NO_ERROR.code), Callback.NOOP);
return false;
}

public int getSendWindow()
Expand All @@ -766,7 +797,13 @@ public int updateRecvWindow(int delta)
@Override
public void close()
{
CloseState oldState = closeState.getAndSet(CloseState.CLOSED);
CloseState oldState;
try (AutoLock ignored = lock.lock())
{
oldState = closeState;
closeState = CloseState.CLOSED;
}

if (oldState != CloseState.CLOSED)
{
int deltaClosing = oldState == CloseState.CLOSING ? -1 : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
{
private static final Logger LOG = LoggerFactory.getLogger(HttpStreamOverHTTP2.class);

private final AutoLock lock = new AutoLock();
private final AutoLock _lock = new AutoLock();
private final HTTP2ServerConnection _connection;
private final HttpChannel _httpChannel;
private final HTTP2Stream _stream;
private MetaData.Request _requestMetaData;
private MetaData.Response _responseMetaData;
private TunnelSupport tunnelSupport;
private TunnelSupport _tunnelSupport;
private Content.Chunk _chunk;
private Content.Chunk _trailer;
private boolean committed;
private boolean _committed;
private boolean _demand;
private boolean _expects100Continue;

Expand All @@ -90,7 +90,7 @@ public Runnable onRequest(HeadersFrame frame)

if (frame.isEndStream())
{
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
_chunk = Content.Chunk.EOF;
}
Expand All @@ -101,7 +101,7 @@ public Runnable onRequest(HeadersFrame frame)
_expects100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());

if (_requestMetaData instanceof MetaData.ConnectRequest)
tunnelSupport = new TunnelSupportOverHTTP2(_requestMetaData.getProtocol());
_tunnelSupport = new TunnelSupportOverHTTP2(_requestMetaData.getProtocol());

if (LOG.isDebugEnabled())
{
Expand Down Expand Up @@ -143,17 +143,27 @@ private void onBadMessage(HttpException x)
// TODO
}

@Override
public void willRead()
{
if (_expects100Continue)
{
_expects100Continue = false;
send(_requestMetaData, HttpGenerator.CONTINUE_100_INFO, false, null, Callback.NOOP);
}
}

@Override
public Content.Chunk read()
{
// Tunnel requests do not have HTTP content, avoid
// returning chunks meant for a different protocol.
if (tunnelSupport != null)
if (_tunnelSupport != null)
return null;

// Check if there already is a chunk, e.g. EOF.
Content.Chunk chunk;
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
chunk = _chunk;
_chunk = Content.Chunk.next(chunk);
Expand All @@ -169,7 +179,7 @@ public Content.Chunk read()
if (data.frame().isEndStream())
{
Content.Chunk trailer;
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
trailer = _trailer;
if (trailer != null)
Expand All @@ -191,7 +201,7 @@ public Content.Chunk read()
if (_expects100Continue && chunk.hasRemaining())
_expects100Continue = false;

try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
_chunk = Content.Chunk.next(chunk);
}
Expand All @@ -203,7 +213,7 @@ public void demand()
{
boolean notify = false;
boolean demand = false;
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
if (_chunk != null || _trailer != null)
notify = true;
Expand All @@ -218,19 +228,15 @@ else if (!_demand)
}
else if (demand)
{
if (_expects100Continue)
{
_expects100Continue = false;
send(_requestMetaData, HttpGenerator.CONTINUE_100_INFO, false, null, Callback.NOOP);
}
willRead();
_stream.demand();
}
}

@Override
public Runnable onDataAvailable()
{
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
_demand = false;
}
Expand All @@ -249,7 +255,7 @@ public Runnable onDataAvailable()
public Runnable onTrailer(HeadersFrame frame)
{
HttpFields trailers = frame.getMetaData().getHttpFields().asImmutable();
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
_trailer = new Trailers(trailers);
}
Expand Down Expand Up @@ -319,7 +325,7 @@ private void sendHeaders(MetaData.Request request, MetaData.Response response, B
}
else
{
committed = true;
_committed = true;
if (last)
{
long realContentLength = BufferUtil.length(content);
Expand Down Expand Up @@ -550,7 +556,7 @@ private void sendTrailersFrame(MetaData metaData, Callback callback)
@Override
public boolean isCommitted()
{
return committed;
return _committed;
}

@Override
Expand All @@ -563,23 +569,17 @@ public boolean isIdle()
@Override
public TunnelSupport getTunnelSupport()
{
return tunnelSupport;
return _tunnelSupport;
}

@Override
public Throwable consumeAvailable()
{
if (tunnelSupport != null)
if (_tunnelSupport != null)
return null;
Throwable result = HttpStream.consumeAvailable(this, _httpChannel.getConnectionMetaData().getHttpConfiguration());
if (result != null)
{
_trailer = null;
if (_chunk != null)
_chunk.release();
_chunk = Content.Chunk.from(result, true);
}
return result;

_stream.resetNoError();
return null;
}

@Override
Expand Down Expand Up @@ -618,7 +618,7 @@ public void succeeded()
}
else
{
EndPoint endPoint = tunnelSupport.getEndPoint();
EndPoint endPoint = _tunnelSupport.getEndPoint();
_stream.setAttachment(endPoint);
endPoint.upgrade(connection);
}
Expand Down
Loading