Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Reference clusters for ConfigSelector lifetime #2883

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 79 additions & 29 deletions packages/grpc-js-xds/src/resolver-xds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ const RETRY_CODES: {[key: string]: status} = {
export const XDS_CONFIG_KEY = `${experimental.SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX}.xds_config`;
export const XDS_CLIENT_KEY = 'grpc.internal.xds_client';

/**
* Tracks a dynamic subscription to a cluster that is currently or previously
* referenced in a RouteConfiguration.
*/
class ClusterRef {
private refCount = 0;
constructor(private unsubscribe: () => void) {}

ref() {
this.refCount += 1;
}

unref() {
this.refCount -= 1;
if (this.refCount <= 0) {
this.unsubscribe();
}
}

hasRef() {
return this.refCount > 0;
}
}

class XdsResolver implements Resolver {

private listenerResourceName: string | null = null;
Expand All @@ -93,6 +117,7 @@ class XdsResolver implements Resolver {

private xdsConfigWatcher: XdsConfigWatcher;
private xdsDependencyManager: XdsDependencyManager | null = null;
private clusterRefs: Map<string, ClusterRef> = new Map();

constructor(
private target: GrpcUri,
Expand Down Expand Up @@ -123,11 +148,20 @@ class XdsResolver implements Resolver {
}
}

private pruneUnusedClusters() {
for (const [cluster, clusterRef] of this.clusterRefs) {
if (!clusterRef.hasRef()) {
this.clusterRefs.delete(cluster);
}
}
}

private async handleXdsConfig(xdsConfig: XdsConfig) {
/* We need to load the xxhash API before this function finishes, because
* it is invoked in the config selector, which can be called immediately
* after this function returns. */
await loadXxhashApi();
this.pruneUnusedClusters();
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, xdsConfig.listener.api_listener!.api_listener!.value);
const configDefaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout;
let defaultTimeout: Duration | undefined = undefined;
Expand Down Expand Up @@ -312,44 +346,60 @@ class XdsResolver implements Resolver {
const routeMatcher = getPredicateForMatcher(route.match!);
matchList.push({matcher: routeMatcher, action: routeAction});
}
const configSelector: ConfigSelector = (methodName, metadata, channelId) => {
for (const {matcher, action} of matchList) {
if (matcher.apply(methodName, metadata)) {
const clusterResult = action.getCluster();
const unrefCluster = this.xdsDependencyManager!.addClusterSubscription(clusterResult.name);
const onCommitted = () => {
unrefCluster();
}
let hash: string;
if (EXPERIMENTAL_RING_HASH) {
hash = `${action.getHash(metadata, channelId)}`;
} else {
hash = '';
for (const cluster of allConfigClusters) {
let clusterRef = this.clusterRefs.get(cluster);
if (!clusterRef) {
clusterRef = new ClusterRef(this.xdsDependencyManager!.addClusterSubscription(cluster));
this.clusterRefs.set(cluster, clusterRef);
}
clusterRef.ref();
}
const configSelector: ConfigSelector = {
invoke: (methodName, metadata, channelId) => {
for (const {matcher, action} of matchList) {
if (matcher.apply(methodName, metadata)) {
const clusterResult = action.getCluster();
const clusterRef = this.clusterRefs.get(clusterResult.name)!;
clusterRef.ref();
const onCommitted = () => {
clusterRef.unref();
}
let hash: string;
if (EXPERIMENTAL_RING_HASH) {
hash = `${action.getHash(metadata, channelId)}`;
} else {
hash = '';
}
return {
methodConfig: clusterResult.methodConfig,
onCommitted: onCommitted,
pickInformation: {cluster: clusterResult.name, hash: hash},
status: status.OK,
dynamicFilterFactories: clusterResult.dynamicFilterFactories
};
}
return {
methodConfig: clusterResult.methodConfig,
onCommitted: onCommitted,
pickInformation: {cluster: clusterResult.name, hash: hash},
status: status.OK,
dynamicFilterFactories: clusterResult.dynamicFilterFactories
};
}
return {
methodConfig: {name: []},
// These fields won't be used here, but they're set because of some TypeScript weirdness
pickInformation: {cluster: '', hash: ''},
status: status.UNAVAILABLE,
dynamicFilterFactories: []
};
},
unref: () => {
for (const cluster of allConfigClusters) {
this.clusterRefs.get(cluster)?.unref();
}
}
return {
methodConfig: {name: []},
// These fields won't be used here, but they're set because of some TypeScript weirdness
pickInformation: {cluster: '', hash: ''},
status: status.UNAVAILABLE,
dynamicFilterFactories: []
};
};
}
trace('Created ConfigSelector with configuration:');
for (const {matcher, action} of matchList) {
trace(matcher.toString());
trace('=> ' + action.toString());
}
const clusterConfigMap: {[key: string]: {child_policy: LoadBalancingConfig[]}} = {};
for (const clusterName of allConfigClusters) {
for (const clusterName of this.clusterRefs.keys()) {
clusterConfigMap[clusterName] = {child_policy: [{cds: {cluster: clusterName}}]};
}
const lbPolicyConfig = {xds_cluster_manager: {children: clusterConfigMap}};
Expand Down
5 changes: 4 additions & 1 deletion packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ export class InternalChannel {
'Address resolution succeeded'
);
}
this.configSelector?.unref();
this.configSelector = configSelector;
this.currentResolutionError = null;
/* We process the queue asynchronously to ensure that the corresponding
Expand Down Expand Up @@ -568,7 +569,7 @@ export class InternalChannel {
if (this.configSelector) {
return {
type: 'SUCCESS',
config: this.configSelector(method, metadata, this.randomChannelId),
config: this.configSelector.invoke(method, metadata, this.randomChannelId),
};
} else {
if (this.currentResolutionError) {
Expand Down Expand Up @@ -790,6 +791,8 @@ export class InternalChannel {
}

this.subchannelPool.unrefUnusedSubchannels();
this.configSelector?.unref();
this.configSelector = null;
}

getTarget() {
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js/src/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ export interface CallConfig {
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc
*/
export interface ConfigSelector {
(methodName: string, metadata: Metadata, channelId: number): CallConfig;
invoke(methodName: string, metadata: Metadata, channelId: number): CallConfig;
unref(): void;
}

/**
Expand Down
74 changes: 39 additions & 35 deletions packages/grpc-js/src/resolving-load-balancer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,43 +103,46 @@ function findMatchingConfig(
function getDefaultConfigSelector(
serviceConfig: ServiceConfig | null
): ConfigSelector {
return function defaultConfigSelector(
methodName: string,
metadata: Metadata
) {
const splitName = methodName.split('/').filter(x => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
/* Check for the following in order, and return the first method
* config that matches:
* 1. A name that exactly matches the service and method
* 2. A name with no method set that matches the service
* 3. An empty name
*/
for (const matchLevel of NAME_MATCH_LEVEL_ORDER) {
const matchingConfig = findMatchingConfig(
service,
method,
serviceConfig.methodConfig,
matchLevel
);
if (matchingConfig) {
return {
methodConfig: matchingConfig,
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
};
return {
invoke(
methodName: string,
metadata: Metadata
) {
const splitName = methodName.split('/').filter(x => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
/* Check for the following in order, and return the first method
* config that matches:
* 1. A name that exactly matches the service and method
* 2. A name with no method set that matches the service
* 3. An empty name
*/
for (const matchLevel of NAME_MATCH_LEVEL_ORDER) {
const matchingConfig = findMatchingConfig(
service,
method,
serviceConfig.methodConfig,
matchLevel
);
if (matchingConfig) {
return {
methodConfig: matchingConfig,
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
};
}
}
}
}
return {
methodConfig: { name: [] },
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
};
return {
methodConfig: { name: [] },
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
};
},
unref() {}
};
}

Expand Down Expand Up @@ -298,6 +301,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
configSelector?.unref();
return;
}
this.childLoadBalancer.updateAddressList(
Expand Down
Loading