Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding is_optional case to RLS #29259

Merged
merged 10 commits into from
Apr 1, 2022
50 changes: 22 additions & 28 deletions src/core/ext/xds/xds_cluster_specifier_plugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void XdsRouteLookupClusterSpecifierPlugin::PopulateSymtab(
grpc_lookup_v1_RouteLookupConfig_getmsgdef(symtab);
}

absl::StatusOr<Json>
absl::StatusOr<std::string>
XdsRouteLookupClusterSpecifierPlugin::GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const {
Expand Down Expand Up @@ -83,7 +83,19 @@ XdsRouteLookupClusterSpecifierPlugin::GenerateLoadBalancingPolicyConfig(
policy["rls_experimental"] = std::move(rls_policy);
Json::Array policies;
policies.emplace_back(std::move(policy));
return Json(policies);
Json lb_policy_config = Json(policies);
grpc_error_handle parse_error = GRPC_ERROR_NONE;
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(lb_policy_config,
&parse_error);
if (parse_error != GRPC_ERROR_NONE) {
absl::Status status = absl::InvalidArgumentError(absl::StrCat(
kXdsRouteLookupClusterSpecifierPluginConfigName,
" ClusterSpecifierPlugin returned invalid LB policy config: ",
grpc_error_std_string(parse_error)));
GRPC_ERROR_UNREF(parse_error);
return status;
}
return lb_policy_config.Dump();
}

namespace {
Expand All @@ -95,6 +107,14 @@ PluginRegistryMap* g_plugin_registry = nullptr;

} // namespace

const XdsClusterSpecifierPluginImpl*
XdsClusterSpecifierPluginRegistry::GetPluginForType(
absl::string_view config_proto_type_name) {
auto it = g_plugin_registry->find(config_proto_type_name);
if (it == g_plugin_registry->end()) return nullptr;
return it->second.get();
}

void XdsClusterSpecifierPluginRegistry::PopulateSymtab(upb_DefPool* symtab) {
for (const auto& p : *g_plugin_registry) {
p.second->PopulateSymtab(symtab);
Expand All @@ -107,32 +127,6 @@ void XdsClusterSpecifierPluginRegistry::RegisterPlugin(
(*g_plugin_registry)[config_proto_type_name] = std::move(plugin);
}

absl::StatusOr<std::string>
XdsClusterSpecifierPluginRegistry::GenerateLoadBalancingPolicyConfig(
absl::string_view proto_type_name, upb_StringView serialized_plugin_config,
upb_Arena* arena, upb_DefPool* symtab) {
auto it = g_plugin_registry->find(proto_type_name);
if (it == g_plugin_registry->end()) {
return absl::InvalidArgumentError(
"Unable to locate the cluster specifier plugin in the registry");
}
auto lb_policy_config = it->second->GenerateLoadBalancingPolicyConfig(
serialized_plugin_config, arena, symtab);
if (!lb_policy_config.ok()) return lb_policy_config.status();
grpc_error_handle parse_error = GRPC_ERROR_NONE;
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(*lb_policy_config,
&parse_error);
if (parse_error != GRPC_ERROR_NONE) {
absl::Status status = absl::InvalidArgumentError(absl::StrCat(
proto_type_name,
" ClusterSpecifierPlugin returned invalid LB policy config: ",
grpc_error_std_string(parse_error)));
GRPC_ERROR_UNREF(parse_error);
return status;
}
return lb_policy_config->Dump();
}

void XdsClusterSpecifierPluginRegistry::Init() {
g_plugin_registry = new PluginRegistryMap;
RegisterPlugin(absl::make_unique<XdsRouteLookupClusterSpecifierPlugin>(),
Expand Down
12 changes: 5 additions & 7 deletions src/core/ext/xds/xds_cluster_specifier_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class XdsClusterSpecifierPluginImpl {
virtual void PopulateSymtab(upb_DefPool* symtab) const = 0;

// Returns the LB policy config in JSON form.
virtual absl::StatusOr<Json> GenerateLoadBalancingPolicyConfig(
virtual absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const = 0;
};
Expand All @@ -53,7 +53,7 @@ class XdsRouteLookupClusterSpecifierPlugin
: public XdsClusterSpecifierPluginImpl {
void PopulateSymtab(upb_DefPool* symtab) const override;

absl::StatusOr<Json> GenerateLoadBalancingPolicyConfig(
absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const override;
};
Expand All @@ -64,12 +64,10 @@ class XdsClusterSpecifierPluginRegistry {
std::unique_ptr<XdsClusterSpecifierPluginImpl> plugin,
absl::string_view config_proto_type_name);

static void PopulateSymtab(upb_DefPool* symtab);
static const XdsClusterSpecifierPluginImpl* GetPluginForType(
absl::string_view config_proto_type_name);

static absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
absl::string_view proto_type_name,
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab);
static void PopulateSymtab(upb_DefPool* symtab);

// Global init and shutdown.
static void Init();
Expand Down
48 changes: 35 additions & 13 deletions src/core/ext/xds/xds_route_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ std::string XdsRouteConfigResource::ToString() const {
for (const auto& it : cluster_specifier_plugin_map) {
parts.push_back(absl::StrFormat("%s={%s}\n", it.first, it.second));
}
parts.push_back("ignored_cluster_specifier_plugins={\n");
for (const auto& it : ignored_cluster_specifier_plugin_set) {
parts.push_back(it);
}
parts.push_back("}");
return absl::StrJoin(parts, "");
}
Expand Down Expand Up @@ -335,11 +339,22 @@ grpc_error_handle ClusterSpecifierPluginParse(
grpc_error_handle error =
ExtractExtensionTypeName(context, any, &plugin_type);
if (error != GRPC_ERROR_NONE) return error;
bool is_optional = true;
// bool is_optional =
// envoy_config_route_v3_ClusterSpecifierPlugin_is_optional(cluster_specifier_plugin[i]);
const XdsClusterSpecifierPluginImpl* cluster_specifier_plugin_impl =
XdsClusterSpecifierPluginRegistry::GetPluginForType(plugin_type);
if (cluster_specifier_plugin_impl == nullptr) {
if (is_optional) {
gpr_log(GPR_INFO, "donna in new case");
rds_update->ignored_cluster_specifier_plugin_set.emplace(name);
continue;
}
}
// Find the plugin and generate the policy.
auto lb_policy_config =
XdsClusterSpecifierPluginRegistry::GenerateLoadBalancingPolicyConfig(
plugin_type, google_protobuf_Any_value(any), context.arena,
context.symtab);
cluster_specifier_plugin_impl->GenerateLoadBalancingPolicyConfig(
google_protobuf_Any_value(any), context.arena, context.symtab);
if (!lb_policy_config.ok()) {
return absl_status_to_grpc_error(lb_policy_config.status());
}
Expand Down Expand Up @@ -688,6 +703,7 @@ grpc_error_handle RouteActionParse(
const std::map<std::string /*cluster_specifier_plugin_name*/,
std::string /*LB policy config*/>&
cluster_specifier_plugin_map,
const std::set<std::string>& ignored_cluster_specifier_plugin_set,
XdsRouteConfigResource::Route::RouteAction* route, bool* ignore_route) {
const envoy_config_route_v3_RouteAction* route_action =
envoy_config_route_v3_Route_route(route_msg);
Expand Down Expand Up @@ -773,15 +789,20 @@ grpc_error_handle RouteActionParse(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains empty cluster specifier plugin name.");
}
if (cluster_specifier_plugin_map.find(plugin_name) ==
if (cluster_specifier_plugin_map.find(plugin_name) !=
cluster_specifier_plugin_map.end()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains cluster specifier plugin name not "
"configured.");
route->action.emplace<XdsRouteConfigResource::Route::RouteAction::
kClusterSpecifierPluginIndex>(
std::move(plugin_name));
} else {
if (ignored_cluster_specifier_plugin_set.find(plugin_name) ==
ignored_cluster_specifier_plugin_set.end()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains cluster specifier plugin name not "
"configured.");
}
*ignore_route = true;
}
route->action.emplace<XdsRouteConfigResource::Route::RouteAction::
kClusterSpecifierPluginIndex>(
std::move(plugin_name));
} else {
// No cluster or weighted_clusters or plugin found in RouteAction, ignore
// this route.
Expand Down Expand Up @@ -992,9 +1013,10 @@ grpc_error_handle XdsRouteConfigResource::Parse(
route.action.emplace<XdsRouteConfigResource::Route::RouteAction>();
auto& route_action =
absl::get<XdsRouteConfigResource::Route::RouteAction>(route.action);
error = RouteActionParse(context, routes[j],
rds_update->cluster_specifier_plugin_map,
&route_action, &ignore_route);
error = RouteActionParse(
context, routes[j], rds_update->cluster_specifier_plugin_map,
rds_update->ignored_cluster_specifier_plugin_set, &route_action,
&ignore_route);
if (error != GRPC_ERROR_NONE) return error;
if (ignore_route) continue;
if (route_action.retry_policy == absl::nullopt &&
Expand Down
5 changes: 4 additions & 1 deletion src/core/ext/xds/xds_route_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,13 @@ struct XdsRouteConfigResource {
std::map<std::string /*cluster_specifier_plugin_name*/,
std::string /*LB policy config*/>
cluster_specifier_plugin_map;
std::set<std::string> ignored_cluster_specifier_plugin_set;

bool operator==(const XdsRouteConfigResource& other) const {
return virtual_hosts == other.virtual_hosts &&
cluster_specifier_plugin_map == other.cluster_specifier_plugin_map;
cluster_specifier_plugin_map == other.cluster_specifier_plugin_map &&
ignored_cluster_specifier_plugin_set ==
other.ignored_cluster_specifier_plugin_set;
}
std::string ToString() const;

Expand Down
7 changes: 7 additions & 0 deletions src/proto/grpc/testing/xds/v3/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ message QueryParameterMatcher {
message ClusterSpecifierPlugin {
// The name of the plugin and its opaque configuration.
core.v3.TypedExtensionConfig extension = 1;

// If is_optional is not set and the plugin defined by this message is not
// a supported type, the containing resource is NACKed. If is_optional is
// set, the resource would not be NACKed for this reason. In this case,
// routes referencing this plugin's name would not be treated as an illegal
// configuration, but would result in a failure if the route is selected.
bool is_optional = 2;
}

// [#protodoc-title: HTTP route configuration]
Expand Down
56 changes: 49 additions & 7 deletions test/cpp/end2end/xds/xds_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7776,12 +7776,8 @@ TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUndefinedSpecifier) {
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_XDS_RLS_LB");
}

TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProto) {
// TODO(donnadionne): Doug is working on adding a new is_optional field to
// ClusterSpecifierPlugin in envoyproxy/envoy#20301. Once that goes in, the
// behavior we want in this case is that if is_optional is true, then we
// ignore that plugin and ignore any routes that refer to that plugin.
// However, if is_optional is false, then we want to NACK.
TEST_P(RlsTest,
XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProtoNotOptional) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_RLS_LB", "true");
const char* kNewClusterName = "new_cluster";
const char* kNewEdsServiceName = "new_eds_service_name";
Expand Down Expand Up @@ -7821,7 +7817,53 @@ TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProto) {
EXPECT_THAT(
response_state->error_message,
::testing::HasSubstr(
"Unable to locate the cluster specifier plugin in the registry"));
// TODO: change this back once is_optional is no longer hard-coded.
//"Unable to locate the cluster specifier plugin in the registry"));
"No valid routes specified."));
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_XDS_RLS_LB");
}

TEST_P(RlsTest,
XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProtoOptional) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_RLS_LB", "true");
const char* kNewClusterName = "new_cluster";
const char* kNewEdsServiceName = "new_eds_service_name";
// Populate new EDS resources.
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
EdsResourceArgs args1({
{"locality0", CreateEndpointsForBackends(1, 2)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsServiceName));
// Populate new CDS resources.
Cluster new_cluster = default_cluster_;
new_cluster.set_name(kNewClusterName);
new_cluster.mutable_eds_cluster_config()->set_service_name(
kNewEdsServiceName);
balancer_->ads_service()->SetCdsResource(new_cluster);
// Prepare the RLSLookupConfig: change route configurations to use cluster
// specifier plugin.
RouteLookupConfig route_lookup_config;
RouteConfiguration new_route_config = default_route_config_;
auto* plugin = new_route_config.add_cluster_specifier_plugins();
plugin->mutable_extension()->set_name(kRlsClusterSpecifierPluginInstanceName);
// Instead of grpc.lookup.v1.RouteLookupClusterSpecifier, let's say we
// mistakenly packed the inner RouteLookupConfig instead.
plugin->mutable_extension()->mutable_typed_config()->PackFrom(
route_lookup_config);
plugin->set_is_optional(true);
auto* default_route =
new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
default_route->mutable_route()->set_cluster_specifier_plugin(
kRlsClusterSpecifierPluginInstanceName);
SetRouteConfiguration(balancer_.get(), new_route_config);
const auto response_state = WaitForRdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr("No valid routes specified."));
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_XDS_RLS_LB");
}

Expand Down