From 6cda731d513a038c6492b3fdf8fd983f2f2d1cde Mon Sep 17 00:00:00 2001 From: Harsha Nalluru Date: Mon, 25 Jan 2021 17:54:37 -0800 Subject: [PATCH 1/2] [Service Bus] Bug fix: batching receiver upon a disconnect (#13374) ## Bug - `receiveMessages` method returned zero messages upon a connection refresh caused by a network disconnect(simulated in the test). - `OperationTimeout` error on message settlement after the disconnect These two failures made the "disconnect" test occasionally fail for the batching receiver. ## Cause `onDetached` on the batchReceivers is called 300ms after the connection refresh causing the recovered receive link to be closed. - If the message returned from the service took close to 300ms until reached the receiver since the refresh, `onDetached` is called to close the link leading to the loss of messages. - If the 300ms had elapsed right before the settlement, we'd see the OperationTimeout error on settlement since the receive link is closed. Investigated here https://github.com/Azure/azure-sdk-for-js/pull/13339 ## Fix - Call `onDetached` for the batching receivers before calling the refresh connection - And retain calling `onDetached` for the streaming receivers after the refresh connection ## Changes in the PR - [x] Refactored "calling onDetached" part - [x] Removed the 300ms delay since we don't see the utility - [x] Changelog - [x] TODO: What to do for sessions? - [x] Needs more investigation https://github.com/Azure/azure-sdk-for-js/pull/13374#discussion_r564139864, will be handled at #8875 --- sdk/servicebus/service-bus/CHANGELOG.md | 3 + .../service-bus/src/connectionContext.ts | 143 +++++++++++++----- .../service-bus/src/core/batchingReceiver.ts | 2 +- 3 files changed, 110 insertions(+), 38 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 26a04420a083..1652db71699f 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -4,6 +4,9 @@ - [Bug Fix] Response from the `ServiceBusAdministrationClient.getSubscriptionRuntimeProperties()` method had the message count properties to be zero. The bug has been fixed in [#13229](https://github.com/Azure/azure-sdk-for-js/pull/13229) +- [Bug Fix] Fixed a race condition where the `ServiceBusReceiver.receiveMessages` might lose messages and not return any if triggered right after the recovery from a network disruption. + The same race condition could also have led to an OperationTimeout error if attempted the message settlement. + [#13374](https://github.com/Azure/azure-sdk-for-js/pull/13374) ## 7.0.2 (2021-01-13) diff --git a/sdk/servicebus/service-bus/src/connectionContext.ts b/sdk/servicebus/service-bus/src/connectionContext.ts index 7657d8f941e1..7a34f4583d94 100644 --- a/sdk/servicebus/service-bus/src/connectionContext.ts +++ b/sdk/servicebus/service-bus/src/connectionContext.ts @@ -6,13 +6,18 @@ import { packageJsonInfo } from "./util/constants"; import { ConnectionConfig, ConnectionContextBase, - Constants, - CreateConnectionContextBaseParameters, - delay + CreateConnectionContextBaseParameters } from "@azure/core-amqp"; import { TokenCredential } from "@azure/core-auth"; import { ServiceBusClientOptions } from "./constructorHelpers"; -import { Connection, ConnectionEvents, EventContext, OnAmqpEvent } from "rhea-promise"; +import { + AmqpError, + Connection, + ConnectionError, + ConnectionEvents, + EventContext, + OnAmqpEvent +} from "rhea-promise"; import { MessageSender } from "./core/messageSender"; import { MessageSession } from "./session/messageSession"; import { MessageReceiver } from "./core/messageReceiver"; @@ -20,6 +25,7 @@ import { ManagementClient } from "./core/managementClient"; import { formatUserAgentPrefix } from "./util/utils"; import { getRuntimeInfo } from "./util/runtimeInfo"; import { SharedKeyCredential } from "./servicebusSharedKeyCredential"; +import { ReceiverType } from "./core/linkEntity"; /** * @internal @@ -130,6 +136,66 @@ type ConnectionContextMethods = Omit< > & ThisType; +/** + * @internal + * @hidden + * Helper method to call onDetached on the receivers from the connection context upon seeing an error. + */ +async function callOnDetachedOnReceivers( + connectionContext: ConnectionContext, + contextOrConnectionError: Error | ConnectionError | AmqpError | undefined, + receiverType: ReceiverType +) { + const detachCalls: Promise[] = []; + + for (const receiverName of Object.keys(connectionContext.messageReceivers)) { + const receiver = connectionContext.messageReceivers[receiverName]; + if (receiver && receiver.receiverType === receiverType) { + logger.verbose( + "[%s] calling detached on %s receiver '%s'.", + connectionContext.connection.id, + receiver.receiverType, + receiver.name + ); + detachCalls.push( + receiver.onDetached(contextOrConnectionError).catch((err) => { + logger.logError( + err, + "[%s] An error occurred while calling onDetached() on the %s receiver '%s'", + connectionContext.connection.id, + receiver.receiverType, + receiver.name + ); + }) + ); + } + } + + return Promise.all(detachCalls); +} + +/** + * @internal + * @hidden + * Helper method to get the number of receivers of specified type from the connectionContext. + */ +async function getNumberOfReceivers( + connectionContext: Pick, + receiverType: ReceiverType +) { + if (receiverType === "session") { + const receivers = connectionContext.messageSessions; + return Object.keys(receivers).length; + } + const receivers = connectionContext.messageReceivers; + const receiverNames = Object.keys(receivers); + const count = receiverNames.reduce( + (acc, name) => (receivers[name].receiverType === receiverType ? ++acc : acc), + 0 + ); + return count; +} + /** * @internal * @hidden @@ -325,7 +391,6 @@ export namespace ConnectionContext { // by cleaning up the timers and closing the links. // We don't call onDetached for sender after `refreshConnection()` // because any new send calls that potentially initialize links would also get affected if called later. - // TODO: do the same for batching receiver logger.verbose( `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` + `senders. We should not reconnect.` @@ -354,47 +419,51 @@ export namespace ConnectionContext { await Promise.all(detachCalls); } + // Calling onDetached on batching receivers for the same reasons as sender + const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching"); + if (!state.wasConnectionCloseCalled && numBatchingReceivers) { + logger.verbose( + `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` + + `batching receivers. We should reconnect.` + ); + + // Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation + await callOnDetachedOnReceivers( + connectionContext, + connectionError || contextError, + "batching" + ); + + // TODO: + // `callOnDetachedOnReceivers` handles "connectionContext.messageReceivers". + // ...What to do for sessions (connectionContext.messageSessions) ?? + } + await refreshConnection(connectionContext); waitForConnectionRefreshResolve(); waitForConnectionRefreshPromise = undefined; // The connection should always be brought back up if the sdk did not call connection.close() // and there was at least one receiver link on the connection before it went down. logger.verbose("[%s] state: %O", connectionContext.connectionId, state); - if (!state.wasConnectionCloseCalled && state.numReceivers) { + + // Calling onDetached on streaming receivers + const numStreamingReceivers = getNumberOfReceivers(connectionContext, "streaming"); + if (!state.wasConnectionCloseCalled && numStreamingReceivers) { logger.verbose( - `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` + - `receivers. We should reconnect.` + `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numStreamingReceivers} ` + + `streaming receivers. We should reconnect.` ); - await delay(Constants.connectionReconnectDelay); - - const detachCalls: Promise[] = []; - - // Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation - // and streaming receivers can decide whether to reconnect or not. - for (const receiverName of Object.keys(connectionContext.messageReceivers)) { - const receiver = connectionContext.messageReceivers[receiverName]; - if (receiver) { - logger.verbose( - "[%s] calling detached on %s receiver '%s'.", - connectionContext.connection.id, - receiver.receiverType, - receiver.name - ); - detachCalls.push( - receiver.onDetached(connectionError || contextError).catch((err) => { - logger.logError( - err, - "[%s] An error occurred while calling onDetached() on the %s receiver '%s'", - connectionContext.connection.id, - receiver.receiverType, - receiver.name - ); - }) - ); - } - } - await Promise.all(detachCalls); + // Calling `onDetached()` on streaming receivers after the refreshConnection() since `onDetached()` would + // recover the streaming receivers and that would only be possible after the connection is refreshed. + // + // This is different from the batching receiver since `onDetached()` for the batching receiver would + // return the outstanding messages and close the receive link. + await callOnDetachedOnReceivers( + connectionContext, + connectionError || contextError, + "streaming" + ); } }; diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index f3203ec0c78f..45bbfee55aaf 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -93,7 +93,7 @@ export class BatchingReceiver extends MessageReceiver { ); } - await this._batchingReceiverLite.close(connectionError); + this._batchingReceiverLite.close(connectionError); } /** From f70a63278bf52b026778933b526f28004bd7c166 Mon Sep 17 00:00:00 2001 From: SDKAuto Date: Tue, 26 Jan 2021 02:15:09 +0000 Subject: [PATCH 2/2] CodeGen from PR 12469 in Azure/azure-rest-api-specs Merge 05c6812b913d032d94495a41199825eb7f82eee6 into 9022265e19ebe2fa6c1955f6d99a51203897bf3d --- .../arm-marketplaceordering/LICENSE.txt | 2 +- .../arm-marketplaceordering/README.md | 9 ++++----- .../arm-marketplaceordering/rollup.config.js | 4 ++-- .../src/marketplaceOrderingAgreements.ts | 5 ++--- .../src/marketplaceOrderingAgreementsContext.ts | 5 ++--- .../arm-marketplaceordering/src/models/index.ts | 4 ++-- .../arm-marketplaceordering/src/models/mappers.ts | 4 ++-- .../src/models/marketplaceAgreementsMappers.ts | 5 ++--- .../src/models/operationsMappers.ts | 4 ++-- .../arm-marketplaceordering/src/models/parameters.ts | 5 ++--- .../arm-marketplaceordering/src/operations/index.ts | 5 ++--- .../src/operations/marketplaceAgreements.ts | 11 +++++------ .../src/operations/operations.ts | 8 +++++--- 13 files changed, 33 insertions(+), 38 deletions(-) diff --git a/sdk/marketplaceordering/arm-marketplaceordering/LICENSE.txt b/sdk/marketplaceordering/arm-marketplaceordering/LICENSE.txt index ea8fb1516028..2d3163745319 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/LICENSE.txt +++ b/sdk/marketplaceordering/arm-marketplaceordering/LICENSE.txt @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2020 Microsoft +Copyright (c) 2021 Microsoft Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/sdk/marketplaceordering/arm-marketplaceordering/README.md b/sdk/marketplaceordering/arm-marketplaceordering/README.md index 3a43a4a5cfcb..6542e1564936 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/README.md +++ b/sdk/marketplaceordering/arm-marketplaceordering/README.md @@ -15,7 +15,7 @@ npm install @azure/arm-marketplaceordering ### How to use -#### nodejs - Authentication, client creation and get marketplaceAgreements as an example written in TypeScript. +#### nodejs - client creation and get marketplaceAgreements as an example written in TypeScript. ##### Install @azure/ms-rest-nodeauth @@ -26,11 +26,10 @@ npm install @azure/ms-rest-nodeauth@"^3.0.0" ##### Sample code +While the below sample uses the interactive login, other authentication options can be found in the [README.md file of @azure/ms-rest-nodeauth](https://www.npmjs.com/package/@azure/ms-rest-nodeauth) package ```typescript -import * as msRest from "@azure/ms-rest-js"; -import * as msRestAzure from "@azure/ms-rest-azure-js"; -import * as msRestNodeAuth from "@azure/ms-rest-nodeauth"; -import { MarketplaceOrderingAgreements, MarketplaceOrderingAgreementsModels, MarketplaceOrderingAgreementsMappers } from "@azure/arm-marketplaceordering"; +const msRestNodeAuth = require("@azure/ms-rest-nodeauth"); +const { MarketplaceOrderingAgreements } = require("@azure/arm-marketplaceordering"); const subscriptionId = process.env["AZURE_SUBSCRIPTION_ID"]; msRestNodeAuth.interactiveLogin().then((creds) => { diff --git a/sdk/marketplaceordering/arm-marketplaceordering/rollup.config.js b/sdk/marketplaceordering/arm-marketplaceordering/rollup.config.js index 8bcf52f6116f..c04d21687542 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/rollup.config.js +++ b/sdk/marketplaceordering/arm-marketplaceordering/rollup.config.js @@ -21,8 +21,8 @@ const config = { "@azure/ms-rest-azure-js": "msRestAzure" }, banner: `/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is regenerated. diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/marketplaceOrderingAgreements.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/marketplaceOrderingAgreements.ts index 3c7a63b3770e..5865e7074265 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/marketplaceOrderingAgreements.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/marketplaceOrderingAgreements.ts @@ -1,7 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for - * license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/marketplaceOrderingAgreementsContext.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/marketplaceOrderingAgreementsContext.ts index 75ba980477a6..a92d6cccec02 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/marketplaceOrderingAgreementsContext.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/marketplaceOrderingAgreementsContext.ts @@ -1,7 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for - * license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/models/index.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/models/index.ts index 2c310d323eef..90aef904093a 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/models/index.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/models/index.ts @@ -1,6 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is regenerated. diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/models/mappers.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/models/mappers.ts index f5702ff6034c..471048b86575 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/models/mappers.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/models/mappers.ts @@ -1,6 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is regenerated. diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/models/marketplaceAgreementsMappers.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/models/marketplaceAgreementsMappers.ts index 10d9ac850590..e939bca2e2e9 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/models/marketplaceAgreementsMappers.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/models/marketplaceAgreementsMappers.ts @@ -1,6 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is regenerated. @@ -9,7 +9,6 @@ export { AgreementTerms, BaseResource, - CloudError, ErrorResponse, ErrorResponseError, Resource diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/models/operationsMappers.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/models/operationsMappers.ts index f95be41b77a7..ba8142bc9cac 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/models/operationsMappers.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/models/operationsMappers.ts @@ -1,6 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is regenerated. diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/models/parameters.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/models/parameters.ts index 2462ce8c76ff..754d990b3e44 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/models/parameters.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/models/parameters.ts @@ -1,7 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for - * license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/operations/index.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/operations/index.ts index 075720543718..cf7b4ef68238 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/operations/index.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/operations/index.ts @@ -1,7 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for - * license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/operations/marketplaceAgreements.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/operations/marketplaceAgreements.ts index bbedaac653da..80d7ff6b0112 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/operations/marketplaceAgreements.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/operations/marketplaceAgreements.ts @@ -1,7 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for - * license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is @@ -258,7 +257,7 @@ const getOperationSpec: msRest.OperationSpec = { bodyMapper: Mappers.AgreementTerms }, default: { - bodyMapper: Mappers.CloudError + bodyMapper: Mappers.ErrorResponse } }, serializer @@ -370,7 +369,7 @@ const getAgreementOperationSpec: msRest.OperationSpec = { bodyMapper: Mappers.AgreementTerms }, default: { - bodyMapper: Mappers.CloudError + bodyMapper: Mappers.ErrorResponse } }, serializer @@ -404,7 +403,7 @@ const listOperationSpec: msRest.OperationSpec = { } }, default: { - bodyMapper: Mappers.CloudError + bodyMapper: Mappers.ErrorResponse } }, serializer diff --git a/sdk/marketplaceordering/arm-marketplaceordering/src/operations/operations.ts b/sdk/marketplaceordering/arm-marketplaceordering/src/operations/operations.ts index ddcbbe2cc097..2e2824bb6c4a 100644 --- a/sdk/marketplaceordering/arm-marketplaceordering/src/operations/operations.ts +++ b/sdk/marketplaceordering/arm-marketplaceordering/src/operations/operations.ts @@ -1,7 +1,6 @@ /* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for - * license information. + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. * * Code generated by Microsoft (R) AutoRest Code Generator. * Changes may cause incorrect behavior and will be lost if the code is @@ -108,6 +107,9 @@ const listNextOperationSpec: msRest.OperationSpec = { urlParameters: [ Parameters.nextPageLink ], + queryParameters: [ + Parameters.apiVersion + ], headerParameters: [ Parameters.acceptLanguage ],