-
Notifications
You must be signed in to change notification settings - Fork 85
CronetBidirectionalStream #2164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 16 commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
f6a6245
Test for Brotli decompressor
carloseltuerto d7108fd
Merge branch 'main' into cronvoy063
carloseltuerto b9d9c10
Rollback config.cc
carloseltuerto c14bd73
Add BrotliTest
carloseltuerto db45ff3
Nits
carloseltuerto 1dde551
More nits
carloseltuerto 81128d8
Move includes to extension_registry.cc
carloseltuerto dc3331f
Merge branch 'main' into cronvoy063
carloseltuerto 3bbdc9a
Nit
carloseltuerto 70bceff
Merge branch 'main' into cronvoy063
carloseltuerto 6fc93ff
Merge branch 'main' into cronvoy063
carloseltuerto 1a1bb9f
Merge branch 'main' into cronvoy063
carloseltuerto d568d78
CronetBidirectionalStream implementation
carloseltuerto 33e332d
Remove deadline
carloseltuerto ace3c18
Fix nits
carloseltuerto d91c2b7
Fix BUILD error
carloseltuerto 2a96f6e
Merge branch 'main' into cronvoy063
carloseltuerto 894a25f
Cleanup and race condition fixes
carloseltuerto e5e5c07
Remove unwanted file
carloseltuerto 0fe8a89
Nit
carloseltuerto 59a032c
Nit
carloseltuerto 75f0728
Address comments
carloseltuerto 3c25e4b
Always use `mState.nextAction` as the `switch` argument
carloseltuerto 253458c
Nit
carloseltuerto 3a90968
Fix a test
carloseltuerto cc31877
Merge branch 'main' into cronvoy063
carloseltuerto 2faabcf
Fix some race conditions.
carloseltuerto bf03b08
Remove spurious file
carloseltuerto 5a72f08
Batch of comments addressed.
carloseltuerto ccaa901
Nits
carloseltuerto e480b4e
Address follow-up batch of comments
carloseltuerto bac674c
Temporarily delete files to facilitate the merge.
carloseltuerto ab6c804
Merge branch 'main' into cronvoy063
carloseltuerto 7889921
- Fix race condition occurring on Linux only.
carloseltuerto f2f6b6a
Add the Read State diagram
carloseltuerto 80ce727
Nits
carloseltuerto File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
242 changes: 242 additions & 0 deletions
242
library/java/org/chromium/net/impl/CancelProofEnvoyStream.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,242 @@ | ||
| package org.chromium.net.impl; | ||
|
|
||
| import androidx.annotation.IntDef; | ||
| import java.lang.annotation.Retention; | ||
| import java.lang.annotation.RetentionPolicy; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.IntUnaryOperator; | ||
| import io.envoyproxy.envoymobile.engine.EnvoyHTTPStream; | ||
|
|
||
| /** | ||
| * Consistency layer above the {@link EnvoyHTTPStream} preventing unwarranted Stream operations | ||
| * after a "cancel" operation. There are no "synchronized" - this is CAS based logic. | ||
| * | ||
| * <p>This contraption ensures that once a "cancel" operation is invoked, there will be no further | ||
| * operations allowed with the EnvoyHTTPStream - subsequent operations will be ignored silently. | ||
| * However, in the event that that one or more EnvoyHTTPStream operations are currently being | ||
| * executed, the "cancel" operation gets postponed: the last concurrent operation will invoke | ||
| * "cancel" at the end. | ||
| * | ||
| * <p>Instance of this class start with a state of "BUSY_STARTING". This ensure that if a cancel | ||
|
carloseltuerto marked this conversation as resolved.
Outdated
|
||
| * is invoked while the stream is being created, that cancel will be executed only once the stream | ||
| * is completely initialized. Doing otherwise leads to unpredictable outcomes. | ||
| */ | ||
|
RyanTheOptimist marked this conversation as resolved.
|
||
| final class CancelProofEnvoyStream { | ||
|
carloseltuerto marked this conversation as resolved.
carloseltuerto marked this conversation as resolved.
|
||
|
|
||
| @IntDef(flag = true, // Note: this is a bitmap - some states are concurrent. | ||
| value = {State.BUSY_STARTING, State.BUSY_SENDING_HEADERS, State.BUSY_READING_DATA, | ||
| State.BUSY_SENDING_DATA, State.CANCELLED}) | ||
| @Retention(RetentionPolicy.SOURCE) | ||
| private @interface State { | ||
| int BUSY_STARTING = 1; | ||
| int BUSY_SENDING_HEADERS = 1 << 1; | ||
| int BUSY_READING_DATA = 1 << 2; | ||
| int BUSY_SENDING_DATA = 1 << 3; | ||
| int CANCELLED = 1 << 4; | ||
| } | ||
|
|
||
| private static final BusyStateUnsetter BUSY_STARTING_UNSETTER = | ||
| new BusyStateUnsetter(State.BUSY_STARTING); | ||
|
|
||
| private static final BusyStateSetter BUSY_SENDING_HEADER_SETTER = | ||
| new BusyStateSetter(State.BUSY_SENDING_HEADERS); | ||
| private static final BusyStateUnsetter BUSY_SENDING_HEADER_UNSETTER = | ||
| new BusyStateUnsetter(State.BUSY_SENDING_HEADERS); | ||
|
|
||
| private static final BusyStateSetter BUSY_SENDING_DATA_SETTER = | ||
| new BusyStateSetter(State.BUSY_SENDING_DATA); | ||
| private static final BusyStateUnsetter BUSY_SENDING_DATA_UNSETTER = | ||
| new BusyStateUnsetter(State.BUSY_SENDING_DATA); | ||
|
|
||
| private static final BusyStateSetter BUSY_READING_DATA_SETTER = | ||
| new BusyStateSetter(State.BUSY_READING_DATA); | ||
| private static final BusyStateUnsetter BUSY_READING_DATA_UNSETTER = | ||
| new BusyStateUnsetter(State.BUSY_READING_DATA); | ||
|
|
||
| private final AtomicInteger mState = new AtomicInteger(State.BUSY_STARTING); | ||
| private volatile EnvoyHTTPStream mStream; // Cancel can come from any Thread. | ||
|
|
||
| /** | ||
| * Sets the stream. Can only be invoked once, and {@link #sendHeaders}, {@link #sendData}, | ||
| * {@link #readData} will fail if this method has not been invoked first. | ||
| */ | ||
| void setStream(EnvoyHTTPStream stream) { | ||
| mStream = stream; | ||
| if (!unsetBusyStarting()) { | ||
| mStream.cancel(); // Cancel was called meanwhile, so now this is honored. | ||
|
carloseltuerto marked this conversation as resolved.
|
||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Initiates the sending of the request headers if the state permits. | ||
| */ | ||
| void sendHeaders(Map<String, List<String>> envoyRequestHeaders, boolean endStream) { | ||
| if (!setBusySendingHeader()) { | ||
| return; // Already Cancelled - to late to send something. | ||
| } | ||
| mStream.sendHeaders(envoyRequestHeaders, endStream); | ||
| if (!unsetBusySendingHeaders()) { | ||
| mStream.cancel(); // Cancel was called meanwhile, so now this is honored. | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Initiates the sending of one chunk of the request body if the state permits. | ||
| */ | ||
| void sendData(ByteBuffer buffer, boolean finalChunk) { | ||
| if (!setBusySendingData()) { | ||
| return; // Already Cancelled - to late to send something. | ||
| } | ||
| // The Envoy Mobile library only cares about the capacity - must use the correct ByteBuffer | ||
|
carloseltuerto marked this conversation as resolved.
|
||
| if (buffer.position() == 0) { | ||
| mStream.sendData(buffer, buffer.remaining(), finalChunk); | ||
| } else { | ||
| ByteBuffer resizedBuffer = ByteBuffer.allocateDirect(buffer.remaining()); | ||
| buffer.mark(); | ||
| resizedBuffer.put(buffer); | ||
| buffer.reset(); | ||
| mStream.sendData(resizedBuffer, finalChunk); | ||
| } | ||
| if (!unsetBusySendingData()) { | ||
| mStream.cancel(); // Cancel was called meanwhile, so now this is honored. | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Initiates the reading of one chunk of the the request body if the state permits. | ||
| */ | ||
| void readData(int size) { | ||
| if (!setBusyReadingData()) { | ||
| return; // Already Cancelled - to late to read something. | ||
| } | ||
| mStream.readData(size); | ||
| if (!unsetBusyReadingData()) { | ||
| mStream.cancel(); // Cancel was called meanwhile, so now this is honored. | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Cancels the Stream if the state permits. Will be delayed when an operation is concurrently | ||
| * running. Idempotent and Thread Safe. | ||
| * | ||
| * @return true if "cancel" was/will be executed | ||
| */ | ||
| void cancel() { | ||
| @State int originalState; | ||
| @State int newState; | ||
| do { | ||
| originalState = mState.get(); | ||
| if ((originalState & State.CANCELLED) != 0) { | ||
| return; // Cancel already invoked. | ||
| } | ||
| newState = originalState | State.CANCELLED; | ||
| } while (!mState.compareAndSet(originalState, newState)); | ||
| if (newState == State.CANCELLED) { | ||
| // Was not busy with other EM operations - cancel right now. | ||
| mStream.cancel(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Unsets the busy starting state. | ||
| * | ||
| * @return true if not cancelled | ||
| */ | ||
| private boolean unsetBusyStarting() { | ||
| return (mState.updateAndGet(BUSY_STARTING_UNSETTER) & State.CANCELLED) != State.CANCELLED; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the busy sending header state if not already cancelled. | ||
| * | ||
| * @return true if not already cancelled | ||
| */ | ||
| private boolean setBusySendingHeader() { | ||
| return (mState.updateAndGet(BUSY_SENDING_HEADER_SETTER) & State.CANCELLED) == 0; | ||
| } | ||
|
|
||
| /** | ||
| * Unsets the busy sending header state. | ||
| * | ||
| * @return true if not cancelled | ||
| */ | ||
| private boolean unsetBusySendingHeaders() { | ||
| return (mState.updateAndGet(BUSY_SENDING_HEADER_UNSETTER) & State.CANCELLED) != State.CANCELLED; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the busy sending data state if not already cancelled. | ||
| * | ||
| * @return true if not already cancelled | ||
| */ | ||
| private boolean setBusySendingData() { | ||
| return (mState.updateAndGet(BUSY_SENDING_DATA_SETTER) & State.CANCELLED) == 0; | ||
| } | ||
|
|
||
| /** | ||
| * Unsets the sending data busy state. | ||
| * | ||
| * @return true if not cancelled | ||
| */ | ||
| private boolean unsetBusySendingData() { | ||
| return (mState.updateAndGet(BUSY_SENDING_DATA_UNSETTER) & State.CANCELLED) != State.CANCELLED; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the busy reading data state if not already cancelled. | ||
| * | ||
| * @return true if not already cancelled | ||
| */ | ||
| private boolean setBusyReadingData() { | ||
| return (mState.updateAndGet(BUSY_READING_DATA_SETTER) & State.CANCELLED) == 0; | ||
| } | ||
|
|
||
| /** | ||
| * Unsets the busy reading data state. | ||
| * | ||
| * @return true if not cancelled | ||
| */ | ||
| private boolean unsetBusyReadingData() { | ||
| return (mState.updateAndGet(BUSY_READING_DATA_UNSETTER) & State.CANCELLED) != State.CANCELLED; | ||
| } | ||
|
|
||
| private static class BusyStateSetter implements IntUnaryOperator { | ||
|
|
||
| @State private final int cancelBusyState; | ||
|
|
||
| BusyStateSetter(@State int cancelBusyState) { this.cancelBusyState = cancelBusyState; } | ||
|
|
||
| @Override | ||
| public int applyAsInt(@State int originalCancelBusyState) { | ||
| // If by mistake there are concurrent invocations of this method, then the second Thread will | ||
| // get this AssertionError. This condition would constitute a software bug: by contract, for a | ||
| // given method (readData or sendData), invocations can only happen "one at a time" since we | ||
| // have to wait for an EM callback before being allowed to invoke the given method again. For | ||
| // sendHeaders, the rule is even simpler: only one invocation. | ||
| assert (originalCancelBusyState & cancelBusyState) == 0; | ||
| // For this assert to trigger, is means that stream is not finished being initialized. It is | ||
| // a software bug: very likely setStream has not been invoked yet. | ||
| assert (originalCancelBusyState & State.BUSY_STARTING) == 0; | ||
| return (originalCancelBusyState & State.CANCELLED) != 0 | ||
| ? originalCancelBusyState | ||
| : originalCancelBusyState | cancelBusyState; | ||
| } | ||
| } | ||
|
|
||
| private static class BusyStateUnsetter implements IntUnaryOperator { | ||
|
|
||
| @State private final int cancelBusyState; | ||
|
|
||
| BusyStateUnsetter(@State int cancelBusyState) { this.cancelBusyState = cancelBusyState; } | ||
|
|
||
| @Override | ||
| public int applyAsInt(@State int originalCancelBusyState) { | ||
| // Triggering this assert means there is a bug in this class, or setStream was called twice. | ||
| assert (originalCancelBusyState & cancelBusyState) != 0; | ||
| return originalCancelBusyState & ~cancelBusyState; | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.