diff --git a/api/client/service.proto b/api/client/service.proto index 6b95397de..ca4a88f22 100644 --- a/api/client/service.proto +++ b/api/client/service.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package nighthawk.client; +import "google/protobuf/duration.proto"; import "google/rpc/status.proto"; import "validate/validate.proto"; diff --git a/api/sink/BUILD b/api/sink/BUILD new file mode 100644 index 000000000..f3a01a41d --- /dev/null +++ b/api/sink/BUILD @@ -0,0 +1,32 @@ +load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library") +load("@envoy_api//bazel:api_build_system.bzl", "api_cc_py_proto_library") + +licenses(["notice"]) # Apache 2 + +api_cc_py_proto_library( + name = "sink", + srcs = [ + "sink.proto", + ], + visibility = ["//visibility:public"], + deps = [ + "//api/client:base", + "@envoy_api//envoy/config/core/v3:pkg", + ], +) + +cc_grpc_library( + name = "sink_grpc_lib", + srcs = [ + ":sink", + ], + generate_mocks = True, + grpc_only = True, + proto_only = False, + use_external = False, + visibility = ["//visibility:public"], + well_known_protos = True, + deps = [ + ":sink_cc_proto", + ], +) diff --git a/api/sink/sink.proto b/api/sink/sink.proto new file mode 100644 index 000000000..df98bd505 --- /dev/null +++ b/api/sink/sink.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +package nighthawk; + +import "api/client/service.proto"; +import "validate/validate.proto"; + +// Encapsulates an ExecutionResponse. +message StoreExecutionRequest { + // Response contains the effective execution id, which will serve as the lookup key. + nighthawk.client.ExecutionResponse execution_response = 1; +} + +// Empty return value message, that serves as a future extension point. +message StoreExecutionResponse { +} + +message SinkRequest { + // Unique id for lookup purposes. Required. + string execution_id = 1 [(validate.rules).string.min_len = 1]; +} + +message SinkResponse { + // Response associated to the requested execution id. + nighthawk.client.ExecutionResponse execution_response = 1; +} + +service NighthawkSink { + // Accepts a stream of execution responses, which is the return value of + // NighthawkService.ExecutionStream. Workers can forward their results using this method. + rpc StoreExecutionResponseStream(stream StoreExecutionRequest) returns (StoreExecutionResponse) { + } + + // Gets the stored responses associated to an execution, keyed by execution id. + rpc SinkRequestStream(stream SinkRequest) returns (stream SinkResponse) { + } +} \ No newline at end of file diff --git a/include/nighthawk/common/BUILD b/include/nighthawk/common/BUILD index eeea25cdd..2534b23e2 100644 --- a/include/nighthawk/common/BUILD +++ b/include/nighthawk/common/BUILD @@ -54,6 +54,22 @@ envoy_basic_cc_library( ], ) +envoy_basic_cc_library( + name = "nighthawk_sink_client", + hdrs = [ + "nighthawk_sink_client.h", + ], + include_prefix = "nighthawk/common", + deps = [ + "//api/sink:sink_cc_proto", + "//api/sink:sink_grpc_lib", + "@envoy//include/envoy/common:base_includes", + "@envoy//source/common/common:assert_lib_with_external_headers", + "@envoy//source/common/common:statusor_lib_with_external_headers", + "@envoy//source/common/protobuf:protobuf_with_external_headers", + ], +) + envoy_basic_cc_library( name = "request_lib", hdrs = [ diff --git a/include/nighthawk/common/nighthawk_sink_client.h b/include/nighthawk/common/nighthawk_sink_client.h new file mode 100644 index 000000000..fae300db7 --- /dev/null +++ b/include/nighthawk/common/nighthawk_sink_client.h @@ -0,0 +1,42 @@ +#pragma once +#include "envoy/common/pure.h" + +#include "external/envoy/source/common/common/statusor.h" +#include "external/envoy/source/common/protobuf/protobuf.h" + +#include "api/sink/sink.grpc.pb.h" + +namespace Nighthawk { + +/** + * Interface of a gRPC sink service client. + */ +class NighthawkSinkClient { +public: + virtual ~NighthawkSinkClient() = default; + + /** + * @brief Store an execution response. + * + * @param nighthawk_sink_stub Used to open a channel to the sink service. + * @param store_execution_request Provide the message that the sink should store. + * @return absl::StatusOr + */ + virtual absl::StatusOr StoreExecutionResponseStream( + nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub, + const nighthawk::StoreExecutionRequest& store_execution_request) const PURE; + + /** + * Look up ExecutionResponse messages in the sink. + * + * @param nighthawk_sink_stub Used to open a channel to the sink service. + * @param sink_request Provide the message that the sink should handle. + * @return absl::StatusOr Either a status indicating failure, or + * a SinkResponse upon success. + */ + virtual absl::StatusOr + SinkRequestStream(nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub, + const nighthawk::SinkRequest& sink_request) const PURE; +}; + +} // namespace Nighthawk diff --git a/source/common/BUILD b/source/common/BUILD index c1076ae73..5432495ee 100644 --- a/source/common/BUILD +++ b/source/common/BUILD @@ -28,6 +28,26 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "nighthawk_sink_client_impl", + srcs = [ + "nighthawk_sink_client_impl.cc", + ], + hdrs = [ + "nighthawk_sink_client_impl.h", + ], + repository = "@envoy", + visibility = ["//visibility:public"], + deps = [ + "//api/client:base_cc_proto", + "//api/client:grpc_service_lib", + "//include/nighthawk/common:nighthawk_sink_client", + "@envoy//source/common/common:assert_lib_with_external_headers", + "@envoy//source/common/common:statusor_lib_with_external_headers", + "@envoy//source/common/protobuf:protobuf_with_external_headers", + ], +) + envoy_cc_library( name = "request_impl_lib", hdrs = [ diff --git a/source/common/nighthawk_sink_client_impl.cc b/source/common/nighthawk_sink_client_impl.cc new file mode 100644 index 000000000..726197c78 --- /dev/null +++ b/source/common/nighthawk_sink_client_impl.cc @@ -0,0 +1,70 @@ +#include "common/nighthawk_sink_client_impl.h" + +#include "external/envoy/source/common/common/assert.h" + +namespace Nighthawk { + +absl::StatusOr +NighthawkSinkClientImpl::StoreExecutionResponseStream( + nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub, + const nighthawk::StoreExecutionRequest& store_execution_request) const { + ::grpc::ClientContext context; + ::nighthawk::StoreExecutionResponse store_execution_response; + std::shared_ptr<::grpc::ClientWriterInterface<::nighthawk::StoreExecutionRequest>> stream( + nighthawk_sink_stub.StoreExecutionResponseStream(&context, &store_execution_response)); + if (!stream->Write(store_execution_request)) { + return absl::UnavailableError("Failed to write request to the Nighthawk Sink gRPC channel."); + } else if (!stream->WritesDone()) { + return absl::InternalError("WritesDone() failed on the Nighthawk Sink gRPC channel."); + } + ::grpc::Status status = stream->Finish(); + if (!status.ok()) { + return absl::Status(static_cast(status.error_code()), status.error_message()); + } + return store_execution_response; +} + +absl::StatusOr NighthawkSinkClientImpl::SinkRequestStream( + nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub, + const nighthawk::SinkRequest& sink_request) const { + nighthawk::SinkResponse response; + + ::grpc::ClientContext context; + std::shared_ptr< + ::grpc::ClientReaderWriterInterface> + stream(nighthawk_sink_stub.SinkRequestStream(&context)); + + if (!stream->Write(sink_request)) { + return absl::UnavailableError("Failed to write request to the Nighthawk Sink gRPC channel."); + } else if (!stream->WritesDone()) { + return absl::InternalError("WritesDone() failed on the Nighthawk Sink gRPC channel."); + } + + bool got_response = false; + while (stream->Read(&response)) { + /* + At the proto api level we support returning a stream of results. The sink service proto api + reflects this, and accepts what NighthawkService. ExecutionStream returns as a parameter + (though we wrap it in StoreExecutionRequest messages for extensibility purposes). So this + implies a stream, and not a single message. + + Having said that, today we constrain what we return to a single message in the implementations + where this is relevant. That's why we assert here, to make sure that stays put until an + explicit choice is made otherwise. + + Why do this? The intent of NighthawkService. ExecutionStream was to be able to stream + intermediate updates some day. So having streams in the api's keeps the door open on streaming + intermediary updates, without forcing a change the proto api. + */ + RELEASE_ASSERT(!got_response, + "Sink Service has started responding with more than one message."); + got_response = true; + } + ::grpc::Status status = stream->Finish(); + if (!status.ok()) { + return absl::Status(static_cast(status.error_code()), status.error_message()); + } + return response; +} + +} // namespace Nighthawk diff --git a/source/common/nighthawk_sink_client_impl.h b/source/common/nighthawk_sink_client_impl.h new file mode 100644 index 000000000..f41b7ad56 --- /dev/null +++ b/source/common/nighthawk_sink_client_impl.h @@ -0,0 +1,25 @@ +#include "nighthawk/common/nighthawk_sink_client.h" + +#include "external/envoy/source/common/common/statusor.h" +#include "external/envoy/source/common/protobuf/protobuf.h" + +namespace Nighthawk { + +/** + * Implements a the gRPC sink client interface. + * + * This class is stateless and may be called from multiple threads. Furthermore, the same gRPC stub + * is safe to use from multiple threads simultaneously. + */ +class NighthawkSinkClientImpl : public NighthawkSinkClient { +public: + absl::StatusOr StoreExecutionResponseStream( + nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub, + const nighthawk::StoreExecutionRequest& store_execution_request) const override; + + absl::StatusOr + SinkRequestStream(nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub, + const nighthawk::SinkRequest& sink_request) const override; +}; + +} // namespace Nighthawk diff --git a/test/common/BUILD b/test/common/BUILD index f7bb0908e..b5d3cec81 100644 --- a/test/common/BUILD +++ b/test/common/BUILD @@ -39,6 +39,16 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "nighthawk_sink_client_test", + srcs = ["nighthawk_sink_client_test.cc"], + repository = "@envoy", + deps = [ + "//source/common:nighthawk_sink_client_impl", + "@com_github_grpc_grpc//:grpc++_test", + ], +) + envoy_cc_test( name = "sink_test", srcs = ["sink_test.cc"], diff --git a/test/common/nighthawk_sink_client_test.cc b/test/common/nighthawk_sink_client_test.cc new file mode 100644 index 000000000..2bbafeb51 --- /dev/null +++ b/test/common/nighthawk_sink_client_test.cc @@ -0,0 +1,299 @@ +#include "external/envoy/source/common/protobuf/protobuf.h" + +#include "api/sink/sink.grpc.pb.h" +#include "api/sink/sink_mock.grpc.pb.h" + +#include "common/nighthawk_sink_client_impl.h" + +#include "grpcpp/test/mock_stream.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Nighthawk { + +namespace { + +using ::Envoy::Protobuf::util::MessageDifferencer; +using ::nighthawk::SinkRequest; +using ::nighthawk::SinkResponse; +using ::nighthawk::StoreExecutionRequest; +using ::nighthawk::StoreExecutionResponse; +using ::testing::_; +using ::testing::DoAll; +using ::testing::HasSubstr; +using ::testing::Return; +using ::testing::SaveArg; +using ::testing::SetArgPointee; + +TEST(StoreExecutionResponseStream, UsesSpecifiedExecutionResponseArguments) { + StoreExecutionRequest observed_request_1; + StoreExecutionRequest observed_request_2; + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, StoreExecutionResponseStreamRaw) + .WillOnce([&observed_request_1](grpc::ClientContext*, ::nighthawk::StoreExecutionResponse*) { + auto* mock_writer = new grpc::testing::MockClientWriter(); + EXPECT_CALL(*mock_writer, Write(_, _)) + .WillOnce(::testing::DoAll(::testing::SaveArg<0>(&observed_request_1), Return(true))); + EXPECT_CALL(*mock_writer, WritesDone()).WillOnce(Return(true)); + EXPECT_CALL(*mock_writer, Finish()).WillOnce(Return(::grpc::Status::OK)); + return mock_writer; + }) + .WillOnce([&observed_request_2](grpc::ClientContext*, ::nighthawk::StoreExecutionResponse*) { + auto* mock_writer = new grpc::testing::MockClientWriter(); + EXPECT_CALL(*mock_writer, Write(_, _)) + .WillOnce(::testing::DoAll(::testing::SaveArg<0>(&observed_request_2), Return(true))); + EXPECT_CALL(*mock_writer, WritesDone()).WillOnce(Return(true)); + EXPECT_CALL(*mock_writer, Finish()).WillOnce(Return(::grpc::Status::OK)); + return mock_writer; + }); + + NighthawkSinkClientImpl client; + StoreExecutionRequest request_1, request_2; + nighthawk::client::ExecutionResponse execution_response_1, execution_response_2; + nighthawk::client::Counter* counter = + execution_response_1.mutable_output()->add_results()->add_counters(); + counter->set_name("test_1"); + counter->set_value(1); + counter = execution_response_2.mutable_output()->add_results()->add_counters(); + counter->set_name("test_2"); + counter->set_value(2); + *request_1.mutable_execution_response() = execution_response_1; + *request_2.mutable_execution_response() = execution_response_2; + + absl::StatusOr response_1 = + client.StoreExecutionResponseStream(mock_nighthawk_sink_stub, request_1); + absl::StatusOr response_2 = + client.StoreExecutionResponseStream(mock_nighthawk_sink_stub, request_2); + EXPECT_EQ(observed_request_1.DebugString(), request_1.DebugString()); + EXPECT_EQ(observed_request_2.DebugString(), request_2.DebugString()); + EXPECT_TRUE(MessageDifferencer::Equivalent(observed_request_1, request_1)); + EXPECT_TRUE(MessageDifferencer::Equivalent(observed_request_2, request_2)); +} + +TEST(StoreExecutionResponseStream, ReturnsResponseSuccessfully) { + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, StoreExecutionResponseStreamRaw) + .WillOnce([](grpc::ClientContext*, ::nighthawk::StoreExecutionResponse*) { + auto* mock_writer = new grpc::testing::MockClientWriter(); + EXPECT_CALL(*mock_writer, Write(_, _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_writer, WritesDone()).WillOnce(Return(true)); + EXPECT_CALL(*mock_writer, Finish()).WillOnce(Return(::grpc::Status::OK)); + return mock_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response = + client.StoreExecutionResponseStream(mock_nighthawk_sink_stub, {}); + EXPECT_TRUE(response.ok()); +} + +TEST(StoreExecutionResponseStream, ReturnsErrorIfNighthawkServiceWriteFails) { + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, StoreExecutionResponseStreamRaw) + .WillOnce([](grpc::ClientContext*, ::nighthawk::StoreExecutionResponse*) { + auto* mock_writer = new grpc::testing::MockClientWriter(); + EXPECT_CALL(*mock_writer, Write(_, _)).WillOnce(Return(false)); + return mock_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response = + client.StoreExecutionResponseStream(mock_nighthawk_sink_stub, {}); + ASSERT_FALSE(response.ok()); + EXPECT_EQ(response.status().code(), absl::StatusCode::kUnavailable); + EXPECT_THAT(response.status().message(), HasSubstr("Failed to write")); +} + +TEST(StoreExecutionResponseStream, ReturnsErrorIfNighthawkServiceWritesDoneFails) { + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, StoreExecutionResponseStreamRaw) + .WillOnce([](grpc::ClientContext*, ::nighthawk::StoreExecutionResponse*) { + auto* mock_writer = new grpc::testing::MockClientWriter(); + EXPECT_CALL(*mock_writer, Write(_, _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_writer, WritesDone()).WillOnce(Return(false)); + return mock_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response = + client.StoreExecutionResponseStream(mock_nighthawk_sink_stub, {}); + ASSERT_FALSE(response.ok()); + EXPECT_EQ(response.status().code(), absl::StatusCode::kInternal); + EXPECT_THAT(response.status().message(), HasSubstr("WritesDone() failed")); +} + +TEST(StoreExecutionResponseStream, PropagatesErrorIfNighthawkServiceGrpcStreamClosesAbnormally) { + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, StoreExecutionResponseStreamRaw) + .WillOnce([](grpc::ClientContext*, ::nighthawk::StoreExecutionResponse*) { + auto* mock_writer = new grpc::testing::MockClientWriter(); + EXPECT_CALL(*mock_writer, Write(_, _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_writer, WritesDone()).WillOnce(Return(true)); + EXPECT_CALL(*mock_writer, Finish()) + .WillOnce( + Return(::grpc::Status(::grpc::PERMISSION_DENIED, "Finish failure status message"))); + return mock_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response = + client.StoreExecutionResponseStream(mock_nighthawk_sink_stub, {}); + ASSERT_FALSE(response.ok()); + EXPECT_EQ(response.status().code(), absl::StatusCode::kPermissionDenied); + EXPECT_THAT(response.status().message(), HasSubstr("Finish failure status message")); +} + +TEST(SinkRequest, UsesSpecifiedCommandLineOptions) { + SinkRequest request; + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, SinkRequestStreamRaw) + .WillOnce([&request](grpc::ClientContext*) { + auto* mock_reader_writer = + new grpc::testing::MockClientReaderWriter(); + // SinkRequest currently expects Read to return true exactly once. + EXPECT_CALL(*mock_reader_writer, Read(_)).WillOnce(Return(true)).WillOnce(Return(false)); + // Capture the Nighthawk request SinkRequest sends on the channel. + EXPECT_CALL(*mock_reader_writer, Write(_, _)) + .WillOnce(::testing::DoAll(::testing::SaveArg<0>(&request), Return(true))); + EXPECT_CALL(*mock_reader_writer, WritesDone()).WillOnce(Return(true)); + EXPECT_CALL(*mock_reader_writer, Finish()).WillOnce(Return(::grpc::Status::OK)); + return mock_reader_writer; + }); + + ::nighthawk::SinkRequest sink_request; + *(sink_request.mutable_execution_id()) = "abc"; + NighthawkSinkClientImpl client; + absl::StatusOr distributed_response_or = + client.SinkRequestStream(mock_nighthawk_sink_stub, sink_request); + EXPECT_TRUE(distributed_response_or.ok()); + EXPECT_EQ(request.execution_id(), "abc"); +} + +TEST(SinkRequest, ReturnsNighthawkResponseSuccessfully) { + SinkResponse expected_response; + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, SinkRequestStreamRaw) + .WillOnce([&expected_response](grpc::ClientContext*) { + auto* mock_reader_writer = + new grpc::testing::MockClientReaderWriter(); + // SinkRequest currently expects Read to return true exactly once. + // Capture the gRPC response proto as it is written to the output parameter. + EXPECT_CALL(*mock_reader_writer, Read(_)) + .WillOnce(DoAll(SetArgPointee<0>(expected_response), Return(true))) + .WillOnce(Return(false)); + EXPECT_CALL(*mock_reader_writer, Write(_, _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_reader_writer, WritesDone()).WillOnce(Return(true)); + EXPECT_CALL(*mock_reader_writer, Finish()).WillOnce(Return(::grpc::Status::OK)); + return mock_reader_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response_or = + client.SinkRequestStream(mock_nighthawk_sink_stub, ::nighthawk::SinkRequest()); + EXPECT_TRUE(response_or.ok()); + SinkResponse actual_response = response_or.value(); + EXPECT_TRUE(MessageDifferencer::Equivalent(actual_response, expected_response)); + EXPECT_EQ(actual_response.DebugString(), expected_response.DebugString()); +} + +TEST(SinkRequest, WillFinishIfNighthawkServiceDoesNotSendResponse) { + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, SinkRequestStreamRaw).WillOnce([](grpc::ClientContext*) { + auto* mock_reader_writer = + new grpc::testing::MockClientReaderWriter(); + EXPECT_CALL(*mock_reader_writer, Read(_)).WillOnce(Return(false)); + EXPECT_CALL(*mock_reader_writer, Write(_, _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_reader_writer, WritesDone()).WillOnce(Return(true)); + EXPECT_CALL(*mock_reader_writer, Finish()).WillOnce(Return(::grpc::Status::OK)); + return mock_reader_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response_or = + client.SinkRequestStream(mock_nighthawk_sink_stub, ::nighthawk::SinkRequest()); + EXPECT_TRUE(response_or.ok()); +} + +TEST(SinkRequest, ReturnsErrorIfNighthawkServiceWriteFails) { + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, SinkRequestStreamRaw).WillOnce([](grpc::ClientContext*) { + auto* mock_reader_writer = + new grpc::testing::MockClientReaderWriter(); + EXPECT_CALL(*mock_reader_writer, Write(_, _)).WillOnce(Return(false)); + return mock_reader_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response_or = + client.SinkRequestStream(mock_nighthawk_sink_stub, ::nighthawk::SinkRequest()); + ASSERT_FALSE(response_or.ok()); + EXPECT_EQ(response_or.status().code(), absl::StatusCode::kUnavailable); + EXPECT_THAT(response_or.status().message(), HasSubstr("Failed to write")); +} + +TEST(SinkRequest, ReturnsErrorIfNighthawkServiceWritesDoneFails) { + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, SinkRequestStreamRaw).WillOnce([](grpc::ClientContext*) { + auto* mock_reader_writer = + new grpc::testing::MockClientReaderWriter(); + EXPECT_CALL(*mock_reader_writer, Write(_, _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_reader_writer, WritesDone()).WillOnce(Return(false)); + return mock_reader_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response_or = + client.SinkRequestStream(mock_nighthawk_sink_stub, ::nighthawk::SinkRequest()); + ASSERT_FALSE(response_or.ok()); + EXPECT_EQ(response_or.status().code(), absl::StatusCode::kInternal); + EXPECT_THAT(response_or.status().message(), HasSubstr("WritesDone() failed")); +} + +TEST(SinkRequest, PropagatesErrorIfNighthawkServiceGrpcStreamClosesAbnormally) { + nighthawk::MockNighthawkSinkStub mock_nighthawk_sink_stub; + // Configure the mock Nighthawk Service stub to return an inner mock channel when the code under + // test requests a channel. Set call expectations on the inner mock channel. + EXPECT_CALL(mock_nighthawk_sink_stub, SinkRequestStreamRaw).WillOnce([](grpc::ClientContext*) { + auto* mock_reader_writer = + new grpc::testing::MockClientReaderWriter(); + // SinkRequest currently expects Read to return true exactly once. + EXPECT_CALL(*mock_reader_writer, Read(_)).WillOnce(Return(true)).WillOnce(Return(false)); + EXPECT_CALL(*mock_reader_writer, Write(_, _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_reader_writer, WritesDone()).WillOnce(Return(true)); + EXPECT_CALL(*mock_reader_writer, Finish()) + .WillOnce( + Return(::grpc::Status(::grpc::PERMISSION_DENIED, "Finish failure status message"))); + return mock_reader_writer; + }); + + NighthawkSinkClientImpl client; + absl::StatusOr response_or = + client.SinkRequestStream(mock_nighthawk_sink_stub, ::nighthawk::SinkRequest()); + ASSERT_FALSE(response_or.ok()); + EXPECT_EQ(response_or.status().code(), absl::StatusCode::kPermissionDenied); + EXPECT_THAT(response_or.status().message(), HasSubstr("Finish failure status message")); +} + +} // namespace +} // namespace Nighthawk