diff --git a/sdk/core/core-amqp/changelog.md b/sdk/core/core-amqp/changelog.md index f27c5b4c337d..6fe74f59aed4 100644 --- a/sdk/core/core-amqp/changelog.md +++ b/sdk/core/core-amqp/changelog.md @@ -1,3 +1,7 @@ +## 1.0.0-preview.2.0 - Coming soon... + +- `sendRequest()` function in the `RequestResponseLink` now excludes default retries and leaves it up to the users to implement it as necessary. + ## 1.0.0-preview.1.0 - 28th June, 2019 This library is based off of the [@azure/amqp-common](https://www.npmjs.com/package/@azure/amqp-common) diff --git a/sdk/core/core-amqp/src/requestResponseLink.ts b/sdk/core/core-amqp/src/requestResponseLink.ts index 1b3e18a25b87..e7fa21cf4576 100644 --- a/sdk/core/core-amqp/src/requestResponseLink.ts +++ b/sdk/core/core-amqp/src/requestResponseLink.ts @@ -3,7 +3,6 @@ import { AbortSignalLike, AbortError } from "@azure/abort-controller"; import * as Constants from "./util/constants"; -import { retry, RetryConfig, RetryOperationType } from "./retry"; import { Session, Connection, @@ -15,8 +14,7 @@ import { SenderOptions, ReceiverOptions, ReceiverEvents, - ReqResLink, - generate_uuid + ReqResLink } from "rhea-promise"; import { translate, ConditionStatusMapper } from "./errors"; import * as log from "./log"; @@ -32,19 +30,9 @@ export interface SendRequestOptions { abortSignal?: AbortSignalLike; /** * @property {number} [timeoutInSeconds] Max time to wait for the operation to complete. - * Default: `10 seconds`. + * Default: `60 seconds`. */ timeoutInSeconds?: number; - /** - * @property {number} [maxRetries] Number of times the operation needs to be retried in case - * of error. Default: 3. - */ - maxRetries?: number; - /** - * @property {number} [delayInSeconds] Amount of time to wait in seconds before making the - * next attempt. Default: 15. - */ - delayInSeconds?: number; /** * @property {string} [requestName] Name of the request being performed. */ @@ -86,9 +74,7 @@ export class RequestResponseLink implements ReqResLink { /** * Sends the given request message and returns the received response. If the operation is not - * completed in the provided timeout in seconds `default: 10`, then the request will be retried - * linearly for the provided number of times `default: 3` with the provided delay in seconds - * `default: 15` between each attempt. + * completed in the provided timeout in seconds `default: 60`, then `OperationTimeoutError` is thrown. * * @param {Message} request The AMQP (request) message. * @param {SendRequestOptions} [options] Options that can be provided while sending a request. @@ -98,168 +84,145 @@ export class RequestResponseLink implements ReqResLink { if (!options) options = {}; if (!options.timeoutInSeconds) { - options.timeoutInSeconds = 10; + options.timeoutInSeconds = Constants.defaultOperationTimeoutInSeconds; } - let count: number = 0; const aborter: AbortSignalLike | undefined = options && options.abortSignal; - const sendRequestPromise = () => - new Promise((resolve: any, reject: any) => { - let waitTimer: any; - let timeOver: boolean = false; - type NormalizedInfo = { - statusCode: number; - statusDescription: string; - errorCondition: string; - }; - - count++; - if (count !== 1) { - // Generate a new message_id every time after the first attempt - request.message_id = generate_uuid(); - } else if (!request.message_id) { - // Set the message_id in the first attempt only if it is not set - request.message_id = generate_uuid(); - } + return new Promise((resolve: any, reject: any) => { + let waitTimer: any; + let timeOver: boolean = false; + type NormalizedInfo = { + statusCode: number; + statusDescription: string; + errorCondition: string; + }; - const rejectOnAbort = () => { - const address = this.receiver.address || "address"; - const requestName = options!.requestName; - const desc: string = - `[${this.connection.id}] The request "${requestName}" ` + - `to "${address}" has been cancelled by the user.`; - log.error(desc); - const error = new AbortError( - `The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.` - ); + const rejectOnAbort = () => { + const address = this.receiver.address || "address"; + const requestName = options!.requestName; + const desc: string = + `[${this.connection.id}] The request "${requestName}" ` + + `to "${address}" has been cancelled by the user.`; + log.error(desc); + const error = new AbortError( + `The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.` + ); - reject(error); - }; + reject(error); + }; - const onAbort = () => { - // remove the event listener as this will be registered next time someone makes a request. - this.receiver.removeListener(ReceiverEvents.message, messageCallback); - // safe to clear the timeout if it hasn't already occurred. - if (!timeOver) { - clearTimeout(waitTimer); - } - aborter!.removeEventListener("abort", onAbort); + const onAbort = () => { + // remove the event listener as this will be registered next time someone makes a request. + this.receiver.removeListener(ReceiverEvents.message, messageCallback); + // safe to clear the timeout if it hasn't already occurred. + if (!timeOver) { + clearTimeout(waitTimer); + } + aborter!.removeEventListener("abort", onAbort); - rejectOnAbort(); - }; + rejectOnAbort(); + }; - if (aborter) { - // the aborter may have been triggered between request attempts - // so check if it was triggered and reject if needed. - if (aborter.aborted) { - return rejectOnAbort(); - } - aborter.addEventListener("abort", onAbort); + if (aborter) { + // the aborter may have been triggered between request attempts + // so check if it was triggered and reject if needed. + if (aborter.aborted) { + return rejectOnAbort(); } + aborter.addEventListener("abort", onAbort); + } - // Handle different variations of property names in responses emitted by EventHubs and ServiceBus. - const getCodeDescriptionAndError = (props: any): NormalizedInfo => { - if (!props) props = {}; - return { - statusCode: (props[Constants.statusCode] || props.statusCode) as number, - statusDescription: (props[Constants.statusDescription] || - props.statusDescription) as string, - errorCondition: (props[Constants.errorCondition] || props.errorCondition) as string - }; + // Handle different variations of property names in responses emitted by EventHubs and ServiceBus. + const getCodeDescriptionAndError = (props: any): NormalizedInfo => { + if (!props) props = {}; + return { + statusCode: (props[Constants.statusCode] || props.statusCode) as number, + statusDescription: (props[Constants.statusDescription] || + props.statusDescription) as string, + errorCondition: (props[Constants.errorCondition] || props.errorCondition) as string }; + }; - const messageCallback = (context: EventContext) => { - // remove the event listeners as they will be registered next time when someone makes a request. - this.receiver.removeListener(ReceiverEvents.message, messageCallback); - if (aborter) { - aborter.removeEventListener("abort", onAbort); - } - const info = getCodeDescriptionAndError(context.message!.application_properties); - const responseCorrelationId = context.message!.correlation_id; - log.reqres( - "[%s] %s response: ", - this.connection.id, - request.to || "$management", - context.message - ); - if (info.statusCode > 199 && info.statusCode < 300) { - if ( - request.message_id === responseCorrelationId || - request.correlation_id === responseCorrelationId - ) { - if (!timeOver) { - clearTimeout(waitTimer); - } - log.reqres( - "[%s] request-messageId | '%s' == '%s' | response-correlationId.", - this.connection.id, - request.message_id, - responseCorrelationId - ); - return resolve(context.message); - } else { - log.error( - "[%s] request-messageId | '%s' != '%s' | response-correlationId. " + - "Hence dropping this response and waiting for the next one.", - this.connection.id, - request.message_id, - responseCorrelationId - ); + const messageCallback = (context: EventContext) => { + // remove the event listeners as they will be registered next time when someone makes a request. + this.receiver.removeListener(ReceiverEvents.message, messageCallback); + if (aborter) { + aborter.removeEventListener("abort", onAbort); + } + const info = getCodeDescriptionAndError(context.message!.application_properties); + const responseCorrelationId = context.message!.correlation_id; + log.reqres( + "[%s] %s response: ", + this.connection.id, + request.to || "$management", + context.message + ); + if (info.statusCode > 199 && info.statusCode < 300) { + if ( + request.message_id === responseCorrelationId || + request.correlation_id === responseCorrelationId + ) { + if (!timeOver) { + clearTimeout(waitTimer); } + log.reqres( + "[%s] request-messageId | '%s' == '%s' | response-correlationId.", + this.connection.id, + request.message_id, + responseCorrelationId + ); + return resolve(context.message); } else { - const condition = - info.errorCondition || - ConditionStatusMapper[info.statusCode] || - "amqp:internal-error"; - const e: AmqpError = { - condition: condition, - description: info.statusDescription - }; - const error = translate(e); - log.error(error); - return reject(error); - } - }; - - const actionAfterTimeout = () => { - timeOver = true; - this.receiver.removeListener(ReceiverEvents.message, messageCallback); - if (aborter) { - aborter.removeEventListener("abort", onAbort); + log.error( + "[%s] request-messageId | '%s' != '%s' | response-correlationId. " + + "Hence dropping this response and waiting for the next one.", + this.connection.id, + request.message_id, + responseCorrelationId + ); } - const address = this.receiver.address || "address"; - const desc: string = - `The request with message_id "${request.message_id}" to "${address}" ` + - `endpoint timed out. Please try again later.`; + } else { + const condition = + info.errorCondition || ConditionStatusMapper[info.statusCode] || "amqp:internal-error"; const e: AmqpError = { - condition: ConditionStatusMapper[408], - description: desc + condition: condition, + description: info.statusDescription }; - return reject(translate(e)); + const error = translate(e); + log.error(error); + return reject(error); + } + }; + + const actionAfterTimeout = () => { + timeOver = true; + this.receiver.removeListener(ReceiverEvents.message, messageCallback); + if (aborter) { + aborter.removeEventListener("abort", onAbort); + } + const address = this.receiver.address || "address"; + const desc: string = + `The request with message_id "${request.message_id}" to "${address}" ` + + `endpoint timed out. Please try again later.`; + const e: Error = { + name: "OperationTimeoutError", + message: desc }; + return reject(translate(e)); + }; - this.receiver.on(ReceiverEvents.message, messageCallback); - waitTimer = setTimeout(actionAfterTimeout, options!.timeoutInSeconds! * 1000); - log.reqres( - "[%s] %s request sent: %O", - this.connection.id, - request.to || "$managment", - request - ); - this.sender.send(request); - }); - const config: RetryConfig = { - operation: sendRequestPromise, - connectionId: this.connection.id, - operationType: - request.to && request.to === Constants.cbsEndpoint - ? RetryOperationType.cbsAuth - : RetryOperationType.management, - delayInSeconds: options.delayInSeconds, - maxRetries: options.maxRetries - }; - return retry(config); + waitTimer = setTimeout(actionAfterTimeout, options!.timeoutInSeconds! * 1000); + this.receiver.on(ReceiverEvents.message, messageCallback); + + log.reqres( + "[%s] %s request sent: %O", + this.connection.id, + request.to || "$managment", + request + ); + this.sender.send(request); + }); } /** diff --git a/sdk/core/core-amqp/test/requestResponse.spec.ts b/sdk/core/core-amqp/test/requestResponse.spec.ts index 5dc5a54a8e1e..fa231dbb5015 100644 --- a/sdk/core/core-amqp/test/requestResponse.spec.ts +++ b/sdk/core/core-amqp/test/requestResponse.spec.ts @@ -2,11 +2,18 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. import * as assert from "assert"; -import { RequestResponseLink, AmqpMessage, ErrorNameConditionMapper } from "../src"; -import { Connection } from "rhea-promise"; +import { + RequestResponseLink, + AmqpMessage, + ErrorNameConditionMapper, + RetryConfig, + RetryOperationType, + retry +} from "../src"; +import { Connection, Message } from "rhea-promise"; import { stub } from "sinon"; import EventEmitter from "events"; -import { AbortController } from "@azure/abort-controller"; +import { AbortController, AbortSignalLike } from "@azure/abort-controller"; describe("RequestResponseLink", function() { it("should send a request and receive a response correctly", async function() { @@ -53,11 +60,11 @@ describe("RequestResponseLink", function() { assert.equal(response.correlation_id, req.message_id); }); - it("should send a request and receive a response correctly", async function() { + it("should surface error up through retry", async function() { const connectionStub = stub(new Connection()); const rcvr = new EventEmitter(); let messageId: string = ""; - let counter = 0; + let count = 0; connectionStub.createSession.resolves({ connection: { id: "connection-1" @@ -65,12 +72,7 @@ describe("RequestResponseLink", function() { createSender: () => { return Promise.resolve({ send: (request: any) => { - counter++; - if (counter != 1) { - assert.notEqual(messageId, undefined); - assert.notEqual(request.message_id, undefined); - assert.notEqual(messageId, request.message_id); - } + count++; messageId = request.message_id; } }); @@ -98,7 +100,7 @@ describe("RequestResponseLink", function() { } } }); - }, 2000); + }, 500); setTimeout(() => { rcvr.emit("message", { message: { @@ -112,12 +114,26 @@ describe("RequestResponseLink", function() { body: "Hello World!!" } }); - }, 4000); - const response = await link.sendRequest(request, { - delayInSeconds: 1, - timeoutInSeconds: 5 - }); - assert.equal(response.correlation_id, messageId); + }, 1000); + + const sendRequestPromise = async (): Promise => { + return await link.sendRequest(request, { + timeoutInSeconds: 5 + }); + }; + + const config: RetryConfig = { + operation: sendRequestPromise, + connectionId: "connection-1", + operationType: RetryOperationType.management, + maxRetries: 3, + delayInSeconds: 1 + }; + + const message = await retry(config); + assert.equal(count, 2, "It should retry twice"); + assert.equal(message == undefined, false, "It should return a valid message"); + assert.equal(message.body, "Hello World!!", `Message '${message.body}' is not as expected`); }); it("should abort a request and response correctly", async function() { @@ -171,15 +187,19 @@ describe("RequestResponseLink", function() { /The foo operation has been cancelled by the user.$/, "gi" ); - assert.equal(expectedErrorRegex.test(err.message), true); + assert.equal(err.name, "AbortError", `Error name ${err.name} is not as expected`); + assert.equal( + expectedErrorRegex.test(err.message), + true, + `Incorrect error received "${err.message}"` + ); } }); - it("should abort a request and response correctly when retried", async function() { + it("should abort a request and response correctly when abort signal is already fired", async function() { const connectionStub = stub(new Connection()); const rcvr = new EventEmitter(); - let messageId: string = ""; - let counter = 0; + let req: any = {}; connectionStub.createSession.resolves({ connection: { id: "connection-1" @@ -187,13 +207,7 @@ describe("RequestResponseLink", function() { createSender: () => { return Promise.resolve({ send: (request: any) => { - counter++; - if (counter != 1) { - assert.notEqual(messageId, undefined); - assert.notEqual(request.message_id, undefined); - assert.notEqual(messageId, request.message_id); - } - messageId = request.message_id; + req = request; } }); }, @@ -211,20 +225,7 @@ describe("RequestResponseLink", function() { setTimeout(() => { rcvr.emit("message", { message: { - correlation_id: messageId, - application_properties: { - statusCode: 500, - errorCondition: ErrorNameConditionMapper.InternalServerError, - statusDescription: "Please retry later.", - "com.microsoft:tracking-id": "1" - } - } - }); - }, 2000); - setTimeout(() => { - rcvr.emit("message", { - message: { - correlation_id: messageId, + correlation_id: req.message_id, application_properties: { statusCode: 200, errorCondition: null, @@ -234,19 +235,19 @@ describe("RequestResponseLink", function() { body: "Hello World!!" } }); - }, 4000); + }, 2000); try { const controller = new AbortController(); - const signal = controller.signal; - setTimeout(controller.abort.bind(controller), 100); - await link.sendRequest(request, { - delayInSeconds: 1, - timeoutInSeconds: 5, - abortSignal: signal // cancel between request attempts - }); + const signal: AbortSignalLike = controller.signal; + controller.abort(); + await link.sendRequest(request, { abortSignal: signal, requestName: "foo" }); throw new Error(`Test failure`); } catch (err) { - const expectedErrorRegex = new RegExp(/The operation has been cancelled by the user.$/, "gi"); + const expectedErrorRegex = new RegExp( + /The foo operation has been cancelled by the user.$/, + "gi" + ); + assert.equal(err.name, "AbortError", `Error name ${err.name} is not as expected`); assert.equal( expectedErrorRegex.test(err.message), true, diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 26e615be262b..66cae060d583 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -7,7 +7,10 @@ import { defaultLock, translate, Constants, - SendRequestOptions + SendRequestOptions, + retry, + RetryConfig, + RetryOperationType } from "@azure/core-amqp"; import { Message, @@ -15,13 +18,14 @@ import { SenderEvents, ReceiverEvents, SenderOptions, - ReceiverOptions + ReceiverOptions, + generate_uuid } from "rhea-promise"; import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import * as log from "./log"; import { RetryOptions, getRetryAttemptTimeoutInMs } from "./eventHubClient"; -import { AbortSignalLike } from "@azure/abort-controller"; +import { AbortSignalLike, AbortError } from "@azure/abort-controller"; /** * Describes the runtime information of an Event Hub. * @interface HubRuntimeInformation @@ -303,34 +307,125 @@ export class ManagementClient extends LinkEntity { */ private async _makeManagementRequest( request: Message, - options?: { retryOptions?: RetryOptions; abortSignal?: AbortSignalLike; requestName?: string } + options: { + retryOptions?: RetryOptions; + abortSignal?: AbortSignalLike; + requestName?: string; + } = {} ): Promise { try { - log.mgmt( - "[%s] Acquiring lock to get the management req res link.", - this._context.connectionId - ); - await defaultLock.acquire(this.managementLock, () => { - return this._init(); - }); + const aborter: AbortSignalLike | undefined = options && options.abortSignal; - if (!options) { - options = {}; - } + const sendOperationPromise = () => + new Promise(async (resolve, reject) => { + let count = 0; + + const retryTimeoutInMs = getRetryAttemptTimeoutInMs(options.retryOptions); + let timeTakenByInit = 0; + + const rejectOnAbort = () => { + const requestName = options.requestName; + const desc: string = + `[${this._context.connectionId}] The request "${requestName}" ` + + `to has been cancelled by the user.`; + log.error(desc); + const error = new AbortError( + `The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.` + ); + + reject(error); + }; + + if (aborter) { + if (aborter.aborted) { + return rejectOnAbort(); + } + } + + if (!this._isMgmtRequestResponseLinkOpen()) { + log.mgmt( + "[%s] Acquiring lock to get the management req res link.", + this._context.connectionId + ); + + const initOperationStartTime = Date.now(); + + const actionAfterTimeout = () => { + const desc: string = `The request with message_id "${request.message_id}" timed out. Please try again later.`; + const e: Error = { + name: "OperationTimeoutError", + message: desc + }; + + return reject(translate(e)); + }; + + const waitTimer = setTimeout(actionAfterTimeout, retryTimeoutInMs); + + try { + await defaultLock.acquire(this.managementLock, () => { + return this._init(); + }); + } catch (err) { + return reject(translate(err)); + } finally { + clearTimeout(waitTimer); + } + timeTakenByInit = Date.now() - initOperationStartTime; + } + + const remainingOperationTimeoutInMs = retryTimeoutInMs - timeTakenByInit; + + const sendRequestOptions: SendRequestOptions = { + abortSignal: options.abortSignal, + requestName: options.requestName, + timeoutInSeconds: remainingOperationTimeoutInMs / 1000 + }; + + count++; + if (count !== 1) { + // Generate a new message_id every time after the first attempt + request.message_id = generate_uuid(); + } else if (!request.message_id) { + // Set the message_id in the first attempt only if it is not set + request.message_id = generate_uuid(); + } + + try { + const result = await this._mgmtReqResLink!.sendRequest(request, sendRequestOptions); + resolve(result); + } catch (err) { + err = translate(err); + const address = + this._mgmtReqResLink || this._mgmtReqResLink!.sender.address + ? "address" + : this._mgmtReqResLink!.sender.address; + log.error( + "[%s] An error occurred during send on management request-response link with address " + + "'%s': %O", + this._context.connectionId, + address, + err + ); + reject(err); + } + }); - const sendRequestOptions: SendRequestOptions = { - maxRetries: options.retryOptions && options.retryOptions.maxRetries, - abortSignal: options.abortSignal, - requestName: options.requestName, - timeoutInSeconds: getRetryAttemptTimeoutInMs(options.retryOptions) / 1000, - delayInSeconds: - options.retryOptions && - options.retryOptions.retryInterval && - options.retryOptions.retryInterval >= 0 - ? options.retryOptions.retryInterval / 1000 - : undefined + const maxRetries = options.retryOptions && options.retryOptions.maxRetries; + const delayInSeconds = + options.retryOptions && + options.retryOptions.retryInterval && + options.retryOptions.retryInterval >= 0 + ? options.retryOptions.retryInterval / 1000 + : Constants.defaultDelayBetweenOperationRetriesInSeconds; + const config: RetryConfig = { + operation: sendOperationPromise, + connectionId: this._context.connectionId, + operationType: RetryOperationType.management, + maxRetries: maxRetries, + delayInSeconds: delayInSeconds }; - return (await this._mgmtReqResLink!.sendRequest(request, sendRequestOptions)).body; + return (await retry(config)).body; } catch (err) { err = translate(err); log.error("An error occurred while making the request to $management endpoint: %O", err);