Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions library/common/http/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ Dispatcher::Dispatcher(std::atomic<envoy_network_t>& preferred_network)
address_(std::make_shared<Network::Address::SyntheticAddressImpl>()) {}

envoy_status_t Dispatcher::startStream(envoy_stream_t new_stream_handle,
envoy_http_callbacks bridge_callbacks,
envoy_stream_options) {
envoy_http_callbacks bridge_callbacks) {
post([this, new_stream_handle, bridge_callbacks]() -> void {
Dispatcher::DirectStreamSharedPtr direct_stream{new DirectStream(new_stream_handle, *this)};
direct_stream->callbacks_ =
Expand Down
4 changes: 1 addition & 3 deletions library/common/http/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
* there is no guarantee it will ever functionally represent an open stream.
* @param stream, the stream to start.
* @param bridge_callbacks, wrapper for callbacks for events on this stream.
* @param stream_options, the config options to start the stream with.
* @return envoy_stream_t handle to the stream being created.
*/
envoy_status_t startStream(envoy_stream_t stream, envoy_http_callbacks bridge_callbacks,
envoy_stream_options stream_options);
envoy_status_t startStream(envoy_stream_t stream, envoy_http_callbacks bridge_callbacks);

/**
* Send headers over an open HTTP stream. This method can be invoked once and needs to be called
Expand Down
5 changes: 2 additions & 3 deletions library/common/jni_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ extern "C" JNIEXPORT jlong JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibr
}

extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibrary_startStream(
JNIEnv* env, jclass, jlong stream_handle, jobject j_context, jboolean buffer_for_retry) {
JNIEnv* env, jclass, jlong stream_handle, jobject j_context) {

jclass jcls_JvmCallbackContext = env->GetObjectClass(j_context);

Expand All @@ -302,9 +302,8 @@ extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibra
envoy_http_callbacks native_callbacks = {jvm_on_headers, jvm_on_data, jvm_on_metadata,
jvm_on_trailers, jvm_on_error, jvm_on_complete,
jvm_on_cancel, retained_context};
envoy_stream_options stream_options = {buffer_for_retry == JNI_TRUE ? true : false};
envoy_status_t result =
start_stream(static_cast<envoy_stream_t>(stream_handle), native_callbacks, stream_options);
start_stream(static_cast<envoy_stream_t>(stream_handle), native_callbacks);
if (result != ENVOY_SUCCESS) {
env->DeleteGlobalRef(retained_context); // No callbacks are fired and we need to release
}
Expand Down
5 changes: 2 additions & 3 deletions library/common/main_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ static std::atomic<envoy_network_t> preferred_network_{ENVOY_NET_GENERIC};

envoy_stream_t init_stream(envoy_engine_t) { return current_stream_handle_++; }

envoy_status_t start_stream(envoy_stream_t stream, envoy_http_callbacks callbacks,
envoy_stream_options stream_options) {
envoy_status_t start_stream(envoy_stream_t stream, envoy_http_callbacks callbacks) {
if (auto e = engine_.lock()) {
return e->httpDispatcher().startStream(stream, callbacks, stream_options);
return e->httpDispatcher().startStream(stream, callbacks);
}
return ENVOY_FAILURE;
}
Expand Down
4 changes: 1 addition & 3 deletions library/common/main_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ envoy_stream_t init_stream(envoy_engine_t);
* can occur.
* @param stream, handle to the stream to be started.
* @param callbacks, the callbacks that will run the stream callbacks.
* @param options, DEPRECATED.
* @return envoy_stream, with a stream handle and a success status, or a failure status.
*/
envoy_status_t start_stream(envoy_stream_t, envoy_http_callbacks callbacks,
envoy_stream_options stream_options);
envoy_status_t start_stream(envoy_stream_t, envoy_http_callbacks callbacks);

/**
* Send headers over an open HTTP stream. This method can be invoked once and needs to be called
Expand Down
4 changes: 0 additions & 4 deletions library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,6 @@ typedef struct {
envoy_data message;
} envoy_error;

typedef struct {
bool buffer_body_for_retry;
} envoy_stream_options;

#ifdef __cplusplus
extern "C" { // function pointers
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ public interface EnvoyEngine {
/**
* Creates a new stream with the provided callbacks.
*
* @param callbacks The callbacks for receiving callbacks from the stream.
* @param bufferForRetry Whether this stream should be buffered to support
* future retries. Must be true for requests that support
* retrying.
* @param callbacks The callbacks for receiving callbacks from the stream.
* @return A stream that may be used for sending data.
*/
EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks, boolean bufferForRetry);
EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks);

/**
* Run the Envoy engine with the provided yaml string and log level.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@ public EnvoyEngineImpl() {
/**
* Creates a new stream with the provided callbacks.
*
* @param callbacks The callbacks for the stream.
* @param bufferForRetry Whether this stream should be buffered to support
* future retries. Must be true for requests that support
* retrying.
* @param callbacks The callbacks for the stream.
* @return A stream that may be used for sending data.
*/
@Override
public EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks, boolean bufferForRetry) {
public EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks) {
long streamHandle = JniLibrary.initStream(engineHandle);
return new EnvoyHTTPStream(streamHandle, callbacks, bufferForRetry);
return new EnvoyHTTPStream(streamHandle, callbacks);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ public class EnvoyHTTPStream {
private final long streamHandle;
private final JvmCallbackContext callbacksContext;

EnvoyHTTPStream(long streamHandle, EnvoyHTTPCallbacks callbacks, boolean bufferForRetry) {
EnvoyHTTPStream(long streamHandle, EnvoyHTTPCallbacks callbacks) {
this.streamHandle = streamHandle;
callbacksContext = new JvmCallbackContext(callbacks);
JniLibrary.startStream(streamHandle, callbacksContext, bufferForRetry);
JniLibrary.startStream(streamHandle, callbacksContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,13 @@ private static class JavaLoader {
* Open an underlying HTTP stream. Note: Streams must be started before other
* other interaction can can occur.
*
* @param stream, handle to the stream to be started.
* @param context, context that contains dispatch logic to fire callbacks
* callbacks.
* @param bufferForRetry Whether this stream should be buffered to support
* future retries. Must be true for requests that support
* retrying.
* @param stream, handle to the stream to be started.
* @param context, context that contains dispatch logic to fire callbacks
* callbacks.
* @return envoy_stream, with a stream handle and a success status, or a failure
* status.
*/
protected static native int startStream(long stream, JvmCallbackContext context,
boolean bufferForRetry);
protected static native int startStream(long stream, JvmCallbackContext context);

/**
* Send headers over an open HTTP stream. This method can be invoked once and
Expand Down
12 changes: 6 additions & 6 deletions library/kotlin/src/io/envoyproxy/envoymobile/EnvoyClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ enum class LogLevel(internal val level: String) {
* Wrapper class that allows for easy calling of Envoy's JNI interface in native Java.
*/
class Envoy private constructor(
private val engine: EnvoyEngine,
internal val envoyConfiguration: EnvoyConfiguration?,
internal val configurationYAML: String?,
internal val logLevel: LogLevel
private val engine: EnvoyEngine,
internal val envoyConfiguration: EnvoyConfiguration?,
internal val configurationYAML: String?,
internal val logLevel: LogLevel
) : HTTPClient {

constructor(engine: EnvoyEngine, envoyConfiguration: EnvoyConfiguration, logLevel: LogLevel = LogLevel.INFO) : this(engine, envoyConfiguration, null, logLevel)
Expand All @@ -36,13 +36,13 @@ class Envoy private constructor(
init {
if (envoyConfiguration == null) {
engine.runWithConfig(configurationYAML, logLevel.level)
}else {
} else {
engine.runWithConfig(envoyConfiguration, logLevel.level)
}
}

override fun send(request: Request, responseHandler: ResponseHandler): StreamEmitter {
val stream = engine.startStream(responseHandler.underlyingCallbacks, request.retryPolicy != null)
val stream = engine.startStream(responseHandler.underlyingCallbacks)
stream.sendHeaders(request.outboundHeaders(), false)
return EnvoyStreamEmitter(stream)
}
Expand Down
69 changes: 12 additions & 57 deletions library/kotlin/test/io/envoyproxy/envoymobile/EnvoyClientTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package io.envoyproxy.envoymobile
import io.envoyproxy.envoymobile.engine.EnvoyConfiguration
import io.envoyproxy.envoymobile.engine.EnvoyEngine
import io.envoyproxy.envoymobile.engine.EnvoyHTTPStream
import java.nio.ByteBuffer
import java.util.concurrent.Executor
import org.junit.Test
import org.mockito.ArgumentMatchers.any
import org.mockito.ArgumentMatchers.anyBoolean
import org.mockito.ArgumentMatchers.eq
import org.mockito.Mockito.`when`
import org.mockito.Mockito.mock
import org.mockito.Mockito.verify
import java.nio.ByteBuffer
import java.util.concurrent.Executor

class EnvoyClientTest {

Expand All @@ -21,7 +19,7 @@ class EnvoyClientTest {

@Test
fun `starting a stream on envoy sends headers`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val expectedHeaders = mapOf(
Expand All @@ -46,7 +44,7 @@ class EnvoyClientTest {

@Test
fun `sending data on stream stream forwards data to the underlying stream`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val emitter = envoy.send(
Expand All @@ -67,7 +65,7 @@ class EnvoyClientTest {

@Test
fun `sending metadata on stream forwards metadata to the underlying stream`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val metadata = mapOf("key_1" to listOf("value_a"))
Expand All @@ -87,7 +85,7 @@ class EnvoyClientTest {

@Test
fun `closing stream sends empty data to the underlying stream`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val emitter = envoy.send(
Expand All @@ -106,7 +104,7 @@ class EnvoyClientTest {

@Test
fun `closing stream with trailers sends trailers to the underlying stream `() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val trailers = mapOf("key_1" to listOf("value_a"))
Expand All @@ -126,7 +124,7 @@ class EnvoyClientTest {

@Test
fun `sending request on envoy sends headers`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val expectedHeaders = mapOf(
Expand All @@ -152,7 +150,7 @@ class EnvoyClientTest {

@Test
fun `sending request on envoy passes the body buffer`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val body = ByteBuffer.allocate(0)
Expand All @@ -171,7 +169,7 @@ class EnvoyClientTest {

@Test
fun `sending request on envoy without trailers sends empty trailers`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val body = ByteBuffer.allocate(0)
Expand All @@ -190,49 +188,7 @@ class EnvoyClientTest {

@Test
fun `sending request on envoy sends trailers`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
val envoy = Envoy(engine, config)

val trailers = mapOf("key_1" to listOf("value_a"))
envoy.send(
RequestBuilder(
method = RequestMethod.POST,
scheme = "https",
authority = "api.foo.com",
path = "foo")
.build(),
ByteBuffer.allocate(0),
trailers,
ResponseHandler(Executor {}))

verify(stream).sendTrailers(trailers)
}

@Test
fun `sending request with retryPolicy creates a stream with buffering`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
val envoy = Envoy(engine, config)

val trailers = mapOf("key_1" to listOf("value_a"))
envoy.send(
RequestBuilder(
method = RequestMethod.POST,
scheme = "https",
authority = "api.foo.com",
path = "foo")
.addRetryPolicy(RetryPolicy(23, listOf(RetryRule.STATUS_5XX, RetryRule.CONNECT_FAILURE), 1234))
.build(),
ByteBuffer.allocate(0),
trailers,
ResponseHandler(Executor {}))

verify(stream).sendTrailers(trailers)
verify(engine).startStream(any(), eq(true))
}

@Test
fun `sending request without retryPolicy creates a stream without buffering`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val trailers = mapOf("key_1" to listOf("value_a"))
Expand All @@ -248,12 +204,11 @@ class EnvoyClientTest {
ResponseHandler(Executor {}))

verify(stream).sendTrailers(trailers)
verify(engine).startStream(any(), eq(false))
}

@Test
fun `cancelling stream cancels the underlying stream`() {
`when`(engine.startStream(any(), anyBoolean())).thenReturn(stream)
`when`(engine.startStream(any())).thenReturn(stream)
val envoy = Envoy(engine, config)

val trailers = mapOf("key_1" to listOf("value_a"))
Expand Down
11 changes: 2 additions & 9 deletions library/objective-c/EnvoyEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,8 @@ typedef NSDictionary<NSString *, NSArray<NSString *> *> EnvoyHeaders;

@param handle Underlying handle of the HTTP stream owned by an Envoy engine.
@param callbacks The callbacks for the stream.
@param bufferForRetry Whether this stream should be buffered to support future retries. Must be
true for requests that support retrying.
*/
- (instancetype)initWithHandle:(intptr_t)handle
callbacks:(EnvoyHTTPCallbacks *)callbacks
bufferForRetry:(BOOL)bufferForRetry;
- (instancetype)initWithHandle:(intptr_t)handle callbacks:(EnvoyHTTPCallbacks *)callbacks;

/**
Send headers over the provided stream.
Expand Down Expand Up @@ -189,11 +185,8 @@ typedef NSDictionary<NSString *, NSArray<NSString *> *> EnvoyHeaders;
Opens a new HTTP stream attached to this engine.

@param callbacks Handler for observing stream events.
@param bufferForRetry Whether this stream should be buffered to support future retries. Must be
true for requests that support retrying.
*/
- (id<EnvoyHTTPStream>)startStreamWithCallbacks:(EnvoyHTTPCallbacks *)callbacks
bufferForRetry:(BOOL)bufferForRetry;
- (id<EnvoyHTTPStream>)startStreamWithCallbacks:(EnvoyHTTPCallbacks *)callbacks;

@end

Expand Down
6 changes: 2 additions & 4 deletions library/objective-c/EnvoyEngineImpl.m
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ - (int)runWithConfigYAML:(NSString *)configYAML logLevel:(NSString *)logLevel {
}
}

- (id<EnvoyHTTPStream>)startStreamWithCallbacks:(EnvoyHTTPCallbacks *)callbacks
bufferForRetry:(BOOL)bufferForRetry {
- (id<EnvoyHTTPStream>)startStreamWithCallbacks:(EnvoyHTTPCallbacks *)callbacks {
return [[EnvoyHTTPStreamImpl alloc] initWithHandle:init_stream(_engineHandle)
callbacks:callbacks
bufferForRetry:bufferForRetry];
callbacks:callbacks];
}

@end
7 changes: 2 additions & 5 deletions library/objective-c/EnvoyHTTPStreamImpl.m
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ @implementation EnvoyHTTPStreamImpl {
envoy_stream_t _streamHandle;
}

- (instancetype)initWithHandle:(envoy_stream_t)handle
callbacks:(EnvoyHTTPCallbacks *)callbacks
bufferForRetry:(BOOL)bufferForRetry {
- (instancetype)initWithHandle:(envoy_stream_t)handle callbacks:(EnvoyHTTPCallbacks *)callbacks {
self = [super init];
if (!self) {
return nil;
Expand All @@ -209,8 +207,7 @@ - (instancetype)initWithHandle:(envoy_stream_t)handle
// We need create the native-held strong ref on this stream before we call start_stream because
// start_stream could result in a reset that would release the native ref.
_strongSelf = self;
envoy_stream_options stream_options = {bufferForRetry};
envoy_status_t result = start_stream(_streamHandle, native_callbacks, stream_options);
envoy_status_t result = start_stream(_streamHandle, native_callbacks);
if (result != ENVOY_SUCCESS) {
_strongSelf = nil;
return nil;
Expand Down
Loading