diff --git a/src/messaging/messaging.ts b/src/messaging/messaging.ts index 1c1e2a5887..8e61cd679a 100644 --- a/src/messaging/messaging.ts +++ b/src/messaging/messaging.ts @@ -17,7 +17,9 @@ import { App } from '../app'; import { deepCopy } from '../utils/deep-copy'; -import { ErrorInfo, MessagingClientErrorCode, FirebaseMessagingError } from '../utils/error'; +import { + ErrorInfo, MessagingClientErrorCode, FirebaseMessagingError, FirebaseMessagingSessionError +} from '../utils/error'; import * as utils from '../utils'; import * as validator from '../utils/validator'; import { validateMessage } from './messaging-internal'; @@ -206,48 +208,71 @@ export class Messaging { MessagingClientErrorCode.INVALID_ARGUMENT, 'dryRun must be a boolean'); } - const http2SessionHandler = this.useLegacyTransport ? undefined : new Http2SessionHandler(`https://${FCM_SEND_HOST}`) + const http2SessionHandler = this.useLegacyTransport ? undefined : new Http2SessionHandler(`https://${FCM_SEND_HOST}`); return this.getUrlPath() .then((urlPath) => { - const requests: Promise[] = copy.map(async (message) => { - validateMessage(message); - const request: { message: Message; validate_only?: boolean } = { message }; - if (dryRun) { - request.validate_only = true; - } - - if (http2SessionHandler){ - return this.messagingRequestHandler.invokeHttp2RequestHandlerForSendResponse( - FCM_SEND_HOST, urlPath, request, http2SessionHandler); - } - return this.messagingRequestHandler.invokeHttpRequestHandlerForSendResponse(FCM_SEND_HOST, urlPath, request); - }); - return Promise.allSettled(requests); - }) - .then((results) => { - const responses: SendResponse[] = []; - results.forEach(result => { - if (result.status === 'fulfilled') { - responses.push(result.value); - } else { // rejected - responses.push({ success: false, error: result.reason }) - } - }) - const successCount: number = responses.filter((resp) => resp.success).length; - return { - responses, - successCount, - failureCount: responses.length - successCount, - }; + if (http2SessionHandler) { + let sendResponsePromise: Promise[]>; + return new Promise((resolve: (result: PromiseSettledResult[]) => void, reject) => { + // Start session listeners + http2SessionHandler.invoke().catch((error) => { + const pendingBatchResponse = + sendResponsePromise ? sendResponsePromise.then(this.parseSendResponses) : undefined; + reject(new FirebaseMessagingSessionError(error, undefined, pendingBatchResponse)); + }); + + // Start making requests + const requests: Promise[] = copy.map(async (message) => { + validateMessage(message); + const request: { message: Message; validate_only?: boolean; } = { message }; + if (dryRun) { + request.validate_only = true; + } + return this.messagingRequestHandler.invokeHttp2RequestHandlerForSendResponse( + FCM_SEND_HOST, urlPath, request, http2SessionHandler); + }); + + // Resolve once all requests have completed + sendResponsePromise = Promise.allSettled(requests); + sendResponsePromise.then(resolve); + }); + } else { + const requests: Promise[] = copy.map(async (message) => { + validateMessage(message); + const request: { message: Message; validate_only?: boolean; } = { message }; + if (dryRun) { + request.validate_only = true; + } + return this.messagingRequestHandler.invokeHttpRequestHandlerForSendResponse( + FCM_SEND_HOST, urlPath, request); + }); + return Promise.allSettled(requests); + } }) + .then(this.parseSendResponses) .finally(() => { - if (http2SessionHandler){ - http2SessionHandler.close() - } + http2SessionHandler?.close(); }); } + private parseSendResponses(results: PromiseSettledResult[]): BatchResponse { + const responses: SendResponse[] = []; + results.forEach(result => { + if (result.status === 'fulfilled') { + responses.push(result.value); + } else { // rejected + responses.push({ success: false, error: result.reason }); + } + }); + const successCount: number = responses.filter((resp) => resp.success).length; + return { + responses, + successCount, + failureCount: responses.length - successCount, + }; + } + /** * Sends the given multicast message to all the FCM registration tokens * specified in it. diff --git a/src/utils/api-request.ts b/src/utils/api-request.ts index c5e284e414..ca3140a27d 100644 --- a/src/utils/api-request.ts +++ b/src/utils/api-request.ts @@ -1054,6 +1054,7 @@ class Http2RequestConfigImpl extends BaseRequestConfigImpl implements Http2Reque public buildRequestOptions(): https.RequestOptions { const parsed = this.buildUrl(); + // TODO(b/401051826) const protocol = parsed.protocol; return { @@ -1315,9 +1316,16 @@ export class ExponentialBackoffPoller extends EventEmitter { export class Http2SessionHandler { private http2Session: http2.ClientHttp2Session + protected promise: Promise + protected resolve: () => void; + protected reject: (_: any) => void; constructor(url: string){ - this.http2Session = this.createSession(url) + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + this.http2Session = this.createSession(url) + }); } public createSession(url: string): http2.ClientHttp2Session { @@ -1330,23 +1338,32 @@ export class Http2SessionHandler { const http2Session = http2.connect(url, opts) http2Session.on('goaway', (errorCode, _, opaqueData) => { - throw new FirebaseAppError( + this.reject(new FirebaseAppError( AppErrorCodes.NETWORK_ERROR, - `Error while making requests: GOAWAY - ${opaqueData.toString()}, Error code: ${errorCode}` - ); + `Error while making requests: GOAWAY - ${opaqueData?.toString()}, Error code: ${errorCode}` + )); }) http2Session.on('error', (error) => { - throw new FirebaseAppError( + this.reject(new FirebaseAppError( AppErrorCodes.NETWORK_ERROR, - `Error while making requests: ${error}` - ); + `Session error while making requests: ${error}` + )); }) + + http2Session.on('close', () => { + // Resolve current promise + this.resolve() + }); return http2Session } return this.http2Session } + public invoke(): Promise { + return this.promise + } + get session(): http2.ClientHttp2Session { return this.http2Session } diff --git a/src/utils/error.ts b/src/utils/error.ts index cf3736adcb..a80254b5b8 100644 --- a/src/utils/error.ts +++ b/src/utils/error.ts @@ -16,6 +16,7 @@ */ import { FirebaseError as FirebaseErrorInterface } from '../app'; +import { BatchResponse } from '../messaging/messaging-api'; import { deepCopy } from '../utils/deep-copy'; /** @@ -344,6 +345,38 @@ export class FirebaseMessagingError extends PrefixedFirebaseError { } } +export class FirebaseMessagingSessionError extends FirebaseMessagingError { + public pendingBatchResponse?: Promise; + /** + * + * @param info - The error code info. + * @param message - The error message. This will override the default message if provided. + * @param pendingBatchResponse - BatchResponse for pending messages when session error occured. + * @constructor + * @internal + */ + constructor(info: ErrorInfo, message?: string, pendingBatchResponse?: Promise) { + // Override default message if custom message provided. + super(info, message || info.message); + this.pendingBatchResponse = pendingBatchResponse; + + /* tslint:disable:max-line-length */ + // Set the prototype explicitly. See the following link for more details: + // https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work + /* tslint:enable:max-line-length */ + (this as any).__proto__ = FirebaseMessagingSessionError.prototype; + } + + /** @returns The object representation of the error. */ + public toJSON(): object { + return { + code: this.code, + message: this.message, + pendingBatchResponse: this.pendingBatchResponse, + }; + } +} + /** * Firebase project management error code structure. This extends PrefixedFirebaseError. */ diff --git a/test/resources/mocks.ts b/test/resources/mocks.ts index d4afddd879..f9d08023d2 100644 --- a/test/resources/mocks.ts +++ b/test/resources/mocks.ts @@ -322,10 +322,11 @@ export interface MockHttp2Request { } export interface MockHttp2Response { - headers: http2.IncomingHttpHeaders & http2.IncomingHttpStatusHeader, - data: Buffer, + headers?: http2.IncomingHttpHeaders & http2.IncomingHttpStatusHeader, + data?: Buffer, delay?: number, - error?: any + sessionError?: any + streamError?: any, } export class Http2Mocker { @@ -340,12 +341,12 @@ export class Http2Mocker { this.connectStub = sinon.stub(http2, 'connect'); this.connectStub.callsFake((_target: any, options: any) => { const session = this.originalConnect('https://www.example.com', options); - session.request = this.createMockRequest() + session.request = this.createMockRequest(session) return session; }) } - private createMockRequest() { + private createMockRequest(session:http2.ClientHttp2Session) { return (requestHeaders: http2.OutgoingHttpHeaders) => { // Create a mock ClientHttp2Stream to return const mockStream = new stream.Readable({ @@ -365,8 +366,11 @@ export class Http2Mocker { const mockRes = this.mockResponses.shift(); if (mockRes) { this.timeouts.push(setTimeout(() => { - if (mockRes.error) { - mockStream.emit('error', mockRes.error) + if (mockRes.sessionError) { + session.emit('error', mockRes.sessionError) + } + if (mockRes.streamError) { + mockStream.emit('error', mockRes.streamError) } else { mockStream.emit('response', mockRes.headers); diff --git a/test/unit/messaging/messaging.spec.ts b/test/unit/messaging/messaging.spec.ts index 488056488e..1ea0c059de 100644 --- a/test/unit/messaging/messaging.spec.ts +++ b/test/unit/messaging/messaging.spec.ts @@ -34,6 +34,7 @@ import { import { HttpClient } from '../../../src/utils/api-request'; import { getMetricsHeader, getSdkVersion } from '../../../src/utils/index'; import * as utils from '../utils'; +import { FirebaseMessagingSessionError } from '../../../src/utils/error'; chai.should(); chai.use(sinonChai); @@ -121,6 +122,12 @@ function mockHttp2SendRequestError( } as mocks.MockHttp2Response } +function mockHttp2Error(streamError?: any, sessionError?:any): mocks.MockHttp2Response { + return { + streamError: streamError, + sessionError: sessionError + } as mocks.MockHttp2Response +} function mockErrorResponse( path: string, @@ -906,6 +913,30 @@ describe('Messaging', () => { }); }); + it('should throw error with BatchResponse promise on session error event using HTTP/2', () => { + mockedHttp2Responses.push(mockHttp2SendRequestResponse('projects/projec_id/messages/1')) + const sessionError = 'MOCK_SESSION_ERROR' + mockedHttp2Responses.push(mockHttp2Error( + new Error(`MOCK_STREAM_ERROR caused by ${sessionError}`), + new Error(sessionError) + )); + http2Mocker.http2Stub(mockedHttp2Responses) + + return messaging.sendEach( + [validMessage, validMessage], true + ).catch(async (error: FirebaseMessagingSessionError) => { + expect(error.code).to.equal('messaging/app/network-error'); + expect(error.pendingBatchResponse).to.not.be.undefined; + await error.pendingBatchResponse?.then((response: BatchResponse) => { + expect(http2Mocker.requests.length).to.equal(2); + expect(response.failureCount).to.equal(1); + const responses = response.responses; + checkSendResponseSuccess(responses[0], 'projects/projec_id/messages/1'); + checkSendResponseFailure(responses[1], 'app/network-error'); + }) + }); + }) + // This test was added to also verify https://github.com/firebase/firebase-admin-node/issues/1146 it('should be fulfilled when called with different message types using HTTP/2', () => { const messageIds = [ diff --git a/test/unit/utils/api-request.spec.ts b/test/unit/utils/api-request.spec.ts index d64589f8c9..691f0f89e0 100644 --- a/test/unit/utils/api-request.spec.ts +++ b/test/unit/utils/api-request.spec.ts @@ -140,12 +140,14 @@ function mockHttp2SendRequestError( } as mocks.MockHttp2Response } -function mockHttp2Error(err: any): mocks.MockHttp2Response { +function mockHttp2Error(streamError?: any, sessionError?:any): mocks.MockHttp2Response { return { - error: err + streamError: streamError, + sessionError: sessionError } as mocks.MockHttp2Response } + /** * Returns a new RetryConfig instance for testing. This is same as the default * RetryConfig, with the backOffFactor set to 0 to avoid delays. @@ -2500,6 +2502,44 @@ describe('Http2Client', () => { http2SessionHandler: http2SessionHandler }).should.eventually.be.rejectedWith(err).and.have.property('code', 'app/network-error'); }); + + it('should fail on session and stream errors', async () => { + const reqData = { request: 'data' }; + const streamError = 'Error while making request: test stream error. Error code: AWFUL_STREAM_ERROR'; + const sessionError = 'Session error while making requests: Error: AWFUL_SESSION_ERROR' + mockedHttp2Responses.push(mockHttp2Error( + { message: 'test stream error', code: 'AWFUL_STREAM_ERROR' }, + new Error('AWFUL_SESSION_ERROR') + )); + http2Mocker.http2Stub(mockedHttp2Responses); + + const client = new Http2Client(); + http2SessionHandler = new Http2SessionHandler(mockHostUrl) + + await client.send({ + method: 'POST', + url: mockUrl, + headers: { + 'authorization': 'Bearer token', + 'My-Custom-Header': 'CustomValue', + }, + data: reqData, + http2SessionHandler: http2SessionHandler, + }).should.eventually.be.rejectedWith(streamError).and.have.property('code', 'app/network-error') + .then(() => { + expect(http2Mocker.requests.length).to.equal(1); + expect(http2Mocker.requests[0].headers[':method']).to.equal('POST'); + expect(http2Mocker.requests[0].headers[':scheme']).to.equal('https:'); + expect(http2Mocker.requests[0].headers[':path']).to.equal(mockPath); + expect(JSON.parse(http2Mocker.requests[0].data)).to.deep.equal(reqData); + expect(http2Mocker.requests[0].headers.authorization).to.equal('Bearer token'); + expect(http2Mocker.requests[0].headers['content-type']).to.contain('application/json'); + expect(http2Mocker.requests[0].headers['My-Custom-Header']).to.equal('CustomValue'); + }); + + await http2SessionHandler.invoke().should.eventually.be.rejectedWith(sessionError) + .and.have.property('code', 'app/network-error') + }); }); describe('AuthorizedHttpClient', () => {