Skip to content
21 changes: 17 additions & 4 deletions api/envoy/config/filter/network/thrift_proxy/v2alpha1/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,24 @@ message Route {
RouteAction route = 2 [(validate.rules).message.required = true, (gogoproto.nullable) = false];
}

// [#comment:next free field: 2]
// [#comment:next free field: 4]
message RouteMatch {
// If specified, the route must exactly match the request method name. As a special case, an
// empty string matches any request method name.
string method = 1;
oneof match_specifier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will allow Thrift requests to be routed based on either method name or service name. @fishcakez and @rgs1, do you think we'll need to be able to route based on the combination of both, e.g. service_name=X and method_name=Y?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, you can do this already with method_name=service:method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks. That solves my one concern.

option (validate.required) = true;

// If specified, the route must exactly match the request method name. As a special case, an
// empty string matches any request method name.
string method_name = 1;

// If specified, the route must have the service name as the request method name prefix. As a
// special case, an empty string matches any service name. Only relevant when service
// multiplexing.
string service_name = 2;
}

// Inverts whatever matching is done in match_specifier. Cannot be combined with wildcard matching
// as that would result in routes never being matched.
bool invert = 3;
}

// [#comment:next free field: 2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "envoy/upstream/cluster_manager.h"
#include "envoy/upstream/thread_local_cluster.h"

#include "common/common/utility.h"

#include "extensions/filters/network/thrift_proxy/app_exception_impl.h"

namespace Envoy {
Expand All @@ -24,14 +26,45 @@ RouteConstSharedPtr RouteEntryImplBase::clusterEntry() const { return shared_fro

MethodNameRouteEntryImpl::MethodNameRouteEntryImpl(
const envoy::config::filter::network::thrift_proxy::v2alpha1::Route& route)
: RouteEntryImplBase(route), method_name_(route.match().method()) {}
: RouteEntryImplBase(route), method_name_(route.match().method_name()),
invert_(route.match().invert()) {
if (method_name_.empty() && invert_) {
throw EnvoyException("Cannot have an empty method name with inversion enabled");
}
}

RouteConstSharedPtr MethodNameRouteEntryImpl::matches(const MessageMetadata& metadata) const {
if (method_name_.empty()) {
bool matches =
method_name_.empty() || (metadata.hasMethodName() && metadata.methodName() == method_name_);

if (matches ^ invert_) {
return clusterEntry();
}

if (metadata.hasMethodName() && metadata.methodName() == method_name_) {
return nullptr;
}

ServiceNameRouteEntryImpl::ServiceNameRouteEntryImpl(
const envoy::config::filter::network::thrift_proxy::v2alpha1::Route& route)
: RouteEntryImplBase(route), invert_(route.match().invert()) {
const std::string service_name = route.match().service_name();
if (service_name.empty() && invert_) {
throw EnvoyException("Cannot have an empty service name with inversion enabled");
}

if (!service_name.empty() && !StringUtil::endsWith(service_name, ":")) {
service_name_ = service_name + ":";
} else {
service_name_ = service_name;
}
}

RouteConstSharedPtr ServiceNameRouteEntryImpl::matches(const MessageMetadata& metadata) const {
bool matches = service_name_.empty() ||
(metadata.hasMethodName() &&
StringUtil::startsWith(metadata.methodName().c_str(), service_name_));

if (matches ^ invert_) {
return clusterEntry();
}

Expand All @@ -40,8 +73,19 @@ RouteConstSharedPtr MethodNameRouteEntryImpl::matches(const MessageMetadata& met

RouteMatcher::RouteMatcher(
const envoy::config::filter::network::thrift_proxy::v2alpha1::RouteConfiguration& config) {
using envoy::config::filter::network::thrift_proxy::v2alpha1::RouteMatch;

for (const auto& route : config.routes()) {
routes_.emplace_back(new MethodNameRouteEntryImpl(route));
switch (route.match().match_specifier_case()) {
case RouteMatch::MatchSpecifierCase::kMethodName:
routes_.emplace_back(new MethodNameRouteEntryImpl(route));
break;
case RouteMatch::MatchSpecifierCase::kServiceName:
routes_.emplace_back(new ServiceNameRouteEntryImpl(route));
break;
default:
NOT_REACHED_GCOVR_EXCL_LINE;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,27 @@ class MethodNameRouteEntryImpl : public RouteEntryImplBase {

const std::string& methodName() const { return method_name_; }

// RoutEntryImplBase
// RouteEntryImplBase
RouteConstSharedPtr matches(const MessageMetadata& metadata) const override;

private:
const std::string method_name_;
const bool invert_;
};

class ServiceNameRouteEntryImpl : public RouteEntryImplBase {
public:
ServiceNameRouteEntryImpl(
const envoy::config::filter::network::thrift_proxy::v2alpha1::Route& route);

const std::string& serviceName() const { return service_name_; }

// RouteEntryImplBase
RouteConstSharedPtr matches(const MessageMetadata& metadata) const override;

private:
std::string service_name_;
const bool invert_;
};

class RouteMatcher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ stat_prefix: test
name: "routes"
routes:
- match:
method: name
method_name: name
route:
cluster: cluster
)EOF";
Expand Down
61 changes: 53 additions & 8 deletions test/extensions/filters/network/thrift_proxy/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,18 @@ class ThriftConnManagerIntegrationTest
route_config:
name: "routes"
routes:
- match: {}
- match:
service_name: "svcname"
route:
cluster: "cluster_0"
- match:
method_name: "execute"
route:
cluster: "cluster_1"
- match:
method_name: "poke"
route:
cluster: "cluster_2"
)EOF";
}

Expand All @@ -73,8 +82,7 @@ class ThriftConnManagerIntegrationTest
preparePayloads(result_mode, "execute");
ASSERT(request_bytes_.length() > 0);
ASSERT(response_bytes_.length() > 0);

BaseIntegrationTest::initialize();
initializeCommon();
}

void initializeOneway() {
Expand All @@ -84,6 +92,24 @@ class ThriftConnManagerIntegrationTest
ASSERT(request_bytes_.length() > 0);
ASSERT(response_bytes_.length() == 0);

initializeCommon();
}

// We allocate as many upstreams as there are clusters, with each upstream being allocated
// to clusters in the order they're defined in the bootstrap config.
void initializeCommon() {
setUpstreamCount(3);

config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) {
auto* c1 = bootstrap.mutable_static_resources()->add_clusters();
c1->MergeFrom(bootstrap.static_resources().clusters()[0]);
c1->set_name("cluster_1");

auto* c2 = bootstrap.mutable_static_resources()->add_clusters();
c2->MergeFrom(bootstrap.static_resources().clusters()[0]);
c2->set_name("cluster_2");
});

BaseIntegrationTest::initialize();
}

Expand Down Expand Up @@ -140,6 +166,20 @@ class ThriftConnManagerIntegrationTest
}
}

// Multiplexed requests are handled by the service name route match,
// while oneway's are handled by the "poke" method. All other requests
// are handled by "execute".
FakeUpstream* getExpectedUpstream(bool oneway) {
int upstreamIdx = 1;
if (multiplexed_) {
upstreamIdx = 0;
} else if (oneway) {
upstreamIdx = 2;
}

return fake_upstreams_[upstreamIdx].get();
}

std::string transport_;
std::string protocol_;
bool multiplexed_;
Expand Down Expand Up @@ -176,7 +216,8 @@ TEST_P(ThriftConnManagerIntegrationTest, Success) {
tcp_client->write(request_bytes_.toString());

FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
FakeUpstream* expected_upstream = getExpectedUpstream(false);
ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection));
std::string data;
ASSERT_TRUE(fake_upstream_connection->waitForData(request_bytes_.length(), &data));
Buffer::OwnedImpl upstream_request(data);
Expand All @@ -201,8 +242,9 @@ TEST_P(ThriftConnManagerIntegrationTest, IDLException) {
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
tcp_client->write(request_bytes_.toString());

FakeUpstream* expected_upstream = getExpectedUpstream(false);
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection));
std::string data;
ASSERT_TRUE(fake_upstream_connection->waitForData(request_bytes_.length(), &data));
Buffer::OwnedImpl upstream_request(data);
Expand All @@ -227,8 +269,9 @@ TEST_P(ThriftConnManagerIntegrationTest, Exception) {
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
tcp_client->write(request_bytes_.toString());

FakeUpstream* expected_upstream = getExpectedUpstream(false);
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection));
std::string data;
ASSERT_TRUE(fake_upstream_connection->waitForData(request_bytes_.length(), &data));
Buffer::OwnedImpl upstream_request(data);
Expand All @@ -253,8 +296,9 @@ TEST_P(ThriftConnManagerIntegrationTest, Oneway) {
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
tcp_client->write(request_bytes_.toString());

FakeUpstream* expected_upstream = getExpectedUpstream(true);
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection));
std::string data;
ASSERT_TRUE(fake_upstream_connection->waitForData(request_bytes_.length(), &data));
Buffer::OwnedImpl upstream_request(data);
Expand All @@ -274,8 +318,9 @@ TEST_P(ThriftConnManagerIntegrationTest, OnewayEarlyClose) {
tcp_client->write(request_bytes_.toString());
tcp_client->close();

FakeUpstream* expected_upstream = getExpectedUpstream(true);
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(expected_upstream->waitForRawConnection(fake_upstream_connection));
std::string data;
ASSERT_TRUE(fake_upstream_connection->waitForData(request_bytes_.length(), &data));
Buffer::OwnedImpl upstream_request(data);
Expand Down
Loading