Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e8d933d
Add a raw (Buffer::Instance) gRPC interface.
jplevyak Apr 9, 2019
80c5ecf
use NOT_REACHED_GCOVR_EXCL_LINE in place of exceptions.
jplevyak Apr 12, 2019
c64fcd0
Fix formating.
jplevyak Apr 12, 2019
3a80415
Update to the new Buffer-based gRPC API.
jplevyak Apr 19, 2019
2210ddc
Add missing file.
jplevyak Apr 19, 2019
45f1e17
Change serializeBody -> serializeToGrpcFrame as requested.
jplevyak Apr 24, 2019
bf6d032
Remove isGrpcHeaderRequired.
jplevyak Apr 25, 2019
e4cb012
Remove the Untyped versions and remove the Typed decorator adding
jplevyak Apr 25, 2019
1564b4c
Remove unused API isGrpcHeaderRequired.
jplevyak Apr 26, 2019
74fad14
Move the function to create the GrpcFrameHeader into Common.
jplevyak Apr 26, 2019
e9760ec
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak May 10, 2019
e80e477
Update merge with master.
jplevyak May 13, 2019
74a50ce
Address typo.
jplevyak May 13, 2019
91b9081
Address comments.
jplevyak May 22, 2019
8f670d5
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak May 22, 2019
70a1688
Address comments.
jplevyak May 22, 2019
b04ee92
Address comments.
jplevyak May 22, 2019
cf58e00
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak May 28, 2019
c5cbb52
Add test for additional coverage.
jplevyak Jun 4, 2019
bdf29de
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak Jun 5, 2019
00116b2
Always check return from grpc::ByteBuffer::Dump().
jplevyak Jun 5, 2019
7bd3307
Merge remote-tracking branch 'upstream/master' into raw-grpc-interface
jplevyak Jun 5, 2019
dab4d57
Address comments.
jplevyak Jun 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/envoy/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ envoy_cc_library(
external_deps = ["abseil_optional"],
deps = [
":status",
"//include/envoy/buffer:buffer_interface",
"//include/envoy/http:header_map_interface",
"//include/envoy/tracing:http_tracer_interface",
"//source/common/common:assert_lib",
"//source/common/protobuf",
],
)
Expand Down
95 changes: 30 additions & 65 deletions include/envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

#include <chrono>

#include "envoy/buffer/buffer.h"
#include "envoy/common/pure.h"
#include "envoy/grpc/status.h"
#include "envoy/http/header_map.h"
#include "envoy/tracing/http_tracer.h"

#include "common/common/assert.h"
#include "common/protobuf/protobuf.h"

#include "absl/types/optional.h"
Expand All @@ -30,18 +32,18 @@ class AsyncRequest {
/**
* An in-flight gRPC stream.
*/
class AsyncStream {
class RawAsyncStream {
public:
virtual ~AsyncStream() {}
virtual ~RawAsyncStream() {}

/**
* Send request message to the stream.
* @param request protobuf serializable message.
* @param request serialized message.
* @param end_stream close the stream locally. No further methods may be invoked on the stream
* object, but callbacks may still be received until the stream is closed
* remotely.
*/
virtual void sendMessage(const Protobuf::Message& request, bool end_stream) PURE;
virtual void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) PURE;

/**
* Close the stream locally and send an empty DATA frame to the remote. No further methods may be
Expand All @@ -57,29 +59,22 @@ class AsyncStream {
virtual void resetStream() PURE;
};

class AsyncRequestCallbacks {
class RawAsyncRequestCallbacks {
public:
virtual ~AsyncRequestCallbacks() {}
virtual ~RawAsyncRequestCallbacks() {}

/**
* Called when populating the headers to send with initial metadata.
* @param metadata initial metadata reference.
*/
virtual void onCreateInitialMetadata(Http::HeaderMap& metadata) PURE;

/**
* Factory for empty response messages.
* @return ProtobufTypes::MessagePtr a Protobuf::Message with the response
* type for the request.
*/
virtual ProtobufTypes::MessagePtr createEmptyResponse() PURE;

/**
* Called when the async gRPC request succeeds. No further callbacks will be invoked.
* @param response the gRPC response.
* @param response the gRPC response bytes.
* @param span a tracing span to fill with extra tags.
*/
virtual void onSuccessUntyped(ProtobufTypes::MessagePtr&& response, Tracing::Span& span) PURE;
virtual void onSuccessRaw(Buffer::InstancePtr&& response, Tracing::Span& span) PURE;

/**
* Called when the async gRPC request fails. No further callbacks will be invoked.
Expand All @@ -91,37 +86,16 @@ class AsyncRequestCallbacks {
Tracing::Span& span) PURE;
};

// Templatized variant of AsyncRequestCallbacks.
template <class ResponseType> class TypedAsyncRequestCallbacks : public AsyncRequestCallbacks {
public:
ProtobufTypes::MessagePtr createEmptyResponse() override {
return std::make_unique<ResponseType>();
}

virtual void onSuccess(std::unique_ptr<ResponseType>&& response, Tracing::Span& span) PURE;

void onSuccessUntyped(ProtobufTypes::MessagePtr&& response, Tracing::Span& span) override {
onSuccess(std::unique_ptr<ResponseType>(dynamic_cast<ResponseType*>(response.release())), span);
}
};

/**
* Notifies caller of async gRPC stream status.
* Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by
* AsyncStream.close(), AsyncStreamCallbacks can continue to receive events until the remote
* to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no
* further callbacks will be invoked.
*/
class AsyncStreamCallbacks {
class RawAsyncStreamCallbacks {
public:
virtual ~AsyncStreamCallbacks() {}

/**
* Factory for empty response messages.
* @return ProtobufTypes::MessagePtr a Protobuf::Message with the response
* type for the stream.
*/
virtual ProtobufTypes::MessagePtr createEmptyResponse() PURE;
virtual ~RawAsyncStreamCallbacks() {}

/**
* Called when populating the headers to send with initial metadata.
Expand All @@ -139,8 +113,10 @@ class AsyncStreamCallbacks {
/**
* Called when an async gRPC message is received.
* @param response the gRPC message.
* @return bool which is true if the message well formed and false otherwise which will cause
the stream to shutdown with an INTERNAL error.
*/
virtual void onReceiveMessageUntyped(ProtobufTypes::MessagePtr&& message) PURE;
virtual bool onReceiveMessageRaw(Buffer::InstancePtr&& response) PURE;

/**
* Called when trailing metadata is received. This will also be called on non-Ok grpc-status
Expand All @@ -159,60 +135,49 @@ class AsyncStreamCallbacks {
virtual void onRemoteClose(Status::GrpcStatus status, const std::string& message) PURE;
};

// Templatized variant of AsyncStreamCallbacks.
template <class ResponseType> class TypedAsyncStreamCallbacks : public AsyncStreamCallbacks {
public:
ProtobufTypes::MessagePtr createEmptyResponse() override {
return std::make_unique<ResponseType>();
}

virtual void onReceiveMessage(std::unique_ptr<ResponseType>&& message) PURE;

void onReceiveMessageUntyped(ProtobufTypes::MessagePtr&& message) override {
onReceiveMessage(std::unique_ptr<ResponseType>(dynamic_cast<ResponseType*>(message.release())));
}
};

/**
* Supports sending gRPC requests and receiving responses asynchronously. This can be used to
* implement either plain gRPC or streaming gRPC calls.
*/
class AsyncClient {
class RawAsyncClient {
public:
virtual ~AsyncClient() {}
virtual ~RawAsyncClient() {}

/**
* Start a gRPC unary RPC asynchronously.
* @param service_method protobuf descriptor of gRPC service method.
* @param request protobuf serializable message.
* @param service_full_name full name of the service (i.e. service_method.service()->full_name()).
* @param method_name name of the method (i.e. service_method.name()).
* @param request serialized message.
* @param callbacks the callbacks to be notified of RPC status.
* @param parent_span the current parent tracing context.
* @param timeout supplies the request timeout.
* @return a request handle or nullptr if no request could be started. NOTE: In this case
* onFailure() has already been called inline. The client owns the request and the
* handle should just be used to cancel.
*/
virtual AsyncRequest* send(const Protobuf::MethodDescriptor& service_method,
const Protobuf::Message& request, AsyncRequestCallbacks& callbacks,
Tracing::Span& parent_span,
const absl::optional<std::chrono::milliseconds>& timeout) PURE;
virtual AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks,
Tracing::Span& parent_span,
const absl::optional<std::chrono::milliseconds>& timeout) PURE;

/**
* Start a gRPC stream asynchronously.
* TODO(mattklein123): Determine if tracing should be added to streaming requests.
* @param service_method protobuf descriptor of gRPC service method.
* @param service_full_name full name of the service (i.e. service_method.service()->full_name()).
* @param method_name name of the method (i.e. service_method.name()).
* @param callbacks the callbacks to be notified of stream status.
* @return a stream handle or nullptr if no stream could be started. NOTE: In this case
* onRemoteClose() has already been called inline. The client owns the stream and
* the handle can be used to send more messages or finish the stream. It is expected that
* closeStream() is invoked by the caller to notify the client that the stream resources
* may be reclaimed.
*/
virtual AsyncStream* start(const Protobuf::MethodDescriptor& service_method,
AsyncStreamCallbacks& callbacks) PURE;
virtual RawAsyncStream* startRaw(absl::string_view service_full_name,
absl::string_view method_name,
RawAsyncStreamCallbacks& callbacks) PURE;
};

typedef std::unique_ptr<AsyncClient> AsyncClientPtr;
typedef std::unique_ptr<RawAsyncClient> RawAsyncClientPtr;

} // namespace Grpc
} // namespace Envoy
8 changes: 4 additions & 4 deletions include/envoy/grpc/async_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
namespace Envoy {
namespace Grpc {

// Per-service factory for Grpc::AsyncClients. This factory is thread aware and will instantiate
// Per-service factory for Grpc::RawAsyncClients. This factory is thread aware and will instantiate
// with thread local state. Clients will use ThreadLocal::Instance::dispatcher() for event handling.
class AsyncClientFactory {
public:
virtual ~AsyncClientFactory() {}

/**
* Create a gRPC::AsyncClient.
* @return AsyncClientPtr async client.
* Create a gRPC::RawAsyncClient.
* @return RawAsyncClientPtr async client.
*/
virtual AsyncClientPtr create() PURE;
virtual RawAsyncClientPtr create() PURE;
};

typedef std::unique_ptr<AsyncClientFactory> AsyncClientFactoryPtr;
Expand Down
3 changes: 3 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ void OwnedImpl::commit(RawSlice* iovecs, uint64_t num_iovecs) {
if (slice_index < 0) {
// There was no slice containing any data, so rewind the iterator at the first slice.
slice_index = 0;
if (!slices_[0]) {
return;
}
}

// Next, scan forward and attempt to match the slices against iovecs.
Expand Down
1 change: 1 addition & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ envoy_cc_library(
"//source/common/common:backoff_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:token_bucket_impl_lib",
"//source/common/grpc:async_client_lib",
"//source/common/protobuf",
],
)
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/delta_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Envoy {
namespace Config {

DeltaSubscriptionImpl::DeltaSubscriptionImpl(
const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client,
const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
absl::string_view type_url, Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings, SubscriptionCallbacks& callbacks,
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class DeltaSubscriptionImpl : public Subscription,
public GrpcStreamCallbacks<envoy::api::v2::DeltaDiscoveryResponse>,
public Logger::Loggable<Logger::Id::config> {
public:
DeltaSubscriptionImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher,
DeltaSubscriptionImpl(const LocalInfo::LocalInfo& local_info,
Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
absl::string_view type_url, Runtime::RandomGenerator& random,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings,
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
namespace Envoy {
namespace Config {

GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher,
GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings)
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class GrpcMuxImpl : public GrpcMux,
public GrpcStreamCallbacks<envoy::api::v2::DiscoveryResponse>,
public Logger::Loggable<Logger::Id::config> {
public:
GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client,
GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings);
Expand Down
9 changes: 5 additions & 4 deletions source/common/config/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "common/common/backoff_strategy.h"
#include "common/common/token_bucket_impl.h"
#include "common/config/utility.h"
#include "common/grpc/typed_async_client.h"

namespace Envoy {
namespace Config {
Expand All @@ -16,10 +17,10 @@ namespace Config {
// xDS variants). Reestablishes the gRPC channel when necessary, and provides rate limiting of
// requests.
template <class RequestProto, class ResponseProto>
class GrpcStream : public Grpc::TypedAsyncStreamCallbacks<ResponseProto>,
class GrpcStream : public Grpc::AsyncStreamCallbacks<ResponseProto>,
public Logger::Loggable<Logger::Id::config> {
public:
GrpcStream(GrpcStreamCallbacks<ResponseProto>* callbacks, Grpc::AsyncClientPtr async_client,
GrpcStream(GrpcStreamCallbacks<ResponseProto>* callbacks, Grpc::RawAsyncClientPtr async_client,
const Protobuf::MethodDescriptor& service_method, Runtime::RandomGenerator& random,
Event::Dispatcher& dispatcher, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings)
Expand Down Expand Up @@ -131,8 +132,8 @@ class GrpcStream : public Grpc::TypedAsyncStreamCallbacks<ResponseProto>,
const uint32_t RETRY_INITIAL_DELAY_MS = 500;
const uint32_t RETRY_MAX_DELAY_MS = 30000; // Do not cross more than 30s

Grpc::AsyncClientPtr async_client_;
Grpc::AsyncStream* stream_{};
Grpc::AsyncClient<RequestProto, ResponseProto> async_client_;
Grpc::AsyncStream<RequestProto> stream_{};
const Protobuf::MethodDescriptor& service_method_;
ControlPlaneStats control_plane_stats_;

Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Config {

class GrpcSubscriptionImpl : public Config::Subscription {
public:
GrpcSubscriptionImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client,
GrpcSubscriptionImpl(const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random,
const Protobuf::MethodDescriptor& service_method, absl::string_view type_url,
SubscriptionCallbacks& callbacks, SubscriptionStats stats,
Expand Down
17 changes: 17 additions & 0 deletions source/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,27 @@ load(

envoy_package()

envoy_cc_library(
name = "typed_async_client_lib",
srcs = ["typed_async_client.cc"],
hdrs = ["typed_async_client.h"],
deps = [
":codec_lib",
":common_lib",
"//include/envoy/grpc:async_client_interface",
"//source/common/buffer:zero_copy_input_stream_lib",
"//source/common/http:async_client_lib",
],
)

envoy_cc_library(
name = "async_client_lib",
srcs = ["async_client_impl.cc"],
hdrs = ["async_client_impl.h"],
deps = [
":codec_lib",
":common_lib",
":typed_async_client_lib",
"//include/envoy/grpc:async_client_interface",
"//source/common/buffer:zero_copy_input_stream_lib",
"//source/common/http:async_client_lib",
Expand Down Expand Up @@ -70,6 +84,7 @@ envoy_cc_library(
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/buffer:buffer_lib",
"//source/common/buffer:zero_copy_input_stream_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:enum_to_int",
Expand Down Expand Up @@ -113,6 +128,8 @@ envoy_cc_library(
deps = [
":common_lib",
":google_grpc_creds_lib",
":google_grpc_utils_lib",
":typed_async_client_lib",
"//include/envoy/api:api_interface",
"//include/envoy/grpc:google_grpc_creds_interface",
"//include/envoy/thread:thread_interface",
Expand Down
Loading