Skip to content

Commit

Permalink
Revert "Revert "Adding is_optional case to RLS (grpc#29259)" (grpc#29299
Browse files Browse the repository at this point in the history
)"

This reverts commit a6419dd.
  • Loading branch information
markdroth committed Apr 12, 2022
1 parent c64d6f3 commit 37a8482
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 59 deletions.
54 changes: 26 additions & 28 deletions src/core/ext/xds/xds_cluster_specifier_plugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void XdsRouteLookupClusterSpecifierPlugin::PopulateSymtab(
grpc_lookup_v1_RouteLookupConfig_getmsgdef(symtab);
}

absl::StatusOr<Json>
absl::StatusOr<std::string>
XdsRouteLookupClusterSpecifierPlugin::GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const {
Expand Down Expand Up @@ -83,7 +83,23 @@ XdsRouteLookupClusterSpecifierPlugin::GenerateLoadBalancingPolicyConfig(
policy["rls_experimental"] = std::move(rls_policy);
Json::Array policies;
policies.emplace_back(std::move(policy));
return Json(policies);
Json lb_policy_config(std::move(policies));
grpc_error_handle parse_error = GRPC_ERROR_NONE;
// TODO(roth): If/when we ever add a second plugin, refactor this code
// somehow such that we automatically validate the resulting config against
// the gRPC LB policy registry instead of requiring each plugin to do that
// itself.
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(lb_policy_config,
&parse_error);
if (parse_error != GRPC_ERROR_NONE) {
absl::Status status = absl::InvalidArgumentError(absl::StrCat(
kXdsRouteLookupClusterSpecifierPluginConfigName,
" ClusterSpecifierPlugin returned invalid LB policy config: ",
grpc_error_std_string(parse_error)));
GRPC_ERROR_UNREF(parse_error);
return status;
}
return lb_policy_config.Dump();
}

namespace {
Expand All @@ -95,6 +111,14 @@ PluginRegistryMap* g_plugin_registry = nullptr;

} // namespace

const XdsClusterSpecifierPluginImpl*
XdsClusterSpecifierPluginRegistry::GetPluginForType(
absl::string_view config_proto_type_name) {
auto it = g_plugin_registry->find(config_proto_type_name);
if (it == g_plugin_registry->end()) return nullptr;
return it->second.get();
}

void XdsClusterSpecifierPluginRegistry::PopulateSymtab(upb_DefPool* symtab) {
for (const auto& p : *g_plugin_registry) {
p.second->PopulateSymtab(symtab);
Expand All @@ -107,32 +131,6 @@ void XdsClusterSpecifierPluginRegistry::RegisterPlugin(
(*g_plugin_registry)[config_proto_type_name] = std::move(plugin);
}

absl::StatusOr<std::string>
XdsClusterSpecifierPluginRegistry::GenerateLoadBalancingPolicyConfig(
absl::string_view proto_type_name, upb_StringView serialized_plugin_config,
upb_Arena* arena, upb_DefPool* symtab) {
auto it = g_plugin_registry->find(proto_type_name);
if (it == g_plugin_registry->end()) {
return absl::InvalidArgumentError(
"Unable to locate the cluster specifier plugin in the registry");
}
auto lb_policy_config = it->second->GenerateLoadBalancingPolicyConfig(
serialized_plugin_config, arena, symtab);
if (!lb_policy_config.ok()) return lb_policy_config.status();
grpc_error_handle parse_error = GRPC_ERROR_NONE;
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(*lb_policy_config,
&parse_error);
if (parse_error != GRPC_ERROR_NONE) {
absl::Status status = absl::InvalidArgumentError(absl::StrCat(
proto_type_name,
" ClusterSpecifierPlugin returned invalid LB policy config: ",
grpc_error_std_string(parse_error)));
GRPC_ERROR_UNREF(parse_error);
return status;
}
return lb_policy_config->Dump();
}

void XdsClusterSpecifierPluginRegistry::Init() {
g_plugin_registry = new PluginRegistryMap;
RegisterPlugin(absl::make_unique<XdsRouteLookupClusterSpecifierPlugin>(),
Expand Down
10 changes: 4 additions & 6 deletions src/core/ext/xds/xds_cluster_specifier_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class XdsClusterSpecifierPluginImpl {
virtual void PopulateSymtab(upb_DefPool* symtab) const = 0;

// Returns the LB policy config in JSON form.
virtual absl::StatusOr<Json> GenerateLoadBalancingPolicyConfig(
virtual absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const = 0;
};
Expand All @@ -53,7 +53,7 @@ class XdsRouteLookupClusterSpecifierPlugin
: public XdsClusterSpecifierPluginImpl {
void PopulateSymtab(upb_DefPool* symtab) const override;

absl::StatusOr<Json> GenerateLoadBalancingPolicyConfig(
absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const override;
};
Expand All @@ -66,10 +66,8 @@ class XdsClusterSpecifierPluginRegistry {

static void PopulateSymtab(upb_DefPool* symtab);

static absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
absl::string_view proto_type_name,
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab);
static const XdsClusterSpecifierPluginImpl* GetPluginForType(
absl::string_view config_proto_type_name);

// Global init and shutdown.
static void Init();
Expand Down
47 changes: 33 additions & 14 deletions src/core/ext/xds/xds_route_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,26 +325,43 @@ grpc_error_handle ClusterSpecifierPluginParse(
cluster_specifier_plugin[i]);
std::string name = UpbStringToStdString(
envoy_config_core_v3_TypedExtensionConfig_name(extension));
if (rds_update->cluster_specifier_plugin_map.find(name) !=
rds_update->cluster_specifier_plugin_map.end()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"Duplicated definition of cluster_specifier_plugin ", name));
}
const google_protobuf_Any* any =
envoy_config_core_v3_TypedExtensionConfig_typed_config(extension);
if (any == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"could not obtrain TypedExtensionConfig for plugin config");
"Could not obtrain TypedExtensionConfig for plugin config.");
}
absl::string_view plugin_type;
grpc_error_handle error =
ExtractExtensionTypeName(context, any, &plugin_type);
if (error != GRPC_ERROR_NONE) return error;
// Find the plugin and generate the policy.
auto lb_policy_config =
XdsClusterSpecifierPluginRegistry::GenerateLoadBalancingPolicyConfig(
plugin_type, google_protobuf_Any_value(any), context.arena,
context.symtab);
if (!lb_policy_config.ok()) {
return absl_status_to_grpc_error(lb_policy_config.status());
bool is_optional = envoy_config_route_v3_ClusterSpecifierPlugin_is_optional(
cluster_specifier_plugin[i]);
const XdsClusterSpecifierPluginImpl* cluster_specifier_plugin_impl =
XdsClusterSpecifierPluginRegistry::GetPluginForType(plugin_type);
std::string lb_policy_config;
if (cluster_specifier_plugin_impl == nullptr) {
if (!is_optional) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("Unknown ClusterSpecifierPlugin type ", plugin_type));
}
// Optional plugin, leave lb_policy_config empty.
} else {
auto config =
cluster_specifier_plugin_impl->GenerateLoadBalancingPolicyConfig(
google_protobuf_Any_value(any), context.arena, context.symtab);
if (!config.ok()) {
return absl_status_to_grpc_error(config.status());
}
lb_policy_config = std::move(*config);
}
rds_update->cluster_specifier_plugin_map[std::move(name)] =
std::move(lb_policy_config.value());
std::move(lb_policy_config);
}
return GRPC_ERROR_NONE;
}
Expand Down Expand Up @@ -773,12 +790,14 @@ grpc_error_handle RouteActionParse(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains empty cluster specifier plugin name.");
}
if (cluster_specifier_plugin_map.find(plugin_name) ==
cluster_specifier_plugin_map.end()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains cluster specifier plugin name not "
"configured.");
auto it = cluster_specifier_plugin_map.find(plugin_name);
if (it == cluster_specifier_plugin_map.end()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("RouteAction cluster contains cluster specifier plugin "
"name not configured: ",
plugin_name));
}
if (it->second.empty()) *ignore_route = true;
route->action.emplace<XdsRouteConfigResource::Route::RouteAction::
kClusterSpecifierPluginIndex>(
std::move(plugin_name));
Expand Down
7 changes: 7 additions & 0 deletions src/proto/grpc/testing/xds/v3/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ message QueryParameterMatcher {
message ClusterSpecifierPlugin {
// The name of the plugin and its opaque configuration.
core.v3.TypedExtensionConfig extension = 1;

// If is_optional is not set and the plugin defined by this message is not
// a supported type, the containing resource is NACKed. If is_optional is
// set, the resource would not be NACKed for this reason. In this case,
// routes referencing this plugin's name would not be treated as an illegal
// configuration, but would result in a failure if the route is selected.
bool is_optional = 2;
}

// [#protodoc-title: HTTP route configuration]
Expand Down
84 changes: 73 additions & 11 deletions test/cpp/end2end/xds/xds_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7672,15 +7672,49 @@ TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUndefinedSpecifier) {
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr("RouteAction cluster contains cluster "
"specifier plugin name not configured."));
"specifier plugin name not configured:"));
}

TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProto) {
// TODO(donnadionne): Doug is working on adding a new is_optional field to
// ClusterSpecifierPlugin in envoyproxy/envoy#20301. Once that goes in, the
// behavior we want in this case is that if is_optional is true, then we
// ignore that plugin and ignore any routes that refer to that plugin.
// However, if is_optional is false, then we want to NACK.
TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksDuplicateSpecifier) {
ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_RLS_LB");
// Prepare the RLSLookupConfig: change route configurations to use cluster
// specifier plugin.
RouteLookupConfig route_lookup_config;
auto* key_builder = route_lookup_config.add_grpc_keybuilders();
auto* name = key_builder->add_names();
name->set_service(kRlsServiceValue);
name->set_method(kRlsMethodValue);
auto* header = key_builder->add_headers();
header->set_key(kRlsTestKey);
header->add_names(kRlsTestKey1);
route_lookup_config.set_lookup_service(
absl::StrCat("localhost:", rls_server_->port()));
route_lookup_config.set_cache_size_bytes(5000);
RouteLookupClusterSpecifier rls;
*rls.mutable_route_lookup_config() = std::move(route_lookup_config);
RouteConfiguration new_route_config = default_route_config_;
auto* plugin = new_route_config.add_cluster_specifier_plugins();
plugin->mutable_extension()->set_name(kRlsClusterSpecifierPluginInstanceName);
plugin->mutable_extension()->mutable_typed_config()->PackFrom(rls);
auto* duplicate_plugin = new_route_config.add_cluster_specifier_plugins();
duplicate_plugin->mutable_extension()->set_name(
kRlsClusterSpecifierPluginInstanceName);
duplicate_plugin->mutable_extension()->mutable_typed_config()->PackFrom(rls);
auto* default_route =
new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
default_route->mutable_route()->set_cluster_specifier_plugin(
kRlsClusterSpecifierPluginInstanceName);
SetRouteConfiguration(balancer_.get(), new_route_config);
const auto response_state = WaitForRdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr(absl::StrCat(
"Duplicated definition of cluster_specifier_plugin ",
kRlsClusterSpecifierPluginInstanceName)));
}

TEST_P(RlsTest,
XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProtoNotOptional) {
ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_RLS_LB");
// Prepare the RLSLookupConfig: change route configurations to use cluster
// specifier plugin.
Expand All @@ -7699,10 +7733,38 @@ TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProto) {
SetRouteConfiguration(balancer_.get(), new_route_config);
const auto response_state = WaitForRdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(
response_state->error_message,
::testing::HasSubstr(
"Unable to locate the cluster specifier plugin in the registry"));
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr("Unknown ClusterSpecifierPlugin type "
"grpc.lookup.v1.RouteLookupConfig"));
}

TEST_P(RlsTest,
XdsRoutingClusterSpecifierPluginIgnoreUnknownSpecifierProtoOptional) {
ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_RLS_LB");
CreateAndStartBackends(1);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Prepare the RLSLookupConfig: change route configurations to use cluster
// specifier plugin.
RouteLookupConfig route_lookup_config;
RouteConfiguration new_route_config = default_route_config_;
auto* plugin = new_route_config.add_cluster_specifier_plugins();
plugin->mutable_extension()->set_name(kRlsClusterSpecifierPluginInstanceName);
// Instead of grpc.lookup.v1.RouteLookupClusterSpecifier, let's say we
// mistakenly packed the inner RouteLookupConfig instead.
plugin->mutable_extension()->mutable_typed_config()->PackFrom(
route_lookup_config);
plugin->set_is_optional(true);
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route->mutable_route()->set_cluster_specifier_plugin(
kRlsClusterSpecifierPluginInstanceName);
auto* default_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultClusterName);
SetRouteConfiguration(balancer_.get(), new_route_config);
// Ensure we ignore the cluster specifier plugin and send traffic according to
// the default route.
WaitForAllBackends();
}

TEST_P(RlsTest, XdsRoutingRlsClusterSpecifierPluginNacksRequiredMatch) {
Expand Down

0 comments on commit 37a8482

Please sign in to comment.