Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions library/common/event/provisional_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ envoy_status_t ProvisionalDispatcher::post(Event::PostCb callback) {
return ENVOY_SUCCESS;
}

Event::SchedulableCallbackPtr
ProvisionalDispatcher::createSchedulableCallback(std::function<void()> cb) {
RELEASE_ASSERT(
isThreadSafe(),
"ProvisionalDispatcher::createSchedulableCallback must be called from a threadsafe context");
return event_dispatcher_->createSchedulableCallback(cb);
}

bool ProvisionalDispatcher::isThreadSafe() const {
// Doesn't require locking because if a thread has a stale view of drained_, then by definition
// this wasn't a threadsafe call.
Expand Down
9 changes: 9 additions & 0 deletions library/common/event/provisional_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ class ProvisionalDispatcher : public ScopeTracker {
*/
virtual envoy_status_t post(Event::PostCb callback);

/**
* Allocates a schedulable callback. @see SchedulableCallback for docs on how to use the wrapped
* callback.
* @param cb supplies the callback to invoke when the SchedulableCallback is triggered on the
* event loop.
* Must be called from context where ProvisionalDispatcher::isThreadSafe() is true.
*/
virtual Event::SchedulableCallbackPtr createSchedulableCallback(std::function<void()> cb);

/**
* @return false before the Event::Dispatcher is running, otherwise the result of the
* underlying call to Event::Dispatcher::isThreadSafe().
Expand Down
9 changes: 7 additions & 2 deletions library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,14 @@ void Client::sendData(envoy_stream_t stream, envoy_data data, bool end_stream) {
if (direct_stream->explicit_flow_control_ && !end_stream) {
if (direct_stream->read_disable_count_ == 0) {
// If there is still buffer space after the write, notify the sender
// that send window is available.
// that send window is available, on the next dispatcher iteration so
// that repeated writes do not starve reads.
direct_stream->wants_write_notification_ = false;
direct_stream->callbacks_->onSendWindowAvailable();
// A new callback must be scheduled each time to capture any changes to the
// DirectStream's callbacks from call to call.
scheduled_callback_ = dispatcher_.createSchedulableCallback(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't remember offhand: do we need to create a new one each time, or only the one time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice. That seems to work. Done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apparently it didn't work - can we call out why we always need to create in a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done.

[direct_stream] { direct_stream->callbacks_->onSendWindowAvailable(); });
scheduled_callback_->scheduleCallbackNextIteration();
} else {
// Otherwise, make sure the stack will send a notification when the
// buffers are drained.
Expand Down
1 change: 1 addition & 0 deletions library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {

ApiListener& api_listener_;
Event::ProvisionalDispatcher& dispatcher_;
Event::SchedulableCallbackPtr scheduled_callback_;
HttpClientStats stats_;
// The set of open streams, which can safely have request data sent on them
// or response data received.
Expand Down
23 changes: 21 additions & 2 deletions test/common/http/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,20 @@ TEST_P(ClientTest, BasicStreamTrailers) {
}

TEST_P(ClientTest, MultipleDataStream) {
Event::MockDispatcher dispatcher;
ON_CALL(dispatcher_, drain).WillByDefault([&](Event::Dispatcher& event_dispatcher) {
dispatcher_.Event::ProvisionalDispatcher::drain(event_dispatcher);
});
dispatcher_.drain(dispatcher);
Event::MockSchedulableCallback* process_buffered_data_callback = nullptr;
if (explicit_flow_control_) {
process_buffered_data_callback = new Event::MockSchedulableCallback(&dispatcher);
EXPECT_CALL(*process_buffered_data_callback, scheduleCallbackNextIteration());
ON_CALL(dispatcher_, createSchedulableCallback).WillByDefault([&](std::function<void()> cb) {
return dispatcher_.Event::ProvisionalDispatcher::createSchedulableCallback(cb);
});
}

cc_.end_stream_with_headers_ = false;

envoy_headers c_headers = defaultRequestHeaders();
Expand All @@ -361,8 +375,13 @@ TEST_P(ClientTest, MultipleDataStream) {
EXPECT_CALL(dispatcher_, popTrackedObject(_));
EXPECT_CALL(*request_decoder_, decodeData(BufferStringEqual("request body"), false));
http_client_.sendData(stream_, c_data, false);
// The buffer is not full: expect an on_send_window_available call in explicit_flow_control mode.
EXPECT_EQ(cc_.on_send_window_available_calls, explicit_flow_control_ ? 1 : 0);
EXPECT_EQ(cc_.on_send_window_available_calls, 0);
if (explicit_flow_control_) {
EXPECT_TRUE(process_buffered_data_callback->enabled_);
process_buffered_data_callback->invokeCallback();
// The buffer is not full: expect an on_send_window_available call.
EXPECT_EQ(cc_.on_send_window_available_calls, 1);
}

// Send second request data.
EXPECT_CALL(dispatcher_, pushTrackedObject(_));
Expand Down
1 change: 1 addition & 0 deletions test/common/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class MockProvisionalDispatcher : public ProvisionalDispatcher {
MOCK_METHOD(void, drain, (Event::Dispatcher & event_dispatcher));
MOCK_METHOD(void, deferredDelete_, (DeferredDeletable * to_delete));
MOCK_METHOD(envoy_status_t, post_, (std::function<void()> callback));
MOCK_METHOD(Event::SchedulableCallbackPtr, createSchedulableCallback, (std::function<void()> cb));
MOCK_METHOD(bool, isThreadSafe, (), (const));
MOCK_METHOD(void, pushTrackedObject, (const ScopeTrackedObject* object));
MOCK_METHOD(void, popTrackedObject, (const ScopeTrackedObject* expected_object));
Expand Down
114 changes: 114 additions & 0 deletions test/java/org/chromium/net/testing/AndroidEnvoyExplicitH2FlowTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package org.chromium.net.testing;

import static io.envoyproxy.envoymobile.engine.EnvoyConfiguration.TrustChainVerification.ACCEPT_UNTRUSTED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.chromium.net.testing.CronetTestRule.SERVER_CERT_PEM;
import static org.chromium.net.testing.CronetTestRule.SERVER_KEY_PKCS8_PEM;

import android.content.Context;
import androidx.test.core.app.ApplicationProvider;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import io.envoyproxy.envoymobile.AndroidEngineBuilder;
import io.envoyproxy.envoymobile.Engine;
import io.envoyproxy.envoymobile.LogLevel;
import io.envoyproxy.envoymobile.RequestHeaders;
import io.envoyproxy.envoymobile.RequestHeadersBuilder;
import io.envoyproxy.envoymobile.RequestMethod;
import io.envoyproxy.envoymobile.Stream;
import io.envoyproxy.envoymobile.UpstreamHttpProtocol;
import io.envoyproxy.envoymobile.engine.AndroidJniLibrary;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(AndroidJUnit4.class)
public class AndroidEnvoyExplicitH2FlowTest {

private Engine engine;

@BeforeClass
public static void loadJniLibrary() {
AndroidJniLibrary.loadTestLibrary();
}

@Before
public void setUpEngine() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Context appContext = ApplicationProvider.getApplicationContext();
engine = new AndroidEngineBuilder(appContext)
.setTrustChainVerification(ACCEPT_UNTRUSTED)
.addLogLevel(LogLevel.DEBUG)
.setOnEngineRunning(() -> {
latch.countDown();
return null;
})
.build();
Http2TestServer.startHttp2TestServer(appContext, SERVER_CERT_PEM, SERVER_KEY_PKCS8_PEM);
latch.await(); // Don't launch a request before initialization has completed.
}

@After
public void shutdown() throws Exception {
engine.terminate();
Http2TestServer.shutdownHttp2TestServer();
}

@Test
public void continuousWrite_withCancelOnResponseHeaders() throws Exception {
URL url = new URL(Http2TestServer.getEchoAllHeadersUrl());
RequestHeadersBuilder requestHeadersBuilder = new RequestHeadersBuilder(
RequestMethod.POST, url.getProtocol(), url.getAuthority(), url.getPath());
RequestHeaders requestHeaders =
requestHeadersBuilder.addUpstreamHttpProtocol(UpstreamHttpProtocol.HTTP2).build();

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Stream> stream = new AtomicReference<>();
final AtomicInteger bufferSent = new AtomicInteger(0);

// Loop 100,000 times which should be long enough to wait for the server's
// response headers to arrive.
final int numWrites = 100000;
stream.set(
engine.streamClient()
.newStreamPrototype()
.setExplicitFlowControl(true)
.setOnSendWindowAvailable((streamIntel -> {
ByteBuffer bf = ByteBuffer.allocateDirect(1);
bf.put((byte)'a');
if (bufferSent.incrementAndGet() == numWrites) {
stream.get().close(bf);
} else {
stream.get().sendData(bf);
}
return null;
}))
.setOnResponseHeaders((responseHeaders, endStream, ignored) -> {
// This was getting executed, even in the initial test, but only
// after all the data was sent. With the fix, this should happen
// before all the data is sent which is checked in the assert
// below.
stream.get().cancel();
return null;
})
.setOnCancel((ignored) -> {
latch.countDown();
return null;
})
.start(Runnable::run) // direct executor - all the logic runs on the EM Network Thread.
.sendHeaders(requestHeaders, false));
ByteBuffer bf = ByteBuffer.allocateDirect(1);
bf.put((byte)'a');
stream.get().sendData(bf);

latch.await();

assertThat(bufferSent.get()).isNotEqualTo(numWrites);
}
}
19 changes: 19 additions & 0 deletions test/java/org/chromium/net/testing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,22 @@ envoy_mobile_android_test(
"//library/kotlin/io/envoyproxy/envoymobile:envoy_lib",
],
)

envoy_mobile_android_test(
name = "temporary_test",
srcs = [
"AndroidEnvoyExplicitH2FlowTest.java",
],
exec_properties = {
# TODO(lfpino): Remove this once the sandboxNetwork=off works for ipv4 localhost addresses.
"sandboxNetwork": "standard",
},
native_deps = [
"//library/common/jni:libndk_envoy_jni.so",
"//library/common/jni:libndk_envoy_jni.jnilib",
],
deps = [
":testing",
"//library/kotlin/io/envoyproxy/envoymobile:envoy_lib",
],
)