Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions packages/grpc-js-xds/interop/xds-interop-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,36 @@ function getIPv6Addresses(): string[] {
return ipv6Addresses;
}

interface ConfiguredMetrics {
qps?: number;
applicationUtilization?: number;
eps?: number;
}

function createInBandMetricsInterceptor(metrics: ConfiguredMetrics) {
return function inBandMetricsInterceptor(methodDescriptor: grpc.ServerMethodDefinition<any, any>, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall {
const metricsRecorder = call.getMetricsRecorder()
if (metrics.qps) {
metricsRecorder.recordQpsMetric(metrics.qps);
}
if (metrics.applicationUtilization) {
metricsRecorder.recordApplicationUtilizationMetric(metrics.applicationUtilization);
}
if (metrics.eps) {
metricsRecorder.recordEpsMetric(metrics.eps);
}
return new grpc.ServerInterceptingCall(call);
}
}

async function main() {
const argv = yargs
.string(['port', 'maintenance_port', 'address_type', 'secure_mode'])
.string(['port', 'maintenance_port', 'address_type', 'secure_mode', 'metrics_mode'])
.number(['qps', 'application_utilization', 'eps'])
.demandOption(['port'])
.default('address_type', 'IPV4_IPV6')
.default('secure_mode', 'false')
.default('metrics_mode', 'NONE')
.parse()
console.log('Starting xDS interop server. Args: ', argv);
const healthImpl = new HealthImplementation({'': 'NOT_SERVING'});
Expand All @@ -253,7 +277,28 @@ async function main() {
}
const reflection = new ReflectionService(packageDefinition, {
services: ['grpc.testing.TestService']
})
});
let metricInterceptor: grpc.ServerInterceptor | null = null;
const metricsMode = argv.metrics_mode.toUpperCase();
let metricRecorder: grpc.ServerMetricRecorder | null = null;
if (metricsMode === 'IN_BAND') {
metricInterceptor = createInBandMetricsInterceptor({
qps: argv.qps,
applicationUtilization: argv.application_utilization,
eps: argv.eps
});
} else if (metricsMode === 'OUT_OF_BAND') {
metricRecorder = new grpc.ServerMetricRecorder();
if (argv.qps) {
metricRecorder.setQpsMetric(argv.qps);
}
if (argv.application_utilization) {
metricRecorder.setApplicationUtilizationMetric(argv.application_utilization);
}
if (argv.eps) {
metricRecorder.setEpsMetric(argv.eps);
}
}
const addressType = argv.address_type.toUpperCase();
const secureMode = argv.secure_mode.toLowerCase() == 'true';
if (secureMode) {
Expand All @@ -266,19 +311,33 @@ async function main() {
reflection.addToServer(maintenanceServer);
grpc.addAdminServicesToServer(maintenanceServer);

const server = new grpc_xds.XdsServer({interceptors: [testInfoInterceptor]});
const interceptorList = [testInfoInterceptor];
if (metricInterceptor) {
interceptorList.push(metricInterceptor);
}
const server = new grpc_xds.XdsServer({interceptors: interceptorList});
server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler);
if (metricRecorder) {
metricRecorder.addToServer(server);
}
const xdsCreds = new grpc_xds.XdsServerCredentials(grpc.ServerCredentials.createInsecure());
await Promise.all([
serverBindPromise(maintenanceServer, `[::]:${argv.maintenance_port}`, grpc.ServerCredentials.createInsecure()),
serverBindPromise(server, `0.0.0.0:${argv.port}`, xdsCreds)
]);
} else {
const interceptorList = [unifiedInterceptor];
if (metricInterceptor) {
interceptorList.push(metricInterceptor);
}
const server = new grpc.Server({interceptors: [unifiedInterceptor]});
server.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl);
healthImpl.addToServer(server);
reflection.addToServer(server);
grpc.addAdminServicesToServer(server);
if (metricRecorder) {
metricRecorder.addToServer(server);
}
server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler);
const creds = grpc.ServerCredentials.createInsecure();
switch (addressType) {
Expand Down
Loading