diff --git a/library/common/event/provisional_dispatcher.cc b/library/common/event/provisional_dispatcher.cc index 3eb72709b4..f44d8203f5 100644 --- a/library/common/event/provisional_dispatcher.cc +++ b/library/common/event/provisional_dispatcher.cc @@ -44,6 +44,14 @@ envoy_status_t ProvisionalDispatcher::post(Event::PostCb callback) { return ENVOY_SUCCESS; } +Event::SchedulableCallbackPtr +ProvisionalDispatcher::createSchedulableCallback(std::function cb) { + RELEASE_ASSERT( + isThreadSafe(), + "ProvisionalDispatcher::createSchedulableCallback must be called from a threadsafe context"); + return event_dispatcher_->createSchedulableCallback(cb); +} + bool ProvisionalDispatcher::isThreadSafe() const { // Doesn't require locking because if a thread has a stale view of drained_, then by definition // this wasn't a threadsafe call. diff --git a/library/common/event/provisional_dispatcher.h b/library/common/event/provisional_dispatcher.h index 3b13e9ef45..28538bef3d 100644 --- a/library/common/event/provisional_dispatcher.h +++ b/library/common/event/provisional_dispatcher.h @@ -41,6 +41,15 @@ class ProvisionalDispatcher : public ScopeTracker { */ virtual envoy_status_t post(Event::PostCb callback); + /** + * Allocates a schedulable callback. @see SchedulableCallback for docs on how to use the wrapped + * callback. + * @param cb supplies the callback to invoke when the SchedulableCallback is triggered on the + * event loop. + * Must be called from context where ProvisionalDispatcher::isThreadSafe() is true. + */ + virtual Event::SchedulableCallbackPtr createSchedulableCallback(std::function cb); + /** * @return false before the Event::Dispatcher is running, otherwise the result of the * underlying call to Event::Dispatcher::isThreadSafe(). diff --git a/library/common/http/client.cc b/library/common/http/client.cc index 951d219b0c..a16738cfd0 100644 --- a/library/common/http/client.cc +++ b/library/common/http/client.cc @@ -471,9 +471,14 @@ void Client::sendData(envoy_stream_t stream, envoy_data data, bool end_stream) { if (direct_stream->explicit_flow_control_ && !end_stream) { if (direct_stream->read_disable_count_ == 0) { // If there is still buffer space after the write, notify the sender - // that send window is available. + // that send window is available, on the next dispatcher iteration so + // that repeated writes do not starve reads. direct_stream->wants_write_notification_ = false; - direct_stream->callbacks_->onSendWindowAvailable(); + // A new callback must be scheduled each time to capture any changes to the + // DirectStream's callbacks from call to call. + scheduled_callback_ = dispatcher_.createSchedulableCallback( + [direct_stream] { direct_stream->callbacks_->onSendWindowAvailable(); }); + scheduled_callback_->scheduleCallbackNextIteration(); } else { // Otherwise, make sure the stack will send a notification when the // buffers are drained. diff --git a/library/common/http/client.h b/library/common/http/client.h index f2a75998d8..757a3f2693 100644 --- a/library/common/http/client.h +++ b/library/common/http/client.h @@ -321,6 +321,7 @@ class Client : public Logger::Loggable { ApiListener& api_listener_; Event::ProvisionalDispatcher& dispatcher_; + Event::SchedulableCallbackPtr scheduled_callback_; HttpClientStats stats_; // The set of open streams, which can safely have request data sent on them // or response data received. diff --git a/test/common/http/client_test.cc b/test/common/http/client_test.cc index 391e6fa079..600d73404b 100644 --- a/test/common/http/client_test.cc +++ b/test/common/http/client_test.cc @@ -335,6 +335,20 @@ TEST_P(ClientTest, BasicStreamTrailers) { } TEST_P(ClientTest, MultipleDataStream) { + Event::MockDispatcher dispatcher; + ON_CALL(dispatcher_, drain).WillByDefault([&](Event::Dispatcher& event_dispatcher) { + dispatcher_.Event::ProvisionalDispatcher::drain(event_dispatcher); + }); + dispatcher_.drain(dispatcher); + Event::MockSchedulableCallback* process_buffered_data_callback = nullptr; + if (explicit_flow_control_) { + process_buffered_data_callback = new Event::MockSchedulableCallback(&dispatcher); + EXPECT_CALL(*process_buffered_data_callback, scheduleCallbackNextIteration()); + ON_CALL(dispatcher_, createSchedulableCallback).WillByDefault([&](std::function cb) { + return dispatcher_.Event::ProvisionalDispatcher::createSchedulableCallback(cb); + }); + } + cc_.end_stream_with_headers_ = false; envoy_headers c_headers = defaultRequestHeaders(); @@ -361,8 +375,13 @@ TEST_P(ClientTest, MultipleDataStream) { EXPECT_CALL(dispatcher_, popTrackedObject(_)); EXPECT_CALL(*request_decoder_, decodeData(BufferStringEqual("request body"), false)); http_client_.sendData(stream_, c_data, false); - // The buffer is not full: expect an on_send_window_available call in explicit_flow_control mode. - EXPECT_EQ(cc_.on_send_window_available_calls, explicit_flow_control_ ? 1 : 0); + EXPECT_EQ(cc_.on_send_window_available_calls, 0); + if (explicit_flow_control_) { + EXPECT_TRUE(process_buffered_data_callback->enabled_); + process_buffered_data_callback->invokeCallback(); + // The buffer is not full: expect an on_send_window_available call. + EXPECT_EQ(cc_.on_send_window_available_calls, 1); + } // Send second request data. EXPECT_CALL(dispatcher_, pushTrackedObject(_)); diff --git a/test/common/mocks/event/mocks.h b/test/common/mocks/event/mocks.h index 95253451ee..411479e5ce 100644 --- a/test/common/mocks/event/mocks.h +++ b/test/common/mocks/event/mocks.h @@ -38,6 +38,7 @@ class MockProvisionalDispatcher : public ProvisionalDispatcher { MOCK_METHOD(void, drain, (Event::Dispatcher & event_dispatcher)); MOCK_METHOD(void, deferredDelete_, (DeferredDeletable * to_delete)); MOCK_METHOD(envoy_status_t, post_, (std::function callback)); + MOCK_METHOD(Event::SchedulableCallbackPtr, createSchedulableCallback, (std::function cb)); MOCK_METHOD(bool, isThreadSafe, (), (const)); MOCK_METHOD(void, pushTrackedObject, (const ScopeTrackedObject* object)); MOCK_METHOD(void, popTrackedObject, (const ScopeTrackedObject* expected_object)); diff --git a/test/java/org/chromium/net/testing/AndroidEnvoyExplicitH2FlowTest.java b/test/java/org/chromium/net/testing/AndroidEnvoyExplicitH2FlowTest.java new file mode 100644 index 0000000000..1582cabbab --- /dev/null +++ b/test/java/org/chromium/net/testing/AndroidEnvoyExplicitH2FlowTest.java @@ -0,0 +1,114 @@ +package org.chromium.net.testing; + +import static io.envoyproxy.envoymobile.engine.EnvoyConfiguration.TrustChainVerification.ACCEPT_UNTRUSTED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.chromium.net.testing.CronetTestRule.SERVER_CERT_PEM; +import static org.chromium.net.testing.CronetTestRule.SERVER_KEY_PKCS8_PEM; + +import android.content.Context; +import androidx.test.core.app.ApplicationProvider; +import androidx.test.ext.junit.runners.AndroidJUnit4; +import io.envoyproxy.envoymobile.AndroidEngineBuilder; +import io.envoyproxy.envoymobile.Engine; +import io.envoyproxy.envoymobile.LogLevel; +import io.envoyproxy.envoymobile.RequestHeaders; +import io.envoyproxy.envoymobile.RequestHeadersBuilder; +import io.envoyproxy.envoymobile.RequestMethod; +import io.envoyproxy.envoymobile.Stream; +import io.envoyproxy.envoymobile.UpstreamHttpProtocol; +import io.envoyproxy.envoymobile.engine.AndroidJniLibrary; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(AndroidJUnit4.class) +public class AndroidEnvoyExplicitH2FlowTest { + + private Engine engine; + + @BeforeClass + public static void loadJniLibrary() { + AndroidJniLibrary.loadTestLibrary(); + } + + @Before + public void setUpEngine() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + Context appContext = ApplicationProvider.getApplicationContext(); + engine = new AndroidEngineBuilder(appContext) + .setTrustChainVerification(ACCEPT_UNTRUSTED) + .addLogLevel(LogLevel.DEBUG) + .setOnEngineRunning(() -> { + latch.countDown(); + return null; + }) + .build(); + Http2TestServer.startHttp2TestServer(appContext, SERVER_CERT_PEM, SERVER_KEY_PKCS8_PEM); + latch.await(); // Don't launch a request before initialization has completed. + } + + @After + public void shutdown() throws Exception { + engine.terminate(); + Http2TestServer.shutdownHttp2TestServer(); + } + + @Test + public void continuousWrite_withCancelOnResponseHeaders() throws Exception { + URL url = new URL(Http2TestServer.getEchoAllHeadersUrl()); + RequestHeadersBuilder requestHeadersBuilder = new RequestHeadersBuilder( + RequestMethod.POST, url.getProtocol(), url.getAuthority(), url.getPath()); + RequestHeaders requestHeaders = + requestHeadersBuilder.addUpstreamHttpProtocol(UpstreamHttpProtocol.HTTP2).build(); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference stream = new AtomicReference<>(); + final AtomicInteger bufferSent = new AtomicInteger(0); + + // Loop 100,000 times which should be long enough to wait for the server's + // response headers to arrive. + final int numWrites = 100000; + stream.set( + engine.streamClient() + .newStreamPrototype() + .setExplicitFlowControl(true) + .setOnSendWindowAvailable((streamIntel -> { + ByteBuffer bf = ByteBuffer.allocateDirect(1); + bf.put((byte)'a'); + if (bufferSent.incrementAndGet() == numWrites) { + stream.get().close(bf); + } else { + stream.get().sendData(bf); + } + return null; + })) + .setOnResponseHeaders((responseHeaders, endStream, ignored) -> { + // This was getting executed, even in the initial test, but only + // after all the data was sent. With the fix, this should happen + // before all the data is sent which is checked in the assert + // below. + stream.get().cancel(); + return null; + }) + .setOnCancel((ignored) -> { + latch.countDown(); + return null; + }) + .start(Runnable::run) // direct executor - all the logic runs on the EM Network Thread. + .sendHeaders(requestHeaders, false)); + ByteBuffer bf = ByteBuffer.allocateDirect(1); + bf.put((byte)'a'); + stream.get().sendData(bf); + + latch.await(); + + assertThat(bufferSent.get()).isNotEqualTo(numWrites); + } +} diff --git a/test/java/org/chromium/net/testing/BUILD b/test/java/org/chromium/net/testing/BUILD index f36f0ea28c..27e7e9fc9f 100644 --- a/test/java/org/chromium/net/testing/BUILD +++ b/test/java/org/chromium/net/testing/BUILD @@ -90,3 +90,22 @@ envoy_mobile_android_test( "//library/kotlin/io/envoyproxy/envoymobile:envoy_lib", ], ) + +envoy_mobile_android_test( + name = "temporary_test", + srcs = [ + "AndroidEnvoyExplicitH2FlowTest.java", + ], + exec_properties = { + # TODO(lfpino): Remove this once the sandboxNetwork=off works for ipv4 localhost addresses. + "sandboxNetwork": "standard", + }, + native_deps = [ + "//library/common/jni:libndk_envoy_jni.so", + "//library/common/jni:libndk_envoy_jni.jnilib", + ], + deps = [ + ":testing", + "//library/kotlin/io/envoyproxy/envoymobile:envoy_lib", + ], +)