Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
260b0e6
[Event Hubs] Introduce timeoutInMs on RetryOptions (#4239)
ramya0820 Jul 11, 2019
c74f5e4
Merge branch 'master' of https://github.com/ramya0820/azure-sdk-for-js
Jul 13, 2019
00016cb
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 15, 2019
5c12c13
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 16, 2019
2081256
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
Jul 17, 2019
f3511e0
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 19, 2019
ee0994f
Update retryOptions
ramya0820 Jul 23, 2019
f5cf034
Update constructor
ramya0820 Jul 23, 2019
340c8bd
Update types
ramya0820 Jul 23, 2019
558f67e
Minor errors
Jul 23, 2019
6131e97
Include api.md file
Jul 23, 2019
8acb8ea
Use timeOutInMs
ramya0820 Jul 23, 2019
0263f0c
Merge branch 'issue-4266' of https://github.com/ramya0820/azure-sdk-f…
ramya0820 Jul 23, 2019
c6e5d02
Fix typo
Jul 23, 2019
251b26c
Merge branch 'master' into issue-4266
ramya0820 Jul 23, 2019
d51b4c6
Remove comma
Jul 25, 2019
11b8c38
Merge branch 'issue-4266' of https://github.com/ramya0820/azure-sdk-f…
Jul 25, 2019
a5a2f3d
Address comments
Jul 26, 2019
91c5bc3
Add comment
Jul 26, 2019
ea840e3
Remove _initRetryOptions
Jul 29, 2019
6a05bad
Fix import
Jul 29, 2019
94a58e1
Simplify getOwnerLevel check
Jul 29, 2019
4a25bd9
Simplify retryOptions handling in sender
Jul 29, 2019
c2bf51a
Update retryInterval -> delayInMs
Jul 29, 2019
09d5d42
Add api.md file
Jul 29, 2019
9a9c5f8
Update management requests to include exponential retry related options
Jul 29, 2019
2d6c112
Revert rename
Jul 29, 2019
aa495cd
Update retryInterval checks
Jul 29, 2019
1b8c7cb
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Jul 29, 2019
d623472
Add api.md file
Jul 29, 2019
fdf729a
Uniformize usage
Jul 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/core/core-amqp/src/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ export async function retry<T>(config: RetryConfig<T>): Promise<T> {
if (config.minExponentialRetryDelayInMs == undefined || config.minExponentialRetryDelayInMs < 0) {
config.minExponentialRetryDelayInMs = defaultMinDelayForExponentialRetryInMs;
}
if (config.retryPolicy == undefined) {
config.retryPolicy = RetryPolicy.LinearRetryPolicy;
}
let lastError: MessagingError | undefined;
let result: any;
let success = false;
Expand Down
4 changes: 4 additions & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { EventHubConnectionConfig } from '@azure/core-amqp';
import { MessagingError } from '@azure/core-amqp';
import { Receiver } from 'rhea-promise';
import { ReceiverOptions } from 'rhea-promise';
import { RetryPolicy } from '@azure/core-amqp';
import { Sender } from 'rhea-promise';
import { SharedKeyCredential } from '@azure/core-amqp';
import { TokenCredential } from '@azure/core-amqp';
Expand Down Expand Up @@ -190,8 +191,11 @@ export class ReceiveHandler {

// @public
export interface RetryOptions {
maxExponentialRetryDelayInMs?: number;
maxRetries?: number;
minExponentialRetryDelayInMs?: number;
retryInterval?: number;
retryPolicy?: RetryPolicy;
timeoutInMs?: number;
}

Expand Down
27 changes: 16 additions & 11 deletions sdk/eventhub/event-hubs/src/eventHubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import {
SharedKeyCredential,
ConnectionConfig,
isTokenCredential,
Constants
Constants,
RetryPolicy
} from "@azure/core-amqp";

import { ConnectionContext } from "./connectionContext";
Expand Down Expand Up @@ -40,16 +41,20 @@ export interface RetryOptions {
* A minimum value of 60 seconds will be used if a value not greater than this is provided.
*/
timeoutInMs?: number;
// /**
// * The maximum value the `retryInterval` gets incremented exponentially between retries.
// * Not applicable, when `isExponential` is set to `false`.
// */
// maxRetryInterval?: number;
// /**
// * Boolean denoting if the `retryInterval` should be incremented exponentially between
// * retries or kept the same.
// */
// isExponential?: boolean;
/**
* @property {RetryPolicy} [retryPolicy] Denotes which retry policy to apply. If undefined, defaults to `LinearRetryPolicy`
*/
retryPolicy?: RetryPolicy;
/**
* @property {number} [maxExponentialRetryDelayInMs] Denotes the maximum delay between retries
* that the retry attempts will be capped at. Applicable only when performing exponential retry.
*/
maxExponentialRetryDelayInMs?: number;
/**
* @property {number} [minExponentialRetryDelayInMs] Denotes the minimum delay between retries
* to use. Applicable only when performing exponential retry.
*/
minExponentialRetryDelayInMs?: number;
}

export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number {
Expand Down
66 changes: 33 additions & 33 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ import {
import { EventData, toAmqpMessage } from "./eventData";
import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import { SendOptions, EventHubProducerOptions } from "./eventHubClient";
import {
SendOptions,
EventHubProducerOptions,
getRetryAttemptTimeoutInMs,
RetryOptions
} from "./eventHubClient";
import { AbortSignalLike, AbortError } from "@azure/abort-controller";
import { EventDataBatch } from "./eventDataBatch";
import { getRetryAttemptTimeoutInMs, RetryOptions } from "./eventHubClient";

/**
* @ignore
Expand Down Expand Up @@ -347,12 +351,14 @@ export class EventHubSender extends LinkEntity {
* @returns Promise<number>
* @throws {AbortError} Thrown if the operation is cancelled via the abortSignal.
*/
async getMaxMessageSize(options?: {
retryOptions?: RetryOptions;
abortSignal?: AbortSignalLike;
}): Promise<number> {
const abortSignal = options && options.abortSignal;
const retryOptions = options && options.retryOptions;
async getMaxMessageSize(
options: {
retryOptions?: RetryOptions;
abortSignal?: AbortSignalLike;
} = {}
): Promise<number> {
const abortSignal = options.abortSignal;
const retryOptions = options.retryOptions || {};
if (this.isOpen()) {
return this._sender!.maxMessageSize;
}
Expand Down Expand Up @@ -386,23 +392,18 @@ export class EventHubSender extends LinkEntity {
this.senderLock
);
await defaultLock.acquire(this.senderLock, () => {
const maxRetries =
retryOptions && typeof retryOptions.maxRetries === "number"
? retryOptions.maxRetries
: Constants.defaultMaxRetries;
const retryInterval =
retryOptions &&
typeof retryOptions.retryInterval === "number" &&
retryOptions.retryInterval > 0
? retryOptions.retryInterval / 1000
: Constants.defaultDelayBetweenOperationRetriesInSeconds;

const config: RetryConfig<void> = {
operation: () => this._init(),
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
maxRetries: maxRetries,
delayInSeconds: retryInterval
maxRetries: retryOptions.maxRetries,
delayInSeconds:
typeof retryOptions.retryInterval === "number"
? retryOptions.retryInterval / 1000
: undefined,
retryPolicy: retryOptions.retryPolicy,
minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs,
maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs
};

return retry<void>(config);
Expand Down Expand Up @@ -555,6 +556,7 @@ export class EventHubSender extends LinkEntity {
options: SendOptions & EventHubProducerOptions = {}
): Promise<void> {
const abortSignal: AbortSignalLike | undefined = options.abortSignal;
const retryOptions = options.retryOptions || {};
const sendEventPromise = () =>
new Promise<void>(async (resolve, reject) => {
let waitTimer: any;
Expand All @@ -567,9 +569,8 @@ export class EventHubSender extends LinkEntity {

const rejectOnAbort = () => {
const desc: string =
`[${this._context.connectionId}] The send operation on the Sender "${
this.name
}" with ` + `address "${this.address}" has been cancelled by the user.`;
`[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` +
`address "${this.address}" has been cancelled by the user.`;
log.error(desc);
return reject(new AbortError("The send operation has been cancelled by the user."));
};
Expand Down Expand Up @@ -742,19 +743,18 @@ export class EventHubSender extends LinkEntity {
}
});

const maxRetries = options.retryOptions && options.retryOptions.maxRetries;
const delayInSeconds =
options.retryOptions &&
options.retryOptions.retryInterval &&
options.retryOptions.retryInterval >= 0
? options.retryOptions.retryInterval / 1000
: Constants.defaultDelayBetweenOperationRetriesInSeconds;
const config: RetryConfig<void> = {
operation: sendEventPromise,
connectionId: this._context.connectionId,
operationType: RetryOperationType.sendMessage,
maxRetries: maxRetries,
delayInSeconds: delayInSeconds
maxRetries: retryOptions.maxRetries,
delayInSeconds:
typeof retryOptions.retryInterval === "number"
? retryOptions.retryInterval / 1000
: undefined,
retryPolicy: retryOptions.retryPolicy,
minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs,
maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs
};
return retry<void>(config);
}
Expand Down
18 changes: 9 additions & 9 deletions sdk/eventhub/event-hubs/src/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ export class ManagementClient extends LinkEntity {
requestName?: string;
} = {}
): Promise<any> {
const retryOptions = options.retryOptions || {};
try {
const aborter: AbortSignalLike | undefined = options && options.abortSignal;

Expand Down Expand Up @@ -414,19 +415,18 @@ export class ManagementClient extends LinkEntity {
}
});

const maxRetries = options.retryOptions && options.retryOptions.maxRetries;
const delayInSeconds =
options.retryOptions &&
options.retryOptions.retryInterval &&
options.retryOptions.retryInterval >= 0
? options.retryOptions.retryInterval / 1000
: Constants.defaultDelayBetweenOperationRetriesInSeconds;
const config: RetryConfig<Message> = {
operation: sendOperationPromise,
connectionId: this._context.connectionId,
operationType: RetryOperationType.management,
maxRetries: maxRetries,
delayInSeconds: delayInSeconds
maxRetries: retryOptions.maxRetries,
delayInSeconds:
typeof retryOptions.retryInterval === "number"
? retryOptions.retryInterval / 1000
: undefined,
retryPolicy: retryOptions.retryPolicy,
minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs,
maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs
};
return (await retry<Message>(config)).body;
} catch (err) {
Expand Down
36 changes: 12 additions & 24 deletions sdk/eventhub/event-hubs/src/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class EventHubConsumer {
/**
* @property The set of retry options to configure the receiveBatch operation.
*/
private _retryOptions: Required<Pick<RetryOptions, "maxRetries" | "retryInterval">>;
private _retryOptions: RetryOptions;

/**
* @property Returns `true` if the consumer is closed. This can happen either because the consumer
Expand Down Expand Up @@ -112,7 +112,7 @@ export class EventHubConsumer {
* @readonly
*/
get ownerLevel(): number | undefined {
return this._receiverOptions && this._receiverOptions.ownerLevel;
return this._receiverOptions.ownerLevel;
}

/**
Expand All @@ -139,7 +139,7 @@ export class EventHubConsumer {
this._consumerGroup = consumerGroup;
this._partitionId = partitionId;
this._receiverOptions = options || {};
this._retryOptions = this._initRetryOptions(this._receiverOptions.retryOptions);
this._retryOptions = this._receiverOptions.retryOptions || {};
this._baseConsumer = new EventHubReceiver(
context,
consumerGroup,
Expand Down Expand Up @@ -370,7 +370,7 @@ export class EventHubConsumer {
);

const addTimeout = (): void => {
let msg = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
const msg = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
Comment thread
ramya-rao-a marked this conversation as resolved.
log.batching(
msg,
this._context.connectionId,
Expand Down Expand Up @@ -404,10 +404,16 @@ export class EventHubConsumer {
const config: RetryConfig<ReceivedEventData[]> = {
connectionHost: this._context.config.host,
connectionId: this._context.connectionId,
delayInSeconds: retryOptions.retryInterval,
delayInSeconds:
typeof retryOptions.retryInterval === "number" && retryOptions.retryInterval > 0
? retryOptions.retryInterval / 1000
: Constants.defaultDelayBetweenOperationRetriesInSeconds,
operation: retrieveEvents,
operationType: RetryOperationType.receiveMessage,
maxRetries: retryOptions.maxRetries
maxRetries: retryOptions.maxRetries,
retryPolicy: retryOptions.retryPolicy,
minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs,
maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs
};
return retry<ReceivedEventData[]>(config);
}
Expand Down Expand Up @@ -436,24 +442,6 @@ export class EventHubConsumer {
}
}

private _initRetryOptions(
retryOptions: RetryOptions = {}
): Required<Pick<RetryOptions, "maxRetries" | "retryInterval">> {
const maxRetries =
typeof retryOptions.maxRetries === "number"
? retryOptions.maxRetries
: Constants.defaultMaxRetries;
const retryInterval =
typeof retryOptions.retryInterval === "number" && retryOptions.retryInterval > 0
? retryOptions.retryInterval / 1000
: Constants.defaultDelayBetweenOperationRetriesInSeconds;

return {
maxRetries,
retryInterval
};
}

private _throwIfAlreadyReceiving(): void {
if (this.isReceivingMessages) {
const errorMessage = `The EventHubConsumer for "${this._context.config.entityPath}" is already receiving messages.`;
Expand Down