Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions source/extensions/filters/network/thrift_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ envoy_cc_extension(
":framed_transport_lib",
":header_transport_lib",
":protocol_interface",
":protocol_options_config_lib",
":twitter_protocol_lib",
":unframed_transport_lib",
"//envoy/registry",
Expand Down Expand Up @@ -265,6 +266,15 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "protocol_options_config_lib",
hdrs = ["protocol_options_config.h"],
deps = [
":thrift_lib",
"//envoy/upstream:upstream_interface",
],
)

envoy_cc_library(
name = "thrift_lib",
hdrs = ["thrift.h"],
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/network/thrift_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "source/extensions/filters/network/common/factory_base.h"
#include "source/extensions/filters/network/thrift_proxy/conn_manager.h"
#include "source/extensions/filters/network/thrift_proxy/filters/filter.h"
#include "source/extensions/filters/network/thrift_proxy/protocol_options_config.h"
#include "source/extensions/filters/network/thrift_proxy/router/router_impl.h"
#include "source/extensions/filters/network/well_known_names.h"

Expand Down
11 changes: 0 additions & 11 deletions source/extensions/filters/network/thrift_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,6 @@ class Config {
virtual uint64_t maxRequestsPerConnection() const PURE;
};

/**
* Extends Upstream::ProtocolOptionsConfig with Thrift-specific cluster options.
*/
class ProtocolOptionsConfig : public Upstream::ProtocolOptionsConfig {
public:
~ProtocolOptionsConfig() override = default;

virtual TransportType transport(TransportType downstream_transport) const PURE;
virtual ProtocolType protocol(ProtocolType downstream_protocol) const PURE;
};

/**
* ConnectionManager is a Network::Filter that will perform Thrift request handling on a connection.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include "envoy/upstream/upstream.h"

#include "source/extensions/filters/network/thrift_proxy/thrift.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace ThriftProxy {

/**
* Extends Upstream::ProtocolOptionsConfig with Thrift-specific cluster options.
*/
class ProtocolOptionsConfig : public Upstream::ProtocolOptionsConfig {
public:
~ProtocolOptionsConfig() override = default;

virtual TransportType transport(TransportType downstream_transport) const PURE;
virtual ProtocolType protocol(ProtocolType downstream_protocol) const PURE;
};

} // namespace ThriftProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
7 changes: 5 additions & 2 deletions source/extensions/filters/network/thrift_proxy/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ envoy_cc_library(
"//envoy/router:router_interface",
"//envoy/tcp:conn_pool_interface",
"//source/common/buffer:buffer_lib",
"//source/common/common:logger_lib",
"//source/extensions/filters/network:well_known_names",
"//source/extensions/filters/network/thrift_proxy:app_exception_lib",
"//source/extensions/filters/network/thrift_proxy:metadata_lib",
"//source/extensions/filters/network/thrift_proxy:protocol_converter_lib",
"//source/extensions/filters/network/thrift_proxy:protocol_options_config_lib",
],
)

Expand All @@ -52,6 +56,7 @@ envoy_cc_library(
deps = [
":router_interface",
"//envoy/tcp:conn_pool_interface",
"//source/common/common:logger_lib",
"//source/extensions/filters/network/thrift_proxy:app_exception_lib",
"//source/extensions/filters/network/thrift_proxy:conn_manager_lib",
"//source/extensions/filters/network/thrift_proxy:thrift_object_interface",
Expand All @@ -72,11 +77,9 @@ envoy_cc_library(
"//envoy/upstream:cluster_manager_interface",
"//envoy/upstream:load_balancer_interface",
"//envoy/upstream:thread_local_cluster_interface",
"//source/common/common:logger_lib",
"//source/common/http:header_utility_lib",
"//source/common/router:metadatamatchcriteria_lib",
"//source/common/upstream:load_balancer_lib",
"//source/extensions/filters/network:well_known_names",
"//source/extensions/filters/network/thrift_proxy:app_exception_lib",
"//source/extensions/filters/network/thrift_proxy:conn_manager_lib",
"//source/extensions/filters/network/thrift_proxy:protocol_converter_lib",
Expand Down
90 changes: 89 additions & 1 deletion source/extensions/filters/network/thrift_proxy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
#include "envoy/tcp/conn_pool.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"
#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h"
#include "source/extensions/filters/network/thrift_proxy/metadata.h"
#include "source/extensions/filters/network/thrift_proxy/protocol_converter.h"
#include "source/extensions/filters/network/thrift_proxy/protocol_options_config.h"
#include "source/extensions/filters/network/well_known_names.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -101,7 +105,7 @@ struct RouterStats {
/**
* This interface is used by an upstream request to communicate its state.
*/
class RequestOwner : public ProtocolConverter {
class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::Id::thrift> {
public:
RequestOwner(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope)
Expand Down Expand Up @@ -181,6 +185,11 @@ class RequestOwner : public ProtocolConverter {
*/
Upstream::ClusterManager& clusterManager() { return cluster_manager_; }

/**
* @return Upstream::Cluster& the upstream cluster associated with the request.
*/
const Upstream::ClusterInfo& cluster() const { return *cluster_; }

/**
* Common stats.
*/
Expand Down Expand Up @@ -269,6 +278,85 @@ class RequestOwner : public ProtocolConverter {
recordClusterScopeHistogram(cluster, {upstream_rq_time_}, unit, value);
}

protected:
struct UpstreamRequestInfo {
bool passthrough_supported;
TransportType transport;
ProtocolType protocol;
absl::optional<Upstream::TcpPoolData> conn_pool_data;
};

struct PrepareUpstreamRequestResult {
absl::optional<AppException> exception;
absl::optional<UpstreamRequestInfo> upstream_request_info;
};

PrepareUpstreamRequestResult prepareUpstreamRequest(const std::string& cluster_name,
MessageMetadataSharedPtr& metadata,
TransportType transport,
ProtocolType protocol,
Upstream::LoadBalancerContext* lb_context) {
Upstream::ThreadLocalCluster* cluster = clusterManager().getThreadLocalCluster(cluster_name);
if (!cluster) {
ENVOY_LOG(debug, "unknown cluster '{}'", cluster_name);
stats().unknown_cluster_.inc();
return {AppException(AppExceptionType::InternalError,
fmt::format("unknown cluster '{}'", cluster_name)),
absl::nullopt};
}

cluster_ = cluster->info();
ENVOY_LOG(debug, "cluster '{}' match for method '{}'", cluster_name, metadata->methodName());

switch (metadata->messageType()) {
case MessageType::Call:
incRequestCall(*cluster_);
break;

case MessageType::Oneway:
incRequestOneWay(*cluster_);
break;

default:
incRequestInvalid(*cluster_);
break;
}

if (cluster_->maintenanceMode()) {
stats().upstream_rq_maintenance_mode_.inc();
return {AppException(AppExceptionType::InternalError,
fmt::format("maintenance mode for cluster '{}'", cluster_name)),
absl::nullopt};
}

const std::shared_ptr<const ProtocolOptionsConfig> options =
cluster_->extensionProtocolOptionsTyped<ProtocolOptionsConfig>(
NetworkFilterNames::get().ThriftProxy);

const TransportType final_transport = options ? options->transport(transport) : transport;
ASSERT(final_transport != TransportType::Auto);

const ProtocolType final_protocol = options ? options->protocol(protocol) : protocol;
ASSERT(final_protocol != ProtocolType::Auto);

auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, lb_context);
if (!conn_pool_data) {
stats().no_healthy_upstream_.inc();
return {AppException(AppExceptionType::InternalError,
fmt::format("no healthy upstream for '{}'", cluster_name)),
absl::nullopt};
}

const auto passthrough_supported =
transport == TransportType::Framed && final_transport == TransportType::Framed &&
protocol == final_protocol && final_protocol != ProtocolType::Twitter;
UpstreamRequestInfo result = {passthrough_supported, final_transport, final_protocol,
conn_pool_data};
return {absl::nullopt, result};
}

Upstream::ClusterInfoConstSharedPtr cluster_;

private:
void incClusterScopeCounter(const Upstream::ClusterInfo& cluster,
const Stats::StatNameVec& names) const {
Expand Down
Loading