Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void sendData(ByteBuffer buffer, boolean finalChunk) {
if (buffer.position() == 0) {
mStream.sendData(buffer, buffer.remaining(), finalChunk);
} else {
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/2247): avoid ByteBuffer copies
ByteBuffer resizedBuffer = ByteBuffer.allocateDirect(buffer.remaining());
buffer.mark();
resizedBuffer.put(buffer);
Expand Down
72 changes: 70 additions & 2 deletions library/java/org/chromium/net/impl/CronetBidirectionalStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,71 @@

/**
* {@link BidirectionalStream} implementation using Envoy-Mobile stack.
*
* <p><b>C++ API differences between EM and Cronet</b>:
* <br>The Cronet C++ API was carved to make the Java implementation of BidirectionalStream
* straightforward. EM C++ API is more bare bone. The missing/different logic is therefore being
* handled by this class. Here are the main differences:
* <ul>
* <li>onStreamReady is called by the C++ on the Network Thread. EM does not have that.
* <li>For the request body, Cronet C++ does a callback once a ByteBuffer has been taking in charge.
* EM rather does a callback once it is ready to take in charge the next ByteBuffer.
* <li>The Cronet C++ "write" method accepts a list of ByteBuffers. EM C++ "write" method accepts
* a single ByteBuffer. This feature is important for QUIC - see Issue #2264
* <li>Cronet C++ systematically does a final "onReadCompleted" callback with an empty ByteBuffer.
* This is the way to tell the user that there is nothing more coming in. EM C++ does not do that:
* the last ByteBuffer might not be empty.
* <li>Cronet C++ does a single "onReadCompleted" callback with an empty ByteBuffer when there is no
* Response Body. EM C++ does nothing like that.
* <li>Cronet has a specific C++ API to destroy the stream (not just "cancel"). This allows Cronet
* to report an Error and quit immediately by invoking "destroy". EM only has Cancel. An EM stream
* is deemed destroyed only once one of the 3 terminating EM callbacks has been invoked.
* <li>When invoking "cancel" with Cronet, it is guaranteed that the Cronet C++ with invoke the
* "onCancel" callback. EM does not do the same: if the "endOfStream" for both "read" and "write"
* have been recorded by the EM C++, then invoking "cancel" just before receiving an EM terminal
* callback will not have "onCancel" to be invoked. For example, if a "cancel" is requested when
* executing "onData" callback method with "endOfStream == true", then this situation occurs:
* "onComplete" will be called, not "onCancel".
* </ul>
*
* <p><b>Implementation strategy</b>:
* <br>Implementation wise, the most noticeable difference between the Cronet implementation and
* this one is the avoidance of any java "synchronized". This implementation is based on "Compare
* And Swap" logic to guarantee correctness. The State of the Stream is kept in a single atomic
* Integer owned by {@link CronetBidirectionalState}. That state is a set of bits. Technically it
* could have been the conjunction of Enums held inside a single Integer. Using bits turned out
* to avoid more complex "if" logic. Still, the most important point here is the fact that the whole
* state is a single Atomic Integer: it eases the avoidance of race conditions, especially when
* "cancel" is involved.
* <ul>
* <li>When starting, the EM Engine itself might still not have finished its own initialisation.
* This implementation won't block. Instead, the Stream creation will be piggybacked on the Engine
* initialisation completion callback. The {@link #start} method is therefore non-blocking like all
* all other Stream methods.
* <li>The User "onStreamReady" callback is invoked by this class after creating the Stream, or
* after sending the Request Headers when requested to do so immediately. EM does not have that C++
* callback.
* <li>Since EM does not indicate when the last "sendData" was taken in charge by EM, then invoking
* "sendData" with "endOfStream == true" also takes care of scheduling the last User
* "onWriteCompleted" callback. This might look like incorrect, but in reality this does not affect
* at all the overall behaviour.
* <li>When invoking "sendData", if the position of the ByteBuffer is not zero, then the ByteBuffer
* is copied so the data starts at position zero. See Issue #2247.
* <li>EM does not expose a callback method where the last received ByteBuffer is empty. This Java
* implementation fakes that behaviour. When the logic figures out that EM has received its last
* ByteBuffer (usually not empty), then the State logic is set to wait for an ultimate "read" from
* the User. This can occur after the Stream has competed. Upon receiving the last User "read", a
* User "onReadCompleted" callback is immediately scheduled with an empty ByteBuffer.
* <li>The "cancel" request is asynchronous. It can happen in the 6 steps of the life cycle of a
* Stream: before starting, while starting, after starting, after receiving final "endOfStream",
* after receiving a terminating EM callback, and after finishing the request. For the first and
* last state, it is easy: nothing to do. When starting, any "cancel" must be postponed until
* started. A "cancel" after receiving final "endOfStream" may or may not be processed by the EM
* "onCancel" callback. In this case any terminal EM callback will complete the "cancel". If a
* "cancel" occurs after receiving a terminal EM callback then the User "onCanceled" callback is
* invoked immediately. And to avoid invoking further Stream methods once "cancel" has been invoked,
* a dedicated class handles this business: {@link CancelProofEnvoyStream}.
* </ul>
*/
public final class CronetBidirectionalStream
extends ExperimentalBidirectionalStream implements EnvoyHTTPCallbacks {
Expand All @@ -53,12 +118,14 @@ public final class CronetBidirectionalStream
private final Executor mExecutor;
private final VersionSafeCallbacks.BidirectionalStreamCallback mCallback;
private final String mInitialUrl;
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1641): Priority? What should we do.
private final int mInitialPriority;
private final String mMethod;
private final boolean mReadOnly; // if mInitialMethod is GET or HEAD, then this is true.
private final List<Map.Entry<String, String>> mRequestHeaders;
private final boolean mDelayRequestHeadersUntilFirstFlush;
private final Collection<Object> mRequestAnnotations;
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1521): implement traffic tagging.
private final boolean mTrafficStatsTagSet;
private final int mTrafficStatsTag;
private final boolean mTrafficStatsUidSet;
Expand Down Expand Up @@ -917,7 +984,8 @@ public void onData(ByteBuffer data, boolean endStream, EnvoyStreamIntel streamIn
case NextAction.INVOKE_ON_READ_COMPLETED:
ReadBuffer readBuffer = mLatestBufferRead.getAndSet(null);
ByteBuffer userBuffer = readBuffer.mByteBuffer;
// TODO: copy buffer on network Thread - consider doing on the user Thread.
// TODO: this copies buffer on the Network Thread - consider doing on the user Thread.
// Or even better, revamp EM API to avoid as much as possible copying ByteBuffers.
userBuffer.mark();
userBuffer.put(data); // NPE ==> BUG, BufferOverflowException ==> User not behaving.
userBuffer.reset();
Expand Down Expand Up @@ -963,7 +1031,7 @@ public void onError(int errorCode, String message, int attemptCount, EnvoyStream
mEnvoyFinalStreamIntel = finalStreamIntel;
switch (mState.nextAction(Event.ON_ERROR)) {
case NextAction.NOTIFY_USER_NETWORK_ERROR:
// TODO: fix error scheme.
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1594): fix error scheme.
onErrorReceived(errorCode, /* nativeError= */ -1,
/* nativeQuicError */ 0, message, finalStreamIntel.getReceivedByteCount());
break;
Expand Down
17 changes: 8 additions & 9 deletions test/java/org/chromium/net/BidirectionalStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ private ByteBuffer getDummyData() {
@Feature({"Cronet"})
@OnlyRunNativeCronet
public void testSimpleGetWithFlush() throws Exception {
// TODO(xunjieli): Use ParameterizedTest instead of the loop.
for (int i = 0; i < 2; i++) {
String url = Http2TestServer.getEchoStreamUrl();
TestBidirectionalStreamCallback callback = new TestBidirectionalStreamCallback() {
Expand Down Expand Up @@ -562,7 +561,6 @@ public void onStreamReady(BidirectionalStream stream) {
@Feature({"Cronet"})
@OnlyRunNativeCronet
public void testSimplePostWithFlushAfterOneWrite() throws Exception {
// TODO(xunjieli): Use ParameterizedTest instead of the loop.
for (int i = 0; i < 2; i++) {
String url = Http2TestServer.getEchoStreamUrl();
TestBidirectionalStreamCallback callback = new TestBidirectionalStreamCallback();
Expand Down Expand Up @@ -591,7 +589,6 @@ public void testSimplePostWithFlushAfterOneWrite() throws Exception {
@Feature({"Cronet"})
@OnlyRunNativeCronet
public void testSimplePostWithFlushTwice() throws Exception {
// TODO(xunjieli): Use ParameterizedTest instead of the loop.
for (int i = 0; i < 2; i++) {
String url = Http2TestServer.getEchoStreamUrl();
TestBidirectionalStreamCallback callback = new TestBidirectionalStreamCallback();
Expand Down Expand Up @@ -1216,7 +1213,7 @@ public void testSimpleGetBufferUpdates() throws Exception {
// The expected received bytes count is lower than it would be for the first request on the
// connection, because the server includes an HPACK dynamic table size update only in the
// first response HEADERS frame.
// TODO: fix expected ReceivedByteCount - quite unpredictable
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/2265): fix expected ReceivedByteCount
// runSimpleGetWithExpectedReceivedByteCount(27);
}

Expand Down Expand Up @@ -1300,8 +1297,9 @@ private void throwOrCancel(FailureType failureType, ResponseStep failureStep,
failureStep == ResponseStep.ON_READ_COMPLETED || failureStep == ResponseStep.ON_TRAILERS) {
// For steps after response headers are received, there will be
// connect timing metrics.
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/2192) uncomment this line
// MetricsTestUtil.checkTimingMetrics(metrics, startTime, endTime);
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/2192): flaky.
MetricsTestUtil.checkTimingMetrics(metrics, startTime, endTime);
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/2192): flaky.
MetricsTestUtil.checkHasConnectTiming(metrics, startTime, endTime, true);
assertTrue(metrics.getSentByteCount() > 0);
assertTrue(metrics.getReceivedByteCount() > 0);
Expand Down Expand Up @@ -1334,12 +1332,13 @@ private void throwOrCancel(FailureType failureType, ResponseStep failureStep,
@OnlyRunNativeCronet
@Ignore("Flaky: crashes EM")
public void testFailures() throws Exception {
// TODO: start time and end time are not set.
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/2192): start/end time are not set.
// throwOrCancel(FailureType.CANCEL_SYNC, ResponseStep.ON_STREAM_READY, false);
// throwOrCancel(FailureType.CANCEL_ASYNC, ResponseStep.ON_STREAM_READY, false);
// throwOrCancel(FailureType.CANCEL_ASYNC_WITHOUT_PAUSE, ResponseStep.ON_STREAM_READY, false);
// throwOrCancel(FailureType.THROW_SYNC, ResponseStep.ON_STREAM_READY, true);

// TODO(https://github.com/envoyproxy/envoy-mobile/issues/2192): start/end time are flaky.
throwOrCancel(FailureType.CANCEL_SYNC, ResponseStep.ON_RESPONSE_STARTED, false);
throwOrCancel(FailureType.CANCEL_ASYNC, ResponseStep.ON_RESPONSE_STARTED, false);
throwOrCancel(FailureType.CANCEL_ASYNC_WITHOUT_PAUSE, ResponseStep.ON_RESPONSE_STARTED, false);
Expand Down Expand Up @@ -1541,7 +1540,7 @@ public void testCronetEngineShutdownAfterStreamCancel() throws Exception {
@Feature({"Cronet"})
@Test
@OnlyRunNativeCronet
@Ignore("https://github.com/envoyproxy/envoy-mobile/issues/1550")
@Ignore("https://github.com/envoyproxy/envoy-mobile/issues/1594")
public void testErrorCodes() throws Exception {
// Non-BidirectionalStream specific error codes.
checkSpecificErrorCode(NetError.ERR_NAME_NOT_RESOLVED,
Expand All @@ -1561,7 +1560,7 @@ public void testErrorCodes() throws Exception {
checkSpecificErrorCode(NetError.ERR_TIMED_OUT, NetworkException.ERROR_TIMED_OUT, true);
checkSpecificErrorCode(NetError.ERR_ADDRESS_UNREACHABLE,
NetworkException.ERROR_ADDRESS_UNREACHABLE, false);
// TODO("enable")
// TODO(https://github.com/envoyproxy/envoy-mobile/issues/1594) Missing error - code this.
// BidirectionalStream specific retryable error codes.
// checkSpecificErrorCode(NetError.ERR_HTTP2_PING_FAILED, NetworkException.ERROR_OTHER, true);
// checkSpecificErrorCode(
Expand Down