Skip to content
Merged
15 changes: 13 additions & 2 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@ envoy_extension_package()

envoy_cc_library(
name = "ext_proc",
srcs = ["ext_proc.cc"],
hdrs = ["ext_proc.h"],
srcs = [
"ext_proc.cc",
"headers.cc",
],
hdrs = [
"ext_proc.h",
"headers.h",
],
deps = [
":client_interface",
"//include/envoy/http:filter_interface",
"//include/envoy/http:header_map_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/strings:str_format",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto",
],
)

Expand All @@ -27,6 +37,7 @@ envoy_cc_extension(
security_posture = "unknown",
status = "alpha",
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/common:factory_base_lib",
Expand Down
17 changes: 13 additions & 4 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <string>

#include "extensions/filters/http/ext_proc/client_impl.h"
#include "extensions/filters/http/ext_proc/ext_proc.h"

namespace Envoy {
Expand All @@ -11,11 +12,19 @@ namespace ExternalProcessing {

Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string&, Server::Configuration::FactoryContext&) {
const auto filter_config = std::make_shared<FilterConfig>(proto_config);
const std::string&, Server::Configuration::FactoryContext& context) {
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
Comment thread
gbrail marked this conversation as resolved.
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(timeout_ms));

return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) {
callbacks.addStreamFilter(Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config)});
return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), grpc_service, context.scope());

callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}

Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ExternalProcessingFilterConfig
ExternalProcessingFilterConfig() : FactoryBase(HttpFilterNames::get().ExternalProcessing) {}

private:
static constexpr uint64_t DefaultTimeout = 200;

Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
Expand Down
107 changes: 106 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,116 @@
#include "extensions/filters/http/ext_proc/ext_proc.h"

#include "extensions/filters/http/ext_proc/headers.h"

#include "absl/strings/str_format.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

void Filter::onDestroy() {}
using envoy::service::ext_proc::v3alpha::ProcessingRequest;
using envoy::service::ext_proc::v3alpha::ProcessingResponse;

using Http::FilterHeadersStatus;
using Http::RequestHeaderMap;

static const std::string kErrorPrefix = "ext_proc error";

void Filter::onDestroy() {
if (stream_ && !stream_closed_) {
ENVOY_LOG(debug, "Closing gRPC stream to processing server");
stream_->close();
}
}

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_of_stream) {
// We're at the start, so start the stream and send a headers message
request_headers_ = &headers;
stream_ = client_->start(*this, config_->grpcTimeout());
ProcessingRequest req;
auto headers_req = req.mutable_request_headers();
buildHttpHeaders(headers, headers_req->mutable_headers());
headers_req->set_end_of_stream(end_of_stream);
request_state_ = FilterState::HEADERS;
stream_->send(std::move(req), false);
Comment thread
gbrail marked this conversation as resolved.

// Wait until we have a gRPC response before allowing any more callbacks
return FilterHeadersStatus::StopAllIterationAndWatermark;
}

void Filter::onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& r) {
auto response = std::move(r);
bool message_valid = false;
ENVOY_LOG(debug, "Received gRPC message. State = {}", request_state_);

// This next section will grow as we support the rest of the protocol
if (request_state_ == FilterState::HEADERS) {
if (response->has_request_headers()) {
Comment thread
gbrail marked this conversation as resolved.
ENVOY_LOG(debug, "applying request_headers response");
message_valid = true;
const auto headers_response = response->request_headers();
if (headers_response.has_response()) {
const auto& common_response = headers_response.response();
if (common_response.has_header_mutation()) {
applyHeaderMutations(common_response.header_mutation(), request_headers_);
}
}
}
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
}

if (!message_valid) {
// Ignore messages received out of order. However, close the stream to
// protect ourselves since the server is not following the protocol.
ENVOY_LOG(warn, "Spurious response message received on gRPC stream");
Comment thread
gbrail marked this conversation as resolved.
stream_closed_ = true;
stream_->close();
Comment thread
gbrail marked this conversation as resolved.
Outdated
}
}

void Filter::onGrpcError(Grpc::Status::GrpcStatus status) {
ENVOY_LOG(debug, "Received gRPC error on stream: {}", status);
stream_closed_ = true;
if (config_->failureModeAllow()) {
// Ignore this and treat as a successful close
onGrpcClose();
} else {
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, "", nullptr, absl::nullopt,
absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status));
break;
default:
// Nothing else to do
break;
}
}
}

void Filter::onGrpcClose() {
ENVOY_LOG(debug, "Received gRPC stream close");
stream_closed_ = true;
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
// Use a switch here now because there will be more than two
// cases very soon.
switch (request_state_) {
case HEADERS:
request_state_ = FilterState::IDLE;
decoder_callbacks_->continueDecoding();
break;
default:
// Nothing to do otherwise
break;
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down
49 changes: 44 additions & 5 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <chrono>
#include <memory>

#include "envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.pb.h"
Expand All @@ -9,6 +10,7 @@
#include "common/common/logger.h"

#include "extensions/filters/http/common/pass_through_filter.h"
#include "extensions/filters/http/ext_proc/client.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -17,25 +19,62 @@ namespace ExternalProcessing {

class FilterConfig {
public:
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& config)
: failure_mode_allow_(config.failure_mode_allow()) {}
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& config,
const std::chrono::milliseconds grpc_timeout)
: failure_mode_allow_(config.failure_mode_allow()), grpc_timeout_(grpc_timeout) {}

bool failureModeAllow() const { return failure_mode_allow_; }

const std::chrono::milliseconds& grpcTimeout() const { return grpc_timeout_; }

private:
const bool failure_mode_allow_;
const std::chrono::milliseconds grpc_timeout_;
};

using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;

class Filter : public Logger::Loggable<Logger::Id::filter>, public Http::PassThroughFilter {
class Filter : public Logger::Loggable<Logger::Id::filter>,
public Http::PassThroughFilter,
public ExternalProcessorCallbacks {
enum FilterState {
Comment thread
gbrail marked this conversation as resolved.
Outdated
IDLE,
HEADERS,
Comment thread
gbrail marked this conversation as resolved.
};

public:
Filter(const FilterConfigSharedPtr& config) : config_(config) {}
Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client)
: config_(config), client_(std::move(client)) {}

void onDestroy() override;

void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;

// ExternalProcessorCallbacks

void onReceiveMessage(
std::unique_ptr<envoy::service::ext_proc::v3alpha::ProcessingResponse>&& response) override;

void onGrpcError(Grpc::Status::GrpcStatus error) override;

void onGrpcClose() override;

private:
FilterConfigSharedPtr config_;
const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;

Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr;

FilterState request_state_ = FilterState::IDLE;
ExternalProcessorStreamPtr stream_;
bool stream_closed_ = false;

Http::HeaderMap* request_headers_ = nullptr;
};

} // namespace ExternalProcessing
Expand Down
45 changes: 45 additions & 0 deletions source/extensions/filters/http/ext_proc/headers.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "extensions/filters/http/ext_proc/headers.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

using Http::LowerCaseString;

void buildHttpHeaders(const Http::HeaderMap& headers_in,
Comment thread
gbrail marked this conversation as resolved.
Outdated
envoy::config::core::v3::HeaderMap* headers_out) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Envoy style prefers mutable refs, e.g. envoy::config::core::v3::HeaderMap& headers_out. This applies here but also below to other functions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed that.

headers_in.iterate([headers_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate {
auto new_header = headers_out->add_headers();
Comment thread
gbrail marked this conversation as resolved.
Outdated
new_header->set_key(std::string(e.key().getStringView()));
new_header->set_value(std::string(e.value().getStringView()));
return Http::HeaderMap::Iterate::Continue;
Comment thread
gbrail marked this conversation as resolved.
Outdated
});
}
Comment thread
htuch marked this conversation as resolved.
Outdated

void applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation,
Http::HeaderMap* headers) {
for (const auto& sh : mutation.set_headers()) {
if (!sh.has_header()) {
continue;
}
bool append = false;
if (sh.has_append()) {
append = sh.append().value();
}
if (append) {
headers->addCopy(LowerCaseString(sh.header().key()), sh.header().value());
} else {
headers->setCopy(LowerCaseString(sh.header().key()), sh.header().value());
}
}

for (const auto& rh : mutation.remove_headers()) {
headers->remove(LowerCaseString(rh));
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
20 changes: 20 additions & 0 deletions source/extensions/filters/http/ext_proc/headers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include "envoy/http/header_map.h"
#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

extern void buildHttpHeaders(const Http::HeaderMap& headers_in,
envoy::config::core::v3::HeaderMap* headers_out);

extern void applyHeaderMutations(const envoy::service::ext_proc::v3alpha::HeaderMutation& mutation,
Http::HeaderMap* headers);

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading