Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions api/client/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package nighthawk.client;

import "google/protobuf/duration.proto";
import "google/rpc/status.proto";
import "validate/validate.proto";

Expand Down Expand Up @@ -45,3 +46,34 @@ service NighthawkService {
rpc ExecutionStream(stream ExecutionRequest) returns (stream ExecutionResponse) {
}
}

// Encapsulates an ExecutionResponse.
message StoreExecutionRequest {
// Response contains the effective execution id, which will serve as the lookup key.
ExecutionResponse execution_response = 1;
}

// Empty return value message, that serves as a future extension point.
message StoreExecutionResponse {
}

message SinkRequest {
// Unique id for lookup purposes.
Comment thread
mum4k marked this conversation as resolved.
Outdated
string execution_id = 1;
}

message SinkResponse {
// Response associated to the requested execution id.
ExecutionResponse execution_response = 1;
}

service NighthawkSink {
Comment thread
mum4k marked this conversation as resolved.
Outdated
// Accepts a stream of execution responses, which is the return value of
// NighthawkService.ExecutionStream. Workers can forward their results using this method.
rpc StoreExecutionResponseStream(stream StoreExecutionRequest) returns (StoreExecutionResponse) {
}

// Gets the stored responses associated to an execution, keyed by execution id.
rpc SinkRequestStream(stream SinkRequest) returns (stream SinkResponse) {
}
}
16 changes: 16 additions & 0 deletions include/nighthawk/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ envoy_basic_cc_library(
],
)

envoy_basic_cc_library(
name = "nighthawk_sink_client",
hdrs = [
"nighthawk_sink_client.h",
],
include_prefix = "nighthawk/common",
deps = [
"//api/client:base_cc_proto",
"//api/client:grpc_service_lib",
"@envoy//include/envoy/common:base_includes",
"@envoy//source/common/common:assert_lib_with_external_headers",
"@envoy//source/common/common:statusor_lib_with_external_headers",
"@envoy//source/common/protobuf:protobuf_with_external_headers",
],
)

envoy_basic_cc_library(
name = "request_lib",
hdrs = [
Expand Down
43 changes: 43 additions & 0 deletions include/nighthawk/common/nighthawk_sink_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once
#include "envoy/common/pure.h"

#include "external/envoy/source/common/common/statusor.h"
#include "external/envoy/source/common/protobuf/protobuf.h"

#include "api/client/options.pb.h"
#include "api/client/service.grpc.pb.h"

namespace Nighthawk {

/**
* Interface of a gRPC sink service client.
*/
class NighthawkSinkClient {
public:
virtual ~NighthawkSinkClient() = default;

/**
* @brief Store an execution response.
*
* @param nighthawk_sink_stub Used to open a channel to the sink service.
* @param store_execution_request Provide the message that the sink should store.
* @return absl::StatusOr<nighthawk::client::StoreExecutionResponse>
*/
virtual absl::StatusOr<nighthawk::client::StoreExecutionResponse> StoreExecutionResponseStream(
Comment thread
mum4k marked this conversation as resolved.
Outdated
nighthawk::client::NighthawkSink::StubInterface* nighthawk_sink_stub,
Comment thread
mum4k marked this conversation as resolved.
Outdated
const nighthawk::client::StoreExecutionRequest& store_execution_request) const PURE;

/**
* Look up ExecutionResponse messages in the sink.
*
* @param nighthawk_sink_stub Used to open a channel to the sink service.
* @param sink_request Provide the message that the sink should handle.
* @return absl::StatusOr<nighthawk::client::SinkResponse> Either a status indicating failure, or
* a SinkResponse upon success.
*/
virtual absl::StatusOr<nighthawk::client::SinkResponse>
SinkRequestStream(nighthawk::client::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::client::SinkRequest& sink_request) const PURE;
};

} // namespace Nighthawk
20 changes: 20 additions & 0 deletions source/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "nighthawk_sink_client_impl",
srcs = [
"nighthawk_sink_client_impl.cc",
],
hdrs = [
"nighthawk_sink_client_impl.h",
],
repository = "@envoy",
visibility = ["//visibility:public"],
deps = [
"//api/client:base_cc_proto",
"//api/client:grpc_service_lib",
"//include/nighthawk/common:nighthawk_sink_client",
"@envoy//source/common/common:assert_lib_with_external_headers",
"@envoy//source/common/common:statusor_lib_with_external_headers",
"@envoy//source/common/protobuf:protobuf_with_external_headers",
],
)

envoy_cc_library(
name = "request_impl_lib",
hdrs = [
Expand Down
56 changes: 56 additions & 0 deletions source/common/nighthawk_sink_client_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "common/nighthawk_sink_client_impl.h"

#include "external/envoy/source/common/common/assert.h"

namespace Nighthawk {

absl::StatusOr<nighthawk::client::StoreExecutionResponse>
NighthawkSinkClientImpl::StoreExecutionResponseStream(
nighthawk::client::NighthawkSink::StubInterface* nighthawk_sink_stub,
const nighthawk::client::StoreExecutionRequest& store_execution_request) const {
::grpc::ClientContext context;
::nighthawk::client::StoreExecutionResponse store_execution_response;
std::shared_ptr<::grpc::ClientWriterInterface<::nighthawk::client::StoreExecutionRequest>> stream(
nighthawk_sink_stub->StoreExecutionResponseStream(&context, &store_execution_response));
if (!stream->Write(store_execution_request)) {
return absl::UnavailableError("Failed to write request to the Nighthawk Sink gRPC channel.");
} else if (!stream->WritesDone()) {
return absl::InternalError("WritesDone() failed on the Nighthawk Sink gRPC channel.");
}
::grpc::Status status = stream->Finish();
if (!status.ok()) {
return absl::Status(static_cast<absl::StatusCode>(status.error_code()), status.error_message());
}
return store_execution_response;
}

absl::StatusOr<nighthawk::client::SinkResponse> NighthawkSinkClientImpl::SinkRequestStream(
nighthawk::client::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::client::SinkRequest& sink_request) const {
nighthawk::client::SinkResponse response;

::grpc::ClientContext context;
std::shared_ptr<::grpc::ClientReaderWriterInterface<nighthawk::client::SinkRequest,
nighthawk::client::SinkResponse>>
stream(nighthawk_sink_stub.SinkRequestStream(&context));

if (!stream->Write(sink_request)) {
return absl::UnavailableError("Failed to write request to the Nighthawk Sink gRPC channel.");
} else if (!stream->WritesDone()) {
return absl::InternalError("WritesDone() failed on the Nighthawk Sink gRPC channel.");
}

bool got_response = false;
while (stream->Read(&response)) {
RELEASE_ASSERT(!got_response,
Comment thread
mum4k marked this conversation as resolved.
Comment thread
mum4k marked this conversation as resolved.
"Sink Service has started responding with more than one message.");
got_response = true;
}
::grpc::Status status = stream->Finish();
if (!status.ok()) {
return absl::Status(static_cast<absl::StatusCode>(status.error_code()), status.error_message());
}
return response;
}

} // namespace Nighthawk
28 changes: 28 additions & 0 deletions source/common/nighthawk_sink_client_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "nighthawk/common/nighthawk_sink_client.h"

#include "external/envoy/source/common/common/statusor.h"
#include "external/envoy/source/common/protobuf/protobuf.h"

#include "api/client/options.pb.h"
#include "api/client/service.grpc.pb.h"

namespace Nighthawk {

/**
* Implements a the gRPC sink client interface.
*
* This class is stateless and may be called from multiple threads. Furthermore, the same gRPC stub
* is safe to use from multiple threads simultaneously.
*/
class NighthawkSinkClientImpl : public NighthawkSinkClient {
public:
absl::StatusOr<nighthawk::client::StoreExecutionResponse> StoreExecutionResponseStream(
nighthawk::client::NighthawkSink::StubInterface* nighthawk_sink_stub,
const nighthawk::client::StoreExecutionRequest& store_execution_request) const override;

absl::StatusOr<nighthawk::client::SinkResponse>
SinkRequestStream(nighthawk::client::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::client::SinkRequest& sink_request) const override;
};

} // namespace Nighthawk
10 changes: 10 additions & 0 deletions test/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "nighthawk_sink_client_test",
srcs = ["nighthawk_sink_client_test.cc"],
repository = "@envoy",
deps = [
"//source/common:nighthawk_sink_client_impl",
"@com_github_grpc_grpc//:grpc++_test",
],
)

envoy_cc_test(
name = "sink_test",
srcs = ["sink_test.cc"],
Expand Down
Loading