Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import {
BaseRequestPolicy,
RequestPolicy,
Expand All @@ -9,10 +10,8 @@ import {
WebResource,
HttpOperationResponse,
Constants,
delay,
RestError
} from "@azure/core-http";
import { AbortError } from "@azure/abort-controller";

/**
* @internal
Expand All @@ -25,21 +24,87 @@ export function throttlingRetryPolicy(): RequestPolicyFactory {
};
}

const StandardAbortMessage = "The operation was aborted.";

/**
* An executor for a function that returns a Promise that obeys both a timeout and an
* optional AbortSignal.
* @param actionFn - The callback that we want to resolve.
* @param timeoutMs - The number of milliseconds to allow before throwing an OperationTimeoutError.
* @param timeoutMessage - The message to place in the .description field for the thrown exception for Timeout.
* @param abortSignal - The abortSignal associated with containing operation.
*
* @internal
*/
export async function waitForTimeoutOrAbortOrResolve<T>(args: {
actionFn: () => Promise<T>;
timeoutMs: number;
timeoutMessage: string;
abortSignal: AbortSignalLike | undefined;
}): Promise<T> {
if (args.abortSignal && args.abortSignal.aborted) {
throw new AbortError(StandardAbortMessage);
}

let timer: any | undefined = undefined;
let clearAbortSignal: (() => void) | undefined = undefined;

const clearAbortSignalAndTimer = (): void => {
clearTimeout(timer);

if (clearAbortSignal) {
clearAbortSignal();
}
};

const abortOrTimeoutPromise = new Promise<T>((_resolve, reject) => {
clearAbortSignal = checkAndRegisterWithAbortSignal(reject, args.abortSignal);

timer = setTimeout(() => {
reject(new Error(args.timeoutMessage));
}, args.timeoutMs);
});

try {
return await Promise.race([abortOrTimeoutPromise, args.actionFn()]);
} finally {
clearAbortSignalAndTimer();
}
}

/**
* Maximum wait duration for the expected event to happen = `10000 ms`(default value is 10 seconds)(= maxWaitTimeInMilliseconds)
* Keep checking whether the predicate is true after every `1000 ms`(default value is 1 second) (= delayBetweenRetriesInMilliseconds)
* Registers listener to the abort event on the abortSignal to call your abortFn and
* returns a function that will clear the same listener.
*
* If abort signal is already aborted, then throws an AbortError and returns a function that does nothing
*
* @returns A function that removes any of our attached event listeners on the abort signal or an empty function if
* the abortSignal was not defined.
*
* @internal
*/
export async function checkWithTimeout(
predicate: () => boolean | Promise<boolean>,
delayBetweenRetriesInMilliseconds: number = 1000,
maxWaitTimeInMilliseconds: number = 10000
): Promise<boolean> {
const maxTime = Date.now() + maxWaitTimeInMilliseconds;
while (Date.now() < maxTime) {
if (await predicate()) return true;
await delay(delayBetweenRetriesInMilliseconds);
export function checkAndRegisterWithAbortSignal(
onAbortFn: (abortError: AbortError) => void,
abortSignal?: AbortSignalLike
): () => void {
if (abortSignal == null) {
return () => {
/** Nothing to do here, no abort signal */
};
}
return false;

if (abortSignal.aborted) {
throw new AbortError(StandardAbortMessage);
}

const onAbort = (): void => {
abortSignal.removeEventListener("abort", onAbort);
onAbortFn(new AbortError(StandardAbortMessage));
};

abortSignal.addEventListener("abort", onAbort);

return () => abortSignal.removeEventListener("abort", onAbort);
}

/**
Expand All @@ -63,16 +128,14 @@ export class ThrottlingRetryPolicy extends BaseRequestPolicy {
throw err;
}

const delayBetweenRechecksInMs = 1;
await checkWithTimeout(
() => httpRequest.abortSignal?.aborted === true,
delayBetweenRechecksInMs,
delayInMs
);
if (httpRequest.abortSignal?.aborted) {
throw new AbortError("The operation was aborted.");
}
return await this.sendRequest(httpRequest.clone());
return await waitForTimeoutOrAbortOrResolve({
timeoutMs: delayInMs,
abortSignal: httpRequest.abortSignal,
actionFn: async () => {
return await this.sendRequest(httpRequest.clone());
},
timeoutMessage: `ServiceBusy: Unable to fulfill the request in ${delayInMs}ms when retried.`
});
} else {
throw err;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,41 @@ describe("AppConfigurationClient", () => {
describe.only("simple usages", () => {
it("Add and query a setting without a label", async () => {
const key = recorder.getUniqueName("noLabelTests");
const numberOfSettings = 2000;
const promises = [];
try {
for (let index = 0; index < numberOfSettings; index++) {
promises.push(
client.addConfigurationSetting(
{
key: key + " " + +index,
value: "added"
},
{
abortSignal: AbortController.timeout(10000)
}
)
);
const numberOfSettings = 200;
const times = 1000;
for (let time = 0; time < times; time++) {
const promises = [];
try {
for (let index = 0; index < numberOfSettings; index++) {
promises.push(
client.addConfigurationSetting(
{
key: key + " " + +index,
value: "added"
},
{
abortSignal: AbortController.timeout(10000)
}
)
);
}
await Promise.all(promises);
} catch (error) {
console.log(error);
}
await Promise.all(promises);
} catch (error) {
console.log(error);
}

await client.deleteConfigurationSetting({ key });
await cleanupSampleValues([key], client);
});
});
});

async function cleanupSampleValues(keys: string[], client: AppConfigurationClient) {
const settingsIterator = client.listConfigurationSettings({
keyFilter: keys.join(",")
});

for await (const setting of settingsIterator) {
await client.deleteConfigurationSetting({ key: setting.key, label: setting.label });
}
}