diff --git a/gulpfile.ts b/gulpfile.ts index f570b5ad3..d0a416ddb 100644 --- a/gulpfile.ts +++ b/gulpfile.ts @@ -23,7 +23,7 @@ import * as reflection from './packages/grpc-reflection/gulpfile'; import * as protobuf from './packages/proto-loader/gulpfile'; import * as internalTest from './test/gulpfile'; -const installAll = gulp.series(protobuf.install, jsCore.install, healthCheck.install, internalTest.install, jsXds.install, reflection.install); +const installAll = gulp.series(protobuf.install, jsCore.install, healthCheck.install, reflection.install, internalTest.install, jsXds.install); const lint = gulp.parallel(jsCore.lint); diff --git a/packages/grpc-health-check/package.json b/packages/grpc-health-check/package.json index 181057d6d..36d3b1059 100644 --- a/packages/grpc-health-check/package.json +++ b/packages/grpc-health-check/package.json @@ -35,6 +35,7 @@ "license": "Apache-2.0", "devDependencies": { "@grpc/grpc-js": "file:../grpc-js", + "@types/mocha": "^10.0.10", "typescript": "^5.2.2" } } diff --git a/packages/grpc-js-xds/interop/Dockerfile b/packages/grpc-js-xds/interop/test-client.Dockerfile similarity index 80% rename from packages/grpc-js-xds/interop/Dockerfile rename to packages/grpc-js-xds/interop/test-client.Dockerfile index 98f3cbad6..8ef6105f4 100644 --- a/packages/grpc-js-xds/interop/Dockerfile +++ b/packages/grpc-js-xds/interop/test-client.Dockerfile @@ -14,7 +14,7 @@ # Dockerfile for building the xDS interop client. To build the image, run the # following command from grpc-node directory: -# docker build -t -f packages/grpc-js-xds/interop/Dockerfile . +# docker build -t -f packages/grpc-js-xds/interop/test-client.Dockerfile . FROM node:18-slim as build @@ -26,16 +26,23 @@ WORKDIR /node/src/grpc-node/packages/proto-loader RUN npm install WORKDIR /node/src/grpc-node/packages/grpc-js RUN npm install +WORKDIR /node/src/grpc-node/packages/grpc-health-check +RUN npm install +WORKDIR /node/src/grpc-node/packages/grpc-reflection +RUN npm install WORKDIR /node/src/grpc-node/packages/grpc-js-xds RUN npm install FROM gcr.io/distroless/nodejs18-debian11:latest WORKDIR /node/src/grpc-node COPY --from=build /node/src/grpc-node/packages/proto-loader ./packages/proto-loader/ +COPY --from=build /node/src/grpc-node/packages/grpc-health-check ./packages/grpc-health-check/ +COPY --from=build /node/src/grpc-node/packages/grpc-reflection ./packages/grpc-reflection/ COPY --from=build /node/src/grpc-node/packages/grpc-js ./packages/grpc-js/ COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xds/ ENV GRPC_VERBOSITY="DEBUG" ENV GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager,cds_balancer,xds_cluster_resolver,xds_cluster_impl,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds,outlier_detection,server,server_call,ring_hash +ENV NODE_XDS_INTEROP_VERBOSITY=1 ENTRYPOINT [ "/nodejs/bin/node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client" ] diff --git a/packages/grpc-js-xds/interop/test-server.Dockerfile b/packages/grpc-js-xds/interop/test-server.Dockerfile new file mode 100644 index 000000000..59fdc7007 --- /dev/null +++ b/packages/grpc-js-xds/interop/test-server.Dockerfile @@ -0,0 +1,54 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Dockerfile for building the xDS interop client. To build the image, run the +# following command from grpc-node directory: +# docker build -t -f packages/grpc-js-xds/interop/test-server.Dockerfile . + +FROM node:18-slim as build + +# Make a grpc-node directory and copy the repo into it. +WORKDIR /node/src/grpc-node +COPY . . + +WORKDIR /node/src/grpc-node/packages/proto-loader +RUN npm install +WORKDIR /node/src/grpc-node/packages/grpc-js +RUN npm install +WORKDIR /node/src/grpc-node/packages/grpc-health-check +RUN npm install +WORKDIR /node/src/grpc-node/packages/grpc-reflection +RUN npm install +WORKDIR /node/src/grpc-node/packages/grpc-js-xds +RUN npm install + +ENV TINI_VERSION v0.19.0 +ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini +RUN chmod +x /tini + +FROM gcr.io/distroless/nodejs18-debian11:latest +WORKDIR /node/src/grpc-node +COPY --from=build /node/src/grpc-node/packages/proto-loader ./packages/proto-loader/ +COPY --from=build /node/src/grpc-node/packages/grpc-health-check ./packages/grpc-health-check/ +COPY --from=build /node/src/grpc-node/packages/grpc-reflection ./packages/grpc-reflection/ +COPY --from=build /node/src/grpc-node/packages/grpc-js ./packages/grpc-js/ +COPY --from=build /node/src/grpc-node/packages/grpc-js-xds ./packages/grpc-js-xds/ + +ENV GRPC_VERBOSITY="DEBUG" +ENV GRPC_TRACE=xds_client,server,xds_server + +# tini serves as PID 1 and enables the server to properly respond to signals. +COPY --from=build /tini /tini + +ENTRYPOINT [ "/tini", "-g", "-vv", "--", "/nodejs/bin/node", "/node/src/grpc-node/packages/grpc-js-xds/build/interop/xds-interop-server" ] diff --git a/packages/grpc-js-xds/interop/xds-interop-server.ts b/packages/grpc-js-xds/interop/xds-interop-server.ts new file mode 100644 index 000000000..5ec772365 --- /dev/null +++ b/packages/grpc-js-xds/interop/xds-interop-server.ts @@ -0,0 +1,308 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as grpc from '@grpc/grpc-js'; + +import * as grpc_xds from '../src'; + +import { ProtoGrpcType } from './generated/test'; + +import * as protoLoader from '@grpc/proto-loader'; +import * as yargs from 'yargs'; +import * as os from 'os'; +import { HealthImplementation } from 'grpc-health-check'; +import { Empty__Output, Empty } from './generated/grpc/testing/Empty'; +import { SimpleRequest__Output } from './generated/grpc/testing/SimpleRequest'; +import { SimpleResponse } from './generated/grpc/testing/SimpleResponse'; +import { ReflectionService } from '@grpc/reflection'; + +const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', { + keepCase: true, + defaults: true, + oneofs: true, + json: true, + longs: String, + enums: String, + includeDirs: [__dirname + '/../../proto'] +}); + +const loadedProto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType; + +function setAsyncTimeout(delayMs: number): Promise { + return new Promise(resolve => { + setTimeout(() => { + resolve(); + }, delayMs); + }); +} + +const HOSTNAME = os.hostname(); + +const TEST_SERVICE_NAME = '/grpc.testing.TestService/'; + +function testInfoInterceptor(methodDescriptor: grpc.ServerMethodDefinition, call: grpc.ServerInterceptingCallInterface) { + const listener: grpc.ServerListener = { + onReceiveMetadata: async (metadata, next) => { + let attemptNum = 0; + const attemptNumHeader = metadata.get('grpc-previous-rpc-attempts'); + if (attemptNumHeader.length > 0) { + attemptNum = Number(attemptNumHeader[0]); + if (Number.isNaN(attemptNum)) { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: 'Invalid format for grpc-previous-rpc-attempts header: ' + attemptNumHeader[0] + }); + return; + } + } + const rpcBehavior = metadata.get('rpc-behavior').filter(v => typeof v === 'string').join(','); + for (const value of rpcBehavior.split(',')) { + let behaviorEntry: string; + if (value.startsWith('hostname=')) { + const splitValue = value.split(' '); + if (splitValue.length > 1) { + if (splitValue[0].substring('hostname='.length) !== HOSTNAME) { + continue; + } + behaviorEntry = splitValue[1]; + } else { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: 'Invalid format for rpc-behavior header: ' + value + }); + return; + } + } else { + behaviorEntry = value; + } + if (behaviorEntry.startsWith('sleep-')) { + const delaySec = Number(behaviorEntry.substring('sleep-'.length)); + if (Number.isNaN(delaySec)) { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: 'Invalid format for rpc-behavior header: ' + value + }); + return; + } + await setAsyncTimeout(delaySec * 1000); + } + if (behaviorEntry === 'keep-open') { + return; + } + if (behaviorEntry.startsWith('error-code-')) { + const errorCode = Number(behaviorEntry.substring('error-code-'.length)); + if (Number.isNaN(errorCode)) { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: 'Invalid format for rpc-behavior header: ' + value + }); + return; + } + call.sendStatus({ + code: errorCode, + details: 'RPC failed as directed by rpc-behavior header value ' + value + }); + return; + } + if (behaviorEntry.startsWith('succeed-on-retry-attempt-')) { + const targetAttempt = Number(behaviorEntry.substring('succeed-on-retry-attempt-'.length)); + if (Number.isNaN(targetAttempt)) { + call.sendStatus({ + code: grpc.status.INVALID_ARGUMENT, + details: 'Invalid format for rpc-behavior header: ' + value + }); + return; + } + if (attemptNum === targetAttempt) { + next(metadata); + return; + } + } + } + next(metadata); + } + }; + const responder: grpc.Responder = { + start: next => { + next(listener); + }, + sendMetadata: (metadata, next) => { + metadata.add('hostname', HOSTNAME); + next(metadata); + } + } + return new grpc.ServerInterceptingCall(call, responder); +}; + +function adminServiceInterceptor(methodDescriptor: grpc.ServerMethodDefinition, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall { + const listener: grpc.ServerListener = { + onReceiveMessage: (message, next) => { + console.log(`Received request to method ${methodDescriptor.path}: ${JSON.stringify(message)}`); + next(message); + } + } + const responder: grpc.Responder = { + start: next => { + next(listener); + } + }; + return new grpc.ServerInterceptingCall(call, responder); +} + +function unifiedInterceptor(methodDescriptor: grpc.ServerMethodDefinition, call: grpc.ServerInterceptingCallInterface): grpc.ServerInterceptingCall { + if (methodDescriptor.path.startsWith(TEST_SERVICE_NAME)) { + return testInfoInterceptor(methodDescriptor, call); + } else { + return adminServiceInterceptor(methodDescriptor, call); + } +} + +const testServiceHandler = { + EmptyCall: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { + callback(null, {}); + }, + UnaryCall: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { + callback(null, { + hostname: HOSTNAME, + payload: { + body: Buffer.from('0'.repeat(call.request.response_size)) + } + }); + } +}; + +function serverBindPromise(server: grpc.Server, port: string, credentials: grpc.ServerCredentials): Promise { + return new Promise((resolve, reject) => { + server.bindAsync(port, credentials, (error, port) => { + if (error) { + reject(error); + } else { + resolve(port); + } + }) + }) +} + +function getIPv4Address(): string | null { + for (const [name, addressList] of Object.entries(os.networkInterfaces())) { + if (name === 'lo' || !addressList) { + continue; + } + for (const address of addressList) { + if (address.family === 'IPv4') { + return address.address; + } + } + } + return null; +} + +function getIPv6Addresses(): string[] { + const ipv6Addresses: string[] = []; + for (const [name, addressList] of Object.entries(os.networkInterfaces())) { + if (name === 'lo' || !addressList) { + continue; + } + for (const address of addressList) { + if (address.family === 'IPv6') { + ipv6Addresses.push(address.address); + } + } + } + return ipv6Addresses; +} + +async function main() { + const argv = yargs + .string(['port', 'maintenance_port', 'address_type']) + .boolean(['secure_mode']) + .demandOption(['port']) + .default('address_type', 'IPV4_IPV6') + .default('secure_mode', false) + .parse() + console.log('Starting xDS interop server. Args: ', argv); + const healthImpl = new HealthImplementation({'': 'NOT_SERVING'}); + const xdsUpdateHealthServiceImpl = { + SetServing(call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) { + healthImpl.setStatus('', 'SERVING'); + callback(null, {}); + }, + SetNotServing(call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) { + healthImpl.setStatus('', 'NOT_SERVING'); + callback(null, {}); + } + } + const reflection = new ReflectionService(packageDefinition, { + services: ['grpc.testing.TestService'] + }) + const addressType = argv.address_type.toUpperCase(); + if (argv.secure_mode) { + if (addressType !== 'IPV4_IPV6') { + throw new Error('Secure mode only supports IPV4_IPV6 address type'); + } + const maintenanceServer = new grpc.Server({interceptors: [adminServiceInterceptor]}); + maintenanceServer.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl) + healthImpl.addToServer(maintenanceServer); + reflection.addToServer(maintenanceServer); + grpc.addAdminServicesToServer(maintenanceServer); + + const server = new grpc_xds.XdsServer({interceptors: [testInfoInterceptor]}); + server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler); + const xdsCreds = new grpc_xds.XdsServerCredentials(grpc.ServerCredentials.createInsecure()); + await Promise.all([ + serverBindPromise(maintenanceServer, `[::]:${argv.maintenance_port}`, grpc.ServerCredentials.createInsecure()), + serverBindPromise(server, `[::]:${argv.port}`, xdsCreds) + ]); + } else { + const server = new grpc.Server({interceptors: [unifiedInterceptor]}); + server.addService(loadedProto.grpc.testing.XdsUpdateHealthService.service, xdsUpdateHealthServiceImpl); + healthImpl.addToServer(server); + reflection.addToServer(server); + grpc.addAdminServicesToServer(server); + server.addService(loadedProto.grpc.testing.TestService.service, testServiceHandler); + const creds = grpc.ServerCredentials.createInsecure(); + switch (addressType) { + case 'IPV4_IPV6': + await serverBindPromise(server, `[::]:${argv.port}`, creds); + break; + case 'IPV4': + await serverBindPromise(server, `127.0.0.1:${argv.port}`, creds); + const address = getIPv4Address(); + if (address) { + await serverBindPromise(server, `${address}:${argv.port}`, creds); + } + break; + case 'IPV6': + await serverBindPromise(server, `[::1]:${argv.port}`, creds); + for (const address of getIPv6Addresses()) { + try { + await serverBindPromise(server, `[${address}]:${argv.port}`, creds); + } catch (e) { + console.log(`Binding ${address} failed with error ${(e as Error).message}`); + } + } + break; + default: + throw new Error(`Unknown address type: ${argv.address_type}`); + } + } + healthImpl.setStatus('', 'SERVING'); +} + +if (require.main === module) { + main(); +} diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index 1e35ac828..5ad27db90 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -38,8 +38,10 @@ "@types/gulp-mocha": "0.0.32", "@types/mocha": "^5.2.6", "@types/node": ">=20.11.20", + "@grpc/reflection": "file:../grpc-reflection", "@types/yargs": "^15.0.5", "find-free-ports": "^3.1.1", + "grpc-health-check": "file:../grpc-health-check", "gts": "^5.0.1", "ncp": "^2.0.0", "typescript": "^5.1.3", diff --git a/packages/grpc-js-xds/scripts/psm-interop-build-node.sh b/packages/grpc-js-xds/scripts/psm-interop-build-node.sh index d52206f0e..49812ace2 100755 --- a/packages/grpc-js-xds/scripts/psm-interop-build-node.sh +++ b/packages/grpc-js-xds/scripts/psm-interop-build-node.sh @@ -28,11 +28,12 @@ set -eo pipefail # Writes the output of docker image build stdout, stderr ####################################### psm::lang::build_docker_images() { - local client_dockerfile="packages/grpc-js-xds/interop/Dockerfile" + local client_dockerfile="packages/grpc-js-xds/interop/test-client.Dockerfile" + local server_dockerfile="packages/grpc-js-xds/interop/test-server.Dockerfile" cd "${SRC_DIR}" psm::tools::run_verbose git submodule update --init --recursive psm::tools::run_verbose git submodule status - psm::build::docker_images_generic "${client_dockerfile}" + psm::build::docker_images_generic "${client_dockerfile}" "${server_dockerfile}" } diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index e9f2f6966..e7001db1f 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -44,6 +44,7 @@ import { makeClientConstructor, MethodDefinition, Serialize, + ServerMethodDefinition, ServiceDefinition, } from './make-client'; import { Metadata, MetadataOptions, MetadataValue } from './metadata'; @@ -181,6 +182,7 @@ export { ServerWritableStream, ServerDuplexStream, ServerErrorResponse, + ServerMethodDefinition, ServiceDefinition, UntypedHandleCall, UntypedServiceImplementation, diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index 8e45a8082..518b30ea6 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -337,6 +337,7 @@ export interface ServerInterceptingCallInterface { export class ServerInterceptingCall implements ServerInterceptingCallInterface { private responder: FullResponder; private processingMetadata = false; + private sentMetadata = false; private processingMessage = false; private pendingMessage: any = null; private pendingMessageCallback: (() => void) | null = null; @@ -395,6 +396,7 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { } sendMetadata(metadata: Metadata): void { this.processingMetadata = true; + this.sentMetadata = true; this.responder.sendMetadata(metadata, interceptedMetadata => { this.processingMetadata = false; this.nextCall.sendMetadata(interceptedMetadata); @@ -404,6 +406,9 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { } sendMessage(message: any, callback: () => void): void { this.processingMessage = true; + if (!this.sentMetadata) { + this.sendMetadata(new Metadata()); + } this.responder.sendMessage(message, interceptedMessage => { this.processingMessage = false; if (this.processingMetadata) { diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index cb60943ce..b187765b8 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -99,6 +99,10 @@ const { HTTP2_HEADER_PATH } = http2.constants; const TRACER_NAME = 'server'; const kMaxAge = Buffer.from('max_age'); +function serverCallTrace(text: string) { + logging.trace(LogVerbosity.DEBUG, 'server_call', text); +} + type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer; interface BindResult { @@ -1248,7 +1252,7 @@ export class Server { } private _retrieveHandler(path: string): Handler | null { - this.trace( + serverCallTrace( 'Received call to method ' + path + ' at address ' + @@ -1258,7 +1262,7 @@ export class Server { const handler = this.handlers.get(path); if (handler === undefined) { - this.trace( + serverCallTrace( 'No handler registered for method ' + path + '. Sending UNIMPLEMENTED status.' diff --git a/packages/grpc-js/test/test-server-interceptors.ts b/packages/grpc-js/test/test-server-interceptors.ts index 5d4038599..4a4abb6fa 100644 --- a/packages/grpc-js/test/test-server-interceptors.ts +++ b/packages/grpc-js/test/test-server-interceptors.ts @@ -186,7 +186,6 @@ describe('Server interceptors', () => { call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData ) => { - call.sendMetadata(new grpc.Metadata()); callback(null, call.request); }, }); diff --git a/packages/grpc-reflection/package.json b/packages/grpc-reflection/package.json index af96dfd9d..d423b0d44 100644 --- a/packages/grpc-reflection/package.json +++ b/packages/grpc-reflection/package.json @@ -39,6 +39,7 @@ }, "devDependencies": { "@grpc/grpc-js": "file:../grpc-js", + "@types/mocha": "^10.0.10", "copyfiles": "^2.4.1", "typescript": "^5.2.2" }