Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/client/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package nighthawk.client;

import "google/protobuf/duration.proto";
import "google/rpc/status.proto";
import "validate/validate.proto";

Expand Down
32 changes: 32 additions & 0 deletions api/sink/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library")
load("@envoy_api//bazel:api_build_system.bzl", "api_cc_py_proto_library")

licenses(["notice"]) # Apache 2

api_cc_py_proto_library(
name = "sink",
srcs = [
"sink.proto",
],
visibility = ["//visibility:public"],
deps = [
"//api/client:base",
"@envoy_api//envoy/config/core/v3:pkg",
],
)

cc_grpc_library(
name = "sink_grpc_lib",
srcs = [
":sink",
],
generate_mocks = True,
grpc_only = True,
proto_only = False,
use_external = False,
visibility = ["//visibility:public"],
well_known_protos = True,
deps = [
":sink_cc_proto",
],
)
37 changes: 37 additions & 0 deletions api/sink/sink.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
syntax = "proto3";

package nighthawk;

import "api/client/service.proto";
import "validate/validate.proto";

// Encapsulates an ExecutionResponse.
message StoreExecutionRequest {
// Response contains the effective execution id, which will serve as the lookup key.
nighthawk.client.ExecutionResponse execution_response = 1;
}

// Empty return value message, that serves as a future extension point.
message StoreExecutionResponse {
}

message SinkRequest {
// Unique id for lookup purposes. Required.
string execution_id = 1 [(validate.rules).string.min_len = 1];
}

message SinkResponse {
// Response associated to the requested execution id.
nighthawk.client.ExecutionResponse execution_response = 1;
}

service NighthawkSink {
// Accepts a stream of execution responses, which is the return value of
// NighthawkService.ExecutionStream. Workers can forward their results using this method.
rpc StoreExecutionResponseStream(stream StoreExecutionRequest) returns (StoreExecutionResponse) {
}

// Gets the stored responses associated to an execution, keyed by execution id.
rpc SinkRequestStream(stream SinkRequest) returns (stream SinkResponse) {
}
}
16 changes: 16 additions & 0 deletions include/nighthawk/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ envoy_basic_cc_library(
],
)

envoy_basic_cc_library(
name = "nighthawk_sink_client",
hdrs = [
"nighthawk_sink_client.h",
],
include_prefix = "nighthawk/common",
deps = [
"//api/sink:sink_cc_proto",
"//api/sink:sink_grpc_lib",
"@envoy//include/envoy/common:base_includes",
"@envoy//source/common/common:assert_lib_with_external_headers",
"@envoy//source/common/common:statusor_lib_with_external_headers",
"@envoy//source/common/protobuf:protobuf_with_external_headers",
],
)

envoy_basic_cc_library(
name = "request_lib",
hdrs = [
Expand Down
42 changes: 42 additions & 0 deletions include/nighthawk/common/nighthawk_sink_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once
#include "envoy/common/pure.h"

#include "external/envoy/source/common/common/statusor.h"
#include "external/envoy/source/common/protobuf/protobuf.h"

#include "api/sink/sink.grpc.pb.h"

namespace Nighthawk {

/**
* Interface of a gRPC sink service client.
*/
class NighthawkSinkClient {
public:
virtual ~NighthawkSinkClient() = default;

/**
* @brief Store an execution response.
*
* @param nighthawk_sink_stub Used to open a channel to the sink service.
* @param store_execution_request Provide the message that the sink should store.
* @return absl::StatusOr<nighthawk::StoreExecutionResponse>
*/
virtual absl::StatusOr<nighthawk::StoreExecutionResponse> StoreExecutionResponseStream(
nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::StoreExecutionRequest& store_execution_request) const PURE;

/**
* Look up ExecutionResponse messages in the sink.
*
* @param nighthawk_sink_stub Used to open a channel to the sink service.
* @param sink_request Provide the message that the sink should handle.
* @return absl::StatusOr<nighthawk::SinkResponse> Either a status indicating failure, or
* a SinkResponse upon success.
*/
virtual absl::StatusOr<nighthawk::SinkResponse>
SinkRequestStream(nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::SinkRequest& sink_request) const PURE;
};

} // namespace Nighthawk
20 changes: 20 additions & 0 deletions source/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "nighthawk_sink_client_impl",
srcs = [
"nighthawk_sink_client_impl.cc",
],
hdrs = [
"nighthawk_sink_client_impl.h",
],
repository = "@envoy",
visibility = ["//visibility:public"],
deps = [
"//api/client:base_cc_proto",
"//api/client:grpc_service_lib",
"//include/nighthawk/common:nighthawk_sink_client",
"@envoy//source/common/common:assert_lib_with_external_headers",
"@envoy//source/common/common:statusor_lib_with_external_headers",
"@envoy//source/common/protobuf:protobuf_with_external_headers",
],
)

envoy_cc_library(
name = "request_impl_lib",
hdrs = [
Expand Down
70 changes: 70 additions & 0 deletions source/common/nighthawk_sink_client_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include "common/nighthawk_sink_client_impl.h"

#include "external/envoy/source/common/common/assert.h"

namespace Nighthawk {

absl::StatusOr<nighthawk::StoreExecutionResponse>
NighthawkSinkClientImpl::StoreExecutionResponseStream(
nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::StoreExecutionRequest& store_execution_request) const {
::grpc::ClientContext context;
::nighthawk::StoreExecutionResponse store_execution_response;
std::shared_ptr<::grpc::ClientWriterInterface<::nighthawk::StoreExecutionRequest>> stream(
nighthawk_sink_stub.StoreExecutionResponseStream(&context, &store_execution_response));
if (!stream->Write(store_execution_request)) {
return absl::UnavailableError("Failed to write request to the Nighthawk Sink gRPC channel.");
} else if (!stream->WritesDone()) {
return absl::InternalError("WritesDone() failed on the Nighthawk Sink gRPC channel.");
}
::grpc::Status status = stream->Finish();
if (!status.ok()) {
return absl::Status(static_cast<absl::StatusCode>(status.error_code()), status.error_message());
}
return store_execution_response;
}

absl::StatusOr<nighthawk::SinkResponse> NighthawkSinkClientImpl::SinkRequestStream(
nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::SinkRequest& sink_request) const {
nighthawk::SinkResponse response;

::grpc::ClientContext context;
std::shared_ptr<
::grpc::ClientReaderWriterInterface<nighthawk::SinkRequest, nighthawk::SinkResponse>>
stream(nighthawk_sink_stub.SinkRequestStream(&context));

if (!stream->Write(sink_request)) {
return absl::UnavailableError("Failed to write request to the Nighthawk Sink gRPC channel.");
} else if (!stream->WritesDone()) {
return absl::InternalError("WritesDone() failed on the Nighthawk Sink gRPC channel.");
}

bool got_response = false;
while (stream->Read(&response)) {
/*
At the proto api level we support returning a stream of results. The sink service proto api
reflects this, and accepts what NighthawkService. ExecutionStream returns as a parameter
(though we wrap it in StoreExecutionRequest messages for extensibility purposes). So this
implies a stream, and not a single message.

Having said that, today we constrain what we return to a single message in the implementations
where this is relevant. That's why we assert here, to make sure that stays put until an
explicit choice is made otherwise.

Why do this? The intent of NighthawkService. ExecutionStream was to be able to stream
intermediate updates some day. So having streams in the api's keeps the door open on streaming
intermediary updates, without forcing a change the proto api.
*/
RELEASE_ASSERT(!got_response,
"Sink Service has started responding with more than one message.");
got_response = true;
}
::grpc::Status status = stream->Finish();
if (!status.ok()) {
return absl::Status(static_cast<absl::StatusCode>(status.error_code()), status.error_message());
}
return response;
}

} // namespace Nighthawk
25 changes: 25 additions & 0 deletions source/common/nighthawk_sink_client_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "nighthawk/common/nighthawk_sink_client.h"

#include "external/envoy/source/common/common/statusor.h"
#include "external/envoy/source/common/protobuf/protobuf.h"

namespace Nighthawk {

/**
* Implements a the gRPC sink client interface.
*
* This class is stateless and may be called from multiple threads. Furthermore, the same gRPC stub
* is safe to use from multiple threads simultaneously.
*/
class NighthawkSinkClientImpl : public NighthawkSinkClient {
public:
absl::StatusOr<nighthawk::StoreExecutionResponse> StoreExecutionResponseStream(
nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::StoreExecutionRequest& store_execution_request) const override;

absl::StatusOr<nighthawk::SinkResponse>
SinkRequestStream(nighthawk::NighthawkSink::StubInterface& nighthawk_sink_stub,
const nighthawk::SinkRequest& sink_request) const override;
};

} // namespace Nighthawk
10 changes: 10 additions & 0 deletions test/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "nighthawk_sink_client_test",
srcs = ["nighthawk_sink_client_test.cc"],
repository = "@envoy",
deps = [
"//source/common:nighthawk_sink_client_impl",
"@com_github_grpc_grpc//:grpc++_test",
],
)

envoy_cc_test(
name = "sink_test",
srcs = ["sink_test.cc"],
Expand Down
Loading