Skip to content
Merged
8 changes: 5 additions & 3 deletions api/envoy/service/ext_proc/v3alpha/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import "envoy/config/core/v3/base.proto";
import "envoy/extensions/filters/http/ext_proc/v3alpha/processing_mode.proto";
import "envoy/type/v3/http_status.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

import "udpa/annotations/status.proto";
Expand Down Expand Up @@ -289,10 +288,13 @@ message GrpcStatus {
// Change HTTP headers or trailers by appending, replacing, or removing
// headers.
message HeaderMutation {
// Add or replace HTTP headers.
// Add or replace HTTP headers. Attempts to set the value of
// any "x-envoy" header, and attempts to set the ":method",
// ":authority", ":scheme", or "host" headers will be ignored.
repeated config.core.v3.HeaderValueOption set_headers = 1;

// Remove these HTTP headers.
// Remove these HTTP headers. Attempts to remove system headers --
// any header starting with ":", plus "host" -- will be ignored.
repeated string remove_headers = 2;
}

Expand Down
21 changes: 21 additions & 0 deletions docs/root/configuration/http/http_filters/ext_proc_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,24 @@ messages, and the server must reply with
:ref:`ProcessingResponse <envoy_v3_api_msg_service.ext_proc.v3alpha.ProcessingResponse>`.

This filter is a work in progress. In its current state, it actually does nothing.

Statistics
----------
This filter outputs statistics in the
*http.<stat_prefix>.ext_proc.* namespace. The :ref:`stat prefix
<envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.stat_prefix>`
comes from the owning HTTP connection manager.

The following statistics are supported:

.. csv-table::
:header: Name, Type, Description
:widths: auto

streams_started, Counter, The number of gRPC streams that have been started to send to the external processing service
streams_msgs_sent, Counter, The number of messages sent on those streams
streams_msgs_received, Counter, The number of messages received on those streams
spurious_msgs_received, Counter, The number of unexpected messages received that violated the protocol
streams_closed, Counter, The number of streams successfully closed on either end
streams_failed, Counter, The number of times a stream produced a gRPC error
failure_mode_allowed, Counter, The number of times an error was ignored due to configuration

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions source/common/http/header_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,10 @@ Http::Status HeaderUtility::checkRequiredHeaders(const Http::RequestHeaderMap& h
return Http::okStatus();
}

bool HeaderUtility::isRemovableHeader(absl::string_view header) {
return (header.empty() || header[0] != ':') &&
!absl::EqualsIgnoreCase(header, Headers::get().HostLegacy.get());
}

} // namespace Http
} // namespace Envoy
7 changes: 7 additions & 0 deletions source/common/http/header_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ class HeaderUtility {
* missing.
*/
static Http::Status checkRequiredHeaders(const Http::RequestHeaderMap& headers);

/**
* Returns true if a header may be safely removed without causing additional
* problems. Effectively, header names beginning with ":" and the "host" header
* may not be removed.
*/
static bool isRemovableHeader(absl::string_view header);
};
} // namespace Http
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ envoy_cc_library(
deps = [
":header_formatter_lib",
"//include/envoy/http:header_map_interface",
"//source/common/http:header_utility_lib",
"//source/common/http:headers_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/header_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "envoy/config/core/v3/base.pb.h"

#include "common/common/assert.h"
#include "common/http/header_utility.h"
#include "common/http/headers.h"
#include "common/protobuf/utility.h"

Expand Down Expand Up @@ -258,7 +259,7 @@ HeaderParserPtr HeaderParser::configure(
// We reject :-prefix (e.g. :path) removal here. This is dangerous, since other aspects of
// request finalization assume their existence and they are needed for well-formedness in most
// cases.
if (header[0] == ':' || Http::LowerCaseString(header).get() == "host") {
if (!Http::HeaderUtility::isRemovableHeader(header)) {
throw EnvoyException(":-prefixed or host headers may not be removed");
}
header_parser->headers_to_remove_.emplace_back(header);
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/filters/http/ext_authz/ext_authz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ void Filter::onComplete(Filters::Common::ExtAuthz::ResponsePtr&& response) {
for (const auto& header : response->headers_to_remove) {
// We don't allow removing any :-prefixed headers, nor Host, as removing
// them would make the request malformed.
if (absl::StartsWithIgnoreCase(absl::string_view(header.get()), ":") ||
header == Http::Headers::get().HostLegacy) {
if (!Http::HeaderUtility::isRemovableHeader(header.get())) {
continue;
}
ENVOY_STREAM_LOG(trace, "'{}'", *callbacks_, header.get());
Expand Down
18 changes: 18 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ envoy_cc_library(
srcs = ["ext_proc.cc"],
hdrs = ["ext_proc.h"],
deps = [
":client_interface",
":mutation_utils_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/http:header_map_interface",
"//include/envoy/stats:stats_macros",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto",
],
)
Expand All @@ -27,6 +32,7 @@ envoy_cc_extension(
security_posture = "unknown",
status = "alpha",
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/common:factory_base_lib",
Expand All @@ -43,6 +49,18 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "mutation_utils_lib",
srcs = ["mutation_utils.cc"],
hdrs = ["mutation_utils.h"],
deps = [
"//include/envoy/http:header_map_interface",
"//source/common/http:header_utility_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)

envoy_cc_library(
name = "client_lib",
srcs = ["client_impl.cc"],
Expand Down
17 changes: 13 additions & 4 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <string>

#include "extensions/filters/http/ext_proc/client_impl.h"
#include "extensions/filters/http/ext_proc/ext_proc.h"

namespace Envoy {
Expand All @@ -11,11 +12,19 @@ namespace ExternalProcessing {

Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string&, Server::Configuration::FactoryContext&) {
const auto filter_config = std::make_shared<FilterConfig>(proto_config);
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) {
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(timeout_ms), context.scope(), stats_prefix);

return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) {
callbacks.addStreamFilter(Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config)});
return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), grpc_service, context.scope());

callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}

Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ExternalProcessingFilterConfig
ExternalProcessingFilterConfig() : FactoryBase(HttpFilterNames::get().ExternalProcessing) {}

private:
static constexpr uint64_t DefaultTimeout = 200;

Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
Expand Down
124 changes: 123 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,133 @@
#include "extensions/filters/http/ext_proc/ext_proc.h"

#include "extensions/filters/http/ext_proc/mutation_utils.h"

#include "absl/strings/str_format.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

void Filter::onDestroy() {}
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

void Filter::closeStream() {
if (!stream_closed_) {
if (stream_) {
ENVOY_LOG(debug, "Closing gRPC stream to processing server");
stream_->close();
stats_.streams_closed_.inc();
}
stream_closed_ = true;
}
}

void Filter::onDestroy() { closeStream(); }

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of_stream) {
// We're at the start, so start the stream and send a headers message
request_headers_ = &headers;
stream_ = client_->start(*this, config_->grpcTimeout());
stats_.streams_started_.inc();
ProcessingRequest req;
auto* headers_req = req.mutable_request_headers();
MutationUtils::buildHttpHeaders(headers, *headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
request_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();

// Wait until we have a gRPC response before allowing any more callbacks
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto& headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
MutationUtils::applyHeaderMutations(common_response.header_mutation(), *request_headers_);
}
}
} else if (response->has_immediate_response()) {
// To be implemented later. Leave stream open to allow people to implement
// correct servers that don't break us.
message_valid = true;
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
}

if (message_valid) {
stats_.stream_msgs_received_.inc();
} else {
stats_.spurious_msgs_received_.inc();
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
closeStream();
}
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
stats_.streams_failed_.inc();
if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
stats_.failure_mode_allowed_.inc();
} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
}
}

void Filter::onGrpcClose() {
ENVOY_LOG(debug, "Received gRPC stream close");
stream_closed_ = true;
stats_.streams_closed_.inc();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case FilterState::HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down
Loading