Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 57 additions & 34 deletions src/messaging/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,48 +206,71 @@ export class Messaging {
MessagingClientErrorCode.INVALID_ARGUMENT, 'dryRun must be a boolean');
}

const http2SessionHandler = this.useLegacyTransport ? undefined : new Http2SessionHandler(`https://${FCM_SEND_HOST}`)
const http2SessionHandler = this.useLegacyTransport ? undefined : new Http2SessionHandler(`https://${FCM_SEND_HOST}`);

return this.getUrlPath()
.then((urlPath) => {
const requests: Promise<SendResponse>[] = copy.map(async (message) => {
validateMessage(message);
const request: { message: Message; validate_only?: boolean } = { message };
if (dryRun) {
request.validate_only = true;
}

if (http2SessionHandler){
return this.messagingRequestHandler.invokeHttp2RequestHandlerForSendResponse(
FCM_SEND_HOST, urlPath, request, http2SessionHandler);
}
return this.messagingRequestHandler.invokeHttpRequestHandlerForSendResponse(FCM_SEND_HOST, urlPath, request);
});
return Promise.allSettled(requests);
})
.then((results) => {
const responses: SendResponse[] = [];
results.forEach(result => {
if (result.status === 'fulfilled') {
responses.push(result.value);
} else { // rejected
responses.push({ success: false, error: result.reason })
}
})
const successCount: number = responses.filter((resp) => resp.success).length;
return {
responses,
successCount,
failureCount: responses.length - successCount,
};
if (http2SessionHandler) {
let batchResponsePromise: Promise<PromiseSettledResult<SendResponse>[]>;
return new Promise((resolve: (result: PromiseSettledResult<SendResponse>[]) => void, reject) => {
// Start session listeners
http2SessionHandler.invoke().catch((error) => {
error.pendingBatchResponse =
batchResponsePromise ? batchResponsePromise.then(this.parseSendResponses) : undefined;
reject(error);
});

// Start making requests
const requests: Promise<SendResponse>[] = copy.map(async (message) => {
validateMessage(message);
const request: { message: Message; validate_only?: boolean; } = { message };
if (dryRun) {
request.validate_only = true;
}
return this.messagingRequestHandler.invokeHttp2RequestHandlerForSendResponse(
FCM_SEND_HOST, urlPath, request, http2SessionHandler);
});

// Resolve once all requests have completed
batchResponsePromise = Promise.allSettled(requests);
batchResponsePromise.then(resolve);
});
} else {
const requests: Promise<SendResponse>[] = copy.map(async (message) => {
validateMessage(message);
const request: { message: Message; validate_only?: boolean; } = { message };
if (dryRun) {
request.validate_only = true;
}
return this.messagingRequestHandler.invokeHttpRequestHandlerForSendResponse(
FCM_SEND_HOST, urlPath, request);
});
return Promise.allSettled(requests);
}
})
.then(this.parseSendResponses)
.finally(() => {
if (http2SessionHandler){
http2SessionHandler.close()
}
http2SessionHandler?.close();
});
}

private parseSendResponses(results: PromiseSettledResult<SendResponse>[]): BatchResponse {
const responses: SendResponse[] = [];
results.forEach(result => {
if (result.status === 'fulfilled') {
responses.push(result.value);
} else { // rejected
responses.push({ success: false, error: result.reason });
}
});
const successCount: number = responses.filter((resp) => resp.success).length;
return {
responses,
successCount,
failureCount: responses.length - successCount,
};
}

/**
* Sends the given multicast message to all the FCM registration tokens
* specified in it.
Expand Down
35 changes: 26 additions & 9 deletions src/utils/api-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,7 @@ class Http2RequestConfigImpl extends BaseRequestConfigImpl implements Http2Reque

public buildRequestOptions(): https.RequestOptions {
const parsed = this.buildUrl();
// TODO(b/401051826)
const protocol = parsed.protocol;

return {
Expand Down Expand Up @@ -1315,9 +1316,16 @@ export class ExponentialBackoffPoller<T> extends EventEmitter {
export class Http2SessionHandler {

private http2Session: http2.ClientHttp2Session
protected promise: Promise<void>
protected resolve: () => void;
protected reject: (_: any) => void;

constructor(url: string){
this.http2Session = this.createSession(url)
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
this.http2Session = this.createSession(url)
});
}

public createSession(url: string): http2.ClientHttp2Session {
Expand All @@ -1330,23 +1338,32 @@ export class Http2SessionHandler {
const http2Session = http2.connect(url, opts)

http2Session.on('goaway', (errorCode, _, opaqueData) => {
throw new FirebaseAppError(
AppErrorCodes.NETWORK_ERROR,
`Error while making requests: GOAWAY - ${opaqueData.toString()}, Error code: ${errorCode}`
);
this.reject(new FirebaseAppError(
AppErrorCodes.HTTP2_SESSION_ERROR,
`Error while making requests: GOAWAY - ${opaqueData?.toString()}, Error code: ${errorCode}`
));
})

http2Session.on('error', (error) => {
throw new FirebaseAppError(
AppErrorCodes.NETWORK_ERROR,
`Error while making requests: ${error}`
);
this.reject(new FirebaseAppError(
AppErrorCodes.HTTP2_SESSION_ERROR,
`Session error while making requests: ${error}`
));
})

http2Session.on('close', () => {
// Resolve current promise
this.resolve()
});
return http2Session
}
return this.http2Session
}

public invoke(): Promise<void> {
return this.promise
}

get session(): http2.ClientHttp2Session {
return this.http2Session
}
Expand Down
1 change: 1 addition & 0 deletions src/utils/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ export class AppErrorCodes {
public static INVALID_APP_OPTIONS = 'invalid-app-options';
public static INVALID_CREDENTIAL = 'invalid-credential';
public static NETWORK_ERROR = 'network-error';
public static HTTP2_SESSION_ERROR = 'http2-session-error';
public static NETWORK_TIMEOUT = 'network-timeout';
public static NO_APP = 'no-app';
public static UNABLE_TO_PARSE_RESPONSE = 'unable-to-parse-response';
Expand Down
18 changes: 11 additions & 7 deletions test/resources/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,11 @@ export interface MockHttp2Request {
}

export interface MockHttp2Response {
headers: http2.IncomingHttpHeaders & http2.IncomingHttpStatusHeader,
data: Buffer,
headers?: http2.IncomingHttpHeaders & http2.IncomingHttpStatusHeader,
data?: Buffer,
delay?: number,
error?: any
sessionError?: any
streamError?: any,
}

export class Http2Mocker {
Expand All @@ -340,12 +341,12 @@ export class Http2Mocker {
this.connectStub = sinon.stub(http2, 'connect');
this.connectStub.callsFake((_target: any, options: any) => {
const session = this.originalConnect('https://www.example.com', options);
session.request = this.createMockRequest()
session.request = this.createMockRequest(session)
return session;
})
}

private createMockRequest() {
private createMockRequest(session:http2.ClientHttp2Session) {
return (requestHeaders: http2.OutgoingHttpHeaders) => {
// Create a mock ClientHttp2Stream to return
const mockStream = new stream.Readable({
Expand All @@ -365,8 +366,11 @@ export class Http2Mocker {
const mockRes = this.mockResponses.shift();
if (mockRes) {
this.timeouts.push(setTimeout(() => {
if (mockRes.error) {
mockStream.emit('error', mockRes.error)
if (mockRes.sessionError) {
session.emit('error', mockRes.sessionError)
}
if (mockRes.streamError) {
mockStream.emit('error', mockRes.streamError)
}
else {
mockStream.emit('response', mockRes.headers);
Expand Down
29 changes: 29 additions & 0 deletions test/unit/messaging/messaging.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ function mockHttp2SendRequestError(
} as mocks.MockHttp2Response
}

function mockHttp2Error(streamError?: any, sessionError?:any): mocks.MockHttp2Response {
return {
streamError: streamError,
sessionError: sessionError
} as mocks.MockHttp2Response
}

function mockErrorResponse(
path: string,
Expand Down Expand Up @@ -906,6 +912,29 @@ describe('Messaging', () => {
});
});

it('should throw error with BatchResponse promise on session error event using HTTP/2', () => {
mockedHttp2Responses.push(mockHttp2SendRequestResponse('projects/projec_id/messages/1'))
const sessionError = 'MOCK_SESSION_ERROR'
mockedHttp2Responses.push(mockHttp2Error(
new Error(`MOCK_STREAM_ERROR caused by ${sessionError}`),
new Error(sessionError)
));
http2Mocker.http2Stub(mockedHttp2Responses)

return messaging.sendEach(
[validMessage, validMessage], true
).catch(async (error) => {
expect(error.errorInfo.code).to.equal('app/http2-session-error')
await error.pendingBatchResponse.then((response: BatchResponse) => {
expect(http2Mocker.requests.length).to.equal(2);
expect(response.failureCount).to.equal(1);
const responses = response.responses;
checkSendResponseSuccess(responses[0], 'projects/projec_id/messages/1');
checkSendResponseFailure(responses[1], 'app/network-error');
})
});
})

// This test was added to also verify https://github.com/firebase/firebase-admin-node/issues/1146
it('should be fulfilled when called with different message types using HTTP/2', () => {
const messageIds = [
Expand Down
43 changes: 41 additions & 2 deletions test/unit/utils/api-request.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,14 @@
} as mocks.MockHttp2Response
}

function mockHttp2Error(err: any): mocks.MockHttp2Response {
function mockHttp2Error(streamError?: any, sessionError?:any): mocks.MockHttp2Response {
return {
error: err
streamError: streamError,
sessionError: sessionError
} as mocks.MockHttp2Response
}


/**
* Returns a new RetryConfig instance for testing. This is same as the default
* RetryConfig, with the backOffFactor set to 0 to avoid delays.
Expand Down Expand Up @@ -2500,6 +2502,43 @@
http2SessionHandler: http2SessionHandler
}).should.eventually.be.rejectedWith(err).and.have.property('code', 'app/network-error');
});

it('should fail on session and stream errors', async () => {
const reqData = { request: 'data' };
const streamError = 'Error while making request: test stream error. Error code: AWFUL_STREAM_ERROR';
const sessionError = 'Session error while making requests: Error: AWFUL_SESSION_ERROR'
mockedHttp2Responses.push(mockHttp2Error(
{ message: 'test stream error', code: 'AWFUL_STREAM_ERROR' },
new Error('AWFUL_SESSION_ERROR')
));
http2Mocker.http2Stub(mockedHttp2Responses);

const client = new Http2Client();
http2SessionHandler = new Http2SessionHandler(mockHostUrl)

await client.send({
method: 'POST',
url: mockUrl,
headers: {
'authorization': 'Bearer token',
'My-Custom-Header': 'CustomValue',
},
data: reqData,
http2SessionHandler: http2SessionHandler,
}).should.eventually.be.rejectedWith(streamError).and.have.property('code', 'app/network-error')
.then(() => {
expect(http2Mocker.requests.length).to.equal(1);
expect(http2Mocker.requests[0].headers[':method']).to.equal('POST');
expect(http2Mocker.requests[0].headers[':scheme']).to.equal('https:');
expect(http2Mocker.requests[0].headers[':path']).to.equal(mockPath);
expect(JSON.parse(http2Mocker.requests[0].data)).to.deep.equal(reqData);
expect(http2Mocker.requests[0].headers.authorization).to.equal('Bearer token');
expect(http2Mocker.requests[0].headers['content-type']).to.contain('application/json');
expect(http2Mocker.requests[0].headers['My-Custom-Header']).to.equal('CustomValue');
});

await http2SessionHandler.invoke().should.eventually.be.rejectedWith(sessionError)
});
});

describe('AuthorizedHttpClient', () => {
Expand Down
Loading