Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e8d933d
Add a raw (Buffer::Instance) gRPC interface.
jplevyak Apr 9, 2019
80c5ecf
use NOT_REACHED_GCOVR_EXCL_LINE in place of exceptions.
jplevyak Apr 12, 2019
c64fcd0
Fix formating.
jplevyak Apr 12, 2019
3a80415
Update to the new Buffer-based gRPC API.
jplevyak Apr 19, 2019
2210ddc
Add missing file.
jplevyak Apr 19, 2019
45f1e17
Change serializeBody -> serializeToGrpcFrame as requested.
jplevyak Apr 24, 2019
bf6d032
Remove isGrpcHeaderRequired.
jplevyak Apr 25, 2019
e4cb012
Remove the Untyped versions and remove the Typed decorator adding
jplevyak Apr 25, 2019
1564b4c
Remove unused API isGrpcHeaderRequired.
jplevyak Apr 26, 2019
74fad14
Move the function to create the GrpcFrameHeader into Common.
jplevyak Apr 26, 2019
e9760ec
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak May 10, 2019
e80e477
Update merge with master.
jplevyak May 13, 2019
74a50ce
Address typo.
jplevyak May 13, 2019
91b9081
Address comments.
jplevyak May 22, 2019
8f670d5
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak May 22, 2019
70a1688
Address comments.
jplevyak May 22, 2019
b04ee92
Address comments.
jplevyak May 22, 2019
cf58e00
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak May 28, 2019
c5cbb52
Add test for additional coverage.
jplevyak Jun 4, 2019
bdf29de
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak Jun 5, 2019
00116b2
Always check return from grpc::ByteBuffer::Dump().
jplevyak Jun 5, 2019
7bd3307
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak Jun 5, 2019
dab4d57
Address comments.
jplevyak Jun 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/envoy/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ envoy_cc_library(
external_deps = ["abseil_optional"],
deps = [
":status",
"//include/envoy/buffer:buffer_interface",
"//include/envoy/http:header_map_interface",
"//include/envoy/tracing:http_tracer_interface",
"//source/common/protobuf",
Expand Down
112 changes: 98 additions & 14 deletions include/envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <chrono>

#include "envoy/buffer/buffer.h"
#include "envoy/common/pure.h"
#include "envoy/grpc/status.h"
#include "envoy/http/header_map.h"
Expand Down Expand Up @@ -41,7 +42,16 @@ class AsyncStream {
* object, but callbacks may still be received until the stream is closed
* remotely.
*/
virtual void sendMessage(const Protobuf::Message& request, bool end_stream) PURE;
virtual void sendMessage(const Protobuf::Message& request, bool end_stream);

/**
* Send request message to the stream.
* @param request serializalized message.
* @param end_stream close the stream locally. No further methods may be invoked on the stream
* object, but callbacks may still be received until the stream is closed
* remotely.
*/
virtual void sendRawMessage(Buffer::InstancePtr request, bool end_stream) PURE;
Comment thread
jplevyak marked this conversation as resolved.
Outdated

/**
* Close the stream locally and send an empty DATA frame to the remote. No further methods may be
Expand All @@ -57,9 +67,9 @@ class AsyncStream {
virtual void resetStream() PURE;
};

class AsyncRequestCallbacks {
class RawAsyncRequestCallbacks {
public:
virtual ~AsyncRequestCallbacks() {}
virtual ~RawAsyncRequestCallbacks() {}

/**
* Called when populating the headers to send with initial metadata.
Expand All @@ -71,15 +81,18 @@ class AsyncRequestCallbacks {
* Factory for empty response messages.
* @return ProtobufTypes::MessagePtr a Protobuf::Message with the response
* type for the request.
* NB: createEmptyResponse will not be called if onSuccessRaw() is overriden.
*/
virtual ProtobufTypes::MessagePtr createEmptyResponse() PURE;
virtual ProtobufTypes::MessagePtr createEmptyResponse() {
throw EnvoyException("AsyncRequestCallbacks::createEmptyResponse must be overriden");
Comment thread
jplevyak marked this conversation as resolved.
Outdated
}

/**
* Called when the async gRPC request succeeds. No further callbacks will be invoked.
* @param response the gRPC response.
* @param response the gRPC response bytes.
* @param span a tracing span to fill with extra tags.
*/
virtual void onSuccessUntyped(ProtobufTypes::MessagePtr&& response, Tracing::Span& span) PURE;
virtual void onSuccessRaw(Buffer::InstancePtr response, Tracing::Span& span) PURE;

/**
* Called when the async gRPC request fails. No further callbacks will be invoked.
Expand All @@ -91,6 +104,20 @@ class AsyncRequestCallbacks {
Tracing::Span& span) PURE;
};

class AsyncRequestCallbacks : public RawAsyncRequestCallbacks {
Comment thread
jplevyak marked this conversation as resolved.
Outdated
public:
virtual ~AsyncRequestCallbacks() {}

void onSuccessRaw(Buffer::InstancePtr response, Tracing::Span& span) override;
Comment thread
jplevyak marked this conversation as resolved.
Outdated
/**
* Called when the async gRPC request succeeds. No further callbacks will be invoked.
* @param response the gRPC response.
* @param span a tracing span to fill with extra tags.
* NB: requires overriding createEmptyResponse().
*/
virtual void onSuccessUntyped(ProtobufTypes::MessagePtr&& response, Tracing::Span& span) PURE;
Comment thread
jplevyak marked this conversation as resolved.
Outdated
};

// Templatized variant of AsyncRequestCallbacks.
template <class ResponseType> class TypedAsyncRequestCallbacks : public AsyncRequestCallbacks {
public:
Expand All @@ -108,20 +135,23 @@ template <class ResponseType> class TypedAsyncRequestCallbacks : public AsyncReq
/**
* Notifies caller of async gRPC stream status.
* Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by
* AsyncStream.close(), AsyncStreamCallbacks can continue to receive events until the remote
* AsyncStream.close(), RawAsyncStreamCallbacks can continue to receive events until the remote
* to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no
* further callbacks will be invoked.
*/
class AsyncStreamCallbacks {
class RawAsyncStreamCallbacks {
public:
virtual ~AsyncStreamCallbacks() {}
virtual ~RawAsyncStreamCallbacks() {}

/**
* Factory for empty response messages.
* @return ProtobufTypes::MessagePtr a Protobuf::Message with the response
* type for the stream.
* NB: createEmptyResponse will not be called if onRecieveRawMessage() is overriden.
*/
virtual ProtobufTypes::MessagePtr createEmptyResponse() PURE;
virtual ProtobufTypes::MessagePtr createEmptyResponse() {
throw EnvoyException("AsyncStreamCallbacks::createEmptyResponse must be overriden");
Comment thread
jplevyak marked this conversation as resolved.
Outdated
}

/**
* Called when populating the headers to send with initial metadata.
Expand All @@ -139,8 +169,10 @@ class AsyncStreamCallbacks {
/**
* Called when an async gRPC message is received.
* @param response the gRPC message.
* @return bool which is true if the message well formed and false otherwise which will cause
the stream to shutdown with an INTERNAL error.
*/
virtual void onReceiveMessageUntyped(ProtobufTypes::MessagePtr&& message) PURE;
virtual bool onReceiveRawMessage(Buffer::InstancePtr response) PURE;

/**
* Called when trailing metadata is received. This will also be called on non-Ok grpc-status
Expand All @@ -152,13 +184,33 @@ class AsyncStreamCallbacks {
/**
* Called when the remote closes or an error occurs on the gRPC stream. The stream is
* considered remotely closed after this invocation and no further callbacks will be
* invoked. In addition, no further stream operations are permitted.
{ * invoked. In addition, no further stream operations are permitted.
* @param status the gRPC status.
* @param message the gRPC status message or empty string if not present.
*/
virtual void onRemoteClose(Status::GrpcStatus status, const std::string& message) PURE;
};

/**
* Notifies caller of async gRPC stream status.
* Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by
* AsyncStream.close(), AsyncStreamCallbacks can continue to receive events until the remote
* to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no
* further callbacks will be invoked.
*/
class AsyncStreamCallbacks : public RawAsyncStreamCallbacks {
public:
virtual ~AsyncStreamCallbacks() {}

bool onReceiveRawMessage(Buffer::InstancePtr response) override;
/**
* Called when an async gRPC message is received.
* @param response the gRPC message.
* NB: requires overriding createEmptyResponse().
*/
virtual void onReceiveMessageUntyped(ProtobufTypes::MessagePtr&& message) PURE;
};

// Templatized variant of AsyncStreamCallbacks.
template <class ResponseType> class TypedAsyncStreamCallbacks : public AsyncStreamCallbacks {
public:
Expand Down Expand Up @@ -195,7 +247,24 @@ class AsyncClient {
virtual AsyncRequest* send(const Protobuf::MethodDescriptor& service_method,
const Protobuf::Message& request, AsyncRequestCallbacks& callbacks,
Tracing::Span& parent_span,
const absl::optional<std::chrono::milliseconds>& timeout) PURE;
const absl::optional<std::chrono::milliseconds>& timeout);

/**
* Start a gRPC unary RPC asynchronously.
* @param service_full_name full name of the service (i.e. service_method.service()->full_name()).
* @param method_name name of the method (i.e. service_method.name()).
* @param request serialized message.
* @param callbacks the callbacks to be notified of RPC status.
* @param parent_span the current parent tracing context.
* @param timeout supplies the request timeout.
* @return a request handle or nullptr if no request could be started. NOTE: In this case
* onFailure() has already been called inline. The client owns the request and the
* handle should just be used to cancel.
*/
virtual AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
Comment thread
jplevyak marked this conversation as resolved.
Buffer::InstancePtr request, RawAsyncRequestCallbacks& callbacks,
Tracing::Span& parent_span,
const absl::optional<std::chrono::milliseconds>& timeout) PURE;

/**
* Start a gRPC stream asynchronously.
Expand All @@ -209,7 +278,22 @@ class AsyncClient {
* may be reclaimed.
*/
virtual AsyncStream* start(const Protobuf::MethodDescriptor& service_method,
AsyncStreamCallbacks& callbacks) PURE;
AsyncStreamCallbacks& callbacks);

/**
* Start a gRPC stream asynchronously.
* TODO(mattklein123): Determine if tracing should be added to streaming requests.
* @param service_full_name full name of the service (i.e. service_method.service()->full_name()).
* @param method_name name of the method (i.e. service_method.name()).
* @param callbacks the callbacks to be notified of stream status.
* @return a stream handle or nullptr if no stream could be started. NOTE: In this case
* onRemoteClose() has already been called inline. The client owns the stream and
* the handle can be used to send more messages or finish the stream. It is expected that
* closeStream() is invoked by the caller to notify the client that the stream resources
* may be reclaimed.
*/
virtual AsyncStream* startRaw(absl::string_view service_full_name, absl::string_view method_name,
RawAsyncStreamCallbacks& callbacks) PURE;
};

typedef std::unique_ptr<AsyncClient> AsyncClientPtr;
Expand Down
4 changes: 3 additions & 1 deletion source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@ void OwnedImpl::commit(RawSlice* iovecs, uint64_t num_iovecs) {
slice_index--;
}
if (slice_index < 0) {
// There was no slice containing any data, so rewind the iterator at the first slice.
slice_index = 0;
if (!slices_[0]) {
Comment thread
jplevyak marked this conversation as resolved.
return;
}
}

// Next, scan forward and attempt to match the slices against iovecs.
Expand Down
1 change: 1 addition & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ envoy_cc_library(
"//source/common/common:backoff_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:token_bucket_impl_lib",
"//source/common/grpc:async_client_lib",
"//source/common/protobuf",
],
)
Expand Down
13 changes: 11 additions & 2 deletions source/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ envoy_package()

envoy_cc_library(
name = "async_client_lib",
srcs = ["async_client_impl.cc"],
srcs = [
"async_client.cc",
"async_client_impl.cc",
],
hdrs = ["async_client_impl.h"],
deps = [
":codec_lib",
Expand All @@ -28,6 +31,7 @@ envoy_cc_library(
hdrs = ["async_client_manager_impl.h"],
deps = [
":async_client_lib",
":common_lib",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/singleton:manager_interface",
"//include/envoy/thread_local:thread_local_interface",
Expand Down Expand Up @@ -61,7 +65,10 @@ envoy_cc_library(
name = "common_lib",
srcs = ["common.cc"],
hdrs = ["common.h"],
external_deps = ["abseil_optional"],
external_deps = [
"abseil_optional",
"grpc",
Comment thread
jplevyak marked this conversation as resolved.
Outdated
],
deps = [
"//include/envoy/http:header_map_interface",
"//include/envoy/http:message_interface",
Expand Down Expand Up @@ -91,6 +98,8 @@ envoy_cc_library(
"grpc",
],
deps = [
":async_client_lib",
":common_lib",
":google_grpc_creds_lib",
"//include/envoy/api:api_interface",
"//include/envoy/grpc:google_grpc_creds_interface",
Expand Down
55 changes: 55 additions & 0 deletions source/common/grpc/async_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "envoy/grpc/async_client.h"
Comment thread
jplevyak marked this conversation as resolved.
Outdated

#include "common/buffer/zero_copy_input_stream_impl.h"
#include "common/common/utility.h"
#include "common/grpc/common.h"
#include "common/http/utility.h"

namespace Envoy {
namespace Grpc {

void AsyncStream::sendMessage(const Protobuf::Message& request, bool end_stream) {
sendRawMessage(Common::serializeBody(request), end_stream);
}

AsyncRequest* AsyncClient::send(const Protobuf::MethodDescriptor& service_method,
const Protobuf::Message& request, AsyncRequestCallbacks& callbacks,
Tracing::Span& parent_span,
const absl::optional<std::chrono::milliseconds>& timeout) {
return sendRaw(service_method.service()->full_name(), service_method.name(),
Common::serializeBody(request), callbacks, parent_span, timeout);
}

AsyncStream* AsyncClient::start(const Protobuf::MethodDescriptor& service_method,
AsyncStreamCallbacks& callbacks) {
return startRaw(service_method.service()->full_name(), service_method.name(), callbacks);
}

void AsyncRequestCallbacks::onSuccessRaw(Buffer::InstancePtr response, Tracing::Span& span) {
ProtobufTypes::MessagePtr response_message = createEmptyResponse();
// TODO(htuch): Need to add support for compressed responses as well here.
if (response->length() > 0) {
Buffer::ZeroCopyInputStreamImpl stream(std::move(response));
if (!response_message->ParseFromZeroCopyStream(&stream)) {
onFailure(Status::GrpcStatus::Internal, "", span);
return;
}
}
onSuccessUntyped(std::move(response_message), span);
}

bool AsyncStreamCallbacks::onReceiveRawMessage(Buffer::InstancePtr response) {
ProtobufTypes::MessagePtr response_message = createEmptyResponse();
// TODO(htuch): Need to add support for compressed responses as well here.
if (response->length() > 0) {
Buffer::ZeroCopyInputStreamImpl stream(std::move(response));
if (!response_message->ParseFromZeroCopyStream(&stream)) {
return false;
}
}
onReceiveMessageUntyped(std::move(response_message));
return true;
}

} // namespace Grpc
} // namespace Envoy
Loading