Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
260b0e6
[Event Hubs] Introduce timeoutInMs on RetryOptions (#4239)
ramya0820 Jul 11, 2019
c74f5e4
Merge branch 'master' of https://github.com/ramya0820/azure-sdk-for-js
Jul 13, 2019
00016cb
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 15, 2019
5876468
Bring over RequestResponse link creation and management from core-amqp
ramya0820 Jul 15, 2019
2f10541
Update _makeManagementRequest()
ramya0820 Jul 16, 2019
934130d
Simplify promise handling
ramya0820 Jul 16, 2019
a974e22
Address comments
ramya0820 Jul 16, 2019
41c5f08
Fix operationType
ramya0820 Jul 16, 2019
5b6d1e1
Update sendRequest() contract
ramya0820 Jul 16, 2019
80624c4
Revert whitespaces
ramya0820 Jul 16, 2019
ef7d712
Address comments
ramya0820 Jul 18, 2019
2ebcc2f
Rearrange code for clarity
ramya0820 Jul 18, 2019
5dec336
Update tests
ramya0820 Jul 18, 2019
ebd6e13
Dummy commit
ramya0820 Jul 19, 2019
4e31c97
Update
ramya0820 Jul 19, 2019
776680c
Update tests
Jul 19, 2019
4a088bc
Update tests
ramya0820 Jul 19, 2019
1617865
Revert "Rearrange code for clarity"
Jul 19, 2019
c64168c
Address comments
Jul 19, 2019
57f4f01
Address comments
Jul 19, 2019
71570d9
Merge branch 'issue-2835-p3' of https://github.com/ramya0820/azure-sd…
Jul 19, 2019
492fe0f
Revert timer clearance
Jul 19, 2019
c8a1887
Improve variable names
Jul 19, 2019
3fb5286
Use getRetryTimeoutInMs util
ramya0820 Jul 19, 2019
e16757e
Test tests [Tests not run yet]
ramya0820 Jul 19, 2019
e8e5cc6
Remove retry test from req-res tests
Jul 19, 2019
cc15d72
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Jul 22, 2019
868fafd
Allow abort during init
Jul 22, 2019
df19eae
Remove unused imports
Jul 22, 2019
d997887
Add test for error surfacing
Jul 23, 2019
f304522
Merge branch 'master' into issue-2835-p3
ramya0820 Jul 23, 2019
fb13acd
Remove abort event listener
Jul 23, 2019
c149776
Address comments
Jul 23, 2019
7e1036a
Address comments
Jul 23, 2019
2910d8b
Rearrange test
Jul 23, 2019
885ac8f
Update sdk/eventhub/event-hubs/src/managementClient.ts
ramya0820 Jul 23, 2019
a636a13
Address comments
ramya0820 Jul 23, 2019
590ece6
Improve test
Jul 23, 2019
9008fb2
Fix error message
Jul 23, 2019
e9232ee
Address comments
Jul 23, 2019
b7a78a1
Fix link receiver references
Jul 24, 2019
0ba8305
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Jul 24, 2019
c13de88
Remove abort on init
Jul 24, 2019
76aea7b
Address comments
ramya0820 Jul 25, 2019
cd11d1c
Fix typo
Jul 25, 2019
a5988d0
Remove unnecessary check
Jul 25, 2019
6ab619e
Remove unnecessary check
Jul 25, 2019
5dcb20e
Update try catch handling
Jul 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/core/core-amqp/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.0.0-preview.2.0 - Coming soon...

- `sendRequest()` function in the `RequestResponseLink` now excludes default retries and leaves it up to the users to implement it as necessary.

## 1.0.0-preview.1.0 - 28th June, 2019

This library is based off of the [@azure/amqp-common](https://www.npmjs.com/package/@azure/amqp-common)
Expand Down
281 changes: 122 additions & 159 deletions sdk/core/core-amqp/src/requestResponseLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import { AbortSignalLike, AbortError } from "@azure/abort-controller";
import * as Constants from "./util/constants";
import { retry, RetryConfig, RetryOperationType } from "./retry";
import {
Session,
Connection,
Expand All @@ -15,8 +14,7 @@ import {
SenderOptions,
ReceiverOptions,
ReceiverEvents,
ReqResLink,
generate_uuid
ReqResLink
} from "rhea-promise";
import { translate, ConditionStatusMapper } from "./errors";
import * as log from "./log";
Expand All @@ -32,19 +30,9 @@ export interface SendRequestOptions {
abortSignal?: AbortSignalLike;
/**
* @property {number} [timeoutInSeconds] Max time to wait for the operation to complete.
* Default: `10 seconds`.
* Default: `60 seconds`.
*/
timeoutInSeconds?: number;
/**
* @property {number} [maxRetries] Number of times the operation needs to be retried in case
* of error. Default: 3.
*/
maxRetries?: number;
/**
* @property {number} [delayInSeconds] Amount of time to wait in seconds before making the
* next attempt. Default: 15.
*/
delayInSeconds?: number;
/**
* @property {string} [requestName] Name of the request being performed.
*/
Expand Down Expand Up @@ -86,9 +74,7 @@ export class RequestResponseLink implements ReqResLink {

/**
* Sends the given request message and returns the received response. If the operation is not
* completed in the provided timeout in seconds `default: 10`, then the request will be retried
* linearly for the provided number of times `default: 3` with the provided delay in seconds
* `default: 15` between each attempt.
* completed in the provided timeout in seconds `default: 60`, then `OperationTimeoutError` is thrown.
*
* @param {Message} request The AMQP (request) message.
* @param {SendRequestOptions} [options] Options that can be provided while sending a request.
Expand All @@ -98,168 +84,145 @@ export class RequestResponseLink implements ReqResLink {
if (!options) options = {};

if (!options.timeoutInSeconds) {
options.timeoutInSeconds = 10;
options.timeoutInSeconds = Constants.defaultOperationTimeoutInSeconds;
}

let count: number = 0;
const aborter: AbortSignalLike | undefined = options && options.abortSignal;

const sendRequestPromise = () =>
new Promise<AmqpMessage>((resolve: any, reject: any) => {
let waitTimer: any;
let timeOver: boolean = false;
type NormalizedInfo = {
statusCode: number;
statusDescription: string;
errorCondition: string;
};

count++;
if (count !== 1) {
// Generate a new message_id every time after the first attempt
request.message_id = generate_uuid();
} else if (!request.message_id) {
// Set the message_id in the first attempt only if it is not set
request.message_id = generate_uuid();
}
return new Promise<AmqpMessage>((resolve: any, reject: any) => {
let waitTimer: any;
let timeOver: boolean = false;
type NormalizedInfo = {
statusCode: number;
statusDescription: string;
errorCondition: string;
};

const rejectOnAbort = () => {
const address = this.receiver.address || "address";
const requestName = options!.requestName;
const desc: string =
`[${this.connection.id}] The request "${requestName}" ` +
`to "${address}" has been cancelled by the user.`;
log.error(desc);
const error = new AbortError(
`The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.`
);
const rejectOnAbort = () => {
const address = this.receiver.address || "address";
const requestName = options!.requestName;
const desc: string =
`[${this.connection.id}] The request "${requestName}" ` +
`to "${address}" has been cancelled by the user.`;
log.error(desc);
const error = new AbortError(
`The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.`
);

reject(error);
};
reject(error);
};

const onAbort = () => {
// remove the event listener as this will be registered next time someone makes a request.
this.receiver.removeListener(ReceiverEvents.message, messageCallback);
// safe to clear the timeout if it hasn't already occurred.
if (!timeOver) {
clearTimeout(waitTimer);
}
aborter!.removeEventListener("abort", onAbort);
const onAbort = () => {
// remove the event listener as this will be registered next time someone makes a request.
this.receiver.removeListener(ReceiverEvents.message, messageCallback);
// safe to clear the timeout if it hasn't already occurred.
if (!timeOver) {
clearTimeout(waitTimer);
}
aborter!.removeEventListener("abort", onAbort);

rejectOnAbort();
};
rejectOnAbort();
};

if (aborter) {
// the aborter may have been triggered between request attempts
// so check if it was triggered and reject if needed.
if (aborter.aborted) {
return rejectOnAbort();
}
aborter.addEventListener("abort", onAbort);
if (aborter) {
// the aborter may have been triggered between request attempts
// so check if it was triggered and reject if needed.
if (aborter.aborted) {
return rejectOnAbort();
}
aborter.addEventListener("abort", onAbort);
}

// Handle different variations of property names in responses emitted by EventHubs and ServiceBus.
const getCodeDescriptionAndError = (props: any): NormalizedInfo => {
if (!props) props = {};
return {
statusCode: (props[Constants.statusCode] || props.statusCode) as number,
statusDescription: (props[Constants.statusDescription] ||
props.statusDescription) as string,
errorCondition: (props[Constants.errorCondition] || props.errorCondition) as string
};
// Handle different variations of property names in responses emitted by EventHubs and ServiceBus.
const getCodeDescriptionAndError = (props: any): NormalizedInfo => {
if (!props) props = {};
return {
statusCode: (props[Constants.statusCode] || props.statusCode) as number,
statusDescription: (props[Constants.statusDescription] ||
props.statusDescription) as string,
errorCondition: (props[Constants.errorCondition] || props.errorCondition) as string
};
};

const messageCallback = (context: EventContext) => {
// remove the event listeners as they will be registered next time when someone makes a request.
this.receiver.removeListener(ReceiverEvents.message, messageCallback);
if (aborter) {
aborter.removeEventListener("abort", onAbort);
}
const info = getCodeDescriptionAndError(context.message!.application_properties);
const responseCorrelationId = context.message!.correlation_id;
log.reqres(
"[%s] %s response: ",
this.connection.id,
request.to || "$management",
context.message
);
if (info.statusCode > 199 && info.statusCode < 300) {
if (
request.message_id === responseCorrelationId ||
request.correlation_id === responseCorrelationId
) {
if (!timeOver) {
clearTimeout(waitTimer);
}
log.reqres(
"[%s] request-messageId | '%s' == '%s' | response-correlationId.",
this.connection.id,
request.message_id,
responseCorrelationId
);
return resolve(context.message);
} else {
log.error(
"[%s] request-messageId | '%s' != '%s' | response-correlationId. " +
"Hence dropping this response and waiting for the next one.",
this.connection.id,
request.message_id,
responseCorrelationId
);
const messageCallback = (context: EventContext) => {
// remove the event listeners as they will be registered next time when someone makes a request.
this.receiver.removeListener(ReceiverEvents.message, messageCallback);
if (aborter) {
aborter.removeEventListener("abort", onAbort);
}
const info = getCodeDescriptionAndError(context.message!.application_properties);
const responseCorrelationId = context.message!.correlation_id;
log.reqres(
"[%s] %s response: ",
this.connection.id,
request.to || "$management",
context.message
);
if (info.statusCode > 199 && info.statusCode < 300) {
if (
request.message_id === responseCorrelationId ||
request.correlation_id === responseCorrelationId
) {
if (!timeOver) {
clearTimeout(waitTimer);
}
log.reqres(
"[%s] request-messageId | '%s' == '%s' | response-correlationId.",
this.connection.id,
request.message_id,
responseCorrelationId
);
return resolve(context.message);
} else {
const condition =
info.errorCondition ||
ConditionStatusMapper[info.statusCode] ||
"amqp:internal-error";
const e: AmqpError = {
condition: condition,
description: info.statusDescription
};
const error = translate(e);
log.error(error);
return reject(error);
}
};

const actionAfterTimeout = () => {
timeOver = true;
this.receiver.removeListener(ReceiverEvents.message, messageCallback);
if (aborter) {
aborter.removeEventListener("abort", onAbort);
log.error(
"[%s] request-messageId | '%s' != '%s' | response-correlationId. " +
"Hence dropping this response and waiting for the next one.",
this.connection.id,
request.message_id,
responseCorrelationId
);
}
const address = this.receiver.address || "address";
const desc: string =
`The request with message_id "${request.message_id}" to "${address}" ` +
`endpoint timed out. Please try again later.`;
} else {
const condition =
info.errorCondition || ConditionStatusMapper[info.statusCode] || "amqp:internal-error";
const e: AmqpError = {
condition: ConditionStatusMapper[408],
description: desc
condition: condition,
description: info.statusDescription
};
return reject(translate(e));
const error = translate(e);
log.error(error);
return reject(error);
}
};

const actionAfterTimeout = () => {
timeOver = true;
this.receiver.removeListener(ReceiverEvents.message, messageCallback);
if (aborter) {
aborter.removeEventListener("abort", onAbort);
}
const address = this.receiver.address || "address";
const desc: string =
`The request with message_id "${request.message_id}" to "${address}" ` +
`endpoint timed out. Please try again later.`;
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
return reject(translate(e));
};

this.receiver.on(ReceiverEvents.message, messageCallback);
waitTimer = setTimeout(actionAfterTimeout, options!.timeoutInSeconds! * 1000);
log.reqres(
"[%s] %s request sent: %O",
this.connection.id,
request.to || "$managment",
request
);
this.sender.send(request);
});
const config: RetryConfig<AmqpMessage> = {
operation: sendRequestPromise,
connectionId: this.connection.id,
operationType:
request.to && request.to === Constants.cbsEndpoint
? RetryOperationType.cbsAuth
: RetryOperationType.management,
delayInSeconds: options.delayInSeconds,
maxRetries: options.maxRetries
};
return retry<AmqpMessage>(config);
waitTimer = setTimeout(actionAfterTimeout, options!.timeoutInSeconds! * 1000);
this.receiver.on(ReceiverEvents.message, messageCallback);

log.reqres(
"[%s] %s request sent: %O",
this.connection.id,
request.to || "$managment",
request
);
this.sender.send(request);
});
}

/**
Expand Down
Loading